Coverage for log2s3 / processor.py: 72%

186 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-20 04:42 +0000

1import os 

2import pathlib 

3import time 

4import shutil 

5from click.core import UNSET 

6from logging import getLogger 

7from typing import Optional, Sequence 

8from abc import ABC, abstractmethod 

9from .compr_stream import auto_compress_stream, FileWriteStream, S3PutStream 

10import pytimeparse 

11import humanfriendly 

12import datetime 

13 

14try: 

15 from mypy_boto3_s3.client import S3Client as S3ClientType 

16except ImportError: 

17 from typing import Any as S3ClientType 

18 

19 

20_log = getLogger(__name__) 

21 

22 

23class FileProcessor(ABC): 

24 def __init__(self, config: dict = {}): 

25 self.config = {k: v for k, v in config.items() if v is not None} 

26 self.processed = 0 

27 self.skipped = 0 

28 

29 def check_date_range(self, mtime: float) -> bool: 

30 if "older" in self.config and self.config["older"] != UNSET: 

31 older = pytimeparse.parse(self.config["older"]) 

32 if older is not None and mtime > time.time() - older: 

33 return False 

34 if "newer" in self.config and self.config["newer"] != UNSET: 

35 newer = pytimeparse.parse(self.config["newer"]) 

36 if newer is not None and mtime < time.time() - newer: 

37 return False 

38 if "date" in self.config and self.config["date"] != UNSET: 

39 mtime_datetime = datetime.datetime.fromtimestamp(mtime) 

40 if ".." in self.config["date"]: 

41 fromdate, todate = [datetime.datetime.fromisoformat(x) for x in self.config["date"].split("..", 1)] 

42 if not fromdate <= mtime_datetime < todate: 

43 return False 

44 else: 

45 fromdate = datetime.datetime.fromisoformat(self.config["date"]) 

46 todate = fromdate + datetime.timedelta(days=1) 

47 if not fromdate <= mtime_datetime < todate: 

48 return False 

49 return True 

50 

51 def check_size_range(self, size: int) -> bool: 

52 if "smaller" in self.config and self.config["smaller"] != UNSET: 

53 smaller = humanfriendly.parse_size(self.config["smaller"], True) 

54 if size > smaller: 

55 return False 

56 if "bigger" in self.config and self.config["bigger"] != UNSET: 

57 bigger = humanfriendly.parse_size(self.config["bigger"], True) 

58 if size < bigger: 

59 return False 

60 return True 

61 

62 def check_name(self, fname: pathlib.Path) -> bool: 

63 if "suffix" in self.config and self.config["suffix"] != UNSET: 

64 if not str(fname).endswith(self.config["suffix"]): 64 ↛ 65line 64 didn't jump to line 65 because the condition on line 64 was never true

65 return False 

66 if "prefix" in self.config and self.config["prefix"] != UNSET: 66 ↛ 67line 66 didn't jump to line 67 because the condition on line 66 was never true

67 if not str(fname).startswith(self.config["prefix"]): 

68 return False 

69 if "glob" in self.config and self.config["glob"] != UNSET: 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true

70 if not fname.match(self.config["glob"]): 

71 return False 

72 if "iglob" in self.config and self.config["iglob"] != UNSET: 72 ↛ 73line 72 didn't jump to line 73 because the condition on line 72 was never true

73 if not fname.match(self.config["iglob"], case_sensitive=False): 

74 return False 

75 return True 

76 

77 def check(self, fname: pathlib.Path, stat: Optional[os.stat_result]) -> bool: 

78 if stat is None: 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true

79 stat = fname.stat() 

80 res = self.check_date_range(stat.st_mtime) and self.check_size_range(stat.st_size) and self.check_name(fname) 

81 if res: 

82 self.processed += 1 

83 else: 

84 self.skipped += 1 

85 return res 

86 

87 @abstractmethod 

88 def process(self, fname: pathlib.Path, stat: Optional[os.stat_result]) -> bool: 

89 raise NotImplementedError() 

90 

91 

92class DebugProcessor(FileProcessor): 

93 def check(self, fname: pathlib.Path, stat: Optional[os.stat_result]) -> bool: 

94 res = super().check(fname, stat) 

95 _log.debug("debug: fname=%s, stat=%s -> %s / %s", fname, stat, res, self.config) 

96 return res 

97 

98 def process(self, fname: pathlib.Path, stat: Optional[os.stat_result]) -> bool: 

99 _log.info("debug: fname=%s, stat=%s", fname, stat) 

100 return False 

101 

102 

103class ListProcessor(FileProcessor): 

104 def __init__(self, config: dict = {}): 

105 super().__init__(config) 

106 self.output: list[tuple[pathlib.Path, Optional[os.stat_result]]] = [] 

107 

108 def process(self, fname: pathlib.Path, stat: Optional[os.stat_result]) -> bool: 

109 self.output.append((fname, stat)) 

110 return False 

111 

112 

113class DelProcessor(FileProcessor): 

114 def process(self, fname: pathlib.Path, stat: Optional[os.stat_result]) -> bool: 

115 if self.config.get("dry", False): 

116 _log.info("(dry) delete fname=%s, stat=%s", fname, stat) 

117 else: 

