Coverage for log2s3 / compr_stream.py: 92%
239 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-20 04:42 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-20 04:42 +0000
1import lzma
2import bz2
3import gzip
4import pathlib
5from .common_stream import Stream
6from typing import Optional, Callable
7from logging import getLogger
8import io
9import os
11try:
12 from mypy_boto3_s3.client import S3Client as S3ClientType
13except ImportError:
14 from typing import Any as S3ClientType
16_log = getLogger(__name__)
19class FileReadStream(Stream):
20 """
21 Read from file, stream interface
22 """
24 def __init__(self, file_like: io.RawIOBase | io.BufferedReader, bufsize=10 * 1024 * 1024):
25 self.fd = file_like
26 self.bufsize = bufsize
28 def gen(self):
29 while True:
30 data = self.fd.read(self.bufsize)
31 if data is None or len(data) == 0:
32 break
33 _log.debug("read file %d", len(data))
34 yield data
37class RawReadStream(Stream):
38 """
39 Read from bytes, stream interface
40 """
42 def __init__(self, data: bytes, bufsize=1024 * 1024):
43 self.fd = io.BytesIO(data)
44 self.bufsize = bufsize
46 def gen(self):
47 while True:
48 data = self.fd.read(self.bufsize)
49 _log.debug("read file %d", len(data))
50 if len(data) == 0:
51 break
52 yield data
55class FileWriteStream(Stream):
56 """
57 Read data from prev_stream and write to file-like object.
58 """
60 def __init__(
61 self,
62 prev_stream,
63 file_like: io.RawIOBase | io.BufferedWriter,
64 bufsize=1024 * 1024,
65 ):
66 super().__init__(prev_stream)
67 self.fd = file_like
68 self.bufsize = bufsize
70 def gen(self):
71 for i in self.prev.gen():
72 yield self.fd.write(i)
75class S3GetStream(Stream):
76 """
77 Read data from S3 object with chunked read.
78 """
80 def __init__(self, s3_client: S3ClientType, bucket: str, key: str, bufsize=1024 * 1024):
81 self.obj = s3_client.get_object(Bucket=bucket, Key=key)
82 self.bufsize = bufsize
84 def gen(self):
85 yield from self.obj["Body"].iter_chunks(self.bufsize)
88class S3PutStream(Stream):
89 """
90 Read data from prev_stream and write to S3 object.
91 """
93 def __init__(
94 self,
95 prev_stream,
96 s3_client: S3ClientType,
97 bucket: str,
98 key: str,
99 bufsize=1024 * 1024,
100 ):
101 super().__init__(prev_stream)
102 self.client = s3_client
103 self.bucket = bucket
104 self.key = key
105 self.bufsize = bufsize
106 self.init_fp()
107 _log.debug("eof is %s", self.eof)
109 def gen(self):
110 _log.debug("gen: bucket=%s, key=%s", self.bucket, self.key)
111 self.client.upload_fileobj(self, self.bucket, self.key) # type: ignore
112 yield b""
115class SimpleFilterStream(Stream):
116 """
117 simple compress/decompress function as stream interface, base class
118 """
120 def __init__(self, prev_stream, filter_fn: Callable[[bytes], bytes]):
121 super().__init__(prev_stream)
122 self.filter_fn = filter_fn
124 def gen(self):
125 yield self.filter_fn(self.prev.read_all())
128class ComprFlushStream(Stream):
129 """
130 repeat compress, and finally flush()
131 """
133 def __init__(self, prev_stream, compressor):
134 super().__init__(prev_stream)
135 self.compr = compressor
137 def gen(self):
138 for i in self.prev.gen():
139 _log.debug("compress %d", len(i))
140 yield self.compr.compress(i)
141 _log.debug("flush")
142 yield self.compr.flush()
145class DecompStream(Stream):
146 """
147 repeat decompress
148 """
150 def __init__(self, prev_stream, decompressor):
151 super().__init__(prev_stream)
152 self.decompr = decompressor
154 def gen(self):
155 for i in self.prev.gen():
156 yield self.decompr.decompress(i)
159class XzCompressorStream(ComprFlushStream):
160 """
161 compressor stream for .xz format
162 """
164 def __init__(self, prev_stream):
165 super().__init__(prev_stream, lzma.LZMACompressor(format=lzma.FORMAT_XZ))
168class LzmaCompressorStream(ComprFlushStream):
169 """
170 compressor stream for .lzma format
171 """
173 def __init__(self, prev_stream):
174 super().__init__(prev_stream, lzma.LZMACompressor(format=lzma.FORMAT_ALONE))
177class XzDecompressorStream(DecompStream):
178 """
179 decompressor stream for .xz or .lzma format
180 """
182 def __init__(self, prev_stream):
183 super().__init__(prev_stream, lzma.LZMADecompressor(format=lzma.FORMAT_AUTO))
186LzmaDecompressorStream = XzDecompressorStream
189class Bz2CompressorStream(ComprFlushStream):
190 """
191 compressor stream for .bz2 format
192 """
194 def __init__(self, prev_stream):
195 super().__init__(prev_stream, bz2.BZ2Compressor())
198class Bz2DecompressorStream(DecompStream):
199 """
200 decompressor stream for .bz2 format
201 """
203 def __init__(self, prev_stream):
204 super().__init__(prev_stream, bz2.BZ2Decompressor())
207class GzipCompressorStream(SimpleFilterStream):
208 """
209 compressor stream for .gz format
210 """
212 def __init__(self, prev_stream):
213 super().__init__(prev_stream, gzip.compress)
216class GzipDecompressorStream(SimpleFilterStream):
217 """
218 decompressor stream for .gz format
219 """
221 def __init__(self, prev_stream):
222 super().__init__(prev_stream, gzip.decompress)
225stream_map: dict[str, tuple[str, type[Stream], type[Stream]]] = {
226 "pass": ("", Stream, Stream),
227 "gzip": (".gz", GzipCompressorStream, GzipDecompressorStream),
228 "bzip2": (".bz2", Bz2CompressorStream, Bz2DecompressorStream),
229 "xz": (".xz", XzCompressorStream, XzDecompressorStream),
230 "lzma": (".lzma", LzmaCompressorStream, LzmaDecompressorStream),
231}
234try:
235 try:
236 from compression import zstd
237 except ImportError:
238 import zstd
240 class ZstdCompressorStream(SimpleFilterStream):
241 def __init__(self, prev_stream):
242 super().__init__(prev_stream, zstd.compress)
244 class ZstdDecompressorStream(SimpleFilterStream):
245 def __init__(self, prev_stream):
246 super().__init__(prev_stream, zstd.decompress)
248 stream_map["zstd"] = (".zstd", ZstdCompressorStream, ZstdDecompressorStream)
250except ImportError:
251 pass
253try:
254 import lz4.frame
256 # lz4.frame.LZ4FrameCompressor does not work?
257 class Lz4CompressorStream(SimpleFilterStream):
258 def __init__(self, prev_stream):
259 super().__init__(prev_stream, lz4.frame.compress)
261 # lz4.frame.LZ4FrameDecompressor does not work?
262 class Lz4DecompressorStream(SimpleFilterStream):
263 def __init__(self, prev_stream):
264 super().__init__(prev_stream, lz4.frame.decompress)
266 stream_map["lz4"] = (".lz4", Lz4CompressorStream, Lz4DecompressorStream)
268except ImportError:
269 pass
271try:
272 import brotli
274 class BrotliCompressorStream(Stream):
275 def __init__(self, prev_stream):
276 super().__init__(prev_stream)
277 self.compr = brotli.Compressor()
279 def gen(self):
280 for i in self.prev.gen():
281 yield self.compr.process(i)
282 yield self.compr.flush()
284 class BrotliDecompressorStream(Stream):
285 def __init__(self, prev_stream):
286 super().__init__(prev_stream)
287 self.decompr = brotli.Decompressor()
289 def gen(self):
290 for i in self.prev.gen():
291 yield self.decompr.process(i)
293 stream_map["brotli"] = (".br", BrotliCompressorStream, BrotliDecompressorStream)
295except ImportError:
296 pass
298try:
299 import liblzfse
301 class LzfseCompressorStream(SimpleFilterStream):
302 def __init__(self, prev_stream):
303 super().__init__(prev_stream, liblzfse.compress)
305 class LzfseDecompressorStream(SimpleFilterStream):
306 def __init__(self, prev_stream):
307 super().__init__(prev_stream, liblzfse.decompress)
309 stream_map["lzfse"] = (".lzfse", LzfseCompressorStream, LzfseDecompressorStream)
311except ImportError:
312 pass
314try:
315 import snappy # type: ignore
317 class SnappyCompressorStream(ComprFlushStream):
318 def __init__(self, prev_stream):
319 super().__init__(prev_stream, snappy.StreamCompressor())
321 class SnappyDecompressorStream(DecompStream):
322 def __init__(self, prev_stream):
323 super().__init__(prev_stream, snappy.StreamDecompressor())
325 # does not work?
326 # stream_map["snappy"] = (".snappy", SnappyCompressorStream, SnappyDecompressorStream)
327except ImportError:
328 pass
330try:
331 import lzo # type: ignore
333 class LzoCompressorStream(SimpleFilterStream):
334 def __init__(self, prev_stream):
335 super().__init__(prev_stream, lzo.compress)
337 class LzoDecompressorStream(SimpleFilterStream):
338 def __init__(self, prev_stream):
339 super().__init__(prev_stream, lzo.decompress)
341 stream_map["lzo"] = (".lzo", LzoCompressorStream, LzoDecompressorStream)
342except ImportError:
343 pass
345try:
346 import zpaq # type: ignore
348 class ZpaqCompressorStream(SimpleFilterStream):
349 def __init__(self, prev_stream):
350 super().__init__(prev_stream, zpaq.compress)
352 class ZpaqDecompressorStream(SimpleFilterStream):
353 def __init__(self, prev_stream):
354 super().__init__(prev_stream, zpaq.decompress)
356 stream_map["zpaq"] = (".zpaq", ZpaqCompressorStream, ZpaqDecompressorStream)
358except ImportError:
359 pass
361try:
362 import zopfli.gzip
364 class ZopfliCompressorStream(SimpleFilterStream):
365 def __init__(self, prev_stream):
366 super().__init__(prev_stream, zopfli.gzip.compress)
368 stream_map["zopfli"] = ("", ZopfliCompressorStream, GzipDecompressorStream)
369except ImportError:
370 pass
373try:
374 import zlib_ng.gzip_ng # type: ignore
376 class ZlibNgCompressorStream(SimpleFilterStream):
377 def __init__(self, prev_stream):
378 super().__init__(prev_stream, zlib_ng.gzip_ng.compress)
380 class ZlibNgDecompressorStream(SimpleFilterStream):
381 def __init__(self, prev_stream):
382 super().__init__(prev_stream, zlib_ng.gzip_ng.decompress)
384 stream_map["zlib-ng"] = ("", ZlibNgCompressorStream, ZlibNgDecompressorStream)
385except ImportError:
386 pass
388stream_ext: dict[str, tuple[str, type[Stream], type[Stream]]] = {v[0]: (k, *v[1:]) for k, v in stream_map.items()}
389stream_compress_modes = list(stream_map.keys()) + ["decompress", "raw"]
392def auto_compress_stream(ifname: pathlib.Path, mode: str, ifp: Optional[Stream] = None) -> tuple[os.PathLike, Stream]:
393 if ifp is None:
394 ifp = FileReadStream(ifname.open("br"))
395 if mode == "raw":
396 return ifname, ifp
397 base, ext = os.path.splitext(str(ifname))
398 # decompress
399 res: Stream = ifp
400 if ext in stream_ext:
401 imode, _, dst = stream_ext[ext]
402 if imode == mode:
403 return ifname, res
404 _log.debug("input mode: %s", imode)
405 res = dst(res)
406 else:
407 base = str(ifname)
408 if mode == "decompress":
409 return pathlib.Path(base), res
410 # compress
411 if mode in stream_map:
412 ext, cst, _ = stream_map[mode]
413 res = cst(res)
414 base = base + ext
415 return pathlib.Path(base), res