Coverage for relppy/client.py: 66%

194 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-13 23:54 +0000

1import socket 

2from concurrent.futures import ThreadPoolExecutor, Future 

3import threading 

4import time 

5import ssl 

6from .protocol import Message, relp_ua 

7from logging import getLogger 

8 

9_log = getLogger(__name__) 

10 

11 

12class RelpTCPClient: 

13 MAX_TXNR = 999999999 

14 

15 def __init__( 

16 self, 

17 address: tuple[str, int | None], 

18 resend_size: int = 1024, 

19 resend_wait: float = 1.0, 

20 rbufsize: int = 1024 * 1024, 

21 wbufsize: int = 1024 * 1024, 

22 **kwargs, 

23 ): 

24 self.lock = threading.Lock() 

25 self.address = address 

26 self.kwargs = kwargs 

27 self.connected = False 

28 

29 self.cur_txnr = 1 

30 self.resend_bufsize = resend_size 

31 self.resend_wait = resend_wait 

32 self.rbufsize = rbufsize 

33 self.wbufsize = wbufsize 

34 self.resendbuf: dict[int, tuple[Message, Future]] = {} 

35 

36 self.sock = self.create_connection(address, **kwargs) 

37 self.wfile = self.sock.makefile("wb", self.wbufsize) 

38 self.rfile = self.sock.makefile("rb", self.rbufsize) 

39 _log.debug("connected: %s", self) 

40 

41 self.executor = ThreadPoolExecutor(1, "acker") 

42 self.executor.submit(self.acker) 

43 try: 

44 self.relp_nego() 

45 except Exception as e: 

46 _log.warning("Failed to negotiate connection: %s" % e) 

47 self.close() 

48 raise 

49 

50 def relp_nego(self): 

51 offer = f"\nrelp_version=1\nrelp_software={relp_ua}\ncommands=syslog" 

52 res: bytes = self.send_command(b"open", offer.encode("ascii"), skip_buffer=True).result() 

53 self.negodata: dict[str, list[str]] = {} 

54 for i in res.splitlines()[1:]: 

55 ll = i.split(b"=", 1) 

56 if len(ll) == 1: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true

57 self.negodata[ll[0].decode()] = [] 

58 else: 

59 self.negodata[ll[0].decode()] = ll[1].decode().split(",") 

60 _log.debug("negotiated: %s", self.negodata) 

61 

62 def create_connection(self, address, **kwargs): 

63 sock = socket.create_connection(address, **kwargs) 

64 self.connected = True 

65 return sock 

66 

67 def close(self): 

68 if hasattr(self, "sock"): 68 ↛ 79line 68 didn't jump to line 79 because the condition on line 68 was always true

69 _log.debug("closing %s", self) 

70 resendbuf = self.resendbuf 

71 self.resendbuf = {} 

72 _log.debug("resendbuf: %s", len(resendbuf)) 

73 for msgft in resendbuf.values(): 73 ↛ 74line 73 didn't jump to line 74 because the loop on line 73 never started

74 _log.info("cancel msg: %s", msgft[0]) 

75 msgft[1].cancel() 

76 _log.debug("sending close: %s", self) 

77 res = self.send_command(b"close", b"").result() 

78 _log.debug("close result: %s", res) 

79 with self.lock: 

80 if hasattr(self, "sock"): 

81 self.rfile.close() 

82 self.wfile.close() 

83 self.sock.close() 

84 del self.sock 

85 _log.debug("socket closed") 

86 self.executor.shutdown(wait=True) 

87 

88 def resend(self, txnr: int | None = None, new_conn: bool = False): 

89 if txnr: 

90 msg, ft = self.resendbuf[txnr] 

91 assert not ft.done() 

92 _log.info("resend %s", msg) 

93 self.wfile.write(msg.pack()) 

94 self.wfile.flush() 

95 else: 

96 cnt = 0 

97 for msg, ft in self.resendbuf.values(): 

98 if not ft.done(): 

99 if new_conn: 

100 msg.txnr = self.cur_txnr 

101 self.cur_txnr += 1 

102 _log.info("resend %s", msg) 

103 self.wfile.write(msg.pack()) 

104 cnt += 1 

105 else: 

106 _log.info("skip resend %s", msg) 

107 if cnt != 0: 

108 _log.info("resend %d messages", cnt) 

109 self.wfile.flush() 

110 

111 def __enter__(self): 

112 return self 

113 

114 def __exit__(self, ex_type, ex_value, trace): 

115 self.close() 

116 

117 def __str__(self): 

118 if hasattr(self, "sock"): 

119 return "%s(%s <- %s)" % (self.__class__.__name__, self.sock.getpeername(), self.sock.getsockname()) 

