Coverage for log2s3/common_stream.py: 82%
99 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 12:02 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 12:02 +0000
1import io
2from typing import Generator, Sequence
3from logging import getLogger
5_log = getLogger(__name__)
8class Stream:
9 """
10 stream base class
12 work as pass-through stream
13 """
15 def __init__(self, prev_stream):
16 self.prev = prev_stream
18 def init_fp(self):
19 """prepare self as file-like interface"""
20 _log.debug("use as fp(%s)", self.__class__.__name__)
21 self.gen1 = self.prev.gen()
22 self.buf = [next(self.gen1)]
23 self.eof = False
25 # work as pass-thru stream
26 def gen(self) -> Generator[bytes, None, None]:
27 """read part generator"""
28 yield from self.prev.gen()
30 def read_all(self) -> bytes:
31 """read all content"""
32 buf = io.BytesIO()
33 for i in self.gen():
34 _log.debug("read %d bytes", len(i))
35 buf.write(i)
36 _log.debug("finish read")
37 return buf.getvalue()
39 def text_gen(self) -> Generator[str, None, None]:
40 """readline generator"""
41 rest = b""
42 for i in self.gen():
43 d = i.rfind(b"\n")
44 if d != -1:
45 buf0 = io.BytesIO(rest + i[: d + 1])
46 rest = i[d + 1 :]
47 buf = io.TextIOWrapper(buf0)
48 yield from buf
49 else:
50 rest = rest + i
51 if rest:
52 buf = io.TextIOWrapper(io.BytesIO(rest))
53 yield from buf
55 def read(self, sz: int = -1) -> bytes:
56 """
57 read up to size bytes
59 used for file-like interface
61 args:
62 sz: read size. if size is not specified or -1, read all content.
63 """
64 assert hasattr(self, "eof")
65 if self.eof:
66 return b""
67 if sz == -1: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true
68 _log.debug("read all")
69 try:
70 while True:
71 self.buf.append(next(self.gen1))
72 except StopIteration:
73 _log.debug(
74 "read %s / %s", len(self.buf), sum([len(x) for x in self.buf])
75 )
76 buf = self.buf
77 self.buf = []
78 self.eof = True
79 return b"".join(buf)
80 cur = sum([len(x) for x in self.buf])
81 try:
82 _log.debug("read part cur=%s / sz=%s", cur, sz)
83 while cur < sz: 83 ↛ 88line 83 didn't jump to line 88 because the condition on line 83 was always true
84 bt = next(self.gen1)
85 _log.debug("read1 %d / cur=%s, sz=%s", len(bt), cur, sz)
86 self.buf.append(bt)
87 cur += len(bt)
88 buf = b"".join(self.buf)
89 self.buf = [buf[sz:]]
90 _log.debug("return %s, rest=%s", sz, len(self.buf[0]))
91 return buf[:sz]
92 except StopIteration:
93 _log.debug("eof %s / %s", len(self.buf), sum([len(x) for x in self.buf]))
94 buf = self.buf
95 self.buf = []
96 self.eof = True
97 return b"".join(buf)
100class CatStream:
101 def __init__(self, inputs: list[Stream]):
102 self.inputs = inputs
104 def gen(self) -> Generator[bytes, None, None]:
105 for i in self.inputs:
106 yield from i.gen()
109class MergeStream:
110 def __init__(self, inputs: Sequence[Stream], bufsize: int = 4096):
111 self.inputs = inputs
112 self.bufsize = bufsize
114 def gen(self) -> Generator[bytes, None, None]:
115 buf = io.BytesIO()
116 for i in self.text_gen():
117 buf.write(i.encode("utf-8"))
118 if buf.tell() > self.bufsize: 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true
119 yield buf.getvalue()
120 buf.truncate(0)
121 buf.seek(0)
122 yield buf.getvalue()
124 def text_gen(self) -> Generator[str, None, None]:
125 txt_gens = [x.text_gen() for x in self.inputs]
126 input_files = [[next(x), x] for x in txt_gens]
127 input_files.sort(key=lambda f: f[0])
128 while len(input_files) != 0:
129 yield input_files[0][0]
130 try:
131 input_files[0][0] = next(input_files[0][1])
132 if len(input_files) == 1 or input_files[0][0] < input_files[1][0]: 132 ↛ 134line 132 didn't jump to line 134 because the condition on line 132 was never true
133 # already sorted
134 continue
135 except StopIteration:
136 input_files.pop(0)
137 input_files.sort(key=lambda f: f[0])