Coverage for relppy/client.py: 66%
194 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 23:54 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 23:54 +0000
1import socket
2from concurrent.futures import ThreadPoolExecutor, Future
3import threading
4import time
5import ssl
6from .protocol import Message, relp_ua
7from logging import getLogger
9_log = getLogger(__name__)
12class RelpTCPClient:
13 MAX_TXNR = 999999999
15 def __init__(
16 self,
17 address: tuple[str, int | None],
18 resend_size: int = 1024,
19 resend_wait: float = 1.0,
20 rbufsize: int = 1024 * 1024,
21 wbufsize: int = 1024 * 1024,
22 **kwargs,
23 ):
24 self.lock = threading.Lock()
25 self.address = address
26 self.kwargs = kwargs
27 self.connected = False
29 self.cur_txnr = 1
30 self.resend_bufsize = resend_size
31 self.resend_wait = resend_wait
32 self.rbufsize = rbufsize
33 self.wbufsize = wbufsize
34 self.resendbuf: dict[int, tuple[Message, Future]] = {}
36 self.sock = self.create_connection(address, **kwargs)
37 self.wfile = self.sock.makefile("wb", self.wbufsize)
38 self.rfile = self.sock.makefile("rb", self.rbufsize)
39 _log.debug("connected: %s", self)
41 self.executor = ThreadPoolExecutor(1, "acker")
42 self.executor.submit(self.acker)
43 try:
44 self.relp_nego()
45 except Exception as e:
46 _log.warning("Failed to negotiate connection: %s" % e)
47 self.close()
48 raise
50 def relp_nego(self):
51 offer = f"\nrelp_version=1\nrelp_software={relp_ua}\ncommands=syslog"
52 res: bytes = self.send_command(b"open", offer.encode("ascii"), skip_buffer=True).result()
53 self.negodata: dict[str, list[str]] = {}
54 for i in res.splitlines()[1:]:
55 ll = i.split(b"=", 1)
56 if len(ll) == 1: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true
57 self.negodata[ll[0].decode()] = []
58 else:
59 self.negodata[ll[0].decode()] = ll[1].decode().split(",")
60 _log.debug("negotiated: %s", self.negodata)
62 def create_connection(self, address, **kwargs):
63 sock = socket.create_connection(address, **kwargs)
64 self.connected = True
65 return sock
67 def close(self):
68 if hasattr(self, "sock"): 68 ↛ 79line 68 didn't jump to line 79 because the condition on line 68 was always true
69 _log.debug("closing %s", self)
70 resendbuf = self.resendbuf
71 self.resendbuf = {}
72 _log.debug("resendbuf: %s", len(resendbuf))
73 for msgft in resendbuf.values(): 73 ↛ 74line 73 didn't jump to line 74 because the loop on line 73 never started
74 _log.info("cancel msg: %s", msgft[0])
75 msgft[1].cancel()
76 _log.debug("sending close: %s", self)
77 res = self.send_command(b"close", b"").result()
78 _log.debug("close result: %s", res)
79 with self.lock:
80 if hasattr(self, "sock"):
81 self.rfile.close()
82 self.wfile.close()
83 self.sock.close()
84 del self.sock
85 _log.debug("socket closed")
86 self.executor.shutdown(wait=True)
88 def resend(self, txnr: int | None = None, new_conn: bool = False):
89 if txnr:
90 msg, ft = self.resendbuf[txnr]
91 assert not ft.done()
92 _log.info("resend %s", msg)
93 self.wfile.write(msg.pack())
94 self.wfile.flush()
95 else:
96 cnt = 0
97 for msg, ft in self.resendbuf.values():
98 if not ft.done():
99 if new_conn:
100 msg.txnr = self.cur_txnr
101 self.cur_txnr += 1
102 _log.info("resend %s", msg)
103 self.wfile.write(msg.pack())
104 cnt += 1
105 else:
106 _log.info("skip resend %s", msg)
107 if cnt != 0:
108 _log.info("resend %d messages", cnt)
109 self.wfile.flush()
111 def __enter__(self):
112 return self
114 def __exit__(self, ex_type, ex_value, trace):
115 self.close()
117 def __str__(self):
118 if hasattr(self, "sock"):
119 return "%s(%s <- %s)" % (self.__class__.__name__, self.sock.getpeername(), self.sock.getsockname())
120 return "%s(not connected)" % (self.__class__.__name__)
122 def _gotack(self, txnr: int, data: bytes):
123 try:
124 _, f = self.resendbuf.pop(txnr)
125 _log.debug("setting result %s <- %s", txnr, data)
126 f.set_result(data)
127 except KeyError:
128 _log.warning("txnr does not found: %s", txnr)
130 def acker(self):
131 _log.debug("acker started: %s", self)
132 for bin in self.rfile: 132 ↛ 171line 132 didn't jump to line 171 because the loop on line 132 didn't complete
133 _log.debug("got line: %s", bin)
134 token = bin.split(b" ", 3)
135 if len(token) < 3: 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true
136 _log.warning("invalid message: %s", bin)
137 continue
138 _log.debug("token: %s", token)
139 if len(token) == 3 and token[2] == b"0\n":
140 txnr = int(token[0])
141 command = token[1]
142 datalen = 0
143 data = b""
144 _log.debug("zero length message: %s", command)
145 else:
146 txnr = int(token[0])
147 command = token[1]
148 datalen = int(token[2])
149 data = token[3]
150 if datalen > len(data):
151 data += self.rfile.read(datalen - len(data) + 1)
152 _log.debug("message: %s msglen=%s", command, len(data))
153 data = data.removesuffix(b"\n")
154 _log.debug("got txnr=%s, command=%s, datalen=%s/%s", txnr, command, datalen, len(data))
155 if command == b"rsp":
156 _log.debug("ack %d", txnr)
157 self._gotack(txnr, data)
158 elif command == b"serverclose": 158 ↛ 170line 158 didn't jump to line 170 because the condition on line 158 was always true
159 _log.info("server close: %s", bin)
160 if txnr != 0: 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true
161 _log.warning("txnr is not 0: %s", bin)
162 with self.lock:
163 if hasattr(self, "sock"):
164 self.rfile.close()
165 self.wfile.close()
166 self.sock.close()
167 del self.sock
168 break
169 else:
170 _log.warning("got not ack: %s (%s)", command, bin)
171 self.connected = False
172 _log.warning("connection closed")
174 def send_command(self, command: bytes, data: bytes, skip_buffer: bool = False) -> Future:
175 _log.debug("send %s msglen=%s (%s)", command, len(data), data)
176 # Check if we are connected.
177 new_conn = False
178 if not self.connected: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true
179 self.executor.shutdown(wait=True)
180 with self.lock:
181 if hasattr(self, "sock"):
182 self.rfile.close()
183 self.wfile.close()
184 self.sock.close()
185 del self.sock
186 self.cur_txnr = 1
187 self.sock = self.create_connection(self.address, **self.kwargs)
188 self.wfile = self.sock.makefile("wb", self.wbufsize)
189 self.rfile = self.sock.makefile("rb", self.rbufsize)
190 self.executor = ThreadPoolExecutor(1, "acker")
191 self.executor.submit(self.acker)
192 try:
193 self.relp_nego()
194 except Exception as e:
195 _log.warning("Failed to negotiate connection: %s" % e)
196 raise
197 new_conn = True
199 if len(self.resendbuf) > self.resend_bufsize and not skip_buffer: 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true
200 _log.warning("buffer full: bufsize=%s", len(self.resendbuf))
201 self.resend(new_conn=new_conn)
202 _log.info("sleep %f second", self.resend_wait)
203 time.sleep(self.resend_wait)
205 msg = Message(self.cur_txnr, command, data)
206 self.cur_txnr += 1
207 if self.cur_txnr > self.MAX_TXNR:
208 self.cur_txnr = 1
209 f = Future()
210 self.resendbuf[msg.txnr] = [msg, f]
211 self.wfile.write(msg.pack())
212 self.wfile.flush()
213 _log.debug("message sent: %s", msg.txnr)
214 return f
217class RelpUnixClient(RelpTCPClient):
218 def create_connection(self, address, **kwargs):
219 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
220 sock.connect(address)
221 self.connected = True
222 return sock
225class RelpTlsClient(RelpTCPClient):
226 def create_connection(self, address, context: ssl.SSLContext, server_hostname=None, **kwargs):
227 sock = socket.create_connection(address, **kwargs)
228 sock = context.wrap_socket(sock, server_hostname=server_hostname)
229 _log.debug("ssl: version=%s, cipher=%s", sock.version(), sock.cipher())
230 self.connected = True
231 return sock