Coverage for log2s3 / common_stream.py: 82%

99 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-20 04:42 +0000

1import io 

2from typing import Generator, Sequence 

3from logging import getLogger 

4 

5_log = getLogger(__name__) 

6 

7 

8class Stream: 

9 """ 

10 stream base class 

11 

12 work as pass-through stream 

13 """ 

14 

15 def __init__(self, prev_stream): 

16 self.prev = prev_stream 

17 

18 def init_fp(self): 

19 """prepare self as file-like interface""" 

20 _log.debug("use as fp(%s)", self.__class__.__name__) 

21 self.gen1 = self.prev.gen() 

22 self.buf = [next(self.gen1)] 

23 self.eof = False 

24 

25 # work as pass-thru stream 

26 def gen(self) -> Generator[bytes, None, None]: 

27 """read part generator""" 

28 yield from self.prev.gen() 

29 

30 def read_all(self) -> bytes: 

31 """read all content""" 

32 buf = io.BytesIO() 

33 for i in self.gen(): 

34 _log.debug("read %d bytes", len(i)) 

35 buf.write(i) 

36 _log.debug("finish read") 

37 return buf.getvalue() 

38 

39 def text_gen(self) -> Generator[str, None, None]: 

40 """readline generator""" 

41 rest = b"" 

42 for i in self.gen(): 

43 d = i.rfind(b"\n") 

44 if d != -1: 

45 buf0 = io.BytesIO(rest + i[: d + 1]) 

46 rest = i[d + 1 :] 

47 buf = io.TextIOWrapper(buf0) 

48 yield from buf 

49 else: 

50 rest = rest + i 

51 if rest: 

52 buf = io.TextIOWrapper(io.BytesIO(rest)) 

53 yield from buf 

54 

55 def read(self, sz: int = -1) -> bytes: 

56 """ 

57 read up to size bytes 

58 

59 used for file-like interface 

60 

61 args: 

62 sz: read size. if size is not specified or -1, read all content. 

63 """ 

64 assert hasattr(self, "eof") 

65 if self.eof: 

66 return b"" 

67 if sz == -1: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true

68 _log.debug("read all") 

69 try: 

70 while True: 

71 self.buf.append(next(self.gen1)) 

72 except StopIteration: 

73 _log.debug("read %s / %s", len(self.buf), sum([len(x) for x in self.buf])) 

74 buf = self.buf 

75 self.buf = [] 

76 self.eof = True 

77 return b"".join(buf) 

78 cur = sum([len(x) for x in self.buf]) 

79 try: 

80 _log.debug("read part cur=%s / sz=%s", cur, sz) 

81 while cur < sz: 81 ↛ 86line 81 didn't jump to line 86 because the condition on line 81 was always true

82 bt = next(self.gen1) 

83 _log.debug("read1 %d / cur=%s, sz=%s", len(bt), cur, sz) 

84 self.buf.append(bt) 

85 cur += len(bt) 

86 buf = b"".join(self.buf) 

87 self.buf = [buf[sz:]] 

88 _log.debug("return %s, rest=%s", sz, len(self.buf[0])) 

89 return buf[:sz] 

90 except StopIteration: 

91 _log.debug("eof %s / %s", len(self.buf), sum([len(x) for x in self.buf])) 

92 buf = self.buf 

93 self.buf = [] 

94 self.eof = True 

95 return b"".join(buf) 

96 

97 

98class CatStream: 

99 def __init__(self, inputs: list[Stream]): 

100 self.inputs = inputs 

101 

102 def gen(self) -> Generator[bytes, None, None]: 

103 for i in self.inputs: 

104 yield from i.gen() 

105 

106 

107class MergeStream: 

108 def __init__(self, inputs: Sequence[Stream], bufsize: int = 4096): 

109 self.inputs = inputs 

110 self.bufsize = bufsize 

111 

112 def gen(self) -> Generator[bytes, None, None]: 

113 buf = io.BytesIO() 

114 for i in self.text_gen(): 

115 buf.write(i.encode("utf-8")) 

116 if buf.tell() > self.bufsize: 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true

117 yield buf.getvalue() 

118 buf.truncate(0) 

119 buf.seek(0) 

120 yield buf.getvalue() 

121 

122 def text_gen(self) -> Generator[str, None, None]: 

123 txt_gens = [x.text_gen() for x in self.inputs] 

124 input_files = [[next(x), x] for x in txt_gens] 

125 input_files.sort(key=lambda f: f[0]) 

126 while len(input_files) != 0: 

127 yield input_files[0][0] 

128 try: 

129 input_files[0][0] = next(input_files[0][1]) 

130 if len(input_files) == 1 or input_files[0][0] < input_files[1][0]: 130 ↛ 132line 130 didn't jump to line 132 because the condition on line 130 was never true

131 # already sorted 

132 continue 

133 except StopIteration: 

134 input_files.pop(0) 

135 input_files.sort(key=lambda f: f[0])