Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import lotrpc 

2import click 

3import json 

4import time 

5import copy 

6import asyncio 

7import inspect 

8import queue 

9import threading 

10from benchmarker import Benchmarker, Skip 

11from logging import getLogger, basicConfig, DEBUG 

12from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor 

13 

14log = getLogger(__name__) 

15 

16 

17def setupLog(verbose): 

18 if verbose: 

19 basicConfig(level=DEBUG) 

20 

21 

22class MyDispatcher(lotrpc.SimpleDispatcher): 

23 """dispatcher example""" 

24 seq = 1 

25 

26 def do_sleep(self, params): 

27 ts = time.time() 

28 time.sleep(1) 

29 params.update({"sleep": time.time() - ts, "seq": self.seq}) 

30 self.seq += 1 

31 return params 

32 

33 def do_hello_sleep(self, params): 

34 ts = time.time() 

35 time.sleep(1) 

36 res = {"sleep": time.time() - ts, "seq": self.seq} 

37 self.seq += 1 

38 return res 

39 

40 def do_Greeter_SayHello(self, params): 

41 log.debug("say hello to %s", params) 

42 return {"message": "hello {}".format(params.get("name", "anonymous"))} 

43 

44 def do_Greeter_SayHelloAgain(self, params): 

45 log.debug("say hello-again to %s", params) 

46 return {"message": "bye {}".format(params.get("name", "anonymous"))} 

47 

48 def do_Greeter_SayGoodMorning(self, params): 

49 log.debug("say gm to %s", params) 

50 return {"message": "good morning {}".format(params.get("name", "anonymous"))} 

51 

52 def do_hello(self, params): 

53 return {"result": "OK"} 

54 

55 def do_hello_world(self, params): 

56 return {"result": "OK"} 

57 

58 def __call__(self, method, params): 

59 log.debug("my dispatch %s %s", method, params) 

60 try: 

61 return super().__call__(method, params) 

62 except Exception as e: 

63 # method not found 

64 log.debug("not found? %s", e) 

65 return {"rst": "method={}, params={}".format(method, params)} 

66 

67 

68def do_call_pool(pool, cl, num, method, params): 

69 ft = [] 

70 for i in range(num): 

71 ft.append(pool.submit(cl.call, method, copy.deepcopy(params))) 

72 res = [x.result() for x in ft] 

73 return "\n".join(map(str, res)) 

74 

75 

76def do_call(cl, num, method, params): 

77 log.debug("call%d %s %s", num, method, params) 

78 res = [] 

79 for i in range(num): 

80 res.append(cl.call(method, params)) 

81 return "\n".join(map(str, res)) 

82 

83 

84def do_call_async(cl, num, method, params): 

85 res = [] 

86 loop = asyncio.get_event_loop() 

87 for i in range(num): 

88 res.append(cl.asynccall(loop, method, copy.deepcopy(params))) 

89 res = [loop.run_until_complete(x) for x in res] 

90 return "\n".join(map(str, res)) 

91 

92 

93@click.group(invoke_without_command=True) 

94@click.pass_context 

95def cli(ctx): 

96 if ctx.invoked_subcommand is None: 

97 print(ctx.get_help()) 

98 

99 

100@cli.command(help="list rpc mode") 

101def listmode(): 

102 for x in filter(lambda f: inspect.ismodule(getattr(lotrpc, f)), dir(lotrpc)): 

103 mod = getattr(lotrpc, x) 

104 if mod.__package__.startswith(lotrpc.__name__): 

105 if hasattr(mod, "Server"): 

106 print(x) 

107 

108 

109@cli.command(help="start server") 

110@click.option("--options", default="{}") 

111@click.argument('mode', default='json') 

112@click.argument('addr', default="http://0.0.0.0:9999/") 

113@click.option("--verbose/--no-verbose", default=False) 

114def server(mode, addr, verbose=False, options={}): 

115 setupLog(verbose) 

116 mod = getattr(lotrpc, mode) 

