Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import sys 

2import os 

3import subprocess 

4import inspect 

5import time 

6import functools 

7import getpass 

8import copy 

9import io 

10from logging import getLogger 

11 

12import json 

13import yaml 

14import toml 

15import jsonpath_rw 

16from threading import Lock 

17from pkg_resources import resource_stream 

18from lxml import etree 

19from PIL import Image 

20from selenium.webdriver.common.by import By 

21import selenium.common.exceptions 

22from jinja2 import Template 

23from ..version import VERSION 

24 

25 

26class Base: 

27 passcmd = "pass" 

28 schema = yaml.safe_load(resource_stream(__name__, '../schema/base.yaml')) 

29 

30 def __init__(self): 

31 self.lock = Lock() 

32 self.step = False 

33 self.save_every = False 

34 self._driver = None 

35 self.variables = { 

36 "selenible_version": VERSION, 

37 } 

38 self.funcs = {} 

39 self.log = getLogger(self.__class__.__name__) 

40 self.browser_args = {} 

41 

42 @property 

43 def driver(self): 

44 if self._driver is None: 

45 self._driver = self.boot_driver() 

46 self.log.info("driver started") 

47 self.variables["driver"] = self._driver.name 

48 self.variables["desired_capabilities"] = self._driver.desired_capabilities 

49 return self._driver 

50 

51 def get_options(self): 

52 return {} 

53 

54 def boot_driver(self): 

55 raise Exception("please implement") 

56 

57 def shutdown_driver(self): 

58 if hasattr(self, "_driver") and self._driver is not None: 

59 self._driver.close() 

60 self._driver.quit() 

61 self._driver = None 

62 

63 def printpdf(self, output_fn): 

64 raise Exception("please implement") 

65 

66 def __del__(self): 

67 self.shutdown_driver() 

68 

69 @classmethod 

70 def load_modules(cls, modname): 

71 log = getLogger(cls.__name__) 

72 log.debug("load module %s", modname) 

73 pfx = cls.__name__ + "_" 

74 try: 

75 mm = modname.rsplit(".", 1) 

76 if len(mm) == 1: 

77 modfirst = "selenible.modules" 

78 modlast = mm[0] 

79 else: 

80 modfirst = mm[0] 

81 modlast = mm[1] 

82 mod1 = __import__(modfirst, globals(), locals(), [modlast], 0) 

83 mod = getattr(mod1, modlast) 

84 except AttributeError: 

85 log.debug("cannot import %s from %s", modlast, modfirst) 

86 return 

87 log.debug("names: %s", dir(mod)) 

88 mtd = [] 

89 for m in filter(lambda f: f.startswith(pfx), dir(mod)): 

90 fn = getattr(mod, m) 

91 if callable(fn): 

92 log.debug("register method: %s", m[len(pfx):]) 

93 name = "do_%s" % (m[len(pfx):]) 

94 setattr(cls, name, fn) 

95 funcname = m[len(pfx):] 

96 mtd.append(funcname) 

97 scmname = "%s_schema" % (funcname) 

98 if hasattr(mod, scmname): 

99 scm = getattr(mod, scmname) 

100 if isinstance(scm, dict): 

101 cls.schema["items"]["properties"][funcname] = scm 

102 else: 

103 log.warn("%s is not callable", fn) 

104 if len(mtd) != 0: 

105 log.debug("register methods: %s", "/".join(mtd)) 

106 

107 def load_vars(self, fp): 

108 self.variables.update(yaml.safe_load(fp)) 

109 

110 def render(self, s): 

111 return Template(s).render(self.variables) 

112 

113 def render_dict(self, d): 

114 if isinstance(d, dict): 

115 res = {} 

116 for k, v in d.items(): 

117 res[k] = self.render_dict(v) 

118 return res 

119 elif isinstance(d, (list, tuple)): 

120 return [self.render_dict(x) for x in d] 

121 elif isinstance(d, str): 

122 return self.render(d) 

