Coverage for log2s3 / compr_stream.py: 92%

239 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-20 04:42 +0000

1import lzma 

2import bz2 

3import gzip 

4import pathlib 

5from .common_stream import Stream 

6from typing import Optional, Callable 

7from logging import getLogger 

8import io 

9import os 

10 

11try: 

12 from mypy_boto3_s3.client import S3Client as S3ClientType 

13except ImportError: 

14 from typing import Any as S3ClientType 

15 

16_log = getLogger(__name__) 

17 

18 

19class FileReadStream(Stream): 

20 """ 

21 Read from file, stream interface 

22 """ 

23 

24 def __init__(self, file_like: io.RawIOBase | io.BufferedReader, bufsize=10 * 1024 * 1024): 

25 self.fd = file_like 

26 self.bufsize = bufsize 

27 

28 def gen(self): 

29 while True: 

30 data = self.fd.read(self.bufsize) 

31 if data is None or len(data) == 0: 

32 break 

33 _log.debug("read file %d", len(data)) 

34 yield data 

35 

36 

37class RawReadStream(Stream): 

38 """ 

39 Read from bytes, stream interface 

40 """ 

41 

42 def __init__(self, data: bytes, bufsize=1024 * 1024): 

43 self.fd = io.BytesIO(data) 

44 self.bufsize = bufsize 

45 

46 def gen(self): 

47 while True: 

48 data = self.fd.read(self.bufsize) 

49 _log.debug("read file %d", len(data)) 

50 if len(data) == 0: 

51 break 

52 yield data 

53 

54 

55class FileWriteStream(Stream): 

56 """ 

57 Read data from prev_stream and write to file-like object. 

58 """ 

59 

60 def __init__( 

61 self, 

62 prev_stream, 

63 file_like: io.RawIOBase | io.BufferedWriter, 

64 bufsize=1024 * 1024, 

65 ): 

66 super().__init__(prev_stream) 

67 self.fd = file_like 

68 self.bufsize = bufsize 

69 

70 def gen(self): 

71 for i in self.prev.gen(): 

72 yield self.fd.write(i) 

73 

74 

75class S3GetStream(Stream): 

76 """ 

77 Read data from S3 object with chunked read. 

78 """ 

79 

80 def __init__(self, s3_client: S3ClientType, bucket: str, key: str, bufsize=1024 * 1024): 

81 self.obj = s3_client.get_object(Bucket=bucket, Key=key) 

82 self.bufsize = bufsize 

83 

84 def gen(self): 

85 yield from self.obj["Body"].iter_chunks(self.bufsize) 

86 

87 

88class S3PutStream(Stream): 

89 """ 

90 Read data from prev_stream and write to S3 object. 

91 """ 

92 

93 def __init__( 

94 self, 

95 prev_stream, 

96 s3_client: S3ClientType, 

97 bucket: str, 

98 key: str, 

99 bufsize=1024 * 1024, 

100 ): 

101 super().__init__(prev_stream) 

102 self.client = s3_client 

103 self.bucket = bucket 

104 self.key = key 

105 self.bufsize = bufsize 

106 self.init_fp() 

107 _log.debug("eof is %s", self.eof) 

108 

109 def gen(self): 

110 _log.debug("gen: bucket=%s, key=%s", self.bucket, self.key) 

111 self.client.upload_fileobj(self, self.bucket, self.key) # type: ignore 

112 yield b"" 

113 

114 

115class SimpleFilterStream(Stream): 

116 """ 

117 simple compress/decompress function as stream interface, base class 

118 """ 

119 

120 def __init__(self, prev_stream, filter_fn: Callable[[bytes], bytes]): 

121 super().__init__(prev_stream) 

122 self.filter_fn = filter_fn 

123 

124 def gen(self): 

125 yield self.filter_fn(self.prev.read_all()) 

126 

127 

128class ComprFlushStream(Stream): 

129 """ 

130 repeat compress, and finally flush() 

131 """ 

132 

133 def __init__(self, prev_stream, compressor): 

134 super().__init__(prev_stream) 

135 self.compr = compressor 

136 

137 def gen(self): 

138 for i in self.prev.gen(): 

139 _log.debug("compress %d", len(i)) 

140 yield self.compr.compress(i) 

141 _log.debug("flush") 

142 yield self.compr.flush() 

143 

144 

145class DecompStream(Stream): 

146 """ 

147 repeat decompress 

148 """ 

149 

150 def __init__(self, prev_stream, decompressor): 

151 super().__init__(prev_stream) 

152 self.decompr = decompressor 

153 

154 def gen(self): 

155 for i in self.prev.gen(): 

156 yield self.decompr.decompress(i) 

157 

158 

