Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import asyncio 

2import functools 

3from aiohttp_jsonrpc.handler import JSONRPCView 

4from aiohttp_jsonrpc.client import ServerProxy 

5from aiohttp import web 

6 

7from ..client import ClientIf 

8from ..server import ServerIf 

9 

10 

11class Server(ServerIf): 

12 class MyHandler(JSONRPCView): 

13 def _d1(self, method, **params): 

14 return self.d(method, params) 

15 

16 def _lookup_method(self, method_name): 

17 res = functools.partial(self._d1, method_name) 

18 res.__module__ = "" 

19 res.__name__ = method_name 

20 return res 

21 

22 def serve(self, dispatcher): 

23 hdl = type("CustomHandler", (self.MyHandler,), {"d": dispatcher}) 

24 app = web.Application() 

25 app.router.add_route('*', self.addr_parsed.path, hdl) 

26 web.run_app(app, host=self.addr_parsed.hostname, 

27 port=self.addr_parsed.port) 

28 

29 

30class Client(ClientIf): 

31 def __init__(self, addr: str, params: dict = {}): 

32 super().__init__(addr, params) 

33 self.cl = ServerProxy(self.addr) 

34 self.loop = asyncio.get_event_loop() 

35 

36 def call(self, method: str, params=None): 

37 if isinstance(params, dict): 

38 res = self.cl[method](**params) 

39 elif isinstance(params, (tuple, list)): 

40 res = self.cl[method](*params) 

41 else: 

42 res = self.cl[method](params) 

43 return self.loop.run_until_complete(res) 

44 

45 def __del__(self): 

46 return self.loop.run_until_complete(self.cl.close())