Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import io
2import base64
3import shlex
4import inspect
5import yaml
6from PIL import Image
7from logging import getLogger, DEBUG, StreamHandler
8from ipykernel.kernelbase import Kernel
9from .version import VERSION
10from . import cli
11from selenium.webdriver.remote.webelement import WebElement
14class SelenibleKernel(Kernel):
15 log = getLogger("selenible")
16 implementation = 'Selenible'
17 implementation_version = '0.0.1'
18 language = 'selenible'
19 language_version = VERSION
20 language_info = {
21 'name': 'Selenible',
22 'mimetype': 'text/yaml',
23 'file_extension': '.yaml',
24 }
25 banner = "Selenible kernel"
26 driver_name = "phantom"
27 extensions = []
29 def __init__(self, **kwargs):
30 super().__init__(**kwargs)
31 self._drv = None
32 self.thumbnail = None
33 self.log.setLevel(DEBUG)
34 self.log.info("kernel started")
36 @property
37 def drv(self):
38 if self._drv is None:
39 drvcls = cli.loadmodules(self.driver_name, self.extensions)
40 self.log.info("driver: cls=%s, name=%s, exts=%s",
41 drvcls, self.driver_name, self.extensions)
42 self._drv = drvcls()
43 drvlog = self._drv.log
44 self.logio = io.StringIO()
45 drvlog.addHandler(StreamHandler(self.logio))
46 return self._drv
48 def do_shutdown(self, restart):
49 self.log.info("kernel finished")
50 del self._drv
51 self._drv = None
53 def cmd_driver(self, args):
54 "set driver: phantom, chrome, firefox, etc..."
55 self.log.info("driver: %s -> %s", self.driver_name, args[0])
56 self.driver_name = args[0]
58 def cmd_module(self, args):
59 "load modules"
60 self.extensions = args
62 def cmd_shutdown(self, args):
63 "shutdown driver"
64 del self._drv
65 self._drv = None
67 def cmd_loglevel(self, args):
68 "set log level"
69 self.drv.log.setLevel(args[0])
71 def cmd_thumbnail(self, args):
72 "set thumbnail size"
73 self.thumbnail = tuple(map(lambda f: int(f), args[:2]))
75 def cmd_help(self, args):
76 "show this help"
77 cmds = filter(lambda f: f.startswith("cmd_"), dir(self))
78 cmds = sorted(filter(lambda f: callable(getattr(self, f)), cmds))
79 cmds = [(x.split("_", 1)[1], inspect.getdoc(getattr(self, x))) for x in cmds]
80 txt = "\n".join(["%s: %s" % x for x in cmds])
81 self.send_response(self.iopub_socket, 'stream', {'name': 'stdout', 'text': txt})
83 def do_execute(self, code, silent, store_history=True, user_expressions=None,
84 allow_stdin=False):
85 if code.startswith("%"):
86 token = shlex.split(code)
87 cmd = token[0].lstrip("%")
88 args = token[1:]
89 if hasattr(self, "cmd_" + cmd):
90 fn = getattr(self, "cmd_" + cmd)
91 if callable(fn):
92 fn(args)
93 else:
94 self.cmd_help()
95 else:
96 return {'status': 'error', 'ename': "NotFound", "evalue": "not found", "traceback": []}
97 return {'status': 'ok', 'execution_count': self.execution_count}
98 v = yaml.safe_load(code)
99 self.log.info("yaml: %s", v)
100 if not silent:
101 stream_content = {'name': 'stdout', 'text': yaml.dump(v, default_flow_style=False)}
102 self.send_response(self.iopub_socket, 'stream', stream_content)
103 if isinstance(v, (list, tuple)):
104 res = self.drv.run(v)
105 elif isinstance(v, dict):
106 res = self.drv.run([v])
107 elif isinstance(v, str):
108 res = self.drv.run([{v: None}])
109 else:
110 raise Exception("invalid type: %s : %s" % (type(v), v))
112 logstr = self.logio.getvalue()
113 self.logio.seek(0)
114 self.logio.truncate(0)
115 if logstr != "":
116 stream_content = {'data': {"text/plain": logstr},
117 'execution_count': self.execution_count}
118 self.send_response(self.iopub_socket, 'execute_result', stream_content)
119 if not isinstance(res, (str, dict, list, tuple)):
120 self.log.info("not json serializeable?: %s", res)
121 ress = str(res)
122 else:
123 ress = res
124 if res is not None:
125 stream_content = {'data': {"text/plain": ress}, 'execution_count': self.execution_count}
126 self.send_response(self.iopub_socket, 'execute_result', stream_content)
127 if isinstance(res, WebElement):
128 imgdata = res.screenshot_as_png
129 else:
130 imgdata = self.drv.saveshot()
131 img = Image.open(io.BytesIO(imgdata))
132 if self.thumbnail is not None:
133 olen = len(imgdata)
134 img.thumbnail(self.thumbnail, Image.ANTIALIAS)
135 buf = io.BytesIO()
136 img.save(buf, format="png")
137 imgdata = buf.getvalue()
138 self.log.info("datasize: %d -> %d", olen, len(imgdata))
139 imgdict = {
140 "data": {
141 "image/png": base64.b64encode(imgdata).decode("ascii"),
142 },
143 "metadata": {
144 "image/png": {
145 "width": img.size[0],
146 "height": img.size[1],
147 }
148 }
149 }
150 self.send_response(self.iopub_socket, 'display_data', imgdict)
151 self.log.info("image: %s", img.size)
152 return {'status': 'ok', 'execution_count': self.execution_count}
155def main():
156 from ipykernel.kernelapp import IPKernelApp
157 IPKernelApp.launch_instance(kernel_class=SelenibleKernel)
160if __name__ == '__main__':
161 main()