123 return d 

124 

125 def run(self, prog): 

126 res = None 

127 for cmd in prog: 

128 self.log.debug("cmd %s", cmd) 

129 res = self.run1(cmd) 

130 if self.step: 

131 ans = input( 

132 "step(q=exit, s=screenshot, c=continue, other=continue):") 

133 if ans == "q": 

134 break 

135 elif ans == "s": 

136 self.saveshot_image().show() 

137 elif ans == "c": 

138 self.step = False 

139 if self.save_every: 

140 self.do_screenshot({}) 

141 return res 

142 

143 def run1(self, cmd): 

144 withitem = self.render_dict(cmd.pop("with_items", None)) 

145 delay = cmd.pop("delay", 0) 

146 if withitem is not None: 

147 loopctl = self.render_dict(cmd.pop("loop_control", {})) 

148 loopvar = loopctl.get("loop_var", "item") 

149 loopiter = loopctl.get("loop_iter", "iter") 

150 start = time.time() 

151 if isinstance(withitem, dict) and "range" in withitem: 

152 rg = withitem.get("range") 

153 if isinstance(rg, (list, tuple)): 

154 withitem = range(*rg) 

155 else: 

156 withitem = range(int(rg)) 

157 elif isinstance(withitem, str): 

158 withitem = self.variables.get(withitem, None) 

159 self.log.info("start loop: %d times", len(withitem)) 

160 for i, j in enumerate(withitem): 

161 self.variables[loopvar] = j 

162 self.variables[loopiter] = i 

163 self.log.info("loop by %d: %s", i, j) 

164 res = self.run1(cmd.copy()) 

165 time.sleep(delay) 

166 self.variables.pop(loopvar) 

167 self.variables.pop(loopiter) 

168 self.log.info("finish loop: %f second", time.time() - start) 

169 return res 

170 # cmd = self.render_dict(cmd) 

171 name = self.render_dict(cmd.pop("name", "")) 

172 condition = self.render_dict(cmd.pop("when", True)) 

173 ncondition = self.render_dict(cmd.pop("when_not", False)) 

174 if not self.eval_param(condition): 

175 self.log.info("skip(when) %s", repr(name)) 

176 return 

177 if self.eval_param(ncondition): 

178 self.log.info("skip(when_not) %s", repr(name)) 

179 return 

180 register = self.render_dict(cmd.pop("register", None)) 

181 ignoreerr = self.render_dict(cmd.pop("ignore_error", False)) 

182 if len(cmd) != 1: 

183 raise Exception("too many parameters: %s" % (cmd.keys())) 

184 self.variables["env"] = os.environ 

185 if self._driver is not None: 

186 # set driver related variables 

187 for v in ("current_url", "page_source", "title", 

188 "window_handles", "session_id", "current_window_handle", 

189 "capabilities", "log_types", "w3c"): 

190 try: 

191 self.variables[v] = getattr(self.driver, v) 

192 except selenium.common.exceptions.WebDriverException: 

193 self.log.info("cannot get attribute %s", v) 

194 for v in ("cookies", "window_size", "window_position"): 

195 try: 

196 self.variables[v] = getattr(self.driver, "get_" + v)() 

197 except selenium.common.exceptions.WebDriverException: 

198 self.log.info("cannot get attribute %s", v) 

199 self.variables["log"] = {} 

200 try: 

201 for logtype in self.driver.log_types: 

202 self.variables["log"][logtype] = self.driver.get_log( 

203 logtype) 

204 # phantomjs case 

205 try: 

206 if logtype == "har": 

207 logdata = json.loads( 

208 self.variables["log"][logtype][0]["message"]) 

209 self.variables["log"][logtype][0]["message"] = logdata 

210 except (KeyError, IndexError, json.decoder.JSONDecodeError): 

211 self.log.debug( 

212 "log.har.0.message does not exists or not json") 

213 pass 

214 except selenium.common.exceptions.WebDriverException: 

