Coverage for relppy/main.py: 46%

188 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-13 23:54 +0000

1import click 

2import functools 

3import socket 

4import socketserver 

5import ssl 

6import codecs 

7from typing import Type 

8from logging import getLogger 

9from .server import RelpStreamHandler 

10from .client import RelpTCPClient, RelpUnixClient, RelpTlsClient 

11from .protocol import process_io, Message, relp_ua 

12from .version import VERSION 

13 

14_log = getLogger(__name__) 

15relp_offer = f"\nrelp_version=1\nrelp_software={relp_ua}\ncommands=syslog,eventlog" 

16 

17errors_list = [x.removesuffix("_errors") for x in dir(codecs) if x.endswith("_errors")] 

18 

19 

20@click.group(invoke_without_command=True) 

21@click.version_option(VERSION) 

22@click.pass_context 

23def cli(ctx): 

24 if ctx.invoked_subcommand is None: 

25 print(ctx.get_help()) 

26 

27 

28def verbose_option(func): 

29 @click.option("--verbose/--quiet", default=None) 

30 @functools.wraps(func) 

31 def _(verbose: bool | None, **kwargs): 

32 from logging import basicConfig 

33 fmt = "%(asctime)s %(levelname)s %(name)s %(message)s" 

34 if verbose is None: 

35 basicConfig(level="INFO", format=fmt) 

36 elif verbose is False: 

37 basicConfig(level="WARNING", format=fmt) 

38 else: 

39 basicConfig(level="DEBUG", format=fmt) 

40 return func(**kwargs) 

41 return _ 

42 

43 

44def hostport_option(func): 

45 @click.option("--port", type=int, default=10514, show_default=True) 

46 @click.option("--host", default="localhost", show_default=True) 

47 @functools.wraps(func) 

48 def _(host: str, port: int, **kwargs): 

49 return func(address=(host, port), **kwargs) 

50 return _ 

51 

52 

53def encoding_option(func): 

54 @click.option("--encoding", default="utf-8", show_default=True) 

55 @click.option("--errors", type=click.Choice(errors_list), default="replace", show_default=True) 

56 @functools.wraps(func) 

57 def _(encoding: str, errors: str, **kwargs): 

58 syslog = getLogger("syslog") 

59 

60 class MyHandler(RelpStreamHandler): 

61 def do_syslog(self, msg: Message) -> str: 

62 syslog.info(msg.data.decode(encoding, errors)) 

63 return "" 

64 

65 return func(encoding=encoding, errors=errors, handler=MyHandler, **kwargs) 

66 return _ 

67 

68 

69def tlsserver_option(func): 

70 @click.option("--cert", type=click.Path(exists=True, file_okay=True, dir_okay=False), required=True) 

71 @click.option("--key", type=click.Path(exists=True, file_okay=True, dir_okay=False), required=True) 

72 @functools.wraps(func) 

73 def _(cert: str, key: str, **kwargs): 

74 context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) 

75 context.load_cert_chain(cert, key) 

76 return func(context=context, **kwargs) 

77 return _ 

78 

79 

80def tlsclient_option(func): 

81 @click.option("--verify/--no-verify", default=True, show_default=True) 

82 @click.option("--cafile", type=click.Path(exists=True, file_okay=True, dir_okay=False)) 

83 @functools.wraps(func) 

84 def _(verify, cafile, **kwargs): 

85 context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=cafile) 

86 if not verify: 

87 context.check_hostname = False 

88 context.verify_mode = ssl.CERT_NONE 

89 return func(context=context, **kwargs) 

90 return _ 

91 

92 

93@cli.command() 

94@verbose_option 

95@hostport_option 

96def raw_server(address: tuple[str, int]): 

97 """RELP server (raw generator style)""" 

98 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, proto=socket.IPPROTO_TCP) 

99 sock.bind(address) 

100 sock.listen(1024) 

101 _log.info("listen on %s", sock.getsockname()) 

102 while True: 

103 client, addr = sock.accept() 

104 _log.info("connected: %s", addr) 

105 for msg in process_io(client, auto_ack=True): 

106 _log.info("received: %s", msg) 

107 if msg.command == b"close": 

108 Message(0, b"serverclose").send(client) 

109 client.close() 

110 break 

111 

112 

113@cli.command() 

114@verbose_option 

115@hostport_option 

116@encoding_option 

117@click.argument("message") 

118def raw_client(address: tuple[str, int], message: str, encoding: str, errors: str, **kwargs): 

119 """RELP client (raw send/recv)""" 

120 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, proto=socket.IPPROTO_TCP) 

121 sock.connect(address) 

