Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import asyncio 

2import functools 

3from aiohttp_xmlrpc.handler import XMLRPCView 

4from aiohttp_xmlrpc.client import ServerProxy 

5from aiohttp import web 

6 

7from ..client import ClientIf 

8from ..server import ServerIf 

9 

10 

11class Server(ServerIf): 

12 class MyHandler(XMLRPCView): 

13 def _d1(self, method, **params): 

14 return self.d(method, params) 

15 

16 def _lookup_method(self, method_name): 

17 res = functools.partial(self._d1, method_name) 

18 res.__module__ = "" 

19 res.__name__ = method_name 

20 return res 

21 

22 def serve(self, dispatcher): 

23 hdl = type("CustomHandler", (self.MyHandler,), {"d": dispatcher}) 

24 app = web.Application() 

25 app.router.add_route('*', self.addr_parsed.path, hdl) 

26 web.run_app(app, host=self.addr_parsed.hostname, 

27 port=self.addr_parsed.port) 

28 

29 

30class Client(ClientIf): 

31 def __init__(self, addr: str, params: dict = {}): 

32 super().__init__(addr, params) 

33 self.cl = ServerProxy(self.addr) 

34 self.loop = asyncio.get_event_loop() 

35 

36 def call(self, method: str, params=None): 

37 res = self.cl[method](params) 

38 return self.loop.run_until_complete(res) 

39 

40 def __del__(self): 

41 return self.loop.run_until_complete(self.cl.close())