Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# XML-RPC Server/Client
2from ..server import ServerIf
3from ..client import ClientIf
4import threading
5import xmlrpc.server
6import xmlrpc.client
7from logging import getLogger
9log = getLogger(__name__)
12class Server(ServerIf):
13 class XServ:
14 def initialize(self, d):
15 self.d = d
17 def _dispatch(self, method, params):
18 log.debug("got %s %s", method, params)
19 return self.d(method, *params)
21 def serve(self, d):
22 xs = self.XServ()
23 xs.initialize(d)
24 hdl = type("XHandler", (xmlrpc.server.SimpleXMLRPCRequestHandler,), {
25 "rpc_paths": (self.params.get("baseurl", self.addr_parsed.path),)})
26 srv = xmlrpc.server.SimpleXMLRPCServer(
27 (self.addr_parsed.hostname, self.addr_parsed.port), requestHandler=hdl, logRequests=False)
28 srv.register_instance(xs)
29 srv.serve_forever()
32class Client(ClientIf):
33 def __init__(self, addr: str, params: dict = {}):
34 super().__init__(addr, params)
35 self.tl = threading.local()
37 def call(self, method: str, params=None):
38 log.debug("call %s %s", method, params)
39 # cl = xmlrpc.client.ServerProxy(self.addr)
40 if not hasattr(self.tl, "sp"):
41 log.debug("create proxy")
42 self.tl.sp = xmlrpc.client.ServerProxy(self.addr)
43 fn = self.tl.sp
44 for k in method.split("."):
45 fn = getattr(fn, k)
46 return fn(params)