215 self.log.info("cannot get log types") 

216 for c in cmd.keys(): 

217 mtdname = "do_%s" % (c) 

218 mtdname2 = "do2_%s" % (c) 

219 if hasattr(self, mtdname): 

220 mtd = getattr(self, mtdname) 

221 param = self.render_dict(cmd.get(c)) 

222 self.log.debug("%s %s %s", name, c, param) 

223 self.log.info("start %s", repr(name)) 

224 start = time.time() 

225 try: 

226 with self.lock: 

227 res = mtd(param) 

228 except Exception as e: 

229 if ignoreerr: 

230 self.log.info("error(ignored): %s", e) 

231 else: 

232 self.log.error("error: %s", e) 

233 raise e 

234 if register is not None: 

235 self.log.debug("register %s = %s", register, res) 

236 self.variables[register] = res 

237 self.log.info("finish %s %f second", 

238 repr(name), time.time() - start) 

239 elif hasattr(self, mtdname2): 

240 # 1st class module 

241 mtd = getattr(self, mtdname2) 

242 param = cmd.get(c) 

243 self.log.debug("%s %s %s", name, c, param) 

244 with self.lock: 

245 res = mtd(c, param) 

246 if register is not None: 

247 self.log.debug("register %s = %s", register, res) 

248 self.variables[register] = res 

249 else: 

250 raise Exception("module not found: %s" % (c)) 

251 time.sleep(delay) 

252 return res 

253 

254 def do2_defun(self, funcname, params): 

255 """ 

256 - name: define func1 

257 defun: 

258 name: func1 

259 args: [a1, a2] 

260 return: r 

261 progn: 

262 - name: hello 

263 echo: "{{a1}} is {{a2}}" 

264 - name: set-retval 

265 var: 

266 r: "hello" 

267 - name: call func1 

268 func1: 

269 a1: xyz 

270 a2: abc 

271 register: rval1 

272 - name: return value 

273 echo: "{{rval1}}" 

274 """ 

275 funcname = params.get("name") 

276 args = params.get("args", []) 

277 retvar = params.get("return", None) 

278 progn = params.get("progn", []) 

279 self.funcs[funcname] = (args, retvar, progn) 

280 setattr(self, "do2_" + funcname, self.run_func) 

281 

282 def run_func(self, funcname, params): 

283 params = self.render_dict(params) 

284 args, retvar, progn = self.funcs.get(funcname, ([], None, None)) 

285 oldvars = self.variables 

286 self.variables = copy.deepcopy(self.variables) 

287 for a in args: 

288 self.variables[a] = params.get(a) 

289 self.log.debug("running %s", progn) 

290 self.lock.release() 

291 res = self.run(progn) 

292 self.lock.acquire() 

293 newvars = self.variables 

294 self.variables = oldvars 

295 if retvar is not None: 

296 self.log.debug("return val %s -> %s", retvar, newvars.get(retvar)) 

297 return newvars.get(retvar) 

298 return res 

299 

300 @classmethod 

301 def listmodule(cls): 

302 pfx = ["do_", "do2_"] 

303 res = {} 

304 for x in dir(cls): 

305 for p in pfx: 

306 if x.startswith(p): 

307 doc = inspect.getdoc(getattr(cls, x)) 

308 if doc is None: 

309 doc = "(no document)" 

310 res[x[len(p):]] = doc 

311 return res 

312 

313 def execute(self, script, args): 

314 self.driver.execute_script(script, args) 

315 

316 def runcmd(self, cmd, encoding="utf-8", stdin=subprocess.DEVNULL, 

317 stderr=subprocess.DEVNULL): 

318 flag = False 

319 if isinstance(cmd, str): 

320 flag = True 

321 self.log.debug("run(%s) %s", flag, cmd) 

322 ret = subprocess.check_output(cmd, stdin=stdin, stderr=stderr, 

323 shell=flag).decode(encoding) 

324 self.log.debug("result: %s", ret) 