117 srv = mod.Server(addr, json.loads(options)) 

118 srv.serve(MyDispatcher()) 

119 

120 

121@cli.command(help="serialized client") 

122@click.argument('mode', default='json') 

123@click.argument('addr', default="http://localhost:9999/") 

124@click.option("--options", default="{}") 

125@click.option("--num", type=int, default=1) 

126@click.option("--method", default="hello", type=str) 

127@click.option("--params", default='{"p1":true,"p2":[1,2,3]}') 

128@click.option("--verbose/--no-verbose", default=False) 

129def client(mode, addr, method, num, params, options, verbose): 

130 setupLog(verbose) 

131 mod = getattr(lotrpc, mode) 

132 cl = mod.Client(addr, json.loads(options)) 

133 res = do_call(cl, num, method, json.loads(params)) 

134 print(res) 

135 

136 

137@cli.command(help="client with thread pool") 

138@click.argument('mode', default='json') 

139@click.argument('addr', default="http://localhost:9999/") 

140@click.option("--options", default="{}") 

141@click.option("--num", type=int, default=1) 

142@click.option("--method", default="hello", type=str) 

143@click.option("--params", default='{"p1":true,"p2":[1,2,3]}') 

144@click.option("--verbose/--no-verbose", default=False) 

145def client_pool(mode, addr, method, num, params, options, verbose): 

146 setupLog(verbose) 

147 mod = getattr(lotrpc, mode) 

148 cl = mod.Client(addr, json.loads(options)) 

149 with ThreadPoolExecutor() as pool: 

150 res = do_call_pool(pool, cl, num, method, json.loads(params)) 

151 print(res) 

152 

153 

154@cli.command(help="client with process pool") 

155@click.argument('mode', default='json') 

156@click.argument('addr', default="http://localhost:9999/") 

157@click.option("--options", default="{}") 

158@click.option("--num", type=int, default=1) 

159@click.option("--method", default="hello", type=str) 

160@click.option("--params", default='{"p1":true,"p2":[1,2,3]}') 

161@click.option("--verbose/--no-verbose", default=False) 

162def client_ppool(mode, addr, method, num, params, options, verbose): 

163 setupLog(verbose) 

164 mod = getattr(lotrpc, mode) 

165 cl = mod.Client(addr, json.loads(options)) 

166 with ProcessPoolExecutor() as pool: 

167 res = do_call_pool(pool, cl, num, method, json.loads(params)) 

168 print(res) 

169 

170 

171@cli.command(help="client with asyncio") 

172@click.argument('mode', default='json') 

173@click.argument('addr', default="http://localhost:9999/") 

174@click.option("--options", default="{}") 

175@click.option("--num", type=int, default=1) 

176@click.option("--method", default="hello", type=str) 

177@click.option("--params", default='{"p1":true,"p2":[1,2,3]}') 

178@click.option("--verbose/--no-verbose", default=False) 

179def client_async(mode, addr, method, num, params, options, verbose): 

180 setupLog(verbose) 

181 mod = getattr(lotrpc, mode) 

182 cl = mod.Client(addr, json.loads(options)) 

183 res = do_call_async(cl, num, method, json.loads(params)) 

184 print(res) 

185 

186 

187@cli.command("proxy", help="start proxy server") 

188@click.argument('server', default='json') 

189@click.argument('addr_s', default="http://localhost:9999/") 

190@click.argument('client', default='msgpack') 

191@click.argument('addr_c', default="http://localhost:9998/") 

192@click.option("--client-options", default="{}") 

193@click.option("--server-options", default="{}") 

194@click.option("--accesslog/--no-accesslog", default=False) 

195@click.option("--verbose/--no-verbose", default=False) 

196def prox(server, addr_s, client, addr_c, client_options, server_options, verbose, accesslog): 

197 setupLog(verbose) 

198 srv = getattr(lotrpc, server).Server(addr_s, json.loads(server_options)) 