159class XzCompressorStream(ComprFlushStream): 

160 """ 

161 compressor stream for .xz format 

162 """ 

163 

164 def __init__(self, prev_stream): 

165 super().__init__(prev_stream, lzma.LZMACompressor(format=lzma.FORMAT_XZ)) 

166 

167 

168class LzmaCompressorStream(ComprFlushStream): 

169 """ 

170 compressor stream for .lzma format 

171 """ 

172 

173 def __init__(self, prev_stream): 

174 super().__init__(prev_stream, lzma.LZMACompressor(format=lzma.FORMAT_ALONE)) 

175 

176 

177class XzDecompressorStream(DecompStream): 

178 """ 

179 decompressor stream for .xz or .lzma format 

180 """ 

181 

182 def __init__(self, prev_stream): 

183 super().__init__(prev_stream, lzma.LZMADecompressor(format=lzma.FORMAT_AUTO)) 

184 

185 

186LzmaDecompressorStream = XzDecompressorStream 

187 

188 

189class Bz2CompressorStream(ComprFlushStream): 

190 """ 

191 compressor stream for .bz2 format 

192 """ 

193 

194 def __init__(self, prev_stream): 

195 super().__init__(prev_stream, bz2.BZ2Compressor()) 

196 

197 

198class Bz2DecompressorStream(DecompStream): 

199 """ 

200 decompressor stream for .bz2 format 

201 """ 

202 

203 def __init__(self, prev_stream): 

204 super().__init__(prev_stream, bz2.BZ2Decompressor()) 

205 

206 

207class GzipCompressorStream(SimpleFilterStream): 

208 """ 

209 compressor stream for .gz format 

210 """ 

211 

212 def __init__(self, prev_stream): 

213 super().__init__(prev_stream, gzip.compress) 

214 

215 

216class GzipDecompressorStream(SimpleFilterStream): 

217 """ 

218 decompressor stream for .gz format 

219 """ 

220 

221 def __init__(self, prev_stream): 

222 super().__init__(prev_stream, gzip.decompress) 

223 

224 

225stream_map: dict[str, tuple[str, type[Stream], type[Stream]]] = { 

226 "pass": ("", Stream, Stream), 

227 "gzip": (".gz", GzipCompressorStream, GzipDecompressorStream), 

228 "bzip2": (".bz2", Bz2CompressorStream, Bz2DecompressorStream), 

229 "xz": (".xz", XzCompressorStream, XzDecompressorStream), 

230 "lzma": (".lzma", LzmaCompressorStream, LzmaDecompressorStream), 

231} 

232 

233 

234try: 

235 try: 

236 from compression import zstd 

237 except ImportError: 

238 import zstd 

239 

240 class ZstdCompressorStream(SimpleFilterStream): 

241 def __init__(self, prev_stream): 

242 super().__init__(prev_stream, zstd.compress) 

243 

244 class ZstdDecompressorStream(SimpleFilterStream): 

245 def __init__(self, prev_stream): 

246 super().__init__(prev_stream, zstd.decompress) 

247 

248 stream_map["zstd"] = (".zstd", ZstdCompressorStream, ZstdDecompressorStream) 

249 

250except ImportError: 

251 pass 

252 

253try: 

254 import lz4.frame 

255 

256 # lz4.frame.LZ4FrameCompressor does not work? 

257 class Lz4CompressorStream(SimpleFilterStream): 

258 def __init__(self, prev_stream): 

259 super().__init__(prev_stream, lz4.frame.compress) 

260 

261 # lz4.frame.LZ4FrameDecompressor does not work? 

262 class Lz4DecompressorStream(SimpleFilterStream): 

263 def __init__(self, prev_stream): 

264 super().__init__(prev_stream, lz4.frame.decompress) 

265 

266 stream_map["lz4"] = (".lz4", Lz4CompressorStream, Lz4DecompressorStream) 

267 

268except ImportError: 

269 pass 

270 

271try: 

272 import brotli 

273 

274 class BrotliCompressorStream(Stream): 

275 def __init__(self, prev_stream): 

276 super().__init__(prev_stream) 

277 self.compr = brotli.Compressor() 

278 

279 def gen(self): 

280 for i in self.prev.gen(): 

281 yield self.compr.process(i) 

282 yield self.compr.flush() 

283 

284 class BrotliDecompressorStream(Stream): 

285 def __init__(self, prev_stream): 

286 super().__init__(prev_stream) 

287 self.decompr = brotli.Decompressor() 

288 

289 def gen(self): 

290 for i in self.prev.gen(): 

291 yield self.decompr.process(i) 

292 

293 stream_map["brotli"] = (".br", BrotliCompressorStream, BrotliDecompressorStream) 

