Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import functools
3from aiohttp_xmlrpc.handler import XMLRPCView
4from aiohttp_xmlrpc.client import ServerProxy
5from aiohttp import web
7from ..client import ClientIf
8from ..server import ServerIf
11class Server(ServerIf):
12 class MyHandler(XMLRPCView):
13 def _d1(self, method, **params):
14 return self.d(method, params)
16 def _lookup_method(self, method_name):
17 res = functools.partial(self._d1, method_name)
18 res.__module__ = ""
19 res.__name__ = method_name
20 return res
22 def serve(self, dispatcher):
23 hdl = type("CustomHandler", (self.MyHandler,), {"d": dispatcher})
24 app = web.Application()
25 app.router.add_route('*', self.addr_parsed.path, hdl)
26 web.run_app(app, host=self.addr_parsed.hostname,
27 port=self.addr_parsed.port)
30class Client(ClientIf):
31 def __init__(self, addr: str, params: dict = {}):
32 super().__init__(addr, params)
33 self.cl = ServerProxy(self.addr)
34 self.loop = asyncio.get_event_loop()
36 def call(self, method: str, params=None):
37 res = self.cl[method](params)
38 return self.loop.run_until_complete(res)
40 def __del__(self):
41 return self.loop.run_until_complete(self.cl.close())