Coverage for log2s3 / app.py: 91%

268 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-20 04:42 +0000

1import datetime 

2import html 

3import io 

4from typing import Any 

5from pathlib import Path 

6from fastapi import APIRouter, HTTPException, Response, Header, Query 

7from fastapi.responses import StreamingResponse 

8from .common_stream import Stream, MergeStream, CatStream 

9from .compr_stream import auto_compress_stream, stream_ext 

10from logging import getLogger 

11 

12router = APIRouter() 

13_log = getLogger(__name__) 

14api_config: dict[str, Any] = { 

15 "weekday_colors": { 

16 5: "lightyellow", # sat 

17 6: "lightcyan", # sun 

18 }, 

19 "today_color": "yellow", 

20} 

21exts = set(stream_ext.keys()) 

22month_query = Query(pattern="(^[0-9]{4}|all|^$)", default="") 

23 

24 

25def update_config(conf: dict): 

26 api_config.update(conf) 

27 

28 

29def uri2file(file_path: str) -> Path: 

30 working_dir = Path(api_config.get("working_dir", ".")) 

31 target = (working_dir / file_path).resolve() 

32 if working_dir.resolve().absolute() not in target.resolve().absolute().parents: 

33 if not (target.exists() and target.samefile(working_dir)): 

34 _log.warning("out of path: wdir=%s, target=%s", working_dir, target.resolve()) 

35 raise HTTPException(status_code=403, detail=f"cannot access to {file_path}") 

36 return target 

37 

38 

39def file2uri(path: Path) -> str: 

40 working_dir = Path(api_config.get("working_dir", ".")) 

41 return str(path.relative_to(working_dir.resolve())) 

42 

43 

44def uriescape(uri: str, quote: bool = True) -> str: 

45 return html.escape(str(Path(api_config.get("prefix", "/")) / uri), quote) 

46 

47 

48@router.get("/config") 

49def read_config() -> dict: 

50 return api_config 

51 

52 

53@router.get("/read/{file_path:path}") 

54def read_file(response: Response, file_path: str, accept_encoding: str = Header("")): 

55 target = uri2file(file_path) 

56 accepts = [x.strip() for x in accept_encoding.split(",")] 

57 media_type = api_config.get("content-type", "text/plain") 

58 # gzip or brotli passthrough case 

59 special = { 

60 "br": (".br",), 

61 "gzip": (".gz",), 

62 } 

63 for acc, exts in special.items(): 

64 if acc in accepts: 

65 for ext in exts: 

66 if target.with_suffix(target.suffix + ext).is_file(): 

67 response.headers["content-encoding"] = acc 

68 _log.info( 

69 "compressed %s: %s", 

70 acc, 

71 target.with_suffix(target.suffix + ext), 

72 ) 

73 return StreamingResponse( 

74 content=target.with_suffix(target.suffix + ext).open("rb"), 

75 media_type=media_type, 

76 ) 

77 # uncompressed case 

78 if target.is_file(): 

79 _log.info("raw %s: %s", acc, target) 

80 return StreamingResponse(content=target.open("rb"), media_type=media_type) 

81 # other type case (directory, etc...) 

82 if target.exists(): 

83 raise HTTPException(status_code=403, detail=f"cannot access to {file_path}") 

84 # compressed case 

85 target_compressed = [x for x in target.parent.iterdir() if x.is_file() and x.name.startswith(target.name + ".")] 

86 for p in target_compressed: 

87 _, stream = auto_compress_stream(p, "decompress") 

88 _log.info("auto decompress %s: %s", acc, p) 

89 return StreamingResponse(content=stream.gen(), media_type=media_type) 

90 raise HTTPException(status_code=404, detail=f"not found: {file_path}") 

91 

92 

93def reg_file(res: dict, p: Path): 

94 if p.suffix in exts: 

95 val = p.with_suffix("") 

96 else: 

97 val = p 

98 name = p.name 

