Coverage for log2s3 / app.py: 91%
268 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-20 04:42 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-20 04:42 +0000
1import datetime
2import html
3import io
4from typing import Any
5from pathlib import Path
6from fastapi import APIRouter, HTTPException, Response, Header, Query
7from fastapi.responses import StreamingResponse
8from .common_stream import Stream, MergeStream, CatStream
9from .compr_stream import auto_compress_stream, stream_ext
10from logging import getLogger
12router = APIRouter()
13_log = getLogger(__name__)
14api_config: dict[str, Any] = {
15 "weekday_colors": {
16 5: "lightyellow", # sat
17 6: "lightcyan", # sun
18 },
19 "today_color": "yellow",
20}
21exts = set(stream_ext.keys())
22month_query = Query(pattern="(^[0-9]{4}|all|^$)", default="")
25def update_config(conf: dict):
26 api_config.update(conf)
29def uri2file(file_path: str) -> Path:
30 working_dir = Path(api_config.get("working_dir", "."))
31 target = (working_dir / file_path).resolve()
32 if working_dir.resolve().absolute() not in target.resolve().absolute().parents:
33 if not (target.exists() and target.samefile(working_dir)):
34 _log.warning("out of path: wdir=%s, target=%s", working_dir, target.resolve())
35 raise HTTPException(status_code=403, detail=f"cannot access to {file_path}")
36 return target
39def file2uri(path: Path) -> str:
40 working_dir = Path(api_config.get("working_dir", "."))
41 return str(path.relative_to(working_dir.resolve()))
44def uriescape(uri: str, quote: bool = True) -> str:
45 return html.escape(str(Path(api_config.get("prefix", "/")) / uri), quote)
48@router.get("/config")
49def read_config() -> dict:
50 return api_config
53@router.get("/read/{file_path:path}")
54def read_file(response: Response, file_path: str, accept_encoding: str = Header("")):
55 target = uri2file(file_path)
56 accepts = [x.strip() for x in accept_encoding.split(",")]
57 media_type = api_config.get("content-type", "text/plain")
58 # gzip or brotli passthrough case
59 special = {
60 "br": (".br",),
61 "gzip": (".gz",),
62 }
63 for acc, exts in special.items():
64 if acc in accepts:
65 for ext in exts:
66 if target.with_suffix(target.suffix + ext).is_file():
67 response.headers["content-encoding"] = acc
68 _log.info(
69 "compressed %s: %s",
70 acc,
71 target.with_suffix(target.suffix + ext),
72 )
73 return StreamingResponse(
74 content=target.with_suffix(target.suffix + ext).open("rb"),
75 media_type=media_type,
76 )
77 # uncompressed case
78 if target.is_file():
79 _log.info("raw %s: %s", acc, target)
80 return StreamingResponse(content=target.open("rb"), media_type=media_type)
81 # other type case (directory, etc...)
82 if target.exists():
83 raise HTTPException(status_code=403, detail=f"cannot access to {file_path}")
84 # compressed case
85 target_compressed = [x for x in target.parent.iterdir() if x.is_file() and x.name.startswith(target.name + ".")]
86 for p in target_compressed:
87 _, stream = auto_compress_stream(p, "decompress")
88 _log.info("auto decompress %s: %s", acc, p)
89 return StreamingResponse(content=stream.gen(), media_type=media_type)
90 raise HTTPException(status_code=404, detail=f"not found: {file_path}")
93def reg_file(res: dict, p: Path):
94 if p.suffix in exts:
95 val = p.with_suffix("")
96 else:
97 val = p
98 name = p.name
99 try:
100 dt = datetime.datetime.strptime(name.split(".")[0], "%Y-%m-%d")
101 except ValueError:
102 return
103 k2 = dt.strftime("%Y-%m-%d")
104 k1 = file2uri(p.parent)
105 v1 = file2uri(val)
106 try:
107 # check k1 and v1 are in working_dir tree
108 uri2file(k1)
109 uri2file(v1)
110 except HTTPException:
111 return
112 if k1 not in res:
113 res[k1] = {}
114 if k2 not in res[k1]: 114 ↛ exitline 114 didn't return from function 'reg_file' because the condition on line 114 was always true
115 res[k1][k2] = v1
118def list_dir(file_path: str, file_prefix: str = "") -> dict[str, dict[str, str]]:
119 res = {}
121 target = uri2file(file_path)
122 if target.is_dir():
123 targets = [target]
124 else:
125 targets = [x for x in target.parent.iterdir() if x.name.startswith(target.name)]
127 for target in targets:
128 if target.is_file():
129 reg_file(res, target)
130 elif target.is_dir(): 130 ↛ 127line 130 didn't jump to line 127 because the condition on line 130 was always true
131 for root, _, filenames in target.walk():
132 root = Path(root)
133 files = [root / x for x in filenames if Path(x).suffix in (exts | {".log", ".txt"})]
134 files = [x for x in files if x.name.startswith(file_prefix)]
135 for x in files:
136 reg_file(res, x)
137 _log.debug("list_dir: keys=%s", res.keys())
138 return res
141@router.get("/dirs")
142def get_dirs(month=month_query) -> list[str]:
143 return list(list_dir(".", month).keys())
146@router.get("/list/{file_path:path}")
147def list_raw(file_path: str, month=month_query):
148 return list_dir(file_path, month)
151@router.get("/html1/{file_path:path}")
152def html1(file_path: str, month=month_query):
153 def gen(ldir: dict[str, dict[str, str]]):
154 yield f"<html><title>{file_path}</title><body>"
155 for title, files in ldir.items():
156 buf = io.StringIO()
157 uri = uriescape(f"html1/{title}")
158 buf.write('<div style="border: 1px solid black; float: left; margin: 10px; padding: 1em;">')
159 buf.write(f'<h2><a href="{uri}">{title}</a></h2><ul>')
160 premonth = None
161 for dtstr in sorted(files.keys()):
162 dt = datetime.datetime.strptime(dtstr, "%Y-%m-%d")
163 month = dt.strftime("%Y-%m")
164 if premonth != month:
165 if premonth is not None:
166 buf.write("</li>")
167 buf.write(f"<li>{month}: ")
168 premonth = month
169 link = files[dtstr]
170 uri = uriescape(f"read/{link}")
171 linkhtml = f'<a href="{uri}">{dt.strftime("%d")}</a>'
172 color = api_config.get("weekday_colors", {}).get(dt.weekday())
173 if color is not None:
174 buf.write(f' <span style="background-color: {color};">{linkhtml}</span>')
175 else:
176 buf.write(f" {linkhtml}")
177 buf.write("</li></ul>")
178 buf.write("</div>")
179 yield buf.getvalue()
180 yield "</body></html>"
182 ldir = list_dir(file_path, month)
183 if len(ldir) == 0:
184 raise HTTPException(status_code=404, detail=f"not found: {file_path}")
185 return StreamingResponse(content=gen(ldir), media_type="text/html")
188def html2_gen1(uri: str, month: str, files: dict[str, str]) -> str:
189 dt = datetime.datetime.strptime(month, "%Y-%m").date()
190 buf = io.StringIO()
191 buf.write(f'<tr><th colspan="7"><a href="{uri}?month={month}">{month}</a></th></tr>')
192 wday = (dt.weekday() + 1) % 7
193 buf.write('<tr align="right">')
194 if wday != 0: 194 ↛ 196line 194 didn't jump to line 196 because the condition on line 194 was always true
195 buf.write(f'<td colspan="{wday}"></td>')
196 for i in range(32): 196 ↛ 222line 196 didn't jump to line 222 because the loop on line 196 didn't complete
197 cdt = dt + datetime.timedelta(days=i)
198 wday = (cdt.weekday() + 1) % 7
199 if cdt.month != dt.month:
200 if wday != 0: 200 ↛ 202line 200 didn't jump to line 202 because the condition on line 200 was always true
201 buf.write(f'<td colspan="{7 - wday}"></td>')
202 buf.write("</tr>")
203 break
204 if wday == 0:
205 buf.write('</tr><tr align="right">')
206 dtstr = cdt.strftime("%Y-%m-%d")
207 if cdt == datetime.date.today(): 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true
208 color = api_config.get("today_color")
209 else:
210 color = api_config.get("weekday_colors", {}).get(cdt.weekday())
211 if color is None:
212 buf.write("<td>")
213 else:
214 buf.write(f'<td style="background-color: {color};">')
215 if dtstr in files:
216 link = files[dtstr]
217 uri = uriescape(f"read/{link}")
218 buf.write(f'<a href="{uri}">{cdt.day}</a>')
219 else:
220 buf.write(f"{cdt.day}")
221 buf.write("</td>")
222 buf.write("</tr>")
223 return buf.getvalue()
226def html2_gen(ldir: dict[str, dict[str, str]], file_path: str, current_month: str):
227 buf = io.StringIO()
228 buf.write(f"<html><title>{file_path}</title><body>")
229 if current_month in ("", "all"):
230 current_month = datetime.date.today().strftime("%Y-%m")
231 today = datetime.date.strptime(current_month, "%Y-%m")
232 lastm = today - datetime.timedelta(days=1)
233 nextm = today + datetime.timedelta(days=31)
234 fmt = "%Y-%m"
235 buf.write("<div>")
236 buf.write(f'<a href="?month={lastm.strftime(fmt)}">{lastm.strftime(fmt)}</a>')
237 buf.write(f' | <a href="?month={today.strftime(fmt)}">{today.strftime(fmt)}</a>')
238 buf.write(f' | <a href="?month={nextm.strftime(fmt)}">{nextm.strftime(fmt)}</a>')
239 buf.write(' | <a href="?month=all">all time</a>')
240 buf.write(' | <a href="./">this month</a>')
241 buf.write("</div>")
242 for title, files in ldir.items():
243 uri = uriescape(f"html2/{title}")
244 buf.write('<div style="float: left; margin: 1em;">')
245 buf.write(f'<h2><a href="{uri}">{title}</a></h2>')
246 buf.write('<table border="1" style="border-collapse: collapse"><tr>')
247 b = datetime.date(2000, 1, 2)
248 for i in range(7):
249 wd = b + datetime.timedelta(days=i)
250 wdstr = wd.strftime("%a")
251 color = api_config.get("weekday_colors", {}).get(wd.weekday())
252 if color:
253 buf.write(f'<th style="background-color: {color};"><code>{wdstr}</code></th>')
254 else:
255 buf.write(f"<th><code>{wdstr}</code></th>")
256 buf.write("</tr>")
257 months = {x.rsplit("-", 1)[0] for x in files.keys()}
258 for month in sorted(months):
259 buf.write(html2_gen1(uri, month, files))
260 buf.write("</table></div>")
261 yield buf.getvalue()
262 buf.truncate(0)
263 buf.seek(0)
264 yield "</body></html>"
267@router.get("/html2/{file_path:path}")
268def html2(file_path: str, month=month_query):
269 if not month: 269 ↛ 270line 269 didn't jump to line 270 because the condition on line 269 was never true
270 month = datetime.date.today().strftime("%Y-%m")
271 if month == "all":
272 month = ""
273 ldir = list_dir(file_path, month)
274 if len(ldir) == 0:
275 raise HTTPException(status_code=404, detail=f"not found: {file_path}")
276 return StreamingResponse(content=html2_gen(ldir, file_path, month), media_type="text/html")
279def find_target(p: Path, accepts: list[str]) -> Path:
280 # gzip pass through
281 if "gzip" in accepts: 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true
282 if p.with_suffix(p.suffix + ".gz").is_file():
283 return p.with_suffix(p.suffix + ".gz")
284 # raw pass through
285 if p.is_file(): 285 ↛ 288line 285 didn't jump to line 288 because the condition on line 285 was always true
286 return p
287 # others
288 if "br" in accepts:
289 if p.with_suffix(p.suffix + ".br").exists():
290 return p.with_suffix(p.suffix + ".br")
291 # compressed case
292 target_compressed = [x for x in p.parent.iterdir() if x.is_file() and x.name.startswith(p.name + ".")]
293 if len(target_compressed):
294 return target_compressed[0]
295 raise HTTPException(status_code=404, detail=f"not found: {p}")
298def get_streams(files: dict[str, dict[str, str]], accepts: list[str]) -> tuple[list[Stream], dict]:
299 outputs: dict[str, list[str]] = {}
300 for _, v in files.items():
301 for k, fn in v.items():
302 if k not in outputs:
303 outputs[k] = []
304 outputs[k].append(fn)
305 output_list: list[Path] = []
306 for k in sorted(outputs.keys()):
307 for fname in sorted(outputs[k]):
308 target = uri2file(fname)
309 target_file = find_target(target, accepts)
310 output_list.append(target_file)
311 mode = "decompress"
312 hdrs = {}
313 if "gzip" in accepts: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true
314 mode = "gzip"
315 hdrs["content-encoding"] = "gzip"
316 elif "br" in accepts and ".br" in stream_ext: 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true
317 mode = "brotli"
318 hdrs["content-encoding"] = "br"
319 _log.debug("streams: %s files, mode=%s, hdrs=%s", len(output_list), mode, hdrs)
320 return [y[1] for y in [auto_compress_stream(x, mode) for x in output_list]], hdrs
323@router.get("/cat/{file_path:path}")
324def cat_file(file_path: str, month=month_query):
325 media_type = api_config.get("content-type", "text/plain")
326 ldir = list_dir(file_path, month)
327 if len(ldir) == 0:
328 raise HTTPException(status_code=404, detail=f"not found: {file_path}")
329 streams, hdrs = get_streams(ldir, [])
330 # daily sort
331 return StreamingResponse(content=CatStream(streams).gen(), media_type=media_type, headers=hdrs)
334@router.get("/merge/{file_path:path}")
335def merge_file(file_path: str, month=month_query):
336 media_type = api_config.get("content-type", "text/plain")
337 ldir = list_dir(file_path, month)
338 if len(ldir) == 0:
339 raise HTTPException(status_code=404, detail=f"not found: {file_path}")
340 streams, hdrs = get_streams(ldir, []) # cannot do passthrough compression
341 # daily sort
342 return StreamingResponse(content=MergeStream(streams).gen(), media_type=media_type, headers=hdrs)