294 

295except ImportError: 

296 pass 

297 

298try: 

299 import liblzfse 

300 

301 class LzfseCompressorStream(SimpleFilterStream): 

302 def __init__(self, prev_stream): 

303 super().__init__(prev_stream, liblzfse.compress) 

304 

305 class LzfseDecompressorStream(SimpleFilterStream): 

306 def __init__(self, prev_stream): 

307 super().__init__(prev_stream, liblzfse.decompress) 

308 

309 stream_map["lzfse"] = (".lzfse", LzfseCompressorStream, LzfseDecompressorStream) 

310 

311except ImportError: 

312 pass 

313 

314try: 

315 import snappy # type: ignore 

316 

317 class SnappyCompressorStream(ComprFlushStream): 

318 def __init__(self, prev_stream): 

319 super().__init__(prev_stream, snappy.StreamCompressor()) 

320 

321 class SnappyDecompressorStream(DecompStream): 

322 def __init__(self, prev_stream): 

323 super().__init__(prev_stream, snappy.StreamDecompressor()) 

324 

325 # does not work? 

326 # stream_map["snappy"] = (".snappy", SnappyCompressorStream, SnappyDecompressorStream) 

327except ImportError: 

328 pass 

329 

330try: 

331 import lzo # type: ignore 

332 

333 class LzoCompressorStream(SimpleFilterStream): 

334 def __init__(self, prev_stream): 

335 super().__init__(prev_stream, lzo.compress) 

336 

337 class LzoDecompressorStream(SimpleFilterStream): 

338 def __init__(self, prev_stream): 

339 super().__init__(prev_stream, lzo.decompress) 

340 

341 stream_map["lzo"] = (".lzo", LzoCompressorStream, LzoDecompressorStream) 

342except ImportError: 

343 pass 

344 

345try: 

346 import zpaq # type: ignore 

347 

348 class ZpaqCompressorStream(SimpleFilterStream): 

349 def __init__(self, prev_stream): 

350 super().__init__(prev_stream, zpaq.compress) 

351 

352 class ZpaqDecompressorStream(SimpleFilterStream): 

353 def __init__(self, prev_stream): 

354 super().__init__(prev_stream, zpaq.decompress) 

355 

356 stream_map["zpaq"] = (".zpaq", ZpaqCompressorStream, ZpaqDecompressorStream) 

357 

358except ImportError: 

359 pass 

360 

361try: 

362 import zopfli.gzip 

363 

364 class ZopfliCompressorStream(SimpleFilterStream): 

365 def __init__(self, prev_stream): 

366 super().__init__(prev_stream, zopfli.gzip.compress) 

367 

368 stream_map["zopfli"] = ("", ZopfliCompressorStream, GzipDecompressorStream) 

369except ImportError: 

370 pass 

371 

372 

373try: 

374 import zlib_ng.gzip_ng # type: ignore 

375 

376 class ZlibNgCompressorStream(SimpleFilterStream): 

377 def __init__(self, prev_stream): 

378 super().__init__(prev_stream, zlib_ng.gzip_ng.compress) 

379 

380 class ZlibNgDecompressorStream(SimpleFilterStream): 

381 def __init__(self, prev_stream): 

382 super().__init__(prev_stream, zlib_ng.gzip_ng.decompress) 

383 

384 stream_map["zlib-ng"] = ("", ZlibNgCompressorStream, ZlibNgDecompressorStream) 

385except ImportError: 

386 pass 

387 

388stream_ext: dict[str, tuple[str, type[Stream], type[Stream]]] = {v[0]: (k, *v[1:]) for k, v in stream_map.items()} 

389stream_compress_modes = list(stream_map.keys()) + ["decompress", "raw"] 

390 

391 

392def auto_compress_stream(ifname: pathlib.Path, mode: str, ifp: Optional[Stream] = None) -> tuple[os.PathLike, Stream]: 

393 if ifp is None: 

394 ifp = FileReadStream(ifname.open("br")) 

395 if mode == "raw": 

396 return ifname, ifp 

397 base, ext = os.path.splitext(str(ifname)) 

398 # decompress 

399 res: Stream = ifp 

400 if ext in stream_ext: 

401 imode, _, dst = stream_ext[ext] 

402 if imode == mode: 

403 return ifname, res 

404 _log.debug("input mode: %s", imode) 

405 res = dst(res) 

406 else: 

407 base = str(ifname) 

408 if mode == "decompress": 

409 return pathlib.Path(base), res 

410 # compress 

411 if mode in stream_map: 

412 ext, cst, _ = stream_map[mode] 

413 res = cst(res) 

414 base = base + ext 

415 return pathlib.Path(base), res