99 try: 

100 dt = datetime.datetime.strptime(name.split(".")[0], "%Y-%m-%d") 

101 except ValueError: 

102 return 

103 k2 = dt.strftime("%Y-%m-%d") 

104 k1 = file2uri(p.parent) 

105 v1 = file2uri(val) 

106 try: 

107 # check k1 and v1 are in working_dir tree 

108 uri2file(k1) 

109 uri2file(v1) 

110 except HTTPException: 

111 return 

112 if k1 not in res: 

113 res[k1] = {} 

114 if k2 not in res[k1]: 114 ↛ exitline 114 didn't return from function 'reg_file' because the condition on line 114 was always true

115 res[k1][k2] = v1 

116 

117 

118def list_dir(file_path: str, file_prefix: str = "") -> dict[str, dict[str, str]]: 

119 res = {} 

120 

121 target = uri2file(file_path) 

122 if target.is_dir(): 

123 targets = [target] 

124 else: 

125 targets = [x for x in target.parent.iterdir() if x.name.startswith(target.name)] 

126 

127 for target in targets: 

128 if target.is_file(): 

129 reg_file(res, target) 

130 elif target.is_dir(): 130 ↛ 127line 130 didn't jump to line 127 because the condition on line 130 was always true

131 for root, _, filenames in target.walk(): 

132 root = Path(root) 

133 files = [root / x for x in filenames if Path(x).suffix in (exts | {".log", ".txt"})] 

134 files = [x for x in files if x.name.startswith(file_prefix)] 

135 for x in files: 

136 reg_file(res, x) 

137 _log.debug("list_dir: keys=%s", res.keys()) 

138 return res 

139 

140 

141@router.get("/dirs") 

142def get_dirs(month=month_query) -> list[str]: 

143 return list(list_dir(".", month).keys()) 

144 

145 

146@router.get("/list/{file_path:path}") 

147def list_raw(file_path: str, month=month_query): 

148 return list_dir(file_path, month) 

149 

150 

151@router.get("/html1/{file_path:path}") 

152def html1(file_path: str, month=month_query): 

153 def gen(ldir: dict[str, dict[str, str]]): 

154 yield f"<html><title>{file_path}</title><body>" 

155 for title, files in ldir.items(): 

156 buf = io.StringIO() 

157 uri = uriescape(f"html1/{title}") 

158 buf.write('<div style="border: 1px solid black; float: left; margin: 10px; padding: 1em;">') 

159 buf.write(f'<h2><a href="{uri}">{title}</a></h2><ul>') 

160 premonth = None 

161 for dtstr in sorted(files.keys()): 

162 dt = datetime.datetime.strptime(dtstr, "%Y-%m-%d") 

163 month = dt.strftime("%Y-%m") 

164 if premonth != month: 

165 if premonth is not None: 

166 buf.write("</li>") 

167 buf.write(f"<li>{month}: ") 

168 premonth = month 

169 link = files[dtstr] 

170 uri = uriescape(f"read/{link}") 

171 linkhtml = f'<a href="{uri}">{dt.strftime("%d")}</a>' 

172 color = api_config.get("weekday_colors", {}).get(dt.weekday()) 

173 if color is not None: 

174 buf.write(f' <span style="background-color: {color};">{linkhtml}</span>') 

175 else: 

176 buf.write(f" {linkhtml}") 

177 buf.write("</li></ul>") 

178 buf.write("</div>") 

179 yield buf.getvalue() 

180 yield "</body></html>" 

181 

182 ldir = list_dir(file_path, month) 

183 if len(ldir) == 0: 

184 raise HTTPException(status_code=404, detail=f"not found: {file_path}") 

185 return StreamingResponse(content=gen(ldir), media_type="text/html") 

186 

187 

188def html2_gen1(uri: str, month: str, files: dict[str, str]) -> str: 

189 dt = datetime.datetime.strptime(month, "%Y-%m").date() 

