Coverage for log2s3/compr_stream.py: 91%

239 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-29 12:02 +0000

1import lzma 

2import bz2 

3import gzip 

4import pathlib 

5from .common_stream import Stream 

6from typing import Optional, Callable 

7from logging import getLogger 

8import io 

9import os 

10 

11try: 

12 from mypy_boto3_s3.client import S3Client as S3ClientType 

13except ImportError: 

14 from typing import Any as S3ClientType 

15 

16_log = getLogger(__name__) 

17 

18 

19class FileReadStream(Stream): 

20 """ 

21 Read from file, stream interface 

22 """ 

23 

24 def __init__( 

25 self, file_like: io.RawIOBase | io.BufferedReader, bufsize=10 * 1024 * 1024 

26 ): 

27 self.fd = file_like 

28 self.bufsize = bufsize 

29 

30 def gen(self): 

31 while True: 

32 data = self.fd.read(self.bufsize) 

33 if data is None or len(data) == 0: 

34 break 

35 _log.debug("read file %d", len(data)) 

36 yield data 

37 

38 

39class RawReadStream(Stream): 

40 """ 

41 Read from bytes, stream interface 

42 """ 

43 

44 def __init__(self, data: bytes, bufsize=1024 * 1024): 

45 self.fd = io.BytesIO(data) 

46 self.bufsize = bufsize 

47 

48 def gen(self): 

49 while True: 

50 data = self.fd.read(self.bufsize) 

51 _log.debug("read file %d", len(data)) 

52 if len(data) == 0: 

53 break 

54 yield data 

55 

56 

57class FileWriteStream(Stream): 

58 """ 

59 Read data from prev_stream and write to file-like object. 

60 """ 

61 

62 def __init__( 

63 self, 

64 prev_stream, 

65 file_like: io.RawIOBase | io.BufferedWriter, 

66 bufsize=1024 * 1024, 

67 ): 

68 super().__init__(prev_stream) 

69 self.fd = file_like 

70 self.bufsize = bufsize 

71 

72 def gen(self): 

73 for i in self.prev.gen(): 

74 yield self.fd.write(i) 

75 

76 

77class S3GetStream(Stream): 

78 """ 

79 Read data from S3 object with chunked read. 

80 """ 

81 

82 def __init__( 

83 self, s3_client: S3ClientType, bucket: str, key: str, bufsize=1024 * 1024 

84 ): 

85 self.obj = s3_client.get_object(Bucket=bucket, Key=key) 

86 self.bufsize = bufsize 

87 

88 def gen(self): 

89 yield from self.obj["Body"].iter_chunks(self.bufsize) 

90 

91 

92class S3PutStream(Stream): 

93 """ 

94 Read data from prev_stream and write to S3 object. 

95 """ 

96 

97 def __init__( 

98 self, 

99 prev_stream, 

100 s3_client: S3ClientType, 

101 bucket: str, 

102 key: str, 

103 bufsize=1024 * 1024, 

104 ): 

105 super().__init__(prev_stream) 

106 self.client = s3_client 

107 self.bucket = bucket 

108 self.key = key 

109 self.bufsize = bufsize 

110 self.init_fp() 

111 _log.debug("eof is %s", self.eof) 

112 

113 def gen(self): 

114 _log.debug("gen: bucket=%s, key=%s", self.bucket, self.key) 

115 self.client.upload_fileobj(self, self.bucket, self.key) # type: ignore 

116 yield b"" 

117 

118 

119class SimpleFilterStream(Stream): 

120 """ 

121 simple compress/decompress function as stream interface, base class 

122 """ 

123 

124 def __init__(self, prev_stream, filter_fn: Callable[[bytes], bytes]): 

125 super().__init__(prev_stream) 

126 self.filter_fn = filter_fn 

127 

128 def gen(self): 

129 yield self.filter_fn(self.prev.read_all()) 

130 

131 

132class ComprFlushStream(Stream): 

133 """ 

134 repeat compress, and finally flush() 

135 """ 

136 

137 def __init__(self, prev_stream, compressor): 

138 super().__init__(prev_stream) 

139 self.compr = compressor 

140 

141 def gen(self): 

142 for i in self.prev.gen(): 

143 _log.debug("compress %d", len(i)) 

144 yield self.compr.compress(i) 

145 _log.debug("flush") 

146 yield self.compr.flush() 

147 

148 

149class DecompStream(Stream): 

150 """ 

151 repeat decompress 

152 """ 

153 

154 def __init__(self, prev_stream, decompressor): 

155 super().__init__(prev_stream) 

156 self.decompr = decompressor 

157 

158 def gen(self): 

159 for i in self.prev.gen(): 

160 yield self.decompr.decompress(i) 

