Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# MessagePack-RPC Server/Client 

2from ..server import ServerIf 

3from ..client import ClientIf 

4import msgpackrpc 

5import functools 

6import threading 

7from logging import getLogger 

8from concurrent.futures import ThreadPoolExecutor 

9 

10log = getLogger(__name__) 

11 

12 

13def conv(s, encoding='utf-8'): 

14 if isinstance(s, dict): 

15 res = {} 

16 for k, v in s.items(): 

17 if isinstance(k, bytes): 

18 res[k.decode(encoding)] = conv(v, encoding) 

19 else: 

20 res[k] = conv(v, encoding) 

21 return res 

22 if isinstance(s, bytes): 

23 return s.decode(encoding) 

24 if isinstance(s, (list, tuple)): 

25 return [conv(x, encoding) for x in s] 

26 return s 

27 

28 

29class Server(ServerIf): 

30 class MPServ(msgpackrpc.Server): 

31 executor = ThreadPoolExecutor() 

32 

33 def __init__(self, dispatcher): 

34 super().__init__(dispatcher, pack_encoding='utf-8', unpack_encoding='utf-8') 

35 

36 def initialize(self, d): 

37 self.d = d 

38 

39 def done_async(self, responder, result): 

40 responder.set_result(result.result(), None) 

41 

42 def dispatch(self, method, param, responder): 

43 log.debug("got %s %s %s", method, param, responder) 

44 method = msgpackrpc.compat.force_str(method) 

45 ft = Server.MPServ.executor.submit(self.d, method, *param) 

46 ft.add_done_callback(functools.partial( 

47 self.done_async, responder)) 

48 

49 def serve(self, dispatcher): 

50 mpsrv = Server.MPServ(None) 

51 mpsrv.initialize(dispatcher) 

52 # srv = msgpackrpc.Server(mpsrv) 

53 mpsrv.listen(msgpackrpc.Address( 

54 self.addr_parsed.hostname, self.addr_parsed.port)) 

55 try: 

56 mpsrv.start() 

57 except KeyboardInterrupt: 

58 log.info("stop") 

59 mpsrv.stop() 

60 

61 

62class Client(ClientIf): 

63 def __init__(self, addr: str, params: dict = {}): 

64 super().__init__(addr, params) 

65 self.tl = threading.local() 

66 

67 def call(self, method: str, params=None): 

68 log.debug("call %s %s", method, params) 

69 if not hasattr(self.tl, "cl"): 

70 self.tl.cl = msgpackrpc.Client(msgpackrpc.Address( 

71 self.addr_parsed.hostname, self.addr_parsed.port), 

72 pack_encoding='utf-8', unpack_encoding='utf-8') 

73 return self.tl.cl.call(method, params)