190 buf = io.StringIO() 

191 buf.write(f'<tr><th colspan="7"><a href="{uri}?month={month}">{month}</a></th></tr>') 

192 wday = (dt.weekday() + 1) % 7 

193 buf.write('<tr align="right">') 

194 if wday != 0: 194 ↛ 196line 194 didn't jump to line 196 because the condition on line 194 was always true

195 buf.write(f'<td colspan="{wday}"></td>') 

196 for i in range(32): 196 ↛ 222line 196 didn't jump to line 222 because the loop on line 196 didn't complete

197 cdt = dt + datetime.timedelta(days=i) 

198 wday = (cdt.weekday() + 1) % 7 

199 if cdt.month != dt.month: 

200 if wday != 0: 200 ↛ 202line 200 didn't jump to line 202 because the condition on line 200 was always true

201 buf.write(f'<td colspan="{7 - wday}"></td>') 

202 buf.write("</tr>") 

203 break 

204 if wday == 0: 

205 buf.write('</tr><tr align="right">') 

206 dtstr = cdt.strftime("%Y-%m-%d") 

207 if cdt == datetime.date.today(): 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true

208 color = api_config.get("today_color") 

209 else: 

210 color = api_config.get("weekday_colors", {}).get(cdt.weekday()) 

211 if color is None: 

212 buf.write("<td>") 

213 else: 

214 buf.write(f'<td style="background-color: {color};">') 

215 if dtstr in files: 

216 link = files[dtstr] 

217 uri = uriescape(f"read/{link}") 

218 buf.write(f'<a href="{uri}">{cdt.day}</a>') 

219 else: 

220 buf.write(f"{cdt.day}") 

221 buf.write("</td>") 

222 buf.write("</tr>") 

223 return buf.getvalue() 

224 

225 

226def html2_gen(ldir: dict[str, dict[str, str]], file_path: str, current_month: str): 

227 buf = io.StringIO() 

228 buf.write(f"<html><title>{file_path}</title><body>") 

229 if current_month in ("", "all"): 

230 current_month = datetime.date.today().strftime("%Y-%m") 

231 today = datetime.date.strptime(current_month, "%Y-%m") 

232 lastm = today - datetime.timedelta(days=1) 

233 nextm = today + datetime.timedelta(days=31) 

234 fmt = "%Y-%m" 

235 buf.write("<div>") 

236 buf.write(f'<a href="?month={lastm.strftime(fmt)}">{lastm.strftime(fmt)}</a>') 

237 buf.write(f' | <a href="?month={today.strftime(fmt)}">{today.strftime(fmt)}</a>') 

238 buf.write(f' | <a href="?month={nextm.strftime(fmt)}">{nextm.strftime(fmt)}</a>') 

239 buf.write(' | <a href="?month=all">all time</a>') 

240 buf.write(' | <a href="./">this month</a>') 

241 buf.write("</div>") 

242 for title, files in ldir.items(): 

243 uri = uriescape(f"html2/{title}") 

244 buf.write('<div style="float: left; margin: 1em;">') 

245 buf.write(f'<h2><a href="{uri}">{title}</a></h2>') 

246 buf.write('<table border="1" style="border-collapse: collapse"><tr>') 

247 b = datetime.date(2000, 1, 2) 

248 for i in range(7): 

249 wd = b + datetime.timedelta(days=i) 

250 wdstr = wd.strftime("%a") 

251 color = api_config.get("weekday_colors", {}).get(wd.weekday()) 

252 if color: 

253 buf.write(f'<th style="background-color: {color};"><code>{wdstr}</code></th>') 

254 else: 

255 buf.write(f"<th><code>{wdstr}</code></th>") 

256 buf.write("</tr>") 

257 months = {x.rsplit("-", 1)[0] for x in files.keys()} 

258 for month in sorted(months): 

259 buf.write(html2_gen1(uri, month, files)) 

260 buf.write("</table></div>") 

