Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import sys
2import os
3import subprocess
4import inspect
5import time
6import functools
7import getpass
8import copy
9import io
10from logging import getLogger
12import json
13import yaml
14import toml
15import jsonpath_rw
16from threading import Lock
17from pkg_resources import resource_stream
18from lxml import etree
19from PIL import Image
20from selenium.webdriver.common.by import By
21import selenium.common.exceptions
22from jinja2 import Template
23from ..version import VERSION
26class Base:
27 passcmd = "pass"
28 schema = yaml.safe_load(resource_stream(__name__, '../schema/base.yaml'))
30 def __init__(self):
31 self.lock = Lock()
32 self.step = False
33 self.save_every = False
34 self._driver = None
35 self.variables = {
36 "selenible_version": VERSION,
37 }
38 self.funcs = {}
39 self.log = getLogger(self.__class__.__name__)
40 self.browser_args = {}
42 @property
43 def driver(self):
44 if self._driver is None:
45 self._driver = self.boot_driver()
46 self.log.info("driver started")
47 self.variables["driver"] = self._driver.name
48 self.variables["desired_capabilities"] = self._driver.desired_capabilities
49 return self._driver
51 def get_options(self):
52 return {}
54 def boot_driver(self):
55 raise Exception("please implement")
57 def shutdown_driver(self):
58 if hasattr(self, "_driver") and self._driver is not None:
59 self._driver.close()
60 self._driver.quit()
61 self._driver = None
63 def printpdf(self, output_fn):
64 raise Exception("please implement")
66 def __del__(self):
67 self.shutdown_driver()
69 @classmethod
70 def load_modules(cls, modname):
71 log = getLogger(cls.__name__)
72 log.debug("load module %s", modname)
73 pfx = cls.__name__ + "_"
74 try:
75 mm = modname.rsplit(".", 1)
76 if len(mm) == 1:
77 modfirst = "selenible.modules"
78 modlast = mm[0]
79 else:
80 modfirst = mm[0]
81 modlast = mm[1]
82 mod1 = __import__(modfirst, globals(), locals(), [modlast], 0)
83 mod = getattr(mod1, modlast)
84 except AttributeError:
85 log.debug("cannot import %s from %s", modlast, modfirst)
86 return
87 log.debug("names: %s", dir(mod))
88 mtd = []
89 for m in filter(lambda f: f.startswith(pfx), dir(mod)):
90 fn = getattr(mod, m)
91 if callable(fn):
92 log.debug("register method: %s", m[len(pfx):])
93 name = "do_%s" % (m[len(pfx):])
94 setattr(cls, name, fn)
95 funcname = m[len(pfx):]
96 mtd.append(funcname)
97 scmname = "%s_schema" % (funcname)
98 if hasattr(mod, scmname):
99 scm = getattr(mod, scmname)
100 if isinstance(scm, dict):
101 cls.schema["items"]["properties"][funcname] = scm
102 else:
103 log.warn("%s is not callable", fn)
104 if len(mtd) != 0:
105 log.debug("register methods: %s", "/".join(mtd))
107 def load_vars(self, fp):
108 self.variables.update(yaml.safe_load(fp))
110 def render(self, s):
111 return Template(s).render(self.variables)
113 def render_dict(self, d):
114 if isinstance(d, dict):
115 res = {}
116 for k, v in d.items():
117 res[k] = self.render_dict(v)
118 return res
119 elif isinstance(d, (list, tuple)):
120 return [self.render_dict(x) for x in d]
121 elif isinstance(d, str):
122 return self.render(d)
123 return d
125 def run(self, prog):
126 res = None
127 for cmd in prog:
128 self.log.debug("cmd %s", cmd)
129 res = self.run1(cmd)
130 if self.step:
131 ans = input(
132 "step(q=exit, s=screenshot, c=continue, other=continue):")
133 if ans == "q":
134 break
135 elif ans == "s":
136 self.saveshot_image().show()
137 elif ans == "c":
138 self.step = False
139 if self.save_every:
140 self.do_screenshot({})
141 return res
143 def run1(self, cmd):
144 withitem = self.render_dict(cmd.pop("with_items", None))
145 delay = cmd.pop("delay", 0)
146 if withitem is not None:
147 loopctl = self.render_dict(cmd.pop("loop_control", {}))
148 loopvar = loopctl.get("loop_var", "item")
149 loopiter = loopctl.get("loop_iter", "iter")
150 start = time.time()
151 if isinstance(withitem, dict) and "range" in withitem:
152 rg = withitem.get("range")
153 if isinstance(rg, (list, tuple)):
154 withitem = range(*rg)
155 else:
156 withitem = range(int(rg))
157 elif isinstance(withitem, str):
158 withitem = self.variables.get(withitem, None)
159 self.log.info("start loop: %d times", len(withitem))
160 for i, j in enumerate(withitem):
161 self.variables[loopvar] = j
162 self.variables[loopiter] = i
163 self.log.info("loop by %d: %s", i, j)
164 res = self.run1(cmd.copy())
165 time.sleep(delay)
166 self.variables.pop(loopvar)
167 self.variables.pop(loopiter)
168 self.log.info("finish loop: %f second", time.time() - start)
169 return res
170 # cmd = self.render_dict(cmd)
171 name = self.render_dict(cmd.pop("name", ""))
172 condition = self.render_dict(cmd.pop("when", True))
173 ncondition = self.render_dict(cmd.pop("when_not", False))
174 if not self.eval_param(condition):
175 self.log.info("skip(when) %s", repr(name))
176 return
177 if self.eval_param(ncondition):
178 self.log.info("skip(when_not) %s", repr(name))
179 return
180 register = self.render_dict(cmd.pop("register", None))
181 ignoreerr = self.render_dict(cmd.pop("ignore_error", False))
182 if len(cmd) != 1:
183 raise Exception("too many parameters: %s" % (cmd.keys()))
184 self.variables["env"] = os.environ
185 if self._driver is not None:
186 # set driver related variables
187 for v in ("current_url", "page_source", "title",
188 "window_handles", "session_id", "current_window_handle",
189 "capabilities", "log_types", "w3c"):
190 try:
191 self.variables[v] = getattr(self.driver, v)
192 except selenium.common.exceptions.WebDriverException:
193 self.log.info("cannot get attribute %s", v)
194 for v in ("cookies", "window_size", "window_position"):
195 try:
196 self.variables[v] = getattr(self.driver, "get_" + v)()
197 except selenium.common.exceptions.WebDriverException:
198 self.log.info("cannot get attribute %s", v)
199 self.variables["log"] = {}
200 try:
201 for logtype in self.driver.log_types:
202 self.variables["log"][logtype] = self.driver.get_log(
203 logtype)
204 # phantomjs case
205 try:
206 if logtype == "har":
207 logdata = json.loads(
208 self.variables["log"][logtype][0]["message"])
209 self.variables["log"][logtype][0]["message"] = logdata
210 except (KeyError, IndexError, json.decoder.JSONDecodeError):
211 self.log.debug(
212 "log.har.0.message does not exists or not json")
213 pass
214 except selenium.common.exceptions.WebDriverException:
215 self.log.info("cannot get log types")
216 for c in cmd.keys():
217 mtdname = "do_%s" % (c)
218 mtdname2 = "do2_%s" % (c)
219 if hasattr(self, mtdname):
220 mtd = getattr(self, mtdname)
221 param = self.render_dict(cmd.get(c))
222 self.log.debug("%s %s %s", name, c, param)
223 self.log.info("start %s", repr(name))
224 start = time.time()
225 try:
226 with self.lock:
227 res = mtd(param)
228 except Exception as e:
229 if ignoreerr:
230 self.log.info("error(ignored): %s", e)
231 else:
232 self.log.error("error: %s", e)
233 raise e
234 if register is not None:
235 self.log.debug("register %s = %s", register, res)
236 self.variables[register] = res
237 self.log.info("finish %s %f second",
238 repr(name), time.time() - start)
239 elif hasattr(self, mtdname2):
240 # 1st class module
241 mtd = getattr(self, mtdname2)
242 param = cmd.get(c)
243 self.log.debug("%s %s %s", name, c, param)
244 with self.lock:
245 res = mtd(c, param)
246 if register is not None:
247 self.log.debug("register %s = %s", register, res)
248 self.variables[register] = res
249 else:
250 raise Exception("module not found: %s" % (c))
251 time.sleep(delay)
252 return res
254 def do2_defun(self, funcname, params):
255 """
256 - name: define func1
257 defun:
258 name: func1
259 args: [a1, a2]
260 return: r
261 progn:
262 - name: hello
263 echo: "{{a1}} is {{a2}}"
264 - name: set-retval
265 var:
266 r: "hello"
267 - name: call func1
268 func1:
269 a1: xyz
270 a2: abc
271 register: rval1
272 - name: return value
273 echo: "{{rval1}}"
274 """
275 funcname = params.get("name")
276 args = params.get("args", [])
277 retvar = params.get("return", None)
278 progn = params.get("progn", [])
279 self.funcs[funcname] = (args, retvar, progn)
280 setattr(self, "do2_" + funcname, self.run_func)
282 def run_func(self, funcname, params):
283 params = self.render_dict(params)
284 args, retvar, progn = self.funcs.get(funcname, ([], None, None))
285 oldvars = self.variables
286 self.variables = copy.deepcopy(self.variables)
287 for a in args:
288 self.variables[a] = params.get(a)
289 self.log.debug("running %s", progn)
290 self.lock.release()
291 res = self.run(progn)
292 self.lock.acquire()
293 newvars = self.variables
294 self.variables = oldvars
295 if retvar is not None:
296 self.log.debug("return val %s -> %s", retvar, newvars.get(retvar))
297 return newvars.get(retvar)
298 return res
300 @classmethod
301 def listmodule(cls):
302 pfx = ["do_", "do2_"]
303 res = {}
304 for x in dir(cls):
305 for p in pfx:
306 if x.startswith(p):
307 doc = inspect.getdoc(getattr(cls, x))
308 if doc is None:
309 doc = "(no document)"
310 res[x[len(p):]] = doc
311 return res
313 def execute(self, script, args):
314 self.driver.execute_script(script, args)
316 def runcmd(self, cmd, encoding="utf-8", stdin=subprocess.DEVNULL,
317 stderr=subprocess.DEVNULL):
318 flag = False
319 if isinstance(cmd, str):
320 flag = True
321 self.log.debug("run(%s) %s", flag, cmd)
322 ret = subprocess.check_output(cmd, stdin=stdin, stderr=stderr,
323 shell=flag).decode(encoding)
324 self.log.debug("result: %s", ret)
325 return ret
327 def saveshot_image(self):
328 return Image.open(io.BytesIO(self.saveshot()))
330 def saveshot(self, fp=None):
331 data = self.driver.get_screenshot_as_png()
332 if fp is None:
333 return data
334 elif isinstance(fp, str):
335 with open(fp, 'wb') as f:
336 f.write(data)
337 else:
338 fp.write(data)
339 return data
341 findmap = {
342 "id": By.ID,
343 "xpath": By.XPATH,
344 "linktext": By.LINK_TEXT,
345 "partlinktext": By.PARTIAL_LINK_TEXT,
346 "name": By.NAME,
347 "tag": By.TAG_NAME,
348 "class": By.CLASS_NAME,
349 "select": By.CSS_SELECTOR,
350 }
352 def removelocator(self, param):
353 res = copy.deepcopy(param)
354 res.pop("nth", None)
355 for k, v in self.findmap.items():
356 res.pop(k, None)
357 for v in filter(lambda f: not f.startswith("_"), dir(By)):
358 res.pop(v, None)
359 res.pop(v.lower(), None)
360 if getattr(By, v) in res:
361 res.pop(getattr(By, v), None)
362 return res
364 def getlocator(self, param):
365 for k, v in self.findmap.items():
366 if k in param:
367 return (v, param.get(k))
368 for v in filter(lambda f: not f.startswith("_"), dir(By)):
369 if v in param:
370 return (getattr(By, v), param.get(v))
371 if v.lower() in param:
372 return (getattr(By, v), param.get(v.lower()))
373 if getattr(By, v) in param:
374 return (getattr(By, v), param.get(getattr(By, v)))
375 return (None, None)
377 def findone(self, param):
378 k, v = self.getlocator(param)
379 if k is not None:
380 return self.driver.find_element(k, v)
381 if param.get("active", False):
382 return self.driver.switch_to.active_element
383 return None
385 def findmany2one(self, param):
386 ret = self.findmany(param)
387 nth = param.get("nth", 0)
388 if isinstance(ret, (list, tuple)):
389 self.log.debug("found %d elements. choose %d-th", len(ret), nth)
390 if len(ret) > nth:
391 return ret[nth]
392 else:
393 return None
394 return ret
396 def findmany(self, param):
397 k, v = self.getlocator(param)
398 if k is not None:
399 return self.driver.find_elements(k, v)
400 if param.get("active", False):
401 return [self.driver.switch_to.active_element]
402 return []
404 def getvalue(self, param):
405 if isinstance(param, str):
406 return param
407 encoding = param.get("encoding", "utf-8")
408 if "text" in param:
409 return param.get("text")
410 elif "password" in param:
411 label = param.get("password")
412 return self.runcmd([self.passcmd, label], encoding).strip()
413 elif "pipe" in param:
414 cmd = param.get("pipe")
415 return self.runcmd(cmd, encoding).strip()
416 elif "yaml" in param:
417 p = param.get("yaml")
418 with open(p.get("file")) as f:
419 data = yaml.safe_load(f)
420 return jsonpath_rw.parse(p.get("path", "*")).find(data)[0].value
421 elif "json" in param:
422 p = param.get("json")
423 with open(p.get("file")) as f:
424 data = json.load(f)
425 return jsonpath_rw.parse(p.get("path", "*")).find(data)[0].value
426 elif "toml" in param:
427 p = param.get("toml")
428 with open(p.get("file")) as f:
429 data = toml.load(f)
430 return jsonpath_rw.parse(p.get("path", "*")).find(data)[0].value
431 elif "input" in param:
432 return input(param.get("input"))
433 elif "input_password" in param:
434 return getpass.getpass(param.get("input_password"))
435 elif "input_multiline" in param:
436 print(param.get("input_multiline"))
437 res = sys.stdin.read()
438 sys.stdin.seek(0)
439 return res
440 return None
442 def eval_param(self, param):
443 if isinstance(param, (list, tuple)):
444 return [self.eval_param(x) for x in param]
445 elif isinstance(param, dict):
446 res = []
447 for k, v in param.items():
448 if k in ("eq", "equals", "==", "is"):
449 res.append(len(set(self.eval_param(v))) == 1)
450 elif k in ("neq", "not_equals", "!=", "is_not"):
451 res.append(len(set(self.eval_param(v))) >= 2)
452 elif k in ("not"):
453 res.append(not self.eval_param(v))
454 elif k in ("and", "&", "&&"):
455 res.append(functools.reduce(
456 lambda a, b: a and b, self.eval_param(v)))
457 elif k in ("or", "|", "||"):
458 res.append(functools.reduce(
459 lambda a, b: a or b, self.eval_param(v)))
460 elif k in ("xor", "^"):
461 res.append(functools.reduce(lambda a, b: bool(a) ^ bool(b), self.eval_param(v)))
462 elif k in ("add", "sum", "plus", "+"):
463 res.append(functools.reduce(
464 lambda a, b: a + b, self.eval_param(v)))
465 elif k in ("sub", "minus", "-"):
466 res.append(functools.reduce(
467 lambda a, b: a - b, self.eval_param(v)))
468 elif k in ("mul", "times", "*"):
469 res.append(functools.reduce(
470 lambda a, b: a * b, self.eval_param(v)))
471 elif k in ("div", "/"):
472 res.append(functools.reduce(
473 lambda a, b: a / b, self.eval_param(v)))
474 elif k in ("selected",):
475 for e in self.findmany(v):
476 res.append(e.is_selected())
477 elif k in ("not_selected", "unselected"):
478 for e in self.findmany(v):
479 res.append(not e.is_selected())
480 elif k in ("enabled",):
481 for e in self.findmany(v):
482 res.append(e.is_enabled())
483 elif k in ("not_enabled", "disabled"):
484 for e in self.findmany(v):
485 res.append(not e.is_enabled())
486 elif k in ("displayed",):
487 for e in self.findmany(v):
488 res.append(e.is_displayed())
489 elif k in ("not_displayed", "undisplayed"):
490 for e in self.findmany(v):
491 res.append(not e.is_displayed())
492 elif k in ("defined",):
493 if isinstance(v, (tuple, list)):
494 res.extend([x in self.variables for x in v])
495 elif isinstance(v, str):
496 res.append(v in self.variables)
497 else:
498 raise Exception("invalid argument: %s" % (v))
499 elif k in ("not_defined", "undefined"):
500 if isinstance(v, (tuple, list)):
501 res.extend([x not in self.variables for x in v])
502 elif isinstance(v, str):
503 res.append(v not in self.variables)
504 else:
505 raise Exception("invalid argument: %s" % (v))
506 else:
507 raise Exception("operator not supported: %s (%s)" % (k, v))
508 return functools.reduce(lambda a, b: a and b, res)
509 return param
511 def return_element(self, param, elem):
512 if elem is None:
513 return elem
514 if not param.get("parseHTML", False):
515 return elem
516 if isinstance(elem, (tuple, list)):
517 return [etree.fromstring(x.get_attribute("outerHTML")) for x in elem]
518 elif isinstance(elem, str):
519 return etree.fromstring(elem)
520 return etree.fromstring(elem.get_attribute("outerHTML"))