Coverage for log2s3/app.py: 91%

253 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-29 12:02 +0000

1import datetime 

2import html 

3import io 

4from typing import Any 

5from pathlib import Path 

6from fastapi import APIRouter, HTTPException, Response, Header, Query 

7from fastapi.responses import StreamingResponse 

8from .common_stream import Stream, MergeStream, CatStream 

9from .compr_stream import auto_compress_stream, stream_ext 

10from logging import getLogger 

11 

12router = APIRouter() 

13_log = getLogger(__name__) 

14api_config: dict[str, Any] = { 

15 "weekday_colors": { 

16 5: "lightyellow", # sat 

17 6: "lightcyan", # sun 

18 }, 

19 "today_color": "yellow", 

20} 

21exts = set(stream_ext.keys()) 

22month_query = Query(pattern="(^[0-9]{4}|^$)", default="") 

23 

24 

25def update_config(conf: dict): 

26 api_config.update(conf) 

27 

28 

29def uri2file(file_path: str) -> Path: 

30 working_dir = Path(api_config.get("working_dir", ".")) 

31 target = (working_dir / file_path).resolve() 

32 if working_dir.resolve().absolute() not in target.resolve().absolute().parents: 

33 if not (target.exists() and target.samefile(working_dir)): 

34 _log.warning( 

35 "out of path: wdir=%s, target=%s", working_dir, target.resolve() 

36 ) 

37 raise HTTPException(status_code=403, detail=f"cannot access to {file_path}") 

38 return target 

39 

40 

41def file2uri(path: Path) -> str: 

42 working_dir = Path(api_config.get("working_dir", ".")) 

43 return str(path.relative_to(working_dir.resolve())) 

44 

45 

46def uriescape(uri: str, quote: bool = True) -> str: 

47 return html.escape(str(Path(api_config.get("prefix", "/")) / uri), quote) 

48 

49 

50@router.get("/config") 

51def read_config() -> dict: 

52 return api_config 

53 

54 

55@router.get("/read/{file_path:path}") 

56def read_file(response: Response, file_path: str, accept_encoding: str = Header("")): 

57 target = uri2file(file_path) 

58 accepts = [x.strip() for x in accept_encoding.split(",")] 

59 media_type = api_config.get("content-type", "text/plain") 

60 # gzip or brotli passthrough case 

61 special = { 

62 "br": (".br",), 

63 "gzip": (".gz",), 

64 } 

65 for acc, exts in special.items(): 

66 if acc in accepts: 

67 for ext in exts: 

68 if target.with_suffix(target.suffix + ext).is_file(): 

69 response.headers["content-encoding"] = acc 

70 _log.info( 

71 "compressed %s: %s", 

72 acc, 

73 target.with_suffix(target.suffix + ext), 

74 ) 

75 return StreamingResponse( 

76 content=target.with_suffix(target.suffix + ext).open("rb"), 

77 media_type=media_type, 

78 ) 

79 # uncompressed case 

80 if target.is_file(): 

81 _log.info("raw %s: %s", acc, target) 

82 return StreamingResponse(content=target.open("rb"), media_type=media_type) 

83 # other type case (directory, etc...) 

84 if target.exists(): 

85 raise HTTPException(status_code=403, detail=f"cannot access to {file_path}") 

86 # compressed case 

87 target_compressed = [ 

88 x 

89 for x in target.parent.iterdir() 

90 if x.is_file() and x.name.startswith(target.name + ".") 

91 ] 

92 for p in target_compressed: 

93 _, stream = auto_compress_stream(p, "decompress") 

94 _log.info("auto decompress %s: %s", acc, p) 

95 return StreamingResponse(content=stream.gen(), media_type=media_type) 

96 raise HTTPException(status_code=404, detail=f"not found: {file_path}") 

97 

98 

99def reg_file(res: dict, p: Path): 

100 if p.suffix in exts: 

101 val = p.with_suffix("") 

102 else: 

103 val = p 

104 name = p.name 

105 try: 

106 dt = datetime.datetime.strptime(name.split(".")[0], "%Y-%m-%d") 

107 except ValueError: 

108 return 

109 k2 = dt.strftime("%Y-%m-%d") 

110 k1 = file2uri(p.parent) 

111 v1 = file2uri(val) 

112 try: 

113 # check k1 and v1 are in working_dir tree 

114 uri2file(k1) 

115 uri2file(v1) 

116 except HTTPException: 

117 return 

118 if k1 not in res: 

119 res[k1] = {} 

120 if k2 not in res[k1]: 120 ↛ exitline 120 didn't return from function 'reg_file' because the condition on line 120 was always true

121 res[k1][k2] = v1 

122 

123 

124def list_dir(file_path: str, file_prefix: str = "") -> dict[str, dict[str, str]]: 

125 res = {} 

126 

127 target = uri2file(file_path) 

128 if target.is_dir(): 

129 targets = [target] 

130 else: 

131 targets = [x for x in target.parent.iterdir() if x.name.startswith(target.name)] 