325 return ret 

326 

327 def saveshot_image(self): 

328 return Image.open(io.BytesIO(self.saveshot())) 

329 

330 def saveshot(self, fp=None): 

331 data = self.driver.get_screenshot_as_png() 

332 if fp is None: 

333 return data 

334 elif isinstance(fp, str): 

335 with open(fp, 'wb') as f: 

336 f.write(data) 

337 else: 

338 fp.write(data) 

339 return data 

340 

341 findmap = { 

342 "id": By.ID, 

343 "xpath": By.XPATH, 

344 "linktext": By.LINK_TEXT, 

345 "partlinktext": By.PARTIAL_LINK_TEXT, 

346 "name": By.NAME, 

347 "tag": By.TAG_NAME, 

348 "class": By.CLASS_NAME, 

349 "select": By.CSS_SELECTOR, 

350 } 

351 

352 def removelocator(self, param): 

353 res = copy.deepcopy(param) 

354 res.pop("nth", None) 

355 for k, v in self.findmap.items(): 

356 res.pop(k, None) 

357 for v in filter(lambda f: not f.startswith("_"), dir(By)): 

358 res.pop(v, None) 

359 res.pop(v.lower(), None) 

360 if getattr(By, v) in res: 

361 res.pop(getattr(By, v), None) 

362 return res 

363 

364 def getlocator(self, param): 

365 for k, v in self.findmap.items(): 

366 if k in param: 

367 return (v, param.get(k)) 

368 for v in filter(lambda f: not f.startswith("_"), dir(By)): 

369 if v in param: 

370 return (getattr(By, v), param.get(v)) 

371 if v.lower() in param: 

372 return (getattr(By, v), param.get(v.lower())) 

373 if getattr(By, v) in param: 

374 return (getattr(By, v), param.get(getattr(By, v))) 

375 return (None, None) 

376 

377 def findone(self, param): 

378 k, v = self.getlocator(param) 

379 if k is not None: 

380 return self.driver.find_element(k, v) 

381 if param.get("active", False): 

382 return self.driver.switch_to.active_element 

383 return None 

384 

385 def findmany2one(self, param): 

386 ret = self.findmany(param) 

387 nth = param.get("nth", 0) 

388 if isinstance(ret, (list, tuple)): 

389 self.log.debug("found %d elements. choose %d-th", len(ret), nth) 

390 if len(ret) > nth: 

391 return ret[nth] 

392 else: 

393 return None 

394 return ret 

395 

396 def findmany(self, param): 

397 k, v = self.getlocator(param) 

398 if k is not None: 

399 return self.driver.find_elements(k, v) 

400 if param.get("active", False): 

401 return [self.driver.switch_to.active_element] 

402 return [] 

403 

404 def getvalue(self, param): 

405 if isinstance(param, str): 

406 return param 

407 encoding = param.get("encoding", "utf-8") 

408 if "text" in param: 

409 return param.get("text") 

410 elif "password" in param: 

411 label = param.get("password") 

412 return self.runcmd([self.passcmd, label], encoding).strip() 

413 elif "pipe" in param: 

414 cmd = param.get("pipe") 

415 return self.runcmd(cmd, encoding).strip() 

416 elif "yaml" in param: 

417 p = param.get("yaml") 

418 with open(p.get("file")) as f: 

419 data = yaml.safe_load(f) 

420 return jsonpath_rw.parse(p.get("path", "*")).find(data)[0].value 

421 elif "json" in param: 

422 p = param.get("json") 

423 with open(p.get("file")) as f: 

424 data = json.load(f) 

425 return jsonpath_rw.parse(p.get("path", "*")).find(data)[0].value 

426 elif "toml" in param: 

427 p = param.get("toml") 

428 with open(p.get("file")) as f: 

429 data = toml.load(f) 

430 return jsonpath_rw.parse(p.get("path", "*")).find(data)[0].value 

431 elif "input" in param: 

