Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# gRPC Server/Client 

2import os 

3import sys 

4import shutil 

5import tempfile 

6import importlib 

7import time 

8import functools 

9import concurrent.futures 

10from logging import getLogger 

11 

12from google.protobuf.json_format import MessageToDict, ParseDict 

13from google.protobuf.descriptor_pb2 import FileDescriptorSet 

14import grpc_tools.protoc 

15import grpc 

16 

17from ..server import ServerIf 

18from ..client import ClientIf 

19 

20log = getLogger(__name__) 

21 

22 

23def read_desc(src): 

24 with open(src, "rb") as f: 

25 return FileDescriptorSet.FromString(f.read()) 

26 

27 

28def compile(src, dest): 

29 shutil.copyfile(src, os.path.join(dest, os.path.basename(src))) 

30 orgdir = os.getcwd() 

31 os.chdir(dest) 

32 bn = os.path.splitext(os.path.basename(src))[0] 

33 dsname = bn + ".pb" 

34 options = { 

35 "proto_path": ".", 

36 "python_out": ".", 

37 "grpc_python_out": ".", 

38 "descriptor_set_out": dsname, 

39 } 

40 arg = ["--%s=%s" % (x[0], x[1]) for x in options.items()] 

41 arg.append(os.path.basename(src)) 

42 log.debug("pre-listdir %s", os.listdir(dest)) 

43 log.debug("compile %s", arg) 

44 rst = grpc_tools.protoc.main(arg) 

45 log.debug("compile result: %s", rst) 

46 ret = read_desc(dsname) 

47 os.chdir(orgdir) 

48 return ret 

49 

50 

51def desc2typemap(desc): 

52 typemap = {} 

53 for f in desc.file: 

54 for sv in f.service: 

55 svname = sv.name 

56 for m in sv.method: 

57 name = m.name 

58 ityp = m.input_type 

59 otyp = m.output_type 

60 typemap[svname + "." + name] = { 

61 "arg": ityp.split(".")[-1], 

62 "return": otyp.split(".")[-1], 

63 } 

64 log.debug("typemap %s", typemap) 

65 return typemap 

66 

67 

68def do_import(src, dest): 

69 bn = os.path.splitext(os.path.basename(src))[0] 

70 modname = bn + "_pb2" 

71 modname = modname.replace("/", ".") 

72 log.debug("listdir %s", os.listdir(dest)) 

73 sys.path.append(dest) 

74 log.debug("module load from %s %s", dest, modname) 

75 mod = importlib.import_module(modname) 

76 grpcmod = importlib.import_module(modname + "_grpc") 

77 log.debug("module %s %s", mod, grpcmod) 

78 return mod, grpcmod 

79 

80 

81def read_proto(src): 

82 tmpdir = tempfile.TemporaryDirectory() 

83 log.debug("tmpdir %s", tmpdir.name) 

84 desc = compile(src, tmpdir.name) 

85 typemap = desc2typemap(desc) 

86 mod, grpcmod = do_import(src, tmpdir.name) 

87 log.debug("moddir %s", dir(mod)) 

88 log.debug("grpcmoddir %s", dir(grpcmod)) 

89 for k, v in typemap.items(): 

90 if hasattr(mod, v.get("arg")): 

91 typemap[k]["argtype"] = getattr(mod, v.get("arg")) 

92 if hasattr(mod, v.get("return")): 

93 typemap[k]["rettype"] = getattr(mod, v.get("return")) 

94 return desc, typemap, mod, grpcmod 

95 

96 

97class Server(ServerIf): 

98 def __init__(self, addr: str, params: dict = {}): 

99 super().__init__(addr, params) 

100 src = self.params.get("source", None) 

101 if src is not None: 

102 self.desc, self.typemap, self.mod, self.grpcmod = read_proto(src) 

103 log.debug("typemap %s", self.typemap) 

104 

105 def mtd(yourself, myself, method, typeinfo, req, context): 

106 log.debug("method called you=%s, me=%s, method=%s, typeinfo=%s, req=%s, ctxt=%s", 

107 yourself, myself, method, typeinfo, req, context) 

108 res = yourself.d(method, MessageToDict(req)) 

109 restype = typeinfo.get("rettype") 

110 ret = ParseDict(res, restype()) 

111 return ret 

112 

113 def serve(self, dispatcher): 

114 self.d = dispatcher 

115 bn = list(set([x.split(".", 1)[0] for x in self.typemap.keys()]))[0] 

116 servicer = getattr(self.grpcmod, bn + "Servicer") 

117 funcs = { 

118 "d": dispatcher, 

119 } 

120 for k, v in self.typemap.items(): 

121 fname = k.split(".", 1)[-1] 

122 log.debug("func: %s %s %s", fname, k, v) 

123 funcs[fname] = functools.partialmethod(self.mtd, k, v) 

124 log.debug("servicer: %s funcs=%s", bn, funcs) 

125 myservicer = type(bn + "Servicer", (servicer,), funcs) 

126 server = grpc.server( 

127 concurrent.futures.ThreadPoolExecutor(max_workers=10)) 

128 addfn = getattr(self.grpcmod, "add_" + bn + "Servicer_to_server") 

129 log.debug("addfn: %s", addfn) 

130 addfn(myservicer(), server) 

131 server.add_insecure_port("%s:%s" % ( 

132 self.addr_parsed.hostname, self.addr_parsed.port)) 

133 log.debug("start server %s", server) 

134 server.start() 

135 try: 

136 while True: 

137 time.sleep(100) 

138 except KeyboardInterrupt: 

139 server.stop(0) 

140 

141 

142class Client(ClientIf): 

143 def __init__(self, addr: str, params: dict = {}): 

144 super().__init__(addr, params) 

145 src = self.params.get("source", None) 

146 if src is not None: 

147 self.desc, self.typemap, self.mod, self.grpcmod = read_proto(src) 

148 log.debug("typemap %s", self.typemap) 

149 self.channel = grpc.insecure_channel("%s:%s" % ( 

150 self.addr_parsed.hostname, self.addr_parsed.port)) 

151 

152 def call(self, method: str, params=None): 

153 sig = self.typemap.get(method, {}) 

154 log.debug("call %s %s sig=%s", method, params, sig) 

155 argtype = sig.get("argtype", None) 

156 restype = sig.get("rettype", None) 

157 if argtype is None or restype is None: 

158 raise Exception("no such method? %s" % (method)) 

159 arg = ParseDict(params, argtype()) 

160 svname, funcname = method.split(".", 1) 

161 log.debug("service=%s, func=%s", svname, funcname) 

162 stubcls = getattr(self.grpcmod, svname + "Stub") 

163 stub = stubcls(self.channel) 

164 mtd = getattr(stub, funcname) 

165 log.debug("method %s %s %s arg=%s", mtd, stub, funcname, arg) 

166 rsp = mtd(arg) 

167 return MessageToDict(rsp)