Coverage for log2s3 / processor.py: 72%
186 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-20 04:42 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-20 04:42 +0000
1import os
2import pathlib
3import time
4import shutil
5from click.core import UNSET
6from logging import getLogger
7from typing import Optional, Sequence
8from abc import ABC, abstractmethod
9from .compr_stream import auto_compress_stream, FileWriteStream, S3PutStream
10import pytimeparse
11import humanfriendly
12import datetime
14try:
15 from mypy_boto3_s3.client import S3Client as S3ClientType
16except ImportError:
17 from typing import Any as S3ClientType
20_log = getLogger(__name__)
23class FileProcessor(ABC):
24 def __init__(self, config: dict = {}):
25 self.config = {k: v for k, v in config.items() if v is not None}
26 self.processed = 0
27 self.skipped = 0
29 def check_date_range(self, mtime: float) -> bool:
30 if "older" in self.config and self.config["older"] != UNSET:
31 older = pytimeparse.parse(self.config["older"])
32 if older is not None and mtime > time.time() - older:
33 return False
34 if "newer" in self.config and self.config["newer"] != UNSET:
35 newer = pytimeparse.parse(self.config["newer"])
36 if newer is not None and mtime < time.time() - newer:
37 return False
38 if "date" in self.config and self.config["date"] != UNSET:
39 mtime_datetime = datetime.datetime.fromtimestamp(mtime)
40 if ".." in self.config["date"]:
41 fromdate, todate = [datetime.datetime.fromisoformat(x) for x in self.config["date"].split("..", 1)]
42 if not fromdate <= mtime_datetime < todate:
43 return False
44 else:
45 fromdate = datetime.datetime.fromisoformat(self.config["date"])
46 todate = fromdate + datetime.timedelta(days=1)
47 if not fromdate <= mtime_datetime < todate:
48 return False
49 return True
51 def check_size_range(self, size: int) -> bool:
52 if "smaller" in self.config and self.config["smaller"] != UNSET:
53 smaller = humanfriendly.parse_size(self.config["smaller"], True)
54 if size > smaller:
55 return False
56 if "bigger" in self.config and self.config["bigger"] != UNSET:
57 bigger = humanfriendly.parse_size(self.config["bigger"], True)
58 if size < bigger:
59 return False
60 return True
62 def check_name(self, fname: pathlib.Path) -> bool:
63 if "suffix" in self.config and self.config["suffix"] != UNSET:
64 if not str(fname).endswith(self.config["suffix"]): 64 ↛ 65line 64 didn't jump to line 65 because the condition on line 64 was never true
65 return False
66 if "prefix" in self.config and self.config["prefix"] != UNSET: 66 ↛ 67line 66 didn't jump to line 67 because the condition on line 66 was never true
67 if not str(fname).startswith(self.config["prefix"]):
68 return False
69 if "glob" in self.config and self.config["glob"] != UNSET: 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true
70 if not fname.match(self.config["glob"]):
71 return False
72 if "iglob" in self.config and self.config["iglob"] != UNSET: 72 ↛ 73line 72 didn't jump to line 73 because the condition on line 72 was never true
73 if not fname.match(self.config["iglob"], case_sensitive=False):
74 return False
75 return True
77 def check(self, fname: pathlib.Path, stat: Optional[os.stat_result]) -> bool:
78 if stat is None: 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true
79 stat = fname.stat()
80 res = self.check_date_range(stat.st_mtime) and self.check_size_range(stat.st_size) and self.check_name(fname)
81 if res:
82 self.processed += 1
83 else:
84 self.skipped += 1
85 return res
87 @abstractmethod
88 def process(self, fname: pathlib.Path, stat: Optional[os.stat_result]) -> bool:
89 raise NotImplementedError()
92class DebugProcessor(FileProcessor):
93 def check(self, fname: pathlib.Path, stat: Optional[os.stat_result]) -> bool:
94 res = super().check(fname, stat)
95 _log.debug("debug: fname=%s, stat=%s -> %s / %s", fname, stat, res, self.config)
96 return res
98 def process(self, fname: pathlib.Path, stat: Optional[os.stat_result]) -> bool:
99 _log.info("debug: fname=%s, stat=%s", fname, stat)
100 return False
103class ListProcessor(FileProcessor):
104 def __init__(self, config: dict = {}):
105 super().__init__(config)
106 self.output: list[tuple[pathlib.Path, Optional[os.stat_result]]] = []
108 def process(self, fname: pathlib.Path, stat: Optional[os.stat_result]) -> bool:
109 self.output.append((fname, stat))
110 return False
113class DelProcessor(FileProcessor):
114 def process(self, fname: pathlib.Path, stat: Optional[os.stat_result]) -> bool:
115 if self.config.get("dry", False):
116 _log.info("(dry) delete fname=%s, stat=%s", fname, stat)
117 else:
118 _log.info("(wet) delete fname=%s, stat=%s", fname, stat)
119 fname.unlink()
120 return True
123class CompressProcessor(FileProcessor):
124 def __init__(self, *args, **kwargs):
125 super().__init__(*args, **kwargs)
126 self.before_total = 0
127 self.after_total = 0
129 def process(self, fname: pathlib.Path, stat: Optional[os.stat_result]) -> bool:
130 compressor = self.config.get("compress", "gzip")
131 newname, data = auto_compress_stream(fname, compressor)
132 newpath = pathlib.Path(newname)
133 if newpath == fname: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true
134 _log.debug("unchanged: fname=%s, stat=%s", fname, stat)
135 self.skipped += 1
136 self.processed -= 1
137 return False
138 pfx = os.path.commonprefix([fname, newpath])
139 if isinstance(stat, os.stat_result): 139 ↛ 142line 139 didn't jump to line 142 because the condition on line 139 was always true
140 before_sz = stat.st_size
141 else:
142 before_sz = 0
143 if self.config.get("dry", False):
144 out_length = sum([len(x) for x in data.gen()])
145 self.before_total += before_sz
146 self.after_total += out_length
147 _log.info(
148 "(dry) compress fname=%s{%s->%s}, size=%s->%s",
149 pfx,
150 str(fname)[len(pfx) :],
151 str(newpath)[len(pfx) :],
152 before_sz,
153 out_length,
154 )
155 else:
156 with newpath.open("wb") as ofp:
157 wrs = FileWriteStream(data, ofp)
158 for _ in wrs.gen():
159 pass
160 out_length = newpath.stat().st_size
161 self.before_total += before_sz
162 self.after_total += out_length
163 _log.info(
164 "(wet) compress fname=%s{%s->%s}, size=%s->%s",
165 pfx,
166 str(fname)[len(pfx) :],
167 str(newpath)[len(pfx) :],
168 before_sz,
169 out_length,
170 )
171 shutil.copystat(fname, newpath, follow_symlinks=False)
172 fname.unlink()
173 return True
176class S3Processor(FileProcessor):
177 def __init__(self, config):
178 super().__init__(config)
179 self.s3: S3ClientType = config.get("s3")
180 self.prefix = config.get("s3_prefix", "")
181 self.bucket: str = config["s3_bucket"]
182 self.skip_names = config.get("skip_names", [])
183 self.top = config["top"]
184 self.uploaded = 0
186 def process(self, fname: pathlib.Path, stat: Optional[os.stat_result]) -> bool:
187 compressor = self.config.get("compress", "gzip")
188 newname, data = auto_compress_stream(fname, compressor)
189 newpath = pathlib.Path(newname)
190 base_name = newpath.relative_to(self.top)
191 base_from = fname.relative_to(self.top)
192 obj_name = self.prefix + str(base_name)
193 if obj_name in self.skip_names:
194 _log.debug("already exists: %s", obj_name)
195 return True
196 common_name = os.path.commonprefix([str(base_name), str(base_from)])
197 rest1 = str(base_from)[len(common_name) :]
198 rest2 = str(base_name)[len(common_name) :]
199 reststr = ""
200 if rest1 != rest2:
201 reststr = "{%s,%s}" % (rest1, rest2)
202 if isinstance(stat, os.stat_result):
203 before_sz = stat.st_size
204 else:
205 before_sz = 0
206 if self.config.get("dry", False):
207 out_length = sum([len(x) for x in data.gen()])
208 self.uploaded += out_length
209 _log.info(
210 "(dry) upload {%s,%s}%s%s (%d->%d)",
211 self.top,
212 self.prefix,
213 common_name,
214 reststr,
215 before_sz,
216 out_length,
217 )
218 else:
219 outstr = S3PutStream(data, self.s3, bucket=self.bucket, key=obj_name)
220 for _ in outstr.gen():
221 pass
222 res = self.s3.head_object(Bucket=self.bucket, Key=obj_name)
223 out_length = res.get("ContentLength", 0)
224 self.uploaded += out_length
225 _log.info(
226 "(wet) upload {%s,%s}%s%s (%d->%d)",
227 self.top,
228 self.prefix,
229 common_name,
230 reststr,
231 before_sz,
232 out_length,
233 )
234 return False
237def process_walk(top: pathlib.Path, processors: Sequence[FileProcessor]):
238 for root, dirs, files in os.walk(top):
239 for f in files:
240 p = pathlib.Path(root, f)
241 st = p.stat(follow_symlinks=False)
242 for proc in processors:
243 chk = proc.check(p, st)
244 _log.debug("check %s(%s) -> %s", proc.__class__.__name__, p, chk)
245 if chk:
246 res = proc.process(p, st)
247 _log.debug("process %s(%s) -> %s", proc.__class__.__name__, p, chk)
248 if res:
249 break