132 

133 for target in targets: 

134 if target.is_file(): 

135 reg_file(res, target) 

136 elif target.is_dir(): 136 ↛ 133line 136 didn't jump to line 133 because the condition on line 136 was always true

137 for root, _, filenames in target.walk(): 

138 root = Path(root) 

139 files = [ 

140 root / x 

141 for x in filenames 

142 if Path(x).suffix in (exts | {".log", ".txt"}) 

143 ] 

144 files = [x for x in files if x.name.startswith(file_prefix)] 

145 for x in files: 

146 reg_file(res, x) 

147 _log.debug("list_dir: keys=%s", res.keys()) 

148 return res 

149 

150 

151@router.get("/dirs") 

152def get_dirs(month=month_query) -> list[str]: 

153 return list(list_dir(".", month).keys()) 

154 

155 

156@router.get("/list/{file_path:path}") 

157def list_raw(file_path: str, month=month_query): 

158 return list_dir(file_path, month) 

159 

160 

161@router.get("/html1/{file_path:path}") 

162def html1(file_path: str, month=month_query): 

163 def gen(ldir: dict[str, dict[str, str]]): 

164 yield f"<html><title>{file_path}</title><body>" 

165 for title, files in ldir.items(): 

166 buf = io.StringIO() 

167 uri = uriescape(f"html1/{title}") 

168 buf.write( 

169 '<div style="border: 1px solid black; float: left; margin: 10px; padding: 1em;">' 

170 ) 

171 buf.write(f'<h2><a href="{uri}">{title}</a></h2><ul>') 

172 premonth = None 

173 for dtstr in sorted(files.keys()): 

174 dt = datetime.datetime.strptime(dtstr, "%Y-%m-%d") 

175 month = dt.strftime("%Y-%m") 

176 if premonth != month: 

177 if premonth is not None: 

178 buf.write("</li>") 

179 buf.write(f"<li>{month}: ") 

180 premonth = month 

181 link = files[dtstr] 

182 uri = uriescape(f"read/{link}") 

183 linkhtml = f'<a href="{uri}">{dt.strftime("%d")}</a>' 

184 color = api_config.get("weekday_colors", {}).get(dt.weekday()) 

185 if color is not None: 

186 buf.write( 

187 f' <span style="background-color: {color};">{linkhtml}</span>' 

188 ) 

189 else: 

190 buf.write(f" {linkhtml}") 

191 buf.write("</li></ul>") 

192 buf.write("</div>") 

193 yield buf.getvalue() 

194 yield "</body></html>" 

195 

196 ldir = list_dir(file_path, month) 

197 if len(ldir) == 0: 

198 raise HTTPException(status_code=404, detail=f"not found: {file_path}") 

199 return StreamingResponse(content=gen(ldir), media_type="text/html") 

200 

201 

202def html2_gen1(uri: str, month: str, files: dict[str, str]) -> str: 

203 dt = datetime.datetime.strptime(month, "%Y-%m").date() 

204 buf = io.StringIO() 

205 buf.write( 

206 f'<tr><th colspan="7"><a href="{uri}?month={month}">{month}</a></th></tr>' 

207 ) 

208 wday = (dt.weekday() + 1) % 7 

209 buf.write('<tr align="right">') 

210 if wday != 0: 210 ↛ 212line 210 didn't jump to line 212 because the condition on line 210 was always true

211 buf.write(f'<td colspan="{wday}"></td>') 

212 for i in range(32): 212 ↛ 238line 212 didn't jump to line 238 because the loop on line 212 didn't complete

213 cdt = dt + datetime.timedelta(days=i) 

214 wday = (cdt.weekday() + 1) % 7 

215 if cdt.month != dt.month: 

216 if wday != 0: 216 ↛ 218line 216 didn't jump to line 218 because the condition on line 216 was always true

217 buf.write(f'<td colspan="{7 - wday}"></td>') 

218 buf.write("</tr>") 

219 break 

220 if wday == 0: 

221 buf.write('</tr><tr align="right">') 

222 dtstr = cdt.strftime("%Y-%m-%d") 

223 if cdt == datetime.date.today(): 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true

224 color = api_config.get("today_color") 

225 else: 

226 color = api_config.get("weekday_colors", {}).get(cdt.weekday()) 

227 if color is None: 

228 buf.write("<td>") 

229 else: 

230 buf.write(f'<td style="background-color: {color};">') 

231 if dtstr in files: 

232 link = files[dtstr] 

233 uri = uriescape(f"read/{link}") 

234 buf.write(f'<a href="{uri}">{cdt.day}</a>') 

235 else: 

236 buf.write(f"{cdt.day}") 

237 buf.write("</td>") 

238 buf.write("</tr>") 

239 return buf.getvalue() 

240 

241 

242def html2_gen(ldir: dict[str, dict[str, str]], file_path: str): 

243 buf = io.StringIO() 

244 buf.write(f"<html><title>{file_path}</title><body>") 

245 thismonth = datetime.date.today().strftime("%Y-%m") 

