Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# gRPC Server/Client
2import os
3import sys
4import shutil
5import tempfile
6import importlib
7import time
8import functools
9import concurrent.futures
10from logging import getLogger
12from google.protobuf.json_format import MessageToDict, ParseDict
13from google.protobuf.descriptor_pb2 import FileDescriptorSet
14import grpc_tools.protoc
15import grpc
17from ..server import ServerIf
18from ..client import ClientIf
20log = getLogger(__name__)
23def read_desc(src):
24 with open(src, "rb") as f:
25 return FileDescriptorSet.FromString(f.read())
28def compile(src, dest):
29 shutil.copyfile(src, os.path.join(dest, os.path.basename(src)))
30 orgdir = os.getcwd()
31 os.chdir(dest)
32 bn = os.path.splitext(os.path.basename(src))[0]
33 dsname = bn + ".pb"
34 options = {
35 "proto_path": ".",
36 "python_out": ".",
37 "grpc_python_out": ".",
38 "descriptor_set_out": dsname,
39 }
40 arg = ["--%s=%s" % (x[0], x[1]) for x in options.items()]
41 arg.append(os.path.basename(src))
42 log.debug("pre-listdir %s", os.listdir(dest))
43 log.debug("compile %s", arg)
44 rst = grpc_tools.protoc.main(arg)
45 log.debug("compile result: %s", rst)
46 ret = read_desc(dsname)
47 os.chdir(orgdir)
48 return ret
51def desc2typemap(desc):
52 typemap = {}
53 for f in desc.file:
54 for sv in f.service:
55 svname = sv.name
56 for m in sv.method:
57 name = m.name
58 ityp = m.input_type
59 otyp = m.output_type
60 typemap[svname + "." + name] = {
61 "arg": ityp.split(".")[-1],
62 "return": otyp.split(".")[-1],
63 }
64 log.debug("typemap %s", typemap)
65 return typemap
68def do_import(src, dest):
69 bn = os.path.splitext(os.path.basename(src))[0]
70 modname = bn + "_pb2"
71 modname = modname.replace("/", ".")
72 log.debug("listdir %s", os.listdir(dest))
73 sys.path.append(dest)
74 log.debug("module load from %s %s", dest, modname)
75 mod = importlib.import_module(modname)
76 grpcmod = importlib.import_module(modname + "_grpc")
77 log.debug("module %s %s", mod, grpcmod)
78 return mod, grpcmod
81def read_proto(src):
82 tmpdir = tempfile.TemporaryDirectory()
83 log.debug("tmpdir %s", tmpdir.name)
84 desc = compile(src, tmpdir.name)
85 typemap = desc2typemap(desc)
86 mod, grpcmod = do_import(src, tmpdir.name)
87 log.debug("moddir %s", dir(mod))
88 log.debug("grpcmoddir %s", dir(grpcmod))
89 for k, v in typemap.items():
90 if hasattr(mod, v.get("arg")):
91 typemap[k]["argtype"] = getattr(mod, v.get("arg"))
92 if hasattr(mod, v.get("return")):
93 typemap[k]["rettype"] = getattr(mod, v.get("return"))
94 return desc, typemap, mod, grpcmod
97class Server(ServerIf):
98 def __init__(self, addr: str, params: dict = {}):
99 super().__init__(addr, params)
100 src = self.params.get("source", None)
101 if src is not None:
102 self.desc, self.typemap, self.mod, self.grpcmod = read_proto(src)
103 log.debug("typemap %s", self.typemap)
105 def mtd(yourself, myself, method, typeinfo, req, context):
106 log.debug("method called you=%s, me=%s, method=%s, typeinfo=%s, req=%s, ctxt=%s",
107 yourself, myself, method, typeinfo, req, context)
108 res = yourself.d(method, MessageToDict(req))
109 restype = typeinfo.get("rettype")
110 ret = ParseDict(res, restype())
111 return ret
113 def serve(self, dispatcher):
114 self.d = dispatcher
115 bn = list(set([x.split(".", 1)[0] for x in self.typemap.keys()]))[0]
116 servicer = getattr(self.grpcmod, bn + "Servicer")
117 funcs = {
118 "d": dispatcher,
119 }
120 for k, v in self.typemap.items():
121 fname = k.split(".", 1)[-1]
122 log.debug("func: %s %s %s", fname, k, v)
123 funcs[fname] = functools.partialmethod(self.mtd, k, v)
124 log.debug("servicer: %s funcs=%s", bn, funcs)
125 myservicer = type(bn + "Servicer", (servicer,), funcs)
126 server = grpc.server(
127 concurrent.futures.ThreadPoolExecutor(max_workers=10))
128 addfn = getattr(self.grpcmod, "add_" + bn + "Servicer_to_server")
129 log.debug("addfn: %s", addfn)
130 addfn(myservicer(), server)
131 server.add_insecure_port("%s:%s" % (
132 self.addr_parsed.hostname, self.addr_parsed.port))
133 log.debug("start server %s", server)
134 server.start()
135 try:
136 while True:
137 time.sleep(100)
138 except KeyboardInterrupt:
139 server.stop(0)
142class Client(ClientIf):
143 def __init__(self, addr: str, params: dict = {}):
144 super().__init__(addr, params)
145 src = self.params.get("source", None)
146 if src is not None:
147 self.desc, self.typemap, self.mod, self.grpcmod = read_proto(src)
148 log.debug("typemap %s", self.typemap)
149 self.channel = grpc.insecure_channel("%s:%s" % (
150 self.addr_parsed.hostname, self.addr_parsed.port))
152 def call(self, method: str, params=None):
153 sig = self.typemap.get(method, {})
154 log.debug("call %s %s sig=%s", method, params, sig)
155 argtype = sig.get("argtype", None)
156 restype = sig.get("rettype", None)
157 if argtype is None or restype is None:
158 raise Exception("no such method? %s" % (method))
159 arg = ParseDict(params, argtype())
160 svname, funcname = method.split(".", 1)
161 log.debug("service=%s, func=%s", svname, funcname)
162 stubcls = getattr(self.grpcmod, svname + "Stub")
163 stub = stubcls(self.channel)
164 mtd = getattr(stub, funcname)
165 log.debug("method %s %s %s arg=%s", mtd, stub, funcname, arg)
166 rsp = mtd(arg)
167 return MessageToDict(rsp)