161 

162 

163class XzCompressorStream(ComprFlushStream): 

164 """ 

165 compressor stream for .xz format 

166 """ 

167 

168 def __init__(self, prev_stream): 

169 super().__init__(prev_stream, lzma.LZMACompressor(format=lzma.FORMAT_XZ)) 

170 

171 

172class LzmaCompressorStream(ComprFlushStream): 

173 """ 

174 compressor stream for .lzma format 

175 """ 

176 

177 def __init__(self, prev_stream): 

178 super().__init__(prev_stream, lzma.LZMACompressor(format=lzma.FORMAT_ALONE)) 

179 

180 

181class XzDecompressorStream(DecompStream): 

182 """ 

183 decompressor stream for .xz or .lzma format 

184 """ 

185 

186 def __init__(self, prev_stream): 

187 super().__init__(prev_stream, lzma.LZMADecompressor(format=lzma.FORMAT_AUTO)) 

188 

189 

190LzmaDecompressorStream = XzDecompressorStream 

191 

192 

193class Bz2CompressorStream(ComprFlushStream): 

194 """ 

195 compressor stream for .bz2 format 

196 """ 

197 

198 def __init__(self, prev_stream): 

199 super().__init__(prev_stream, bz2.BZ2Compressor()) 

200 

201 

202class Bz2DecompressorStream(DecompStream): 

203 """ 

204 decompressor stream for .bz2 format 

205 """ 

206 

207 def __init__(self, prev_stream): 

208 super().__init__(prev_stream, bz2.BZ2Decompressor()) 

209 

210 

211class GzipCompressorStream(SimpleFilterStream): 

212 """ 

213 compressor stream for .gz format 

214 """ 

215 

216 def __init__(self, prev_stream): 

217 super().__init__(prev_stream, gzip.compress) 

218 

219 

220class GzipDecompressorStream(SimpleFilterStream): 

221 """ 

222 decompressor stream for .gz format 

223 """ 

224 

225 def __init__(self, prev_stream): 

226 super().__init__(prev_stream, gzip.decompress) 

227 

228 

229stream_map: dict[str, tuple[str, type[Stream], type[Stream]]] = { 

230 "pass": ("", Stream, Stream), 

231 "gzip": (".gz", GzipCompressorStream, GzipDecompressorStream), 

232 "bzip2": (".bz2", Bz2CompressorStream, Bz2DecompressorStream), 

233 "xz": (".xz", XzCompressorStream, XzDecompressorStream), 

234 "lzma": (".lzma", LzmaCompressorStream, LzmaDecompressorStream), 

235} 

236 

237 

238try: 

239 try: 

240 from compression import zstd 

241 except ImportError: 

242 import zstd 

243 

244 class ZstdCompressorStream(SimpleFilterStream): 

245 def __init__(self, prev_stream): 

246 super().__init__(prev_stream, zstd.compress) 

247 

248 class ZstdDecompressorStream(SimpleFilterStream): 

249 def __init__(self, prev_stream): 

250 super().__init__(prev_stream, zstd.decompress) 

251 

252 stream_map["zstd"] = (".zstd", ZstdCompressorStream, ZstdDecompressorStream) 

253 

254except ImportError: 

255 pass 

256 

257try: 

258 import lz4.frame 

259 

260 # lz4.frame.LZ4FrameCompressor does not work? 

261 class Lz4CompressorStream(SimpleFilterStream): 

262 def __init__(self, prev_stream): 

263 super().__init__(prev_stream, lz4.frame.compress) 

264 

265 # lz4.frame.LZ4FrameDecompressor does not work? 

266 class Lz4DecompressorStream(SimpleFilterStream): 

267 def __init__(self, prev_stream): 

268 super().__init__(prev_stream, lz4.frame.decompress) 

269 

270 stream_map["lz4"] = (".lz4", Lz4CompressorStream, Lz4DecompressorStream) 

271 

272except ImportError: 

273 pass 

274 

275try: 

276 import brotli 

277 

278 class BrotliCompressorStream(Stream): 

279 def __init__(self, prev_stream): 

280 super().__init__(prev_stream) 

281 self.compr = brotli.Compressor() 

282 

283 def gen(self): 

284 for i in self.prev.gen(): 

285 yield self.compr.process(i) 

286 yield self.compr.flush() 

287 

288 class BrotliDecompressorStream(Stream): 

289 def __init__(self, prev_stream): 

290 super().__init__(prev_stream) 

291 self.decompr = brotli.Decompressor() 

292 

293 def gen(self): 

294 for i in self.prev.gen(): 

295 yield self.decompr.process(i) 

296 

297 stream_map["brotli"] = (".br", BrotliCompressorStream, BrotliDecompressorStream) 

