Coverage for log2s3/compr_stream.py: 91%
239 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 12:02 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 12:02 +0000
1import lzma
2import bz2
3import gzip
4import pathlib
5from .common_stream import Stream
6from typing import Optional, Callable
7from logging import getLogger
8import io
9import os
11try:
12 from mypy_boto3_s3.client import S3Client as S3ClientType
13except ImportError:
14 from typing import Any as S3ClientType
16_log = getLogger(__name__)
19class FileReadStream(Stream):
20 """
21 Read from file, stream interface
22 """
24 def __init__(
25 self, file_like: io.RawIOBase | io.BufferedReader, bufsize=10 * 1024 * 1024
26 ):
27 self.fd = file_like
28 self.bufsize = bufsize
30 def gen(self):
31 while True:
32 data = self.fd.read(self.bufsize)
33 if data is None or len(data) == 0:
34 break
35 _log.debug("read file %d", len(data))
36 yield data
39class RawReadStream(Stream):
40 """
41 Read from bytes, stream interface
42 """
44 def __init__(self, data: bytes, bufsize=1024 * 1024):
45 self.fd = io.BytesIO(data)
46 self.bufsize = bufsize
48 def gen(self):
49 while True:
50 data = self.fd.read(self.bufsize)
51 _log.debug("read file %d", len(data))
52 if len(data) == 0:
53 break
54 yield data
57class FileWriteStream(Stream):
58 """
59 Read data from prev_stream and write to file-like object.
60 """
62 def __init__(
63 self,
64 prev_stream,
65 file_like: io.RawIOBase | io.BufferedWriter,
66 bufsize=1024 * 1024,
67 ):
68 super().__init__(prev_stream)
69 self.fd = file_like
70 self.bufsize = bufsize
72 def gen(self):
73 for i in self.prev.gen():
74 yield self.fd.write(i)
77class S3GetStream(Stream):
78 """
79 Read data from S3 object with chunked read.
80 """
82 def __init__(
83 self, s3_client: S3ClientType, bucket: str, key: str, bufsize=1024 * 1024
84 ):
85 self.obj = s3_client.get_object(Bucket=bucket, Key=key)
86 self.bufsize = bufsize
88 def gen(self):
89 yield from self.obj["Body"].iter_chunks(self.bufsize)
92class S3PutStream(Stream):
93 """
94 Read data from prev_stream and write to S3 object.
95 """
97 def __init__(
98 self,
99 prev_stream,
100 s3_client: S3ClientType,
101 bucket: str,
102 key: str,
103 bufsize=1024 * 1024,
104 ):
105 super().__init__(prev_stream)
106 self.client = s3_client
107 self.bucket = bucket
108 self.key = key
109 self.bufsize = bufsize
110 self.init_fp()
111 _log.debug("eof is %s", self.eof)
113 def gen(self):
114 _log.debug("gen: bucket=%s, key=%s", self.bucket, self.key)
115 self.client.upload_fileobj(self, self.bucket, self.key) # type: ignore
116 yield b""
119class SimpleFilterStream(Stream):
120 """
121 simple compress/decompress function as stream interface, base class
122 """
124 def __init__(self, prev_stream, filter_fn: Callable[[bytes], bytes]):
125 super().__init__(prev_stream)
126 self.filter_fn = filter_fn
128 def gen(self):
129 yield self.filter_fn(self.prev.read_all())
132class ComprFlushStream(Stream):
133 """
134 repeat compress, and finally flush()
135 """
137 def __init__(self, prev_stream, compressor):
138 super().__init__(prev_stream)
139 self.compr = compressor
141 def gen(self):
142 for i in self.prev.gen():
143 _log.debug("compress %d", len(i))
144 yield self.compr.compress(i)
145 _log.debug("flush")
146 yield self.compr.flush()
149class DecompStream(Stream):
150 """
151 repeat decompress
152 """
154 def __init__(self, prev_stream, decompressor):
155 super().__init__(prev_stream)
156 self.decompr = decompressor
158 def gen(self):
159 for i in self.prev.gen():
160 yield self.decompr.decompress(i)
163class XzCompressorStream(ComprFlushStream):
164 """
165 compressor stream for .xz format
166 """
168 def __init__(self, prev_stream):
169 super().__init__(prev_stream, lzma.LZMACompressor(format=lzma.FORMAT_XZ))
172class LzmaCompressorStream(ComprFlushStream):
173 """
174 compressor stream for .lzma format
175 """
177 def __init__(self, prev_stream):
178 super().__init__(prev_stream, lzma.LZMACompressor(format=lzma.FORMAT_ALONE))
181class XzDecompressorStream(DecompStream):
182 """
183 decompressor stream for .xz or .lzma format
184 """
186 def __init__(self, prev_stream):
187 super().__init__(prev_stream, lzma.LZMADecompressor(format=lzma.FORMAT_AUTO))
190LzmaDecompressorStream = XzDecompressorStream
193class Bz2CompressorStream(ComprFlushStream):
194 """
195 compressor stream for .bz2 format
196 """
198 def __init__(self, prev_stream):
199 super().__init__(prev_stream, bz2.BZ2Compressor())
202class Bz2DecompressorStream(DecompStream):
203 """
204 decompressor stream for .bz2 format
205 """
207 def __init__(self, prev_stream):
208 super().__init__(prev_stream, bz2.BZ2Decompressor())
211class GzipCompressorStream(SimpleFilterStream):
212 """
213 compressor stream for .gz format
214 """
216 def __init__(self, prev_stream):
217 super().__init__(prev_stream, gzip.compress)
220class GzipDecompressorStream(SimpleFilterStream):
221 """
222 decompressor stream for .gz format
223 """
225 def __init__(self, prev_stream):
226 super().__init__(prev_stream, gzip.decompress)
229stream_map: dict[str, tuple[str, type[Stream], type[Stream]]] = {
230 "pass": ("", Stream, Stream),
231 "gzip": (".gz", GzipCompressorStream, GzipDecompressorStream),
232 "bzip2": (".bz2", Bz2CompressorStream, Bz2DecompressorStream),
233 "xz": (".xz", XzCompressorStream, XzDecompressorStream),
234 "lzma": (".lzma", LzmaCompressorStream, LzmaDecompressorStream),
235}
238try:
239 try:
240 from compression import zstd
241 except ImportError:
242 import zstd
244 class ZstdCompressorStream(SimpleFilterStream):
245 def __init__(self, prev_stream):
246 super().__init__(prev_stream, zstd.compress)
248 class ZstdDecompressorStream(SimpleFilterStream):
249 def __init__(self, prev_stream):
250 super().__init__(prev_stream, zstd.decompress)
252 stream_map["zstd"] = (".zstd", ZstdCompressorStream, ZstdDecompressorStream)
254except ImportError:
255 pass
257try:
258 import lz4.frame
260 # lz4.frame.LZ4FrameCompressor does not work?
261 class Lz4CompressorStream(SimpleFilterStream):
262 def __init__(self, prev_stream):
263 super().__init__(prev_stream, lz4.frame.compress)
265 # lz4.frame.LZ4FrameDecompressor does not work?
266 class Lz4DecompressorStream(SimpleFilterStream):
267 def __init__(self, prev_stream):
268 super().__init__(prev_stream, lz4.frame.decompress)
270 stream_map["lz4"] = (".lz4", Lz4CompressorStream, Lz4DecompressorStream)
272except ImportError:
273 pass
275try:
276 import brotli
278 class BrotliCompressorStream(Stream):
279 def __init__(self, prev_stream):
280 super().__init__(prev_stream)
281 self.compr = brotli.Compressor()
283 def gen(self):
284 for i in self.prev.gen():
285 yield self.compr.process(i)
286 yield self.compr.flush()
288 class BrotliDecompressorStream(Stream):
289 def __init__(self, prev_stream):
290 super().__init__(prev_stream)
291 self.decompr = brotli.Decompressor()
293 def gen(self):
294 for i in self.prev.gen():
295 yield self.decompr.process(i)
297 stream_map["brotli"] = (".br", BrotliCompressorStream, BrotliDecompressorStream)
299except ImportError:
300 pass
302try:
303 import liblzfse
305 class LzfseCompressorStream(SimpleFilterStream):
306 def __init__(self, prev_stream):
307 super().__init__(prev_stream, liblzfse.compress)
309 class LzfseDecompressorStream(SimpleFilterStream):
310 def __init__(self, prev_stream):
311 super().__init__(prev_stream, liblzfse.decompress)
313 stream_map["lzfse"] = (".lzfse", LzfseCompressorStream, LzfseDecompressorStream)
315except ImportError:
316 pass
318try:
319 import snappy # type: ignore
321 class SnappyCompressorStream(ComprFlushStream):
322 def __init__(self, prev_stream):
323 super().__init__(prev_stream, snappy.StreamCompressor())
325 class SnappyDecompressorStream(DecompStream):
326 def __init__(self, prev_stream):
327 super().__init__(prev_stream, snappy.StreamDecompressor())
329 # does not work?
330 # stream_map["snappy"] = (".snappy", SnappyCompressorStream, SnappyDecompressorStream)
331except ImportError:
332 pass
334try:
335 import lzo # type: ignore
337 class LzoCompressorStream(SimpleFilterStream):
338 def __init__(self, prev_stream):
339 super().__init__(prev_stream, lzo.compress)
341 class LzoDecompressorStream(SimpleFilterStream):
342 def __init__(self, prev_stream):
343 super().__init__(prev_stream, lzo.decompress)
345 stream_map["lzo"] = (".lzo", LzoCompressorStream, LzoDecompressorStream)
346except ImportError:
347 pass
349try:
350 import zpaq # type: ignore
352 class ZpaqCompressorStream(SimpleFilterStream):
353 def __init__(self, prev_stream):
354 super().__init__(prev_stream, zpaq.compress)
356 class ZpaqDecompressorStream(SimpleFilterStream):
357 def __init__(self, prev_stream):
358 super().__init__(prev_stream, zpaq.decompress)
360 stream_map["zpaq"] = (".zpaq", ZpaqCompressorStream, ZpaqDecompressorStream)
362except ImportError:
363 pass
365try:
366 import zopfli.gzip
368 class ZopfliCompressorStream(SimpleFilterStream):
369 def __init__(self, prev_stream):
370 super().__init__(prev_stream, zopfli.gzip.compress)
372 stream_map["zopfli"] = ("", ZopfliCompressorStream, GzipDecompressorStream)
373except ImportError:
374 pass
377try:
378 import zlib_ng.gzip_ng # type: ignore
380 class ZlibNgCompressorStream(SimpleFilterStream):
381 def __init__(self, prev_stream):
382 super().__init__(prev_stream, zlib_ng.gzip_ng.compress)
384 class ZlibNgDecompressorStream(SimpleFilterStream):
385 def __init__(self, prev_stream):
386 super().__init__(prev_stream, zlib_ng.gzip_ng.decompress)
388 stream_map["zlib-ng"] = ("", ZlibNgCompressorStream, ZlibNgDecompressorStream)
389except ImportError:
390 pass
392stream_ext: dict[str, tuple[str, type[Stream], type[Stream]]] = {
393 v[0]: (k, *v[1:]) for k, v in stream_map.items()
394}
395stream_compress_modes = list(stream_map.keys()) + ["decompress", "raw"]
398def auto_compress_stream(
399 ifname: pathlib.Path, mode: str, ifp: Optional[Stream] = None
400) -> tuple[os.PathLike, Stream]:
401 if ifp is None:
402 ifp = FileReadStream(ifname.open("br"))
403 if mode == "raw":
404 return ifname, ifp
405 base, ext = os.path.splitext(str(ifname))
406 # decompress
407 res: Stream = ifp
408 if ext in stream_ext:
409 imode, _, dst = stream_ext[ext]
410 if imode == mode:
411 return ifname, res
412 _log.debug("input mode: %s", imode)
413 res = dst(res)
414 else:
415 base = str(ifname)
416 if mode == "decompress":
417 return pathlib.Path(base), res
418 # compress
419 if mode in stream_map:
420 ext, cst, _ = stream_map[mode]
421 res = cst(res)
422 base = base + ext
423 return pathlib.Path(base), res