261 yield buf.getvalue() 

262 buf.truncate(0) 

263 buf.seek(0) 

264 yield "</body></html>" 

265 

266 

267@router.get("/html2/{file_path:path}") 

268def html2(file_path: str, month=month_query): 

269 if not month: 269 ↛ 270line 269 didn't jump to line 270 because the condition on line 269 was never true

270 month = datetime.date.today().strftime("%Y-%m") 

271 if month == "all": 

272 month = "" 

273 ldir = list_dir(file_path, month) 

274 if len(ldir) == 0: 

275 raise HTTPException(status_code=404, detail=f"not found: {file_path}") 

276 return StreamingResponse(content=html2_gen(ldir, file_path, month), media_type="text/html") 

277 

278 

279def find_target(p: Path, accepts: list[str]) -> Path: 

280 # gzip pass through 

281 if "gzip" in accepts: 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true

282 if p.with_suffix(p.suffix + ".gz").is_file(): 

283 return p.with_suffix(p.suffix + ".gz") 

284 # raw pass through 

285 if p.is_file(): 285 ↛ 288line 285 didn't jump to line 288 because the condition on line 285 was always true

286 return p 

287 # others 

288 if "br" in accepts: 

289 if p.with_suffix(p.suffix + ".br").exists(): 

290 return p.with_suffix(p.suffix + ".br") 

291 # compressed case 

292 target_compressed = [x for x in p.parent.iterdir() if x.is_file() and x.name.startswith(p.name + ".")] 

293 if len(target_compressed): 

294 return target_compressed[0] 

295 raise HTTPException(status_code=404, detail=f"not found: {p}") 

296 

297 

298def get_streams(files: dict[str, dict[str, str]], accepts: list[str]) -> tuple[list[Stream], dict]: 

299 outputs: dict[str, list[str]] = {} 

300 for _, v in files.items(): 

301 for k, fn in v.items(): 

302 if k not in outputs: 

303 outputs[k] = [] 

304 outputs[k].append(fn) 

305 output_list: list[Path] = [] 

306 for k in sorted(outputs.keys()): 

307 for fname in sorted(outputs[k]): 

308 target = uri2file(fname) 

309 target_file = find_target(target, accepts) 

310 output_list.append(target_file) 

311 mode = "decompress" 

312 hdrs = {} 

313 if "gzip" in accepts: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true

314 mode = "gzip" 

315 hdrs["content-encoding"] = "gzip" 

316 elif "br" in accepts and ".br" in stream_ext: 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true

317 mode = "brotli" 

318 hdrs["content-encoding"] = "br" 

319 _log.debug("streams: %s files, mode=%s, hdrs=%s", len(output_list), mode, hdrs) 

320 return [y[1] for y in [auto_compress_stream(x, mode) for x in output_list]], hdrs 

321 

322 

323@router.get("/cat/{file_path:path}") 

324def cat_file(file_path: str, month=month_query): 

325 media_type = api_config.get("content-type", "text/plain") 

326 ldir = list_dir(file_path, month) 

327 if len(ldir) == 0: 

328 raise HTTPException(status_code=404, detail=f"not found: {file_path}") 

329 streams, hdrs = get_streams(ldir, []) 

330 # daily sort 

331 return StreamingResponse(content=CatStream(streams).gen(), media_type=media_type, headers=hdrs) 

332 

333 

334@router.get("/merge/{file_path:path}") 

335def merge_file(file_path: str, month=month_query): 

336 media_type = api_config.get("content-type", "text/plain") 

337 ldir = list_dir(file_path, month) 

338 if len(ldir) == 0: 

339 raise HTTPException(status_code=404, detail=f"not found: {file_path}") 

340 streams, hdrs = get_streams(ldir, []) # cannot do passthrough compression 

341 # daily sort 

342 return StreamingResponse(content=MergeStream(streams).gen(), media_type=media_type, headers=hdrs)