199 cl = getattr(lotrpc, client).Client(addr_c, json.loads(client_options)) 

200 if accesslog: 

201 prox = lotrpc.LoggingProxy(srv, cl) 

202 else: 

203 prox = lotrpc.Proxy(srv, cl) 

204 prox.serve() 

205 

206 

207@cli.command(help="start proxy with authentication example") 

208@click.argument('server', default='json') 

209@click.argument('addr_s', default="http://localhost:9999/") 

210@click.argument('client', default='msgpack') 

211@click.argument('addr_c', default="http://localhost:9998/") 

212@click.option("--client-options", default="{}") 

213@click.option("--server-options", default="{}") 

214@click.option("--verbose/--no-verbose", default=False) 

215def proxy_auth(server, addr_s, client, addr_c, client_options, server_options, verbose): 

216 from lotrpc.proxyauth import LoginProxy 

217 setupLog(verbose) 

218 srv = getattr(lotrpc, server).Server(addr_s, json.loads(server_options)) 

219 cl = getattr(lotrpc, client).Client(addr_c, json.loads(client_options)) 

220 prox = LoginProxy(srv, cl) 

221 prox.serve() 

222 

223 

224def qworker(q): 

225 while True: 

226 item = q.get() 

227 if item is None: 

228 break 

229 item.result() 

230 q.task_done() 

231 

232 

233def qworker2(q, loop): 

234 while True: 

235 item = q.get() 

236 if item is None: 

237 break 

238 loop.run_until_complete(item) 

239 q.task_done() 

240 

241 

242@cli.command(help="start benchmark client") 

243@click.argument('mode', default='json') 

244@click.argument('addr', default="http://localhost:9999/") 

245@click.option("--options", default="{}") 

246@click.option("--loop", type=int, default=10000) 

247@click.option("--qsize", type=int, default=100) 

248@click.option("--method", default="hello", type=str) 

249@click.option("--params", default='{}') 

250@click.option("--filter", default=None) 

251@click.option("--verbose/--no-verbose", default=False) 

252def benchmark(mode, addr, method, loop, params, options, verbose, qsize, filter): 

253 setupLog(verbose) 

254 mod = getattr(lotrpc, mode) 

255 cl = mod.Client(addr, json.loads(options)) 

256 arg = json.loads(params) 

257 q = queue.Queue(qsize) 

258 

259 with Benchmarker(loop, filter=filter) as bench: 

260 @bench('sync') 

261 def callapi(bm): 

262 try: 

263 for i in bm: 

264 cl.call(method, arg) 

265 except Exception as e: 

266 raise Skip("error {}".format(e)) 

267 

268 @bench('thread-pool') 

269 def tpcall(bm): 

270 try: 

271 t = threading.Thread(target=qworker, args=(q,)) 

272 t.start() 

273 with ThreadPoolExecutor() as pool: 

274 for i in bm: 

275 q.put(pool.submit(cl.call, method, arg)) 

276 q.put(None) 

277 t.join() 

278 except Exception as e: 

279 raise Skip("error {}".format(e)) 

280 

281 @bench('process-pool') 

282 def ppcall(bm): 

283 try: 

284 t = threading.Thread(target=qworker, args=(q,)) 

285 t.start() 

286 with ProcessPoolExecutor() as pool: 

287 for i in bm: 

288 q.put(pool.submit(cl.call, method, arg)) 

289 q.put(None) 

290 t.join() 

291 except Exception as e: 

292 raise Skip("error {}".format(e)) 

293 

294 @bench('async') 

295 def asynccall(bm): 

296 try: 

297 loop = asyncio.get_event_loop() 

298 t = threading.Thread(target=qworker2, args=(q, loop)) 

299 t.start() 

300 for i in bm: 

301 q.put(cl.asynccall(loop, method, arg)) 

302 q.put(None) 

303 t.join() 

304 except Exception as e: 

305 raise Skip("error {}".format(e)) 

306 

307 

308if __name__ == '__main__': 

309 cli()