Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# XML-RPC Server/Client 

2from ..server import ServerIf 

3from ..client import ClientIf 

4import threading 

5import xmlrpc.server 

6import xmlrpc.client 

7from logging import getLogger 

8 

9log = getLogger(__name__) 

10 

11 

12class Server(ServerIf): 

13 class XServ: 

14 def initialize(self, d): 

15 self.d = d 

16 

17 def _dispatch(self, method, params): 

18 log.debug("got %s %s", method, params) 

19 return self.d(method, *params) 

20 

21 def serve(self, d): 

22 xs = self.XServ() 

23 xs.initialize(d) 

24 hdl = type("XHandler", (xmlrpc.server.SimpleXMLRPCRequestHandler,), { 

25 "rpc_paths": (self.params.get("baseurl", self.addr_parsed.path),)}) 

26 srv = xmlrpc.server.SimpleXMLRPCServer( 

27 (self.addr_parsed.hostname, self.addr_parsed.port), requestHandler=hdl, logRequests=False) 

28 srv.register_instance(xs) 

29 srv.serve_forever() 

30 

31 

32class Client(ClientIf): 

33 def __init__(self, addr: str, params: dict = {}): 

34 super().__init__(addr, params) 

35 self.tl = threading.local() 

36 

37 def call(self, method: str, params=None): 

38 log.debug("call %s %s", method, params) 

39 # cl = xmlrpc.client.ServerProxy(self.addr) 

40 if not hasattr(self.tl, "sp"): 

41 log.debug("create proxy") 

42 self.tl.sp = xmlrpc.client.ServerProxy(self.addr) 

43 fn = self.tl.sp 

44 for k in method.split("."): 

45 fn = getattr(fn, k) 

46 return fn(params)