Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# MessagePack-RPC Server/Client
2from ..server import ServerIf
3from ..client import ClientIf
4import msgpackrpc
5import functools
6import threading
7from logging import getLogger
8from concurrent.futures import ThreadPoolExecutor
10log = getLogger(__name__)
13def conv(s, encoding='utf-8'):
14 if isinstance(s, dict):
15 res = {}
16 for k, v in s.items():
17 if isinstance(k, bytes):
18 res[k.decode(encoding)] = conv(v, encoding)
19 else:
20 res[k] = conv(v, encoding)
21 return res
22 if isinstance(s, bytes):
23 return s.decode(encoding)
24 if isinstance(s, (list, tuple)):
25 return [conv(x, encoding) for x in s]
26 return s
29class Server(ServerIf):
30 class MPServ(msgpackrpc.Server):
31 executor = ThreadPoolExecutor()
33 def __init__(self, dispatcher):
34 super().__init__(dispatcher, pack_encoding='utf-8', unpack_encoding='utf-8')
36 def initialize(self, d):
37 self.d = d
39 def done_async(self, responder, result):
40 responder.set_result(result.result(), None)
42 def dispatch(self, method, param, responder):
43 log.debug("got %s %s %s", method, param, responder)
44 method = msgpackrpc.compat.force_str(method)
45 ft = Server.MPServ.executor.submit(self.d, method, *param)
46 ft.add_done_callback(functools.partial(
47 self.done_async, responder))
49 def serve(self, dispatcher):
50 mpsrv = Server.MPServ(None)
51 mpsrv.initialize(dispatcher)
52 # srv = msgpackrpc.Server(mpsrv)
53 mpsrv.listen(msgpackrpc.Address(
54 self.addr_parsed.hostname, self.addr_parsed.port))
55 try:
56 mpsrv.start()
57 except KeyboardInterrupt:
58 log.info("stop")
59 mpsrv.stop()
62class Client(ClientIf):
63 def __init__(self, addr: str, params: dict = {}):
64 super().__init__(addr, params)
65 self.tl = threading.local()
67 def call(self, method: str, params=None):
68 log.debug("call %s %s", method, params)
69 if not hasattr(self.tl, "cl"):
70 self.tl.cl = msgpackrpc.Client(msgpackrpc.Address(
71 self.addr_parsed.hostname, self.addr_parsed.port),
72 pack_encoding='utf-8', unpack_encoding='utf-8')
73 return self.tl.cl.call(method, params)