Coverage for log2s3 / main.py: 74%
786 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-20 04:42 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-20 04:42 +0000
1from logging import getLogger
2import os
3import sys
4import datetime
5import shlex
6import subprocess
7import functools
8import click
9import json
10import pathlib
11import boto3
12import io
13from click.core import UNSET
14from typing import Union, Generator, Optional
15from .version import VERSION
16from .common_stream import Stream, MergeStream
17from .compr_stream import (
18 S3GetStream,
19 S3PutStream,
20 auto_compress_stream,
21 stream_compress_modes,
22)
24try:
25 from mypy_boto3_s3.client import S3Client as S3ClientType
26except ImportError:
27 from typing import Any as S3ClientType
29_log = getLogger(__name__)
32mask_prefix = ["s3_"]
35def arg_mask(d: Union[list, dict]) -> Union[list, dict]:
36 if isinstance(d, list):
37 return [arg_mask(x) for x in d]
38 if isinstance(d, dict): 38 ↛ 51line 38 didn't jump to line 51 because the condition on line 38 was always true
39 res = d.copy()
40 for p in mask_prefix:
41 for k, v in d.items():
42 if v is None or v == UNSET:
43 res.pop(k)
44 elif isinstance(v, str) and k.startswith(p):
45 res[k] = "*" * len(v)
46 elif isinstance(v, (dict, list)):
47 res[k] = arg_mask(v)
48 elif k.startswith(p): 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true
49 res[k] = "****"
50 return res
51 return d
54@click.group(invoke_without_command=True)
55@click.version_option(VERSION)
56@click.pass_context
57def cli(ctx):
58 if ctx.invoked_subcommand is None: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true
59 print(ctx.get_help())
62def s3_option(func):
63 @click.option("--s3-access-key", envvar="AWS_ACCESS_KEY_ID", help="AWS Access Key")
64 @click.option("--s3-secret-key", envvar="AWS_SECRET_ACCESS_KEY", help="AWS Secret Key")
65 @click.option("--s3-region", envvar="AWS_DEFAULT_REGION", help="AWS Region")
66 @click.option("--s3-endpoint", envvar="AWS_ENDPOINT_URL_S3", help="AWS Endpoint URL for S3")
67 @click.option("--s3-bucket", envvar="AWS_S3_BUCKET", help="AWS S3 Bucket name")
68 @click.option("--dotenv/--no-dotenv", default=False, help="load .env for S3 client config")
69 @functools.wraps(func)
70 def _(
71 s3_endpoint,
72 s3_access_key,
73 s3_secret_key,
74 s3_region,
75 s3_bucket,
76 dotenv,
77 **kwargs,
78 ):
79 if dotenv:
80 from dotenv import load_dotenv
82 load_dotenv()
83 if not s3_bucket: 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true
84 s3_bucket = os.getenv("AWS_S3_BUCKET")
85 args = {
86 "aws_access_key_id": s3_access_key,
87 "aws_secret_access_key": s3_secret_key,
88 "region_name": s3_region,
89 "endpoint_url": s3_endpoint,
90 }
91 empty_keys = {k for k, v in args.items() if v is None or v == UNSET}
92 for k in empty_keys:
93 args.pop(k)
94 s3 = boto3.client("s3", **args)
95 return func(s3=s3, bucket_name=s3_bucket, **kwargs)
97 return _
100compress_option = click.option(
101 "--compress",
102 default="gzip",
103 type=click.Choice(stream_compress_modes),
104 help="compress type",
105 show_default=True,
106)
109def filetree_option(func):
110 @click.option(
111 "--top",
112 type=click.Path(dir_okay=True, exists=True, file_okay=False),
113 required=True,
114 help="root directory to find files",
115 )
116 @click.option("--older", help="find older file")
117 @click.option("--newer", help="find newer file")
118 @click.option("--date", help="find date range(YYYY-mm-dd[..YYYY-mm-dd])")
119 @click.option("--bigger", help="find bigger file")
120 @click.option("--smaller", help="find smaller file")
121 @click.option("--glob", help="glob pattern")
122 @click.option("--dry/--wet", help="dry run or wet run", default=False, show_default=True)
123 @functools.wraps(func)
124 def _(top, older, newer, date, bigger, smaller, glob, dry, **kwargs):
125 config = {
126 "top": top,
127 "older": older,
128 "newer": newer,
129 "date": date,
130 "bigger": bigger,
131 "smaller": smaller,
132 "glob": glob,
133 "dry": dry,
134 }
135 return func(top=pathlib.Path(top), config=config, **kwargs)
137 return _
140def s3tree_option(func):
141 @click.option("--prefix", default="", help="AWS S3 Object Key Prefix")
142 @click.option("--suffix", default="", help="AWS S3 Object Key Suffix")
143 @click.option("--older", help="find older file")
144 @click.option("--newer", help="find newer file")
145 @click.option("--date", help="find date range(YYYY-mm-dd[..YYYY-mm-dd])")
146 @click.option("--bigger", help="find bigger file")
147 @click.option("--smaller", help="find smaller file")
148 @click.option("--glob", help="glob pattern")
149 @functools.wraps(func)
150 def _(prefix, older, newer, date, bigger, smaller, suffix, glob, **kwargs):
151 config = {
152 "top": prefix,
153 "older": older,
154 "newer": newer,
155 "date": date,
156 "bigger": bigger,
157 "smaller": smaller,
158 "suffix": suffix,
159 "glob": glob,
160 }
161 if prefix: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true
162 return func(top=pathlib.Path(prefix), config=config, **kwargs)
163 return func(top="", config=config, **kwargs)
165 return _
168def verbose_option(func):
169 @click.option("--verbose/--quiet", default=None)
170 @functools.wraps(func)
171 def _(verbose, **kwargs):
172 from logging import basicConfig
174 fmt = "%(asctime)s %(levelname)s %(name)s %(message)s"
175 if verbose is None:
176 basicConfig(level="INFO", format=fmt)
177 elif verbose is False: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true
178 basicConfig(level="WARNING", format=fmt)
179 else:
180 basicConfig(level="DEBUG", format=fmt)
181 return func(**kwargs)
183 return _
186@cli.command()
187@s3_option
188@verbose_option
189def s3_make_bucket(s3: S3ClientType, bucket_name: str):
190 """make S3 buckets"""
191 res = s3.create_bucket(Bucket=bucket_name)
192 click.echo(f"response {res}")
195@cli.command()
196@s3_option
197@verbose_option
198def s3_bucket(s3: S3ClientType, bucket_name: str):
199 """list S3 buckets"""
200 res = s3.list_buckets()
201 _log.debug("response %s", res)
202 for bkt in res.get("Buckets", []): 202 ↛ 203line 202 didn't jump to line 203 because the loop on line 202 never started
203 click.echo("%s %s" % (bkt["CreationDate"], bkt["Name"]))
206def allobjs(s3: S3ClientType, bucket_name: str, prefix: str, marker: str = ""):
207 res = s3.list_objects(Bucket=bucket_name, Prefix=prefix, Marker=marker)
208 ct = res.get("Contents", [])
209 yield from ct
210 if res.get("IsTruncated") and len(ct) != 0:
211 mk = ct[-1].get("Key")
212 if mk: 212 ↛ exitline 212 didn't return from function 'allobjs' because the condition on line 212 was always true
213 yield from allobjs(s3, bucket_name=bucket_name, prefix=prefix, marker=mk)
216def s3obj2stat(obj: dict) -> os.stat_result:
217 ts = obj.get("LastModified", datetime.datetime.now()).timestamp()
218 sz = obj.get("Size", 0)
219 return os.stat_result((0o644, 0, 0, 0, 0, 0, sz, ts, ts, ts))
222def allobjs_conf(s3: S3ClientType, bucket_name: str, prefix: str, config: dict):
223 _log.debug("allobjs: bucket=%s, prefix=%s, config=%s", bucket_name, prefix, config)
224 from .processor import DebugProcessor
226 dummy = DebugProcessor(config)
227 suffix = config.get("suffix", "")
228 objs = allobjs(s3, bucket_name, prefix)
229 return filter(
230 lambda x: x["Key"].endswith(suffix) and dummy.check(pathlib.Path(x["Key"]), s3obj2stat(x)),
231 objs,
232 )
235@cli.command()
236@s3_option
237@s3tree_option
238@verbose_option
239def s3_list(s3: S3ClientType, bucket_name: str, config: dict, top: pathlib.Path):
240 """list S3 objects"""
241 topstr = str(top)
242 if topstr == ".": 242 ↛ 243line 242 didn't jump to line 243 because the condition on line 242 was never true
243 topstr = ""
244 for i in allobjs_conf(s3, bucket_name, topstr.lstrip("/"), config):
245 click.echo("%s %6d %s" % (i["LastModified"], i["Size"], i["Key"]))
248@cli.command()
249@click.option("--summary/--no-summary", "-S", default=False, type=bool)
250@click.option("--pathsep", default="/", show_default=True)
251@s3_option
252@s3tree_option
253@verbose_option
254def s3_du(
255 s3: S3ClientType,
256 bucket_name: str,
257 config: dict,
258 top: pathlib.Path,
259 summary: bool,
260 pathsep: str,
261):
262 """show S3 directory usage"""
263 out = {}
264 for i in allobjs_conf(s3, bucket_name, str(top).lstrip("/"), config):
265 key = i["Key"]
266 ks = key.rsplit(pathsep, 1)
267 dirname = ks[0]
268 sz = i["Size"]
269 if dirname not in out:
270 out[dirname] = [0, 0]
271 out[dirname][0] += 1
272 out[dirname][1] += sz
273 if len(out) == 0:
274 click.echo("(empty result)")
275 return
276 if summary:
277 for korig in list(out.keys()):
278 k = korig
279 while len(k) != 0: 279 ↛ 277line 279 didn't jump to line 277 because the condition on line 279 was always true
280 k0 = k.rsplit(pathsep, 1)
281 if len(k0) == 1:
282 break
283 k = k0[0]
284 if k not in out: 284 ↛ 285line 284 didn't jump to line 285 because the condition on line 284 was never true
285 out[k] = [0, 0]
286 out[k][0] += out[korig][0]
287 out[k][1] += out[korig][1]
288 click.echo("%10s %5s %s" % ("size", "cnt", "name"))
289 click.echo("----------+-----+-----------------------")
290 for k, v in sorted(out.items(), key=lambda f: f[1][1], reverse=True):
291 click.echo("%10d %5d %s" % (v[1], v[0], k))
294@cli.command()
295@s3_option
296@s3tree_option
297@verbose_option
298@click.option("--dry/--wet", help="dry run or wet run", default=False, show_default=True)
299def s3_delete_by(s3: S3ClientType, bucket_name: str, top: pathlib.Path, config: dict, dry: bool):
300 """delete S3 objects"""
301 del_keys = [x["Key"] for x in allobjs_conf(s3, bucket_name, str(top).lstrip("/"), config)]
302 if len(del_keys) == 0:
303 _log.info("no object found")
304 elif dry:
305 _log.info("(dry)remove objects: %s", del_keys)
306 click.echo(f"(dry)remove {len(del_keys)} objects")
307 else:
308 _log.info("(wet)remove %s objects", len(del_keys))
309 s3.delete_objects(Bucket=bucket_name, Delete={"Objects": [{"Key": x} for x in del_keys]})
312@cli.command()
313@s3_option
314@filetree_option
315@verbose_option
316@click.option("--prefix", default="", help="AWS S3 Object Prefix")
317@click.option("--content/--stat", help="diff content or stat", default=False, show_default=True)
318def s3_diff(
319 s3: S3ClientType,
320 bucket_name: str,
321 prefix: str,
322 top: pathlib.Path,
323 config: dict,
324 content: bool,
325):
326 """diff S3 and filetree"""
327 all_keys = {pathlib.Path(x["Key"][len(prefix) :]): x for x in allobjs_conf(s3, bucket_name, prefix, config)}
328 from .processor import ListProcessor, process_walk
330 lp = ListProcessor(config)
331 process_walk(top, [lp])
332 files = {k.relative_to(top): v for k, v in lp.output if v is not None}
333 for k in set(all_keys.keys()) - set(files.keys()):
334 click.echo("only-s3: %s: %s" % (k, all_keys[k]))
335 for k in set(files.keys()) - set(all_keys.keys()):
336 click.echo("only-file: %s: %s" % (k, files[k]))
337 for k in set(files.keys()) & set(all_keys.keys()):
338 if files[k].st_size != all_keys[k].get("Size"):
339 click.echo("size mismatch %s file=%s, obj=%s" % (k, files[k].st_size, all_keys[k]["Size"]))
342@cli.command()
343@s3_option
344@s3tree_option
345@verbose_option
346@compress_option
347@click.option("--dry/--wet", help="dry run or wet run", default=False, show_default=True)
348@click.option("--keep/--remove", help="keep old file or delete", default=True, show_default=True)
349def s3_compress_tree(
350 s3: S3ClientType,
351 bucket_name: str,
352 config: dict,
353 top: pathlib.Path,
354 compress: str,
355 dry: bool,
356 keep: bool,
357):
358 """compress S3 objects"""
359 topstr = str(top)
360 if topstr == ".":
361 topstr = ""
362 for i in allobjs_conf(s3, bucket_name, topstr.lstrip("/"), config):
363 rd = S3GetStream(s3, bucket=bucket_name, key=i["Key"])
364 newname, data = auto_compress_stream(pathlib.Path(i["Key"]), compress, rd)
365 if newname == i["Key"]:
366 _log.debug("do nothing: %s", i["Key"])
367 continue
368 if dry:
369 new_length = sum([len(x) for x in data.gen()])
370 _log.info(
371 "(dry) recompress %s -> %s (%s->%s)",
372 i["Key"],
373 newname,
374 i["Size"],
375 new_length,
376 )
377 else:
378 ps = S3PutStream(data, s3, bucket=bucket_name, key=str(newname))
379 for _ in ps.gen():
380 pass
381 res = s3.head_object(Bucket=bucket_name, Key=str(newname))
382 _log.info(
383 "(wet) recompress %s -> %s (%s->%s)",
384 i["Key"],
385 newname,
386 i["Size"],
387 res["ContentLength"],
388 )
389 if not keep:
390 _log.info("remove old %s (->%s)", i["Key"], newname)
391 s3.delete_object(Bucket=bucket_name, Key=i["Key"])
394@cli.command()
395@filetree_option
396@verbose_option
397def filetree_debug(top: pathlib.Path, config: dict):
398 """(debug command)"""
399 from .processor import DebugProcessor, process_walk
401 proc = [DebugProcessor(config)]
402 process_walk(top, proc)
405@cli.command()
406@filetree_option
407@verbose_option
408def filetree_list(top: pathlib.Path, config: dict):
409 """list files"""
410 from .processor import ListProcessor, process_walk
412 lp = ListProcessor(config)
413 process_walk(top, [lp])
414 click.echo("%10s %-19s %s %d(+%d) total" % ("size", "mtime", "name", lp.processed, lp.skipped))
415 click.echo("----------+-------------------+-----------------------")
416 for p, st in lp.output:
417 if st is None: 417 ↛ 418line 417 didn't jump to line 418 because the condition on line 417 was never true
418 tmstr = "unknown"
419 sz = -1
420 else:
421 tmstr = datetime.datetime.fromtimestamp(st.st_mtime).strftime("%Y-%m-%d %H:%M:%S")
422 sz = st.st_size
423 click.echo("%10s %19s %s" % (sz, tmstr, p))
426@cli.command()
427@filetree_option
428@compress_option
429@verbose_option
430def filetree_compress(top: pathlib.Path, config: dict, compress):
431 """compress files"""
432 if compress: 432 ↛ 434line 432 didn't jump to line 434 because the condition on line 432 was always true
433 config["compress"] = compress
434 from .processor import CompressProcessor, process_walk
436 cproc = CompressProcessor(config)
437 proc = [cproc]
438 process_walk(top, proc)
439 _log.info(
440 "compressed=%d, skipped=%d, size=%d->%d (%d bytes)",
441 cproc.processed,
442 cproc.skipped,
443 cproc.before_total,
444 cproc.after_total,
445 cproc.before_total - cproc.after_total,
446 )
449@cli.command()
450@filetree_option
451@verbose_option
452def filetree_delete(top: pathlib.Path, config: dict):
453 """remove files"""
454 from .processor import DelProcessor, process_walk
456 proc = [DelProcessor(config)]
457 process_walk(top, proc)
458 _log.info("removed=%d, skipped=%d", proc[0].processed, proc[0].skipped)
461@cli.command()
462@click.argument(
463 "files",
464 type=click.Path(file_okay=True, dir_okay=True, exists=True, readable=True),
465 nargs=-1,
466)
467@verbose_option
468def merge(files: list[click.Path]):
469 """merge sorted log files"""
470 input_stream: list[Stream] = []
471 for fn in files:
472 p = pathlib.Path(str(fn))
473 if p.is_file():
474 _, ch = auto_compress_stream(p, "decompress")
475 input_stream.append(ch)
476 elif p.is_dir(): 476 ↛ 482line 476 didn't jump to line 482 because the condition on line 476 was always true
477 for proot, _, pfiles in p.walk():
478 for pfn in pfiles:
479 _, ch = auto_compress_stream(proot / pfn, "decompress")
480 input_stream.append(ch)
481 else:
482 _log.warning("%s is not a directory or file", p)
484 for i in MergeStream(input_stream).text_gen():
485 click.echo(i, nl=False)
488@cli.command()
489@s3_option
490@click.option("--prefix", default="", help="AWS S3 Object Prefix")
491@filetree_option
492@compress_option
493@verbose_option
494def s3_put_tree(
495 s3: S3ClientType,
496 bucket_name: str,
497 prefix: str,
498 top: pathlib.Path,
499 config: dict,
500 compress,
501):
502 """compress and put log files to S3"""
503 config["s3"] = s3
504 config["s3_bucket"] = bucket_name
505 config["s3_prefix"] = prefix
506 config["skip_names"] = {x["Key"] for x in allobjs(s3, bucket_name, prefix)}
507 config["compress"] = compress
508 from .processor import S3Processor, process_walk
510 proc = [S3Processor(config)]
511 process_walk(top, proc)
512 _log.info(
513 "processed=%d, skipped=%d, upload %d bytes",
514 proc[0].processed,
515 proc[0].skipped,
516 proc[0].uploaded,
517 )
520@cli.command()
521@s3_option
522@click.option("--key", required=True, help="AWS S3 Object Key")
523@click.argument("filename", type=click.Path(file_okay=True, dir_okay=False, exists=True))
524@compress_option
525@verbose_option
526def s3_put1(s3: S3ClientType, bucket_name: str, key: str, filename: str, compress: str):
527 """put 1 file to S3"""
528 from .compr_stream import S3PutStream, auto_compress_stream
530 input_path = pathlib.Path(filename)
531 _, st = auto_compress_stream(input_path, compress)
532 ost = S3PutStream(st, s3, bucket_name, key)
533 for _ in ost.gen():
534 pass
537def _s3_read_stream(s3: S3ClientType, bucket_name: str, key: str) -> Stream:
538 res = S3GetStream(s3, bucket=bucket_name, key=key)
539 _, res = auto_compress_stream(pathlib.Path(key), "decompress", res)
540 return res
543@cli.command()
544@s3_option
545@click.argument("keys", nargs=-1)
546@verbose_option
547def s3_cat(s3: S3ClientType, bucket_name: str, keys: list[str]):
548 """concatinate compressed objects"""
549 for key in keys:
550 for d in _s3_read_stream(s3, bucket_name, key).gen():
551 sys.stdout.buffer.write(d)
554def _data_via_pager(input: Stream):
555 pager_bin = os.getenv("LOG2S3_PAGER", os.getenv("PAGER", "less"))
556 proc = subprocess.Popen(shlex.split(pager_bin), stdin=subprocess.PIPE)
557 assert proc.stdin is not None # for type check
558 for d in input.gen():
559 proc.stdin.write(d)
560 proc.communicate()
563@cli.command()
564@s3_option
565@click.argument("key")
566@verbose_option
567def s3_less(s3: S3ClientType, bucket_name: str, key: str):
568 """view compressed object"""
569 _data_via_pager(_s3_read_stream(s3, bucket_name, key))
572@cli.command()
573@s3_option
574@click.argument("key")
575@click.option("--dry/--wet", help="dry run or wet run", default=False, show_default=True)
576@verbose_option
577def s3_vi(s3: S3ClientType, bucket_name: str, key: str, dry):
578 """edit compressed object and overwrite"""
579 bindata = _s3_read_stream(s3, bucket_name, key).read_all().decode("utf-8")
580 from .compr_stream import stream_ext, RawReadStream
582 _, ext = os.path.splitext(key)
583 if ext in stream_ext:
585 def compress_fn(f: bytes) -> bytes:
586 compress_st = stream_ext[ext][1]
587 return compress_st(RawReadStream(f)).read_all()
588 else:
590 def compress_fn(f: bytes) -> bytes:
591 return f
593 newdata = click.edit(text=bindata)
594 if newdata is not None and newdata != bindata:
595 wr = compress_fn(newdata.encode("utf-8"))
596 if dry:
597 _log.info(
598 "(dry) changed. write back to %s (%d->%d(%d))",
599 key,
600 len(bindata),
601 len(newdata),
602 len(wr),
603 )
604 else:
605 _log.info(
606 "(wet) changed. write back to %s (%d->%d(%d))",
607 key,
608 len(bindata),
609 len(newdata),
610 len(wr),
611 )
612 s3.put_object(Body=wr, Bucket=bucket_name, Key=key)
613 else:
614 _log.info("not changed")
617@cli.command()
618@s3_option
619@click.argument("keys", nargs=-1)
620@verbose_option
621def s3_merge(s3: S3ClientType, bucket_name: str, keys: list[str]):
622 """merge sorted log objects"""
623 input_stream: list[Stream] = []
624 for key in keys:
625 input_stream.append(_s3_read_stream(s3, bucket_name, key))
627 for i in MergeStream(input_stream).text_gen():
628 click.echo(i, nl=False)
631@cli.command()
632@s3_option
633@click.argument("keys", nargs=-1)
634@click.option("--dry/--wet", help="dry run or wet run", default=False, show_default=True)
635@verbose_option
636def s3_del(s3: S3ClientType, bucket_name: str, keys: list[str], dry):
637 """delete objects"""
638 for k in keys:
639 res = s3.head_object(Bucket=bucket_name, Key=k)
640 click.echo("%s %s %s" % (res["LastModified"], res["ContentLength"], k))
641 if dry:
642 _log.info("(dry) delete %s keys", len(keys))
643 else:
644 _log.info("(wet) delete %s keys", len(keys))
645 s3.delete_objects(Bucket=bucket_name, Delete={"Objects": [{"Key": x} for x in keys]})
648@cli.command()
649@s3_option
650@click.argument("keys", nargs=-1)
651@verbose_option
652def s3_head(s3: S3ClientType, bucket_name: str, keys: list[str]):
653 """delete objects"""
654 for k in keys:
655 res = s3.head_object(Bucket=bucket_name, Key=k)
656 click.echo(f"{k} = {res}")
659@cli.command()
660@s3_option
661@verbose_option
662@click.option("--cleanup/--no-cleanup", default=False)
663def s3_list_parts(s3: S3ClientType, bucket_name: str, cleanup):
664 """list in-progress multipart upload"""
665 res = s3.list_multipart_uploads(Bucket=bucket_name)
666 if len(res.get("Uploads", [])) == 0:
667 click.echo("(no in-progress multipart uploads found)")
668 for upl in res.get("Uploads", []):
669 click.echo("%s %s %s" % (upl["Initiated"], upl["UploadId"], upl["Key"]))
670 if cleanup:
671 _log.info("cleanup %s/%s", upl["Key"], upl["UploadId"])
672 s3.abort_multipart_upload(Bucket=bucket_name, Key=upl["Key"], UploadId=upl["UploadId"])
675@cli.command("cat")
676@click.argument(
677 "files",
678 type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True),
679 nargs=-1,
680)
681@verbose_option
682def cat_file(files: list[click.Path]):
683 """concatinate compressed files"""
684 for fn in files:
685 _, data = auto_compress_stream(pathlib.Path(str(fn)), "decompress")
686 for d in data.gen():
687 sys.stdout.buffer.write(d)
690@cli.command("less")
691@click.argument(
692 "filename",
693 type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True),
694)
695@verbose_option
696def view_file(filename: str):
697 """view compressed file"""
698 _, data = auto_compress_stream(pathlib.Path(filename), "decompress")
699 _data_via_pager(data)
702@cli.command("vi")
703@click.argument(
704 "filename",
705 type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True),
706)
707@click.option("--dry/--wet", help="dry run or wet run", default=False, show_default=True)
708@verbose_option
709def edit_file(filename: str, dry):
710 """edit compressed file and overwrite"""
711 from .compr_stream import stream_ext, RawReadStream
713 fname = pathlib.Path(filename)
714 _, data = auto_compress_stream(fname, "decompress")
715 _, ext = os.path.splitext(fname)
716 if ext in stream_ext:
718 def compress_fn(f: bytes) -> bytes:
719 compress_st = stream_ext[ext][1]
720 return compress_st(RawReadStream(f)).read_all()
721 else:
723 def compress_fn(f: bytes) -> bytes:
724 return f
726 bindata = data.read_all().decode("utf-8")
727 newdata = click.edit(text=bindata)
728 if newdata is not None and newdata != bindata:
729 if dry:
730 _log.info("(dry) changed. write back to %s", fname)
731 else:
732 _log.info("(wet) changed. write back to %s", fname)
733 # mode, timestamp will be changed
734 fname.write_bytes(compress_fn(newdata.encode("utf-8")))
735 else:
736 _log.info("not changed")
739@cli.command()
740@click.argument("file")
741@click.option(
742 "--compress",
743 default=None,
744 type=click.Choice(list(set(stream_compress_modes) - {"raw", "decompress"})),
745 help="compress type (default: all)",
746 multiple=True,
747)
748def compress_benchmark(compress, file):
749 """benchmark compress algorithm
751 \b
752 outputs:
753 mode: mode string
754 rate: compression rate (compressed size/original size)
755 compress: compress throuput (original bytes/sec)
756 decompress: decompress throuput (original bytes/sec)
757 """
758 import csv
759 import timeit
760 from .compr_stream import stream_map, stream_compress_modes, RawReadStream, Stream
762 if not compress:
763 compress = stream_compress_modes
764 raw_data = pathlib.Path(file).read_bytes()
766 def bench_comp(input_data: bytes):
767 s1 = RawReadStream(input_data)
768 s2 = comp_st(s1)
769 for _ in s2.gen():
770 pass
772 def bench_decomp(input_data: bytes):
773 s1 = RawReadStream(input_data)
774 s2 = decomp_st(s1)
775 for _ in s2.gen():
776 pass
778 wr = csv.writer(sys.stdout)
779 wr.writerow(["mode", "rate", "compress", "decompress"])
780 for c in compress:
781 mode = c
782 m = stream_map.get(mode)
783 if not m:
784 click.Abort(f"no such compress mode: {mode}")
785 continue
786 comp_st: type[Stream] = m[1]
787 decomp_st: type[Stream] = m[2]
788 compressed_data = comp_st(RawReadStream(raw_data)).read_all()
789 assert raw_data == decomp_st(RawReadStream(compressed_data)).read_all()
790 isz = len(raw_data)
791 csz = len(compressed_data)
792 rate = csz / isz
793 cnum, csec = timeit.Timer(
794 stmt="bench(input_data)",
795 globals={"input_data": raw_data, "comp_st": comp_st, "bench": bench_comp},
796 ).autorange()
797 dnum, dsec = timeit.Timer(
798 stmt="bench(input_data)",
799 globals={
800 "input_data": compressed_data,
801 "decomp_st": decomp_st,
802 "bench": bench_decomp,
803 },
804 ).autorange()
805 wr.writerow([str(x) for x in [mode, rate, isz * cnum / csec, isz * dnum / dsec]])
808@cli.command()
809@verbose_option
810@click.option(
811 "--format",
812 type=click.Choice(["combined", "common", "debug"]),
813 default="combined",
814 show_default=True,
815)
816@click.option("--nth", type=int, default=1, show_default=True, help="parse from n-th '{'")
817@click.argument("file", type=click.Path(exists=True, file_okay=True, dir_okay=False))
818def traefik_json_convert(file, nth, format):
819 """
820 convert traefik access-log(json) to other format
822 \b
823 traefik --accesslog=true --accesslog.format=json \\
824 --accesslog.fields.defaultmode=keep \\
825 --accesslog.fields.headers.defaultmode=keep
826 """
827 from .compr_stream import FileReadStream, auto_compress_stream
828 from collections import defaultdict
830 common = (
831 "%(ClientHost)s - %(ClientUsername)s [%(httptime)s]"
832 ' "%(RequestMethod)s %(RequestPath)s %(RequestProtocol)s"'
833 " %(DownstreamStatus)d %(DownstreamContentSize)s"
834 )
835 combined = (
836 "%(ClientHost)s - %(ClientUsername)s [%(httptime)s]"
837 ' "%(RequestMethod)s %(RequestPath)s %(RequestProtocol)s"'
838 " %(DownstreamStatus)d %(DownstreamContentSize)s"
839 ' "%(request_Referer)s" "%(request_User-Agent)s"'
840 )
841 dateformat = "%d/%b/%Y:%H:%M:%S %z"
842 format_map = {
843 "combined": combined,
844 "common": common,
845 "debug": "%s",
846 }
847 if file in (None, "-"):
848 fp = sys.stdin.buffer
849 path = pathlib.Path("-")
850 ist = FileReadStream(fp)
851 else:
852 path = pathlib.Path(file)
853 ist = None
854 _, ofp = auto_compress_stream(path, "decompress", ist)
855 fmt = format_map.get(format, combined)
856 for line in ofp.text_gen():
857 n = line.split("{", nth)
858 jsonstr = "{" + n[-1]
859 try:
860 jsdata: dict = json.loads(jsonstr)
861 if "time" in jsdata:
862 ts = datetime.datetime.fromisoformat(jsdata.get("time", "")).astimezone()
863 jsdata["httptime"] = ts.strftime(dateformat)
864 click.echo(fmt % defaultdict(lambda: "-", **jsdata))
865 except json.JSONDecodeError:
866 _log.exception("parse error: %s", jsonstr)
869def do_ible1(name: str, fn: click.Command, args: dict, dry: bool):
870 _log.debug("name=%s, fn=%s, args=%s, dry=%s", name, fn, arg_mask(args), dry)
871 _log.info("start %s", name)
872 if dry:
873 _log.info("run(dry): %s %s", fn, arg_mask(args))
874 else:
875 _log.info("run(wet): %s %s", fn, arg_mask(args))
876 assert fn.callback is not None
877 fn.callback(**args)
878 _log.info("end %s", name)
881def convert_ible(data: Union[list[dict], dict]) -> list[dict]:
882 if isinstance(data, dict):
883 d = []
884 _log.debug("convert %s", arg_mask(data))
885 for k, v in data.items():
886 name = v.pop("name", k)
887 allow_fail = v.pop("allow-fail", None)
888 ent = {"name": name, k: v}
889 if allow_fail is not None:
890 ent["allow-fail"] = allow_fail
891 d.append(ent)
892 _log.debug("converted: %s", arg_mask(d))
893 data = d
894 return data
897def arg2arg(fn: click.Command, args: dict, baseparam: dict) -> dict:
898 params = {}
899 for opt in fn.params:
900 if isinstance(opt.envvar, list) and len(opt.envvar) != 0: 900 ↛ 901line 900 didn't jump to line 901 because the condition on line 900 was never true
901 for ev in opt.envvar:
902 if os.getenv(ev):
903 params[opt.name] = os.getenv(ev)
904 break
905 elif isinstance(opt.envvar, str) and opt.envvar and os.getenv(opt.envvar): 905 ↛ 906line 905 didn't jump to line 906 because the condition on line 905 was never true
906 params[opt.name] = os.getenv(opt.envvar)
907 elif opt.default or not opt.required: 907 ↛ 899line 907 didn't jump to line 899 because the condition on line 907 was always true
908 params[opt.name] = opt.default
909 pnames = [x.name for x in fn.params]
910 for name in pnames:
911 if name in args:
912 if args[name] != UNSET: 912 ↛ 910line 912 didn't jump to line 910 because the condition on line 912 was always true
913 params[name] = args[name]
914 elif name in baseparam:
915 params[name] = baseparam[name]
916 _log.debug("arg=%s, base=%s, result=%s", args, baseparam, params)
917 return params
920def ible_gen(
921 data: list[dict],
922) -> Generator[tuple[str, click.Command, dict, dict], None, None]:
923 _log.debug("input: %s", arg_mask(data))
924 if not isinstance(data, list): 924 ↛ 925line 924 didn't jump to line 925 because the condition on line 924 was never true
925 raise Exception(f"invalid list style: {type(data)}")
926 baseparam = {}
927 for v in data:
928 _log.debug("exec %s", arg_mask(v))
929 if not isinstance(v, dict): 929 ↛ 930line 929 didn't jump to line 930 because the condition on line 929 was never true
930 raise Exception(f"invalid dict style: {type(v)}")
931 kw: set[str] = v.keys() - {"name", "allow-fail"}
932 if len(kw) != 1: 932 ↛ 933line 932 didn't jump to line 933 because the condition on line 932 was never true
933 raise Exception(f"invalid command style: {kw}")
934 cmd: str = kw.pop()
935 args: dict = v[cmd]
936 name: str = v.get("name", cmd)
937 if not isinstance(args, dict): 937 ↛ 938line 937 didn't jump to line 938 because the condition on line 937 was never true
938 raise Exception(f"invalid args: {args}")
939 if cmd == "params":
940 baseparam.update(args)
941 continue
942 if cmd not in cli.commands: 942 ↛ 943line 942 didn't jump to line 943 because the condition on line 942 was never true
943 raise Exception(f"invalid command: {cmd} / {cli.commands.keys()}")
944 if cmd.startswith("ible"): 944 ↛ 945line 944 didn't jump to line 945 because the condition on line 944 was never true
945 raise Exception(f"unsupported command: {cmd}")
946 fn = cli.commands[cmd]
947 yield name, fn, arg2arg(fn, args, baseparam), v
950def do_ible(data: list[dict], dry: bool):
951 for name, fn, args, v in ible_gen(data):
952 try:
953 do_ible1(name, fn, args, dry)
954 except Exception as e:
955 if not v.get("allow-fail"): 955 ↛ 956line 955 didn't jump to line 956 because the condition on line 955 was never true
956 _log.exception("error occured. stop")
957 raise
958 _log.info("failed. continue: %s", e)
961def gen_sh(file: str) -> Generator[tuple[Optional[str], list[str]], None, None]:
962 with open(file, "r") as fp:
963 name = None
964 for line in fp:
965 line = line.lstrip().rstrip("\n")
966 if line.startswith("#"):
967 name = line.lstrip()[1:].strip()
968 _log.debug("name: %s", name)
969 continue
970 tokens = list(shlex.shlex(line, punctuation_chars=True))
971 while len(tokens) != 0 and tokens[-1] == "\\": 971 ↛ 972line 971 didn't jump to line 972 because the condition on line 971 was never true
972 try:
973 line += next(fp)
974 except StopIteration:
975 break
976 tokens = list(shlex.shlex(line, punctuation_chars=True))
977 if len(tokens) == 0:
978 continue
979 yield name, tokens
980 name = None
983def sh_line2arg(cmdop: click.Command, tokens: list[str]) -> tuple[dict, bool]:
984 args: dict[str, Union[str, bool]] = {}
985 allow_fail = False
986 if tokens[-2:] == ["||", "true"]:
987 allow_fail = True
988 tokens = tokens[:-2]
989 _log.debug("allow fail: %s", tokens)
990 tks = iter(tokens)
991 for token in tks:
992 if token.startswith("#"): 992 ↛ 994line 992 didn't jump to line 994 because the condition on line 992 was never true
993 # comment
994 break
995 for opt in cmdop.params: 995 ↛ 1012line 995 didn't jump to line 1012 because the loop on line 995 didn't complete
996 assert isinstance(opt.name, str)
997 if token in opt.opts:
998 _log.debug("option %s", opt)
999 if getattr(opt, "is_flag", False):
1000 _log.debug("true: %s", opt)
1001 args[opt.name] = True
1002 else:
1003 val = next(tks)
1004 _log.debug("args: %s", arg_mask({opt.name: val}))
1005 args[opt.name] = val
1006 break
1007 elif token in opt.secondary_opts: 1007 ↛ 1008line 1007 didn't jump to line 1008 because the condition on line 1007 was never true
1008 _log.debug("false: %s", opt)
1009 args[opt.name] = False
1010 break
1011 else:
1012 raise Exception(f"no such option: {token}")
1013 return args, allow_fail
1016def read_sh(file: str) -> list[dict]:
1017 res: list[dict] = []
1018 for name, tokens in gen_sh(file):
1019 if len(tokens) <= 1: 1019 ↛ 1020line 1019 didn't jump to line 1020 because the condition on line 1019 was never true
1020 continue
1021 if tokens[0] != "log2s3":
1022 continue
1023 _log.debug("tokens: %s", tokens[1:])
1024 cmd = tokens[1]
1025 if cmd not in cli.commands: 1025 ↛ 1026line 1025 didn't jump to line 1026 because the condition on line 1025 was never true
1026 raise Exception(f"no such subcommand: {cmd}")
1027 cmdop = cli.commands[cmd]
1028 args, allow_fail = sh_line2arg(cmdop, tokens[2:])
1029 _log.debug("result args: %s", arg_mask(args))
1030 ent = {
1031 "name": name or cmd,
1032 cmd: args,
1033 }
1034 if allow_fail:
1035 ent["allow-fail"] = True
1036 res.append(ent)
1037 name = None
1038 return res
1041def try_read(file: str) -> Union[list[dict], dict]:
1042 try:
1043 import tomllib
1045 with open(file, "rb") as fp:
1046 return tomllib.load(fp)
1047 except ValueError as e:
1048 _log.debug("toml error", exc_info=e)
1049 try:
1050 import yaml
1052 with open(file, "rb") as fp:
1053 return yaml.safe_load(fp)
1054 except (yaml.error.YAMLError, ImportError) as e:
1055 _log.debug("yaml error", exc_info=e)
1056 try:
1057 import json
1059 with open(file, "rb") as fp:
1060 return json.load(fp)
1061 except ValueError as e:
1062 _log.debug("json error", exc_info=e)
1063 try:
1064 return read_sh(file)
1065 except Exception as e:
1066 _log.debug("sh error", exc_info=e)
1067 raise Exception(f"cannot load {file}: unknown filetype")
1070@cli.command()
1071@verbose_option
1072@click.option("--dry/--wet", help="dry run or wet run", default=False, show_default=True)
1073@click.argument("file", type=click.Path(file_okay=True, dir_okay=False, readable=True, exists=True))
1074def ible_playbook(file, dry):
1075 """do log2s3-ible playbook"""
1076 do_ible(convert_ible(try_read(file)), dry)
1079def sh_dump(data, output):
1080 click.echo("#! /bin/sh", file=output)
1081 click.echo("set -eu", file=output)
1082 click.echo("", file=output)
1083 for name, fn, args, v in ible_gen(data):
1084 allow_fail = v.get("allow-fail")
1085 for comment in io.StringIO(name):
1086 click.echo(f"# {comment.rstrip()}", file=output)
1087 fnname: str = fn.name or name
1088 options: list[str] = [fnname]
1089 for k, v in sorted(args.items()):
1090 opt = [x for x in fn.params if x.name == k][0]
1091 if opt.default == v:
1092 continue
1093 if isinstance(v, bool):
1094 if v: 1094 ↛ 1097line 1094 didn't jump to line 1097 because the condition on line 1094 was always true
1095 options.append(opt.opts[0])
1096 else:
1097 options.append(opt.secondary_opts[0])
1098 elif v is not None: 1098 ↛ 1089line 1098 didn't jump to line 1089 because the condition on line 1098 was always true
1099 options.append(opt.opts[0])
1100 options.append(v)
1101 optstr = shlex.join(options)
1102 suffixstr = ""
1103 if allow_fail:
1104 suffixstr = " || true"
1105 click.echo(f"log2s3 {optstr}{suffixstr}", file=output)
1106 click.echo("", file=output)
1109@cli.command()
1110@verbose_option
1111@click.option("--format", type=click.Choice(["yaml", "json", "sh"]))
1112@click.argument("file", type=click.Path(file_okay=True, dir_okay=False, readable=True, exists=True))
1113@click.option("--output", type=click.File("w"), default="-")
1114def ible_convert(file, format, output):
1115 """convert log2s3-ible playbook"""
1116 data: list[dict] = convert_ible(try_read(file))
1117 if format == "yaml": 1117 ↛ 1118line 1117 didn't jump to line 1118 because the condition on line 1117 was never true
1118 import yaml
1120 yaml.dump(data, stream=output, allow_unicode=True)
1121 elif format == "json":
1122 import json
1124 json.dump(data, fp=output, ensure_ascii=False, indent=2)
1125 elif format == "sh": 1125 ↛ 1128line 1125 didn't jump to line 1128 because the condition on line 1125 was always true
1126 sh_dump(data, output)
1127 else:
1128 raise Exception(f"unknown format: {format}")
1131@cli.command()
1132@click.argument("args", nargs=-1)
1133def sh(args):
1134 """execute /bin/sh"""
1135 subprocess.run(["sh", *args])
1138@cli.command()
1139@click.argument("args", nargs=-1)
1140def bash(args):
1141 """execute bash"""
1142 subprocess.run(["bash", *args])
1145@cli.command()
1146@verbose_option
1147@click.option("--host", default="0.0.0.0", show_default=True)
1148@click.option("--port", type=int, default=8000, show_default=True)
1149@click.option("--root", default=".", show_default=True)
1150@click.option("--prefix", default="")
1151@click.option("--debug/--no-debug", default=False, show_default=True)
1152def serve(prefix, root, host, port, debug):
1153 """start viewer"""
1154 from .app import update_config, router
1155 import uvicorn
1156 from fastapi import FastAPI
1158 update_config(
1159 {
1160 "working_dir": root,
1161 }
1162 )
1163 if prefix:
1164 update_config(
1165 {
1166 "prefix": prefix,
1167 }
1168 )
1169 app = FastAPI(debug=debug)
1170 app.include_router(router, prefix=prefix)
1171 uvicorn.run(app, host=host, port=port, log_config=None)
1174if __name__ == "__main__": 1174 ↛ 1175line 1174 didn't jump to line 1175 because the condition on line 1174 was never true
1175 cli()