122 Message(1, b"open", relp_offer.encode(encoding, errors)).send(sock) 

123 recv = Message() 

124 recv.recv(sock) 

125 _log.info("receive: %s", recv) 

126 Message(2, b"syslog", message.encode(encoding, errors)).send(sock) 

127 recv.recv(sock) 

128 _log.info("receive %s", recv) 

129 Message(3, b"close").send(sock) 

130 rest = recv.recv(sock) 

131 _log.info("receive %s, rest=%s", recv, rest) 

132 rest = recv.recv(sock) 

133 _log.info("receive %s, rest=%s", recv, rest) 

134 sock.close() 

135 

136 

137@cli.command() 

138@verbose_option 

139@hostport_option 

140@encoding_option 

141def server(address: tuple[str, int], handler: Type[RelpStreamHandler], **kwargs): 

142 """RELP server (TCP)""" 

143 class _T(socketserver.TCPServer, socketserver.ThreadingMixIn): 

144 allow_reuse_address = True 

145 

146 def verify_request(self, request, client_address): 

147 _log.info("connect from: %s", client_address) 

148 return True 

149 

150 srv = _T(address, handler) 

151 srv.serve_forever() 

152 

153 

154@cli.command() 

155@verbose_option 

156@click.option("--sock", type=click.Path(), required=True) 

157@encoding_option 

158def server_unix(sock: str, handler: Type[RelpStreamHandler], **kwargs): 

159 """RELP server (unix socket)""" 

160 class _T(socketserver.UnixStreamServer, socketserver.ThreadingMixIn): 

161 allow_reuse_address = True 

162 

163 srv = _T(sock, handler) 

164 srv.serve_forever() 

165 

166 

167@cli.command() 

168@verbose_option 

169@hostport_option 

170@tlsserver_option 

171@encoding_option 

172def server_tls(address: tuple[str, int], context: ssl.SSLContext, handler: Type[RelpStreamHandler], **kwargs): 

173 """RELP server (TLS)""" 

174 class _T(socketserver.TCPServer, socketserver.ThreadingMixIn): 

175 allow_reuse_address = True 

176 

177 def verify_request(self, request, client_address): 

178 _log.info("connect from: %s", client_address) 

179 _log.debug("ssl: version=%s, cipher=%s", request.version(), request.cipher()) 

180 return True 

181 

182 srv = _T(address, handler, bind_and_activate=False) 

183 srv.socket = context.wrap_socket(srv.socket, server_side=True) 

184 srv.server_bind() 

185 srv.server_activate() 

186 srv.serve_forever() 

187 

188 

189@cli.command() 

190@verbose_option 

191@hostport_option 

192@encoding_option 

193@click.argument("message", nargs=-1) 

194def client(address: tuple[str, int], message: tuple[str], encoding: str, errors: str, **kwargs): 

195 """RELP client (TCP)""" 

196 with RelpTCPClient(address=address) as cl: 

197 for m in message: 

198 res = cl.send_command(b"syslog", m.encode(encoding, errors)).result() 

199 _log.info("sent: %s -> %s", m, res) 

200 _log.debug("finalize %s", cl) 

201 _log.debug("finished %s", cl) 

202 

203 

204@cli.command() 

205@verbose_option 

206@click.option("--sock", type=click.Path(), required=True) 

207@encoding_option 

208@click.argument("message", nargs=-1) 

209def client_unix(sock: str, message: tuple[str], encoding: str, errors: str, **kwargs): 

210 """RELP client (unix socket)""" 

211 with RelpUnixClient(address=sock) as cl: 

212 for m in message: 

213 res = cl.send_command(b"syslog", m.encode(encoding, errors)).result() 

214 _log.info("sent: %s -> %s", m, res) 

215 _log.debug("finalize %s", cl) 

216 _log.debug("finished %s", cl) 

217 

218 

219@cli.command() 

220@verbose_option 

221@hostport_option 

222@tlsclient_option 

223@encoding_option 

224@click.argument("message", nargs=-1) 

225def client_tls(address: tuple[str, int], message: tuple[str], encoding: str, errors: str, 

226 context: ssl.SSLContext, **kwargs): 

227 """RELP client (TLS)""" 

228 with RelpTlsClient(address=address, context=context, server_hostname=address[0]) as cl: 

229 for m in message: 

230 res = cl.send_command(b"syslog", m.encode(encoding, errors)).result() 

231 _log.info("sent: %s -> %s", m, res) 

232 _log.debug("finalize %s", cl) 

233 _log.debug("finished %s", cl) 

234 

235 

236if __name__ == "__main__": 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true

237 cli()