Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import io 

2import base64 

3import shlex 

4import inspect 

5import yaml 

6from PIL import Image 

7from logging import getLogger, DEBUG, StreamHandler 

8from ipykernel.kernelbase import Kernel 

9from .version import VERSION 

10from . import cli 

11from selenium.webdriver.remote.webelement import WebElement 

12 

13 

14class SelenibleKernel(Kernel): 

15 log = getLogger("selenible") 

16 implementation = 'Selenible' 

17 implementation_version = '0.0.1' 

18 language = 'selenible' 

19 language_version = VERSION 

20 language_info = { 

21 'name': 'Selenible', 

22 'mimetype': 'text/yaml', 

23 'file_extension': '.yaml', 

24 } 

25 banner = "Selenible kernel" 

26 driver_name = "phantom" 

27 extensions = [] 

28 

29 def __init__(self, **kwargs): 

30 super().__init__(**kwargs) 

31 self._drv = None 

32 self.thumbnail = None 

33 self.log.setLevel(DEBUG) 

34 self.log.info("kernel started") 

35 

36 @property 

37 def drv(self): 

38 if self._drv is None: 

39 drvcls = cli.loadmodules(self.driver_name, self.extensions) 

40 self.log.info("driver: cls=%s, name=%s, exts=%s", 

41 drvcls, self.driver_name, self.extensions) 

42 self._drv = drvcls() 

43 drvlog = self._drv.log 

44 self.logio = io.StringIO() 

45 drvlog.addHandler(StreamHandler(self.logio)) 

46 return self._drv 

47 

48 def do_shutdown(self, restart): 

49 self.log.info("kernel finished") 

50 del self._drv 

51 self._drv = None 

52 

53 def cmd_driver(self, args): 

54 "set driver: phantom, chrome, firefox, etc..." 

55 self.log.info("driver: %s -> %s", self.driver_name, args[0]) 

56 self.driver_name = args[0] 

57 

58 def cmd_module(self, args): 

59 "load modules" 

60 self.extensions = args 

61 

62 def cmd_shutdown(self, args): 

63 "shutdown driver" 

64 del self._drv 

65 self._drv = None 

66 

67 def cmd_loglevel(self, args): 

68 "set log level" 

69 self.drv.log.setLevel(args[0]) 

70 

71 def cmd_thumbnail(self, args): 

72 "set thumbnail size" 

73 self.thumbnail = tuple(map(lambda f: int(f), args[:2])) 

74 

75 def cmd_help(self, args): 

76 "show this help" 

77 cmds = filter(lambda f: f.startswith("cmd_"), dir(self)) 

78 cmds = sorted(filter(lambda f: callable(getattr(self, f)), cmds)) 

79 cmds = [(x.split("_", 1)[1], inspect.getdoc(getattr(self, x))) for x in cmds] 

80 txt = "\n".join(["%s: %s" % x for x in cmds]) 

81 self.send_response(self.iopub_socket, 'stream', {'name': 'stdout', 'text': txt}) 

82 

83 def do_execute(self, code, silent, store_history=True, user_expressions=None, 

84 allow_stdin=False): 

85 if code.startswith("%"): 

86 token = shlex.split(code) 

87 cmd = token[0].lstrip("%") 

88 args = token[1:] 

89 if hasattr(self, "cmd_" + cmd): 

90 fn = getattr(self, "cmd_" + cmd) 

91 if callable(fn): 

92 fn(args) 

93 else: 

94 self.cmd_help() 

95 else: 

96 return {'status': 'error', 'ename': "NotFound", "evalue": "not found", "traceback": []} 

97 return {'status': 'ok', 'execution_count': self.execution_count} 

98 v = yaml.safe_load(code) 

99 self.log.info("yaml: %s", v) 

100 if not silent: 

101 stream_content = {'name': 'stdout', 'text': yaml.dump(v, default_flow_style=False)} 

102 self.send_response(self.iopub_socket, 'stream', stream_content) 

103 if isinstance(v, (list, tuple)): 

104 res = self.drv.run(v) 

105 elif isinstance(v, dict): 

106 res = self.drv.run([v]) 

107 elif isinstance(v, str): 

108 res = self.drv.run([{v: None}]) 

109 else: 

110 raise Exception("invalid type: %s : %s" % (type(v), v)) 

111 

112 logstr = self.logio.getvalue() 

113 self.logio.seek(0) 

114 self.logio.truncate(0) 

115 if logstr != "": 

116 stream_content = {'data': {"text/plain": logstr}, 

117 'execution_count': self.execution_count} 

118 self.send_response(self.iopub_socket, 'execute_result', stream_content) 

119 if not isinstance(res, (str, dict, list, tuple)): 

120 self.log.info("not json serializeable?: %s", res) 

121 ress = str(res) 

122 else: 

123 ress = res 

124 if res is not None: 

125 stream_content = {'data': {"text/plain": ress}, 'execution_count': self.execution_count} 

126 self.send_response(self.iopub_socket, 'execute_result', stream_content) 

127 if isinstance(res, WebElement): 

128 imgdata = res.screenshot_as_png 

129 else: 

130 imgdata = self.drv.saveshot() 

131 img = Image.open(io.BytesIO(imgdata)) 

132 if self.thumbnail is not None: 

133 olen = len(imgdata) 

134 img.thumbnail(self.thumbnail, Image.ANTIALIAS) 

135 buf = io.BytesIO() 

136 img.save(buf, format="png") 

137 imgdata = buf.getvalue() 

138 self.log.info("datasize: %d -> %d", olen, len(imgdata)) 

139 imgdict = { 

140 "data": { 

141 "image/png": base64.b64encode(imgdata).decode("ascii"), 

142 }, 

143 "metadata": { 

144 "image/png": { 

145 "width": img.size[0], 

146 "height": img.size[1], 

147 } 

148 } 

149 } 

150 self.send_response(self.iopub_socket, 'display_data', imgdict) 

151 self.log.info("image: %s", img.size) 

152 return {'status': 'ok', 'execution_count': self.execution_count} 

153 

154 

155def main(): 

156 from ipykernel.kernelapp import IPKernelApp 

157 IPKernelApp.launch_instance(kernel_class=SelenibleKernel) 

158 

159 

160if __name__ == '__main__': 

161 main()