Coverage for log2s3 / common_stream.py: 82%
99 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-20 04:42 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-20 04:42 +0000
1import io
2from typing import Generator, Sequence
3from logging import getLogger
5_log = getLogger(__name__)
8class Stream:
9 """
10 stream base class
12 work as pass-through stream
13 """
15 def __init__(self, prev_stream):
16 self.prev = prev_stream
18 def init_fp(self):
19 """prepare self as file-like interface"""
20 _log.debug("use as fp(%s)", self.__class__.__name__)
21 self.gen1 = self.prev.gen()
22 self.buf = [next(self.gen1)]
23 self.eof = False
25 # work as pass-thru stream
26 def gen(self) -> Generator[bytes, None, None]:
27 """read part generator"""
28 yield from self.prev.gen()
30 def read_all(self) -> bytes:
31 """read all content"""
32 buf = io.BytesIO()
33 for i in self.gen():
34 _log.debug("read %d bytes", len(i))
35 buf.write(i)
36 _log.debug("finish read")
37 return buf.getvalue()
39 def text_gen(self) -> Generator[str, None, None]:
40 """readline generator"""
41 rest = b""
42 for i in self.gen():
43 d = i.rfind(b"\n")
44 if d != -1:
45 buf0 = io.BytesIO(rest + i[: d + 1])
46 rest = i[d + 1 :]
47 buf = io.TextIOWrapper(buf0)
48 yield from buf
49 else:
50 rest = rest + i
51 if rest:
52 buf = io.TextIOWrapper(io.BytesIO(rest))
53 yield from buf
55 def read(self, sz: int = -1) -> bytes:
56 """
57 read up to size bytes
59 used for file-like interface
61 args:
62 sz: read size. if size is not specified or -1, read all content.
63 """
64 assert hasattr(self, "eof")
65 if self.eof:
66 return b""
67 if sz == -1: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true
68 _log.debug("read all")
69 try:
70 while True:
71 self.buf.append(next(self.gen1))
72 except StopIteration:
73 _log.debug("read %s / %s", len(self.buf), sum([len(x) for x in self.buf]))
74 buf = self.buf
75 self.buf = []
76 self.eof = True
77 return b"".join(buf)
78 cur = sum([len(x) for x in self.buf])
79 try:
80 _log.debug("read part cur=%s / sz=%s", cur, sz)
81 while cur < sz: 81 ↛ 86line 81 didn't jump to line 86 because the condition on line 81 was always true
82 bt = next(self.gen1)
83 _log.debug("read1 %d / cur=%s, sz=%s", len(bt), cur, sz)
84 self.buf.append(bt)
85 cur += len(bt)
86 buf = b"".join(self.buf)
87 self.buf = [buf[sz:]]
88 _log.debug("return %s, rest=%s", sz, len(self.buf[0]))
89 return buf[:sz]
90 except StopIteration:
91 _log.debug("eof %s / %s", len(self.buf), sum([len(x) for x in self.buf]))
92 buf = self.buf
93 self.buf = []
94 self.eof = True
95 return b"".join(buf)
98class CatStream:
99 def __init__(self, inputs: list[Stream]):
100 self.inputs = inputs
102 def gen(self) -> Generator[bytes, None, None]:
103 for i in self.inputs:
104 yield from i.gen()
107class MergeStream:
108 def __init__(self, inputs: Sequence[Stream], bufsize: int = 4096):
109 self.inputs = inputs
110 self.bufsize = bufsize
112 def gen(self) -> Generator[bytes, None, None]:
113 buf = io.BytesIO()
114 for i in self.text_gen():
115 buf.write(i.encode("utf-8"))
116 if buf.tell() > self.bufsize: 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true
117 yield buf.getvalue()
118 buf.truncate(0)
119 buf.seek(0)
120 yield buf.getvalue()
122 def text_gen(self) -> Generator[str, None, None]:
123 txt_gens = [x.text_gen() for x in self.inputs]
124 input_files = [[next(x), x] for x in txt_gens]
125 input_files.sort(key=lambda f: f[0])
126 while len(input_files) != 0:
127 yield input_files[0][0]
128 try:
129 input_files[0][0] = next(input_files[0][1])
130 if len(input_files) == 1 or input_files[0][0] < input_files[1][0]: 130 ↛ 132line 130 didn't jump to line 132 because the condition on line 130 was never true
131 # already sorted
132 continue
133 except StopIteration:
134 input_files.pop(0)
135 input_files.sort(key=lambda f: f[0])