298 

299except ImportError: 

300 pass 

301 

302try: 

303 import liblzfse 

304 

305 class LzfseCompressorStream(SimpleFilterStream): 

306 def __init__(self, prev_stream): 

307 super().__init__(prev_stream, liblzfse.compress) 

308 

309 class LzfseDecompressorStream(SimpleFilterStream): 

310 def __init__(self, prev_stream): 

311 super().__init__(prev_stream, liblzfse.decompress) 

312 

313 stream_map["lzfse"] = (".lzfse", LzfseCompressorStream, LzfseDecompressorStream) 

314 

315except ImportError: 

316 pass 

317 

318try: 

319 import snappy # type: ignore 

320 

321 class SnappyCompressorStream(ComprFlushStream): 

322 def __init__(self, prev_stream): 

323 super().__init__(prev_stream, snappy.StreamCompressor()) 

324 

325 class SnappyDecompressorStream(DecompStream): 

326 def __init__(self, prev_stream): 

327 super().__init__(prev_stream, snappy.StreamDecompressor()) 

328 

329 # does not work? 

330 # stream_map["snappy"] = (".snappy", SnappyCompressorStream, SnappyDecompressorStream) 

331except ImportError: 

332 pass 

333 

334try: 

335 import lzo # type: ignore 

336 

337 class LzoCompressorStream(SimpleFilterStream): 

338 def __init__(self, prev_stream): 

339 super().__init__(prev_stream, lzo.compress) 

340 

341 class LzoDecompressorStream(SimpleFilterStream): 

342 def __init__(self, prev_stream): 

343 super().__init__(prev_stream, lzo.decompress) 

344 

345 stream_map["lzo"] = (".lzo", LzoCompressorStream, LzoDecompressorStream) 

346except ImportError: 

347 pass 

348 

349try: 

350 import zpaq # type: ignore 

351 

352 class ZpaqCompressorStream(SimpleFilterStream): 

353 def __init__(self, prev_stream): 

354 super().__init__(prev_stream, zpaq.compress) 

355 

356 class ZpaqDecompressorStream(SimpleFilterStream): 

357 def __init__(self, prev_stream): 

358 super().__init__(prev_stream, zpaq.decompress) 

359 

360 stream_map["zpaq"] = (".zpaq", ZpaqCompressorStream, ZpaqDecompressorStream) 

361 

362except ImportError: 

363 pass 

364 

365try: 

366 import zopfli.gzip 

367 

368 class ZopfliCompressorStream(SimpleFilterStream): 

369 def __init__(self, prev_stream): 

370 super().__init__(prev_stream, zopfli.gzip.compress) 

371 

372 stream_map["zopfli"] = ("", ZopfliCompressorStream, GzipDecompressorStream) 

373except ImportError: 

374 pass 

375 

376 

377try: 

378 import zlib_ng.gzip_ng # type: ignore 

379 

380 class ZlibNgCompressorStream(SimpleFilterStream): 

381 def __init__(self, prev_stream): 

382 super().__init__(prev_stream, zlib_ng.gzip_ng.compress) 

383 

384 class ZlibNgDecompressorStream(SimpleFilterStream): 

385 def __init__(self, prev_stream): 

386 super().__init__(prev_stream, zlib_ng.gzip_ng.decompress) 

387 

388 stream_map["zlib-ng"] = ("", ZlibNgCompressorStream, ZlibNgDecompressorStream) 

389except ImportError: 

390 pass 

391 

392stream_ext: dict[str, tuple[str, type[Stream], type[Stream]]] = { 

393 v[0]: (k, *v[1:]) for k, v in stream_map.items() 

394} 

395stream_compress_modes = list(stream_map.keys()) + ["decompress", "raw"] 

396 

397 

398def auto_compress_stream( 

399 ifname: pathlib.Path, mode: str, ifp: Optional[Stream] = None 

400) -> tuple[os.PathLike, Stream]: 

401 if ifp is None: 

402 ifp = FileReadStream(ifname.open("br")) 

403 if mode == "raw": 

404 return ifname, ifp 

405 base, ext = os.path.splitext(str(ifname)) 

406 # decompress 

407 res: Stream = ifp 

408 if ext in stream_ext: 

409 imode, _, dst = stream_ext[ext] 

410 if imode == mode: 

411 return ifname, res 

412 _log.debug("input mode: %s", imode) 

413 res = dst(res) 

414 else: 

415 base = str(ifname) 

416 if mode == "decompress": 

417 return pathlib.Path(base), res 

418 # compress 

419 if mode in stream_map: 

420 ext, cst, _ = stream_map[mode] 

421 res = cst(res) 

422 base = base + ext 

423 return pathlib.Path(base), res