118 _log.info("(wet) delete fname=%s, stat=%s", fname, stat) 

119 fname.unlink() 

120 return True 

121 

122 

123class CompressProcessor(FileProcessor): 

124 def __init__(self, *args, **kwargs): 

125 super().__init__(*args, **kwargs) 

126 self.before_total = 0 

127 self.after_total = 0 

128 

129 def process(self, fname: pathlib.Path, stat: Optional[os.stat_result]) -> bool: 

130 compressor = self.config.get("compress", "gzip") 

131 newname, data = auto_compress_stream(fname, compressor) 

132 newpath = pathlib.Path(newname) 

133 if newpath == fname: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true

134 _log.debug("unchanged: fname=%s, stat=%s", fname, stat) 

135 self.skipped += 1 

136 self.processed -= 1 

137 return False 

138 pfx = os.path.commonprefix([fname, newpath]) 

139 if isinstance(stat, os.stat_result): 139 ↛ 142line 139 didn't jump to line 142 because the condition on line 139 was always true

140 before_sz = stat.st_size 

141 else: 

142 before_sz = 0 

143 if self.config.get("dry", False): 

144 out_length = sum([len(x) for x in data.gen()]) 

145 self.before_total += before_sz 

146 self.after_total += out_length 

147 _log.info( 

148 "(dry) compress fname=%s{%s->%s}, size=%s->%s", 

149 pfx, 

150 str(fname)[len(pfx) :], 

151 str(newpath)[len(pfx) :], 

152 before_sz, 

153 out_length, 

154 ) 

155 else: 

156 with newpath.open("wb") as ofp: 

157 wrs = FileWriteStream(data, ofp) 

158 for _ in wrs.gen(): 

159 pass 

160 out_length = newpath.stat().st_size 

161 self.before_total += before_sz 

162 self.after_total += out_length 

163 _log.info( 

164 "(wet) compress fname=%s{%s->%s}, size=%s->%s", 

165 pfx, 

166 str(fname)[len(pfx) :], 

167 str(newpath)[len(pfx) :], 

168 before_sz, 

169 out_length, 

170 ) 

171 shutil.copystat(fname, newpath, follow_symlinks=False) 

172 fname.unlink() 

173 return True 

174 

175 

176class S3Processor(FileProcessor): 

177 def __init__(self, config): 

178 super().__init__(config) 

179 self.s3: S3ClientType = config.get("s3") 

180 self.prefix = config.get("s3_prefix", "") 

181 self.bucket: str = config["s3_bucket"] 

182 self.skip_names = config.get("skip_names", []) 

183 self.top = config["top"] 

184 self.uploaded = 0 

185 

186 def process(self, fname: pathlib.Path, stat: Optional[os.stat_result]) -> bool: 

187 compressor = self.config.get("compress", "gzip") 

188 newname, data = auto_compress_stream(fname, compressor) 

189 newpath = pathlib.Path(newname) 

190 base_name = newpath.relative_to(self.top) 

191 base_from = fname.relative_to(self.top) 

192 obj_name = self.prefix + str(base_name) 

193 if obj_name in self.skip_names: 

194 _log.debug("already exists: %s", obj_name) 

195 return True 

196 common_name = os.path.commonprefix([str(base_name), str(base_from)]) 

197 rest1 = str(base_from)[len(common_name) :] 

198 rest2 = str(base_name)[len(common_name) :] 

199 reststr = "" 

200 if rest1 != rest2: 

201 reststr = "{%s,%s}" % (rest1, rest2) 

202 if isinstance(stat, os.stat_result): 

203 before_sz = stat.st_size 

204 else: 

205 before_sz = 0 

206 if self.config.get("dry", False): 

207 out_length = sum([len(x) for x in data.gen()]) 

208 self.uploaded += out_length 

209 _log.info( 

210 "(dry) upload {%s,%s}%s%s (%d->%d)", 

211 self.top, 

212 self.prefix, 

213 common_name, 

214 reststr, 

215 before_sz, 

216 out_length, 

217 ) 

218 else: 

219 outstr = S3PutStream(data, self.s3, bucket=self.bucket, key=obj_name) 

220 for _ in outstr.gen(): 

221 pass 

222 res = self.s3.head_object(Bucket=self.bucket, Key=obj_name) 

223 out_length = res.get("ContentLength", 0) 

224 self.uploaded += out_length 

225 _log.info( 

226 "(wet) upload {%s,%s}%s%s (%d->%d)", 

227 self.top, 

228 self.prefix, 

229 common_name, 

230 reststr, 

231 before_sz, 

232 out_length, 

233 ) 

234 return False 

235 

236 

237def process_walk(top: pathlib.Path, processors: Sequence[FileProcessor]): 

238 for root, dirs, files in os.walk(top): 

239 for f in files: 

240 p = pathlib.Path(root, f) 

241 st = p.stat(follow_symlinks=False) 

242 for proc in processors: 

243 chk = proc.check(p, st) 

244 _log.debug("check %s(%s) -> %s", proc.__class__.__name__, p, chk) 

245 if chk: 

246 res = proc.process(p, st) 

247 _log.debug("process %s(%s) -> %s", proc.__class__.__name__, p, chk) 

248 if res: 

249 break