Coverage for log2s3/common_stream.py: 82%

99 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-29 12:02 +0000

1import io 

2from typing import Generator, Sequence 

3from logging import getLogger 

4 

5_log = getLogger(__name__) 

6 

7 

8class Stream: 

9 """ 

10 stream base class 

11 

12 work as pass-through stream 

13 """ 

14 

15 def __init__(self, prev_stream): 

16 self.prev = prev_stream 

17 

18 def init_fp(self): 

19 """prepare self as file-like interface""" 

20 _log.debug("use as fp(%s)", self.__class__.__name__) 

21 self.gen1 = self.prev.gen() 

22 self.buf = [next(self.gen1)] 

23 self.eof = False 

24 

25 # work as pass-thru stream 

26 def gen(self) -> Generator[bytes, None, None]: 

27 """read part generator""" 

28 yield from self.prev.gen() 

29 

30 def read_all(self) -> bytes: 

31 """read all content""" 

32 buf = io.BytesIO() 

33 for i in self.gen(): 

34 _log.debug("read %d bytes", len(i)) 

35 buf.write(i) 

36 _log.debug("finish read") 

37 return buf.getvalue() 

38 

39 def text_gen(self) -> Generator[str, None, None]: 

40 """readline generator""" 

41 rest = b"" 

42 for i in self.gen(): 

43 d = i.rfind(b"\n") 

44 if d != -1: 

45 buf0 = io.BytesIO(rest + i[: d + 1]) 

46 rest = i[d + 1 :] 

47 buf = io.TextIOWrapper(buf0) 

48 yield from buf 

49 else: 

50 rest = rest + i 

51 if rest: 

52 buf = io.TextIOWrapper(io.BytesIO(rest)) 

53 yield from buf 

54 

55 def read(self, sz: int = -1) -> bytes: 

56 """ 

57 read up to size bytes 

58 

59 used for file-like interface 

60 

61 args: 

62 sz: read size. if size is not specified or -1, read all content. 

63 """ 

64 assert hasattr(self, "eof") 

65 if self.eof: 

66 return b"" 

67 if sz == -1: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true

68 _log.debug("read all") 

69 try: 

70 while True: 

71 self.buf.append(next(self.gen1)) 

72 except StopIteration: 

73 _log.debug( 

74 "read %s / %s", len(self.buf), sum([len(x) for x in self.buf]) 

75 ) 

76 buf = self.buf 

77 self.buf = [] 

78 self.eof = True 

79 return b"".join(buf) 

80 cur = sum([len(x) for x in self.buf]) 

81 try: 

82 _log.debug("read part cur=%s / sz=%s", cur, sz) 

83 while cur < sz: 83 ↛ 88line 83 didn't jump to line 88 because the condition on line 83 was always true

84 bt = next(self.gen1) 

85 _log.debug("read1 %d / cur=%s, sz=%s", len(bt), cur, sz) 

86 self.buf.append(bt) 

87 cur += len(bt) 

88 buf = b"".join(self.buf) 

89 self.buf = [buf[sz:]] 

90 _log.debug("return %s, rest=%s", sz, len(self.buf[0])) 

91 return buf[:sz] 

92 except StopIteration: 

93 _log.debug("eof %s / %s", len(self.buf), sum([len(x) for x in self.buf])) 

94 buf = self.buf 

95 self.buf = [] 

96 self.eof = True 

97 return b"".join(buf) 

98 

99 

100class CatStream: 

101 def __init__(self, inputs: list[Stream]): 

102 self.inputs = inputs 

103 

104 def gen(self) -> Generator[bytes, None, None]: 

105 for i in self.inputs: 

106 yield from i.gen() 

107 

108 

109class MergeStream: 

110 def __init__(self, inputs: Sequence[Stream], bufsize: int = 4096): 

111 self.inputs = inputs 

112 self.bufsize = bufsize 

113 

114 def gen(self) -> Generator[bytes, None, None]: 

115 buf = io.BytesIO() 

116 for i in self.text_gen(): 

117 buf.write(i.encode("utf-8")) 

118 if buf.tell() > self.bufsize: 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true

119 yield buf.getvalue() 

120 buf.truncate(0) 

121 buf.seek(0) 

122 yield buf.getvalue() 

123 

124 def text_gen(self) -> Generator[str, None, None]: 

125 txt_gens = [x.text_gen() for x in self.inputs] 

126 input_files = [[next(x), x] for x in txt_gens] 

127 input_files.sort(key=lambda f: f[0]) 

128 while len(input_files) != 0: 

129 yield input_files[0][0] 

130 try: 

131 input_files[0][0] = next(input_files[0][1]) 

132 if len(input_files) == 1 or input_files[0][0] < input_files[1][0]: 132 ↛ 134line 132 didn't jump to line 134 because the condition on line 132 was never true

133 # already sorted 

134 continue 

135 except StopIteration: 

136 input_files.pop(0) 

137 input_files.sort(key=lambda f: f[0])