432 return input(param.get("input")) 

433 elif "input_password" in param: 

434 return getpass.getpass(param.get("input_password")) 

435 elif "input_multiline" in param: 

436 print(param.get("input_multiline")) 

437 res = sys.stdin.read() 

438 sys.stdin.seek(0) 

439 return res 

440 return None 

441 

442 def eval_param(self, param): 

443 if isinstance(param, (list, tuple)): 

444 return [self.eval_param(x) for x in param] 

445 elif isinstance(param, dict): 

446 res = [] 

447 for k, v in param.items(): 

448 if k in ("eq", "equals", "==", "is"): 

449 res.append(len(set(self.eval_param(v))) == 1) 

450 elif k in ("neq", "not_equals", "!=", "is_not"): 

451 res.append(len(set(self.eval_param(v))) >= 2) 

452 elif k in ("not"): 

453 res.append(not self.eval_param(v)) 

454 elif k in ("and", "&", "&&"): 

455 res.append(functools.reduce( 

456 lambda a, b: a and b, self.eval_param(v))) 

457 elif k in ("or", "|", "||"): 

458 res.append(functools.reduce( 

459 lambda a, b: a or b, self.eval_param(v))) 

460 elif k in ("xor", "^"): 

461 res.append(functools.reduce(lambda a, b: bool(a) ^ bool(b), self.eval_param(v))) 

462 elif k in ("add", "sum", "plus", "+"): 

463 res.append(functools.reduce( 

464 lambda a, b: a + b, self.eval_param(v))) 

465 elif k in ("sub", "minus", "-"): 

466 res.append(functools.reduce( 

467 lambda a, b: a - b, self.eval_param(v))) 

468 elif k in ("mul", "times", "*"): 

469 res.append(functools.reduce( 

470 lambda a, b: a * b, self.eval_param(v))) 

471 elif k in ("div", "/"): 

472 res.append(functools.reduce( 

473 lambda a, b: a / b, self.eval_param(v))) 

474 elif k in ("selected",): 

475 for e in self.findmany(v): 

476 res.append(e.is_selected()) 

477 elif k in ("not_selected", "unselected"): 

478 for e in self.findmany(v): 

479 res.append(not e.is_selected()) 

480 elif k in ("enabled",): 

481 for e in self.findmany(v): 

482 res.append(e.is_enabled()) 

483 elif k in ("not_enabled", "disabled"): 

484 for e in self.findmany(v): 

485 res.append(not e.is_enabled()) 

486 elif k in ("displayed",): 

487 for e in self.findmany(v): 

488 res.append(e.is_displayed()) 

489 elif k in ("not_displayed", "undisplayed"): 

490 for e in self.findmany(v): 

491 res.append(not e.is_displayed()) 

492 elif k in ("defined",): 

493 if isinstance(v, (tuple, list)): 

494 res.extend([x in self.variables for x in v]) 

495 elif isinstance(v, str): 

496 res.append(v in self.variables) 

497 else: 

498 raise Exception("invalid argument: %s" % (v)) 

499 elif k in ("not_defined", "undefined"): 

500 if isinstance(v, (tuple, list)): 

501 res.extend([x not in self.variables for x in v]) 

502 elif isinstance(v, str): 

503 res.append(v not in self.variables) 

504 else: 

505 raise Exception("invalid argument: %s" % (v)) 

506 else: 

507 raise Exception("operator not supported: %s (%s)" % (k, v)) 

508 return functools.reduce(lambda a, b: a and b, res) 

509 return param 

510 

511 def return_element(self, param, elem): 

512 if elem is None: 

513 return elem 

514 if not param.get("parseHTML", False): 

515 return elem 

516 if isinstance(elem, (tuple, list)): 

517 return [etree.fromstring(x.get_attribute("outerHTML")) for x in elem] 

518 elif isinstance(elem, str): 

519 return etree.fromstring(elem) 

520 return etree.fromstring(elem.get_attribute("outerHTML"))