246 buf.write(f'<p><a href="?month={thismonth}">this month</a></p>') 

247 for title, files in ldir.items(): 

248 uri = uriescape(f"html2/{title}") 

249 buf.write('<div style="float: left; margin: 1em;">') 

250 buf.write(f'<h2><a href="{uri}">{title}</a></h2>') 

251 buf.write('<table border="1" style="border-collapse: collapse"><tr>') 

252 b = datetime.date(2000, 1, 2) 

253 for i in range(7): 

254 wd = b + datetime.timedelta(days=i) 

255 wdstr = wd.strftime("%a") 

256 color = api_config.get("weekday_colors", {}).get(wd.weekday()) 

257 if color: 

258 buf.write( 

259 f'<th style="background-color: {color};"><code>{wdstr}</code></th>' 

260 ) 

261 else: 

262 buf.write(f"<th><code>{wdstr}</code></th>") 

263 buf.write("</tr>") 

264 months = {x.rsplit("-", 1)[0] for x in files.keys()} 

265 for month in sorted(months): 

266 buf.write(html2_gen1(uri, month, files)) 

267 buf.write("</table></div>") 

268 yield buf.getvalue() 

269 buf.truncate(0) 

270 buf.seek(0) 

271 yield "</body></html>" 

272 

273 

274@router.get("/html2/{file_path:path}") 

275def html2(file_path: str, month=month_query): 

276 ldir = list_dir(file_path, month) 

277 if len(ldir) == 0: 

278 raise HTTPException(status_code=404, detail=f"not found: {file_path}") 

279 return StreamingResponse(content=html2_gen(ldir, file_path), media_type="text/html") 

280 

281 

282def find_target(p: Path, accepts: list[str]) -> Path: 

283 # gzip pass through 

284 if "gzip" in accepts: 284 ↛ 285line 284 didn't jump to line 285 because the condition on line 284 was never true

285 if p.with_suffix(p.suffix + ".gz").is_file(): 

286 return p.with_suffix(p.suffix + ".gz") 

287 # raw pass through 

288 if p.is_file(): 288 ↛ 291line 288 didn't jump to line 291 because the condition on line 288 was always true

289 return p 

290 # others 

291 if "br" in accepts: 

292 if p.with_suffix(p.suffix + ".br").exists(): 

293 return p.with_suffix(p.suffix + ".br") 

294 # compressed case 

295 target_compressed = [ 

296 x for x in p.parent.iterdir() if x.is_file() and x.name.startswith(p.name + ".") 

297 ] 

298 if len(target_compressed): 

299 return target_compressed[0] 

300 raise HTTPException(status_code=404, detail=f"not found: {p}") 

301 

302 

303def get_streams( 

304 files: dict[str, dict[str, str]], accepts: list[str] 

305) -> tuple[list[Stream], dict]: 

306 outputs: dict[str, list[str]] = {} 

307 for _, v in files.items(): 

308 for k, fn in v.items(): 

309 if k not in outputs: 

310 outputs[k] = [] 

311 outputs[k].append(fn) 

312 output_list: list[Path] = [] 

313 for k in sorted(outputs.keys()): 

314 for fname in sorted(outputs[k]): 

315 target = uri2file(fname) 

316 target_file = find_target(target, accepts) 

317 output_list.append(target_file) 

318 mode = "decompress" 

319 hdrs = {} 

320 if "gzip" in accepts: 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true

321 mode = "gzip" 

322 hdrs["content-encoding"] = "gzip" 

323 elif "br" in accepts and ".br" in stream_ext: 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true

324 mode = "brotli" 

325 hdrs["content-encoding"] = "br" 

326 _log.debug("streams: %s files, mode=%s, hdrs=%s", len(output_list), mode, hdrs) 

327 return [y[1] for y in [auto_compress_stream(x, mode) for x in output_list]], hdrs 

328 

329 

330@router.get("/cat/{file_path:path}") 

331def cat_file(file_path: str, month=month_query): 

332 media_type = api_config.get("content-type", "text/plain") 

333 ldir = list_dir(file_path, month) 

334 if len(ldir) == 0: 

335 raise HTTPException(status_code=404, detail=f"not found: {file_path}") 

336 streams, hdrs = get_streams(ldir, []) 

337 # daily sort 

338 return StreamingResponse( 

339 content=CatStream(streams).gen(), media_type=media_type, headers=hdrs 

340 ) 

341 

342 

343@router.get("/merge/{file_path:path}") 

344def merge_file(file_path: str, month=month_query): 

345 media_type = api_config.get("content-type", "text/plain") 

346 ldir = list_dir(file_path, month) 

347 if len(ldir) == 0: 

348 raise HTTPException(status_code=404, detail=f"not found: {file_path}") 

349 streams, hdrs = get_streams(ldir, []) # cannot do passthrough compression 

350 # daily sort 

351 return StreamingResponse( 

352 content=MergeStream(streams).gen(), media_type=media_type, headers=hdrs 

353 )