120 return "%s(not connected)" % (self.__class__.__name__) 

121 

122 def _gotack(self, txnr: int, data: bytes): 

123 try: 

124 _, f = self.resendbuf.pop(txnr) 

125 _log.debug("setting result %s <- %s", txnr, data) 

126 f.set_result(data) 

127 except KeyError: 

128 _log.warning("txnr does not found: %s", txnr) 

129 

130 def acker(self): 

131 _log.debug("acker started: %s", self) 

132 for bin in self.rfile: 132 ↛ 171line 132 didn't jump to line 171 because the loop on line 132 didn't complete

133 _log.debug("got line: %s", bin) 

134 token = bin.split(b" ", 3) 

135 if len(token) < 3: 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true

136 _log.warning("invalid message: %s", bin) 

137 continue 

138 _log.debug("token: %s", token) 

139 if len(token) == 3 and token[2] == b"0\n": 

140 txnr = int(token[0]) 

141 command = token[1] 

142 datalen = 0 

143 data = b"" 

144 _log.debug("zero length message: %s", command) 

145 else: 

146 txnr = int(token[0]) 

147 command = token[1] 

148 datalen = int(token[2]) 

149 data = token[3] 

150 if datalen > len(data): 

151 data += self.rfile.read(datalen - len(data) + 1) 

152 _log.debug("message: %s msglen=%s", command, len(data)) 

153 data = data.removesuffix(b"\n") 

154 _log.debug("got txnr=%s, command=%s, datalen=%s/%s", txnr, command, datalen, len(data)) 

155 if command == b"rsp": 

156 _log.debug("ack %d", txnr) 

157 self._gotack(txnr, data) 

158 elif command == b"serverclose": 158 ↛ 170line 158 didn't jump to line 170 because the condition on line 158 was always true

159 _log.info("server close: %s", bin) 

160 if txnr != 0: 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true

161 _log.warning("txnr is not 0: %s", bin) 

162 with self.lock: 

163 if hasattr(self, "sock"): 

164 self.rfile.close() 

165 self.wfile.close() 

166 self.sock.close() 

167 del self.sock 

168 break 

169 else: 

170 _log.warning("got not ack: %s (%s)", command, bin) 

171 self.connected = False 

172 _log.warning("connection closed") 

173 

174 def send_command(self, command: bytes, data: bytes, skip_buffer: bool = False) -> Future: 

175 _log.debug("send %s msglen=%s (%s)", command, len(data), data) 

176 # Check if we are connected. 

177 new_conn = False 

178 if not self.connected: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 self.executor.shutdown(wait=True) 

180 with self.lock: 

181 if hasattr(self, "sock"): 

182 self.rfile.close() 

183 self.wfile.close() 

184 self.sock.close() 

185 del self.sock 

186 self.cur_txnr = 1 

187 self.sock = self.create_connection(self.address, **self.kwargs) 

188 self.wfile = self.sock.makefile("wb", self.wbufsize) 

189 self.rfile = self.sock.makefile("rb", self.rbufsize) 

190 self.executor = ThreadPoolExecutor(1, "acker") 

191 self.executor.submit(self.acker) 

192 try: 

193 self.relp_nego() 

194 except Exception as e: 

195 _log.warning("Failed to negotiate connection: %s" % e) 

196 raise 

197 new_conn = True 

198 

199 if len(self.resendbuf) > self.resend_bufsize and not skip_buffer: 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true

200 _log.warning("buffer full: bufsize=%s", len(self.resendbuf)) 

201 self.resend(new_conn=new_conn) 

202 _log.info("sleep %f second", self.resend_wait) 

203 time.sleep(self.resend_wait) 

204 

205 msg = Message(self.cur_txnr, command, data) 

206 self.cur_txnr += 1 

207 if self.cur_txnr > self.MAX_TXNR: 

208 self.cur_txnr = 1 

209 f = Future() 

210 self.resendbuf[msg.txnr] = [msg, f] 

211 self.wfile.write(msg.pack()) 

212 self.wfile.flush() 

213 _log.debug("message sent: %s", msg.txnr) 

214 return f 

215 

216 

217class RelpUnixClient(RelpTCPClient): 

218 def create_connection(self, address, **kwargs): 

219 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 

220 sock.connect(address) 

221 self.connected = True 

222 return sock 

223 

224 

225class RelpTlsClient(RelpTCPClient): 

226 def create_connection(self, address, context: ssl.SSLContext, server_hostname=None, **kwargs): 

227 sock = socket.create_connection(address, **kwargs) 

228 sock = context.wrap_socket(sock, server_hostname=server_hostname) 

229 _log.debug("ssl: version=%s, cipher=%s", sock.version(), sock.cipher()) 

230 self.connected = True 

231 return sock