Coverage for log2s3/main.py: 74%
786 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 12:02 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 12:02 +0000
1from logging import getLogger
2import os
3import sys
4import datetime
5import shlex
6import subprocess
7import functools
8import click
9import json
10import pathlib
11import boto3
12import io
13from click.core import UNSET
14from typing import Union, Generator, Optional
15from .version import VERSION
16from .common_stream import Stream, MergeStream
17from .compr_stream import (
18 S3GetStream,
19 S3PutStream,
20 auto_compress_stream,
21 stream_compress_modes,
22)
24try:
25 from mypy_boto3_s3.client import S3Client as S3ClientType
26except ImportError:
27 from typing import Any as S3ClientType
29_log = getLogger(__name__)
32mask_prefix = ["s3_"]
35def arg_mask(d: Union[list, dict]) -> Union[list, dict]:
36 if isinstance(d, list):
37 return [arg_mask(x) for x in d]
38 if isinstance(d, dict): 38 ↛ 51line 38 didn't jump to line 51 because the condition on line 38 was always true
39 res = d.copy()
40 for p in mask_prefix:
41 for k, v in d.items():
42 if v is None or v == UNSET:
43 res.pop(k)
44 elif isinstance(v, str) and k.startswith(p):
45 res[k] = "*" * len(v)
46 elif isinstance(v, (dict, list)):
47 res[k] = arg_mask(v)
48 elif k.startswith(p): 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true
49 res[k] = "****"
50 return res
51 return d
54@click.group(invoke_without_command=True)
55@click.version_option(VERSION)
56@click.pass_context
57def cli(ctx):
58 if ctx.invoked_subcommand is None: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true
59 print(ctx.get_help())
62def s3_option(func):
63 @click.option("--s3-access-key", envvar="AWS_ACCESS_KEY_ID", help="AWS Access Key")
64 @click.option(
65 "--s3-secret-key", envvar="AWS_SECRET_ACCESS_KEY", help="AWS Secret Key"
66 )
67 @click.option("--s3-region", envvar="AWS_DEFAULT_REGION", help="AWS Region")
68 @click.option(
69 "--s3-endpoint", envvar="AWS_ENDPOINT_URL_S3", help="AWS Endpoint URL for S3"
70 )
71 @click.option("--s3-bucket", envvar="AWS_S3_BUCKET", help="AWS S3 Bucket name")
72 @click.option(
73 "--dotenv/--no-dotenv", default=False, help="load .env for S3 client config"
74 )
75 @functools.wraps(func)
76 def _(
77 s3_endpoint,
78 s3_access_key,
79 s3_secret_key,
80 s3_region,
81 s3_bucket,
82 dotenv,
83 **kwargs,
84 ):
85 if dotenv:
86 from dotenv import load_dotenv
88 load_dotenv()
89 if not s3_bucket: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true
90 s3_bucket = os.getenv("AWS_S3_BUCKET")
91 args = {
92 "aws_access_key_id": s3_access_key,
93 "aws_secret_access_key": s3_secret_key,
94 "region_name": s3_region,
95 "endpoint_url": s3_endpoint,
96 }
97 empty_keys = {k for k, v in args.items() if v is None}
98 for k in empty_keys: 98 ↛ 99line 98 didn't jump to line 99 because the loop on line 98 never started
99 args.pop(k)
100 s3 = boto3.client("s3", **args)
101 return func(s3=s3, bucket_name=s3_bucket, **kwargs)
103 return _
106compress_option = click.option(
107 "--compress",
108 default="gzip",
109 type=click.Choice(stream_compress_modes),
110 help="compress type",
111 show_default=True,
112)
115def filetree_option(func):
116 @click.option(
117 "--top",
118 type=click.Path(dir_okay=True, exists=True, file_okay=False),
119 required=True,
120 help="root directory to find files",
121 )
122 @click.option("--older", help="find older file")
123 @click.option("--newer", help="find newer file")
124 @click.option("--date", help="find date range(YYYY-mm-dd[..YYYY-mm-dd])")
125 @click.option("--bigger", help="find bigger file")
126 @click.option("--smaller", help="find smaller file")
127 @click.option("--glob", help="glob pattern")
128 @click.option(
129 "--dry/--wet", help="dry run or wet run", default=False, show_default=True
130 )
131 @functools.wraps(func)
132 def _(top, older, newer, date, bigger, smaller, glob, dry, **kwargs):
133 config = {
134 "top": top,
135 "older": older,
136 "newer": newer,
137 "date": date,
138 "bigger": bigger,
139 "smaller": smaller,
140 "glob": glob,
141 "dry": dry,
142 }
143 return func(top=pathlib.Path(top), config=config, **kwargs)
145 return _
148def s3tree_option(func):
149 @click.option("--prefix", default="", help="AWS S3 Object Key Prefix")
150 @click.option("--suffix", default="", help="AWS S3 Object Key Suffix")
151 @click.option("--older", help="find older file")
152 @click.option("--newer", help="find newer file")
153 @click.option("--date", help="find date range(YYYY-mm-dd[..YYYY-mm-dd])")
154 @click.option("--bigger", help="find bigger file")
155 @click.option("--smaller", help="find smaller file")
156 @click.option("--glob", help="glob pattern")
157 @functools.wraps(func)
158 def _(prefix, older, newer, date, bigger, smaller, suffix, glob, **kwargs):
159 config = {
160 "top": prefix,
161 "older": older,
162 "newer": newer,
163 "date": date,
164 "bigger": bigger,
165 "smaller": smaller,
166 "suffix": suffix,
167 "glob": glob,
168 }
169 if prefix: 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true
170 return func(top=pathlib.Path(prefix), config=config, **kwargs)
171 return func(top="", config=config, **kwargs)
173 return _
176def verbose_option(func):
177 @click.option("--verbose/--quiet", default=None)
178 @functools.wraps(func)
179 def _(verbose, **kwargs):
180 from logging import basicConfig
182 fmt = "%(asctime)s %(levelname)s %(name)s %(message)s"
183 if verbose is None:
184 basicConfig(level="INFO", format=fmt)
185 elif verbose is False: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true
186 basicConfig(level="WARNING", format=fmt)
187 else:
188 basicConfig(level="DEBUG", format=fmt)
189 return func(**kwargs)
191 return _
194@cli.command()
195@s3_option
196@verbose_option
197def s3_make_bucket(s3: S3ClientType, bucket_name: str):
198 """make S3 buckets"""
199 res = s3.create_bucket(Bucket=bucket_name)
200 click.echo(f"response {res}")
203@cli.command()
204@s3_option
205@verbose_option
206def s3_bucket(s3: S3ClientType, bucket_name: str):
207 """list S3 buckets"""
208 res = s3.list_buckets()
209 _log.debug("response %s", res)
210 for bkt in res.get("Buckets", []): 210 ↛ 211line 210 didn't jump to line 211 because the loop on line 210 never started
211 click.echo("%s %s" % (bkt["CreationDate"], bkt["Name"]))
214def allobjs(s3: S3ClientType, bucket_name: str, prefix: str, marker: str = ""):
215 res = s3.list_objects(Bucket=bucket_name, Prefix=prefix, Marker=marker)
216 ct = res.get("Contents", [])
217 yield from ct
218 if res.get("IsTruncated") and len(ct) != 0:
219 mk = ct[-1].get("Key")
220 if mk: 220 ↛ exitline 220 didn't return from function 'allobjs' because the condition on line 220 was always true
221 yield from allobjs(s3, bucket_name=bucket_name, prefix=prefix, marker=mk)
224def s3obj2stat(obj: dict) -> os.stat_result:
225 ts = obj.get("LastModified", datetime.datetime.now()).timestamp()
226 sz = obj.get("Size", 0)
227 return os.stat_result((0o644, 0, 0, 0, 0, 0, sz, ts, ts, ts))
230def allobjs_conf(s3: S3ClientType, bucket_name: str, prefix: str, config: dict):
231 _log.debug("allobjs: bucket=%s, prefix=%s, config=%s", bucket_name, prefix, config)
232 from .processor import DebugProcessor
234 dummy = DebugProcessor(config)
235 suffix = config.get("suffix", "")
236 objs = allobjs(s3, bucket_name, prefix)
237 return filter(
238 lambda x: x["Key"].endswith(suffix)
239 and dummy.check(pathlib.Path(x["Key"]), s3obj2stat(x)),
240 objs,
241 )
244@cli.command()
245@s3_option
246@s3tree_option
247@verbose_option
248def s3_list(s3: S3ClientType, bucket_name: str, config: dict, top: pathlib.Path):
249 """list S3 objects"""
250 topstr = str(top)
251 if topstr == ".": 251 ↛ 252line 251 didn't jump to line 252 because the condition on line 251 was never true
252 topstr = ""
253 for i in allobjs_conf(s3, bucket_name, topstr.lstrip("/"), config):
254 click.echo("%s %6d %s" % (i["LastModified"], i["Size"], i["Key"]))
257@cli.command()
258@click.option("--summary/--no-summary", "-S", default=False, type=bool)
259@click.option("--pathsep", default="/", show_default=True)
260@s3_option
261@s3tree_option
262@verbose_option
263def s3_du(
264 s3: S3ClientType,
265 bucket_name: str,
266 config: dict,
267 top: pathlib.Path,
268 summary: bool,
269 pathsep: str,
270):
271 """show S3 directory usage"""
272 out = {}
273 for i in allobjs_conf(s3, bucket_name, str(top).lstrip("/"), config):
274 key = i["Key"]
275 ks = key.rsplit(pathsep, 1)
276 dirname = ks[0]
277 sz = i["Size"]
278 if dirname not in out:
279 out[dirname] = [0, 0]
280 out[dirname][0] += 1
281 out[dirname][1] += sz
282 if len(out) == 0:
283 click.echo("(empty result)")
284 return
285 if summary:
286 for korig in list(out.keys()):
287 k = korig
288 while len(k) != 0: 288 ↛ 286line 288 didn't jump to line 286 because the condition on line 288 was always true
289 k0 = k.rsplit(pathsep, 1)
290 if len(k0) == 1:
291 break
292 k = k0[0]
293 if k not in out: 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true
294 out[k] = [0, 0]
295 out[k][0] += out[korig][0]
296 out[k][1] += out[korig][1]
297 click.echo("%10s %5s %s" % ("size", "cnt", "name"))
298 click.echo("----------+-----+-----------------------")
299 for k, v in sorted(out.items(), key=lambda f: f[1][1], reverse=True):
300 click.echo("%10d %5d %s" % (v[1], v[0], k))
303@cli.command()
304@s3_option
305@s3tree_option
306@verbose_option
307@click.option(
308 "--dry/--wet", help="dry run or wet run", default=False, show_default=True
309)
310def s3_delete_by(
311 s3: S3ClientType, bucket_name: str, top: pathlib.Path, config: dict, dry: bool
312):
313 """delete S3 objects"""
314 del_keys = [
315 x["Key"] for x in allobjs_conf(s3, bucket_name, str(top).lstrip("/"), config)
316 ]
317 if len(del_keys) == 0:
318 _log.info("no object found")
319 elif dry:
320 _log.info("(dry)remove objects: %s", del_keys)
321 click.echo(f"(dry)remove {len(del_keys)} objects")
322 else:
323 _log.info("(wet)remove %s objects", len(del_keys))
324 s3.delete_objects(
325 Bucket=bucket_name, Delete={"Objects": [{"Key": x} for x in del_keys]}
326 )
329@cli.command()
330@s3_option
331@filetree_option
332@verbose_option
333@click.option("--prefix", default="", help="AWS S3 Object Prefix")
334@click.option(
335 "--content/--stat", help="diff content or stat", default=False, show_default=True
336)
337def s3_diff(
338 s3: S3ClientType,
339 bucket_name: str,
340 prefix: str,
341 top: pathlib.Path,
342 config: dict,
343 content: bool,
344):
345 """diff S3 and filetree"""
346 all_keys = {
347 pathlib.Path(x["Key"][len(prefix) :]): x
348 for x in allobjs_conf(s3, bucket_name, prefix, config)
349 }
350 from .processor import ListProcessor, process_walk
352 lp = ListProcessor(config)
353 process_walk(top, [lp])
354 files = {k.relative_to(top): v for k, v in lp.output if v is not None}
355 for k in set(all_keys.keys()) - set(files.keys()):
356 click.echo("only-s3: %s: %s" % (k, all_keys[k]))
357 for k in set(files.keys()) - set(all_keys.keys()):
358 click.echo("only-file: %s: %s" % (k, files[k]))
359 for k in set(files.keys()) & set(all_keys.keys()):
360 if files[k].st_size != all_keys[k].get("Size"):
361 click.echo(
362 "size mismatch %s file=%s, obj=%s"
363 % (k, files[k].st_size, all_keys[k]["Size"])
364 )
367@cli.command()
368@s3_option
369@s3tree_option
370@verbose_option
371@compress_option
372@click.option(
373 "--dry/--wet", help="dry run or wet run", default=False, show_default=True
374)
375@click.option(
376 "--keep/--remove", help="keep old file or delete", default=True, show_default=True
377)
378def s3_compress_tree(
379 s3: S3ClientType,
380 bucket_name: str,
381 config: dict,
382 top: pathlib.Path,
383 compress: str,
384 dry: bool,
385 keep: bool,
386):
387 """compress S3 objects"""
388 topstr = str(top)
389 if topstr == ".":
390 topstr = ""
391 for i in allobjs_conf(s3, bucket_name, topstr.lstrip("/"), config):
392 rd = S3GetStream(s3, bucket=bucket_name, key=i["Key"])
393 newname, data = auto_compress_stream(pathlib.Path(i["Key"]), compress, rd)
394 if newname == i["Key"]:
395 _log.debug("do nothing: %s", i["Key"])
396 continue
397 if dry:
398 new_length = sum([len(x) for x in data.gen()])
399 _log.info(
400 "(dry) recompress %s -> %s (%s->%s)",
401 i["Key"],
402 newname,
403 i["Size"],
404 new_length,
405 )
406 else:
407 ps = S3PutStream(data, s3, bucket=bucket_name, key=str(newname))
408 for _ in ps.gen():
409 pass
410 res = s3.head_object(Bucket=bucket_name, Key=str(newname))
411 _log.info(
412 "(wet) recompress %s -> %s (%s->%s)",
413 i["Key"],
414 newname,
415 i["Size"],
416 res["ContentLength"],
417 )
418 if not keep:
419 _log.info("remove old %s (->%s)", i["Key"], newname)
420 s3.delete_object(Bucket=bucket_name, Key=i["Key"])
423@cli.command()
424@filetree_option
425@verbose_option
426def filetree_debug(top: pathlib.Path, config: dict):
427 """(debug command)"""
428 from .processor import DebugProcessor, process_walk
430 proc = [DebugProcessor(config)]
431 process_walk(top, proc)
434@cli.command()
435@filetree_option
436@verbose_option
437def filetree_list(top: pathlib.Path, config: dict):
438 """list files"""
439 from .processor import ListProcessor, process_walk
441 lp = ListProcessor(config)
442 process_walk(top, [lp])
443 click.echo(
444 "%10s %-19s %s %d(+%d) total"
445 % ("size", "mtime", "name", lp.processed, lp.skipped)
446 )
447 click.echo("----------+-------------------+-----------------------")
448 for p, st in lp.output:
449 if st is None: 449 ↛ 450line 449 didn't jump to line 450 because the condition on line 449 was never true
450 tmstr = "unknown"
451 sz = -1
452 else:
453 tmstr = datetime.datetime.fromtimestamp(st.st_mtime).strftime(
454 "%Y-%m-%d %H:%M:%S"
455 )
456 sz = st.st_size
457 click.echo("%10s %19s %s" % (sz, tmstr, p))
460@cli.command()
461@filetree_option
462@compress_option
463@verbose_option
464def filetree_compress(top: pathlib.Path, config: dict, compress):
465 """compress files"""
466 if compress: 466 ↛ 468line 466 didn't jump to line 468 because the condition on line 466 was always true
467 config["compress"] = compress
468 from .processor import CompressProcessor, process_walk
470 cproc = CompressProcessor(config)
471 proc = [cproc]
472 process_walk(top, proc)
473 _log.info(
474 "compressed=%d, skipped=%d, size=%d->%d (%d bytes)",
475 cproc.processed,
476 cproc.skipped,
477 cproc.before_total,
478 cproc.after_total,
479 cproc.before_total - cproc.after_total,
480 )
483@cli.command()
484@filetree_option
485@verbose_option
486def filetree_delete(top: pathlib.Path, config: dict):
487 """remove files"""
488 from .processor import DelProcessor, process_walk
490 proc = [DelProcessor(config)]
491 process_walk(top, proc)
492 _log.info("removed=%d, skipped=%d", proc[0].processed, proc[0].skipped)
495@cli.command()
496@click.argument(
497 "files",
498 type=click.Path(file_okay=True, dir_okay=True, exists=True, readable=True),
499 nargs=-1,
500)
501@verbose_option
502def merge(files: list[click.Path]):
503 """merge sorted log files"""
504 input_stream: list[Stream] = []
505 for fn in files:
506 p = pathlib.Path(str(fn))
507 if p.is_file():
508 _, ch = auto_compress_stream(p, "decompress")
509 input_stream.append(ch)
510 elif p.is_dir(): 510 ↛ 516line 510 didn't jump to line 516 because the condition on line 510 was always true
511 for proot, _, pfiles in p.walk():
512 for pfn in pfiles:
513 _, ch = auto_compress_stream(proot / pfn, "decompress")
514 input_stream.append(ch)
515 else:
516 _log.warning("%s is not a directory or file", p)
518 for i in MergeStream(input_stream).text_gen():
519 click.echo(i, nl=False)
522@cli.command()
523@s3_option
524@click.option("--prefix", default="", help="AWS S3 Object Prefix")
525@filetree_option
526@compress_option
527@verbose_option
528def s3_put_tree(
529 s3: S3ClientType,
530 bucket_name: str,
531 prefix: str,
532 top: pathlib.Path,
533 config: dict,
534 compress,
535):
536 """compress and put log files to S3"""
537 config["s3"] = s3
538 config["s3_bucket"] = bucket_name
539 config["s3_prefix"] = prefix
540 config["skip_names"] = {x["Key"] for x in allobjs(s3, bucket_name, prefix)}
541 config["compress"] = compress
542 from .processor import S3Processor, process_walk
544 proc = [S3Processor(config)]
545 process_walk(top, proc)
546 _log.info(
547 "processed=%d, skipped=%d, upload %d bytes",
548 proc[0].processed,
549 proc[0].skipped,
550 proc[0].uploaded,
551 )
554@cli.command()
555@s3_option
556@click.option("--key", required=True, help="AWS S3 Object Key")
557@click.argument(
558 "filename", type=click.Path(file_okay=True, dir_okay=False, exists=True)
559)
560@compress_option
561@verbose_option
562def s3_put1(s3: S3ClientType, bucket_name: str, key: str, filename: str, compress: str):
563 """put 1 file to S3"""
564 from .compr_stream import S3PutStream, auto_compress_stream
566 input_path = pathlib.Path(filename)
567 _, st = auto_compress_stream(input_path, compress)
568 ost = S3PutStream(st, s3, bucket_name, key)
569 for _ in ost.gen():
570 pass
573def _s3_read_stream(s3: S3ClientType, bucket_name: str, key: str) -> Stream:
574 res = S3GetStream(s3, bucket=bucket_name, key=key)
575 _, res = auto_compress_stream(pathlib.Path(key), "decompress", res)
576 return res
579@cli.command()
580@s3_option
581@click.argument("keys", nargs=-1)
582@verbose_option
583def s3_cat(s3: S3ClientType, bucket_name: str, keys: list[str]):
584 """concatinate compressed objects"""
585 for key in keys:
586 for d in _s3_read_stream(s3, bucket_name, key).gen():
587 sys.stdout.buffer.write(d)
590def _data_via_pager(input: Stream):
591 pager_bin = os.getenv("LOG2S3_PAGER", os.getenv("PAGER", "less"))
592 proc = subprocess.Popen(shlex.split(pager_bin), stdin=subprocess.PIPE)
593 assert proc.stdin is not None # for type check
594 for d in input.gen():
595 proc.stdin.write(d)
596 proc.communicate()
599@cli.command()
600@s3_option
601@click.argument("key")
602@verbose_option
603def s3_less(s3: S3ClientType, bucket_name: str, key: str):
604 """view compressed object"""
605 _data_via_pager(_s3_read_stream(s3, bucket_name, key))
608@cli.command()
609@s3_option
610@click.argument("key")
611@click.option(
612 "--dry/--wet", help="dry run or wet run", default=False, show_default=True
613)
614@verbose_option
615def s3_vi(s3: S3ClientType, bucket_name: str, key: str, dry):
616 """edit compressed object and overwrite"""
617 bindata = _s3_read_stream(s3, bucket_name, key).read_all().decode("utf-8")
618 from .compr_stream import stream_ext, RawReadStream
620 _, ext = os.path.splitext(key)
621 if ext in stream_ext:
623 def compress_fn(f: bytes) -> bytes:
624 compress_st = stream_ext[ext][1]
625 return compress_st(RawReadStream(f)).read_all()
626 else:
628 def compress_fn(f: bytes) -> bytes:
629 return f
631 newdata = click.edit(text=bindata)
632 if newdata is not None and newdata != bindata:
633 wr = compress_fn(newdata.encode("utf-8"))
634 if dry:
635 _log.info(
636 "(dry) changed. write back to %s (%d->%d(%d))",
637 key,
638 len(bindata),
639 len(newdata),
640 len(wr),
641 )
642 else:
643 _log.info(
644 "(wet) changed. write back to %s (%d->%d(%d))",
645 key,
646 len(bindata),
647 len(newdata),
648 len(wr),
649 )
650 s3.put_object(Body=wr, Bucket=bucket_name, Key=key)
651 else:
652 _log.info("not changed")
655@cli.command()
656@s3_option
657@click.argument("keys", nargs=-1)
658@verbose_option
659def s3_merge(s3: S3ClientType, bucket_name: str, keys: list[str]):
660 """merge sorted log objects"""
661 input_stream: list[Stream] = []
662 for key in keys:
663 input_stream.append(_s3_read_stream(s3, bucket_name, key))
665 for i in MergeStream(input_stream).text_gen():
666 click.echo(i, nl=False)
669@cli.command()
670@s3_option
671@click.argument("keys", nargs=-1)
672@click.option(
673 "--dry/--wet", help="dry run or wet run", default=False, show_default=True
674)
675@verbose_option
676def s3_del(s3: S3ClientType, bucket_name: str, keys: list[str], dry):
677 """delete objects"""
678 for k in keys:
679 res = s3.head_object(Bucket=bucket_name, Key=k)
680 click.echo("%s %s %s" % (res["LastModified"], res["ContentLength"], k))
681 if dry:
682 _log.info("(dry) delete %s keys", len(keys))
683 else:
684 _log.info("(wet) delete %s keys", len(keys))
685 s3.delete_objects(
686 Bucket=bucket_name, Delete={"Objects": [{"Key": x} for x in keys]}
687 )
690@cli.command()
691@s3_option
692@click.argument("keys", nargs=-1)
693@verbose_option
694def s3_head(s3: S3ClientType, bucket_name: str, keys: list[str]):
695 """delete objects"""
696 for k in keys:
697 res = s3.head_object(Bucket=bucket_name, Key=k)
698 click.echo(f"{k} = {res}")
701@cli.command()
702@s3_option
703@verbose_option
704@click.option("--cleanup/--no-cleanup", default=False)
705def s3_list_parts(s3: S3ClientType, bucket_name: str, cleanup):
706 """list in-progress multipart upload"""
707 res = s3.list_multipart_uploads(Bucket=bucket_name)
708 if len(res.get("Uploads", [])) == 0:
709 click.echo("(no in-progress multipart uploads found)")
710 for upl in res.get("Uploads", []):
711 click.echo("%s %s %s" % (upl["Initiated"], upl["UploadId"], upl["Key"]))
712 if cleanup:
713 _log.info("cleanup %s/%s", upl["Key"], upl["UploadId"])
714 s3.abort_multipart_upload(
715 Bucket=bucket_name, Key=upl["Key"], UploadId=upl["UploadId"]
716 )
719@cli.command("cat")
720@click.argument(
721 "files",
722 type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True),
723 nargs=-1,
724)
725@verbose_option
726def cat_file(files: list[click.Path]):
727 """concatinate compressed files"""
728 for fn in files:
729 _, data = auto_compress_stream(pathlib.Path(str(fn)), "decompress")
730 for d in data.gen():
731 sys.stdout.buffer.write(d)
734@cli.command("less")
735@click.argument(
736 "filename",
737 type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True),
738)
739@verbose_option
740def view_file(filename: str):
741 """view compressed file"""
742 _, data = auto_compress_stream(pathlib.Path(filename), "decompress")
743 _data_via_pager(data)
746@cli.command("vi")
747@click.argument(
748 "filename",
749 type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True),
750)
751@click.option(
752 "--dry/--wet", help="dry run or wet run", default=False, show_default=True
753)
754@verbose_option
755def edit_file(filename: str, dry):
756 """edit compressed file and overwrite"""
757 from .compr_stream import stream_ext, RawReadStream
759 fname = pathlib.Path(filename)
760 _, data = auto_compress_stream(fname, "decompress")
761 _, ext = os.path.splitext(fname)
762 if ext in stream_ext:
764 def compress_fn(f: bytes) -> bytes:
765 compress_st = stream_ext[ext][1]
766 return compress_st(RawReadStream(f)).read_all()
767 else:
769 def compress_fn(f: bytes) -> bytes:
770 return f
772 bindata = data.read_all().decode("utf-8")
773 newdata = click.edit(text=bindata)
774 if newdata is not None and newdata != bindata:
775 if dry:
776 _log.info("(dry) changed. write back to %s", fname)
777 else:
778 _log.info("(wet) changed. write back to %s", fname)
779 # mode, timestamp will be changed
780 fname.write_bytes(compress_fn(newdata.encode("utf-8")))
781 else:
782 _log.info("not changed")
785@cli.command()
786@click.argument("file")
787@click.option(
788 "--compress",
789 default=None,
790 type=click.Choice(list(set(stream_compress_modes) - {"raw", "decompress"})),
791 help="compress type (default: all)",
792 multiple=True,
793)
794def compress_benchmark(compress, file):
795 """benchmark compress algorithm
797 \b
798 outputs:
799 mode: mode string
800 rate: compression rate (compressed size/original size)
801 compress: compress throuput (original bytes/sec)
802 decompress: decompress throuput (original bytes/sec)
803 """
804 import csv
805 import timeit
806 from .compr_stream import stream_map, stream_compress_modes, RawReadStream, Stream
808 if not compress:
809 compress = stream_compress_modes
810 raw_data = pathlib.Path(file).read_bytes()
812 def bench_comp(input_data: bytes):
813 s1 = RawReadStream(input_data)
814 s2 = comp_st(s1)
815 for _ in s2.gen():
816 pass
818 def bench_decomp(input_data: bytes):
819 s1 = RawReadStream(input_data)
820 s2 = decomp_st(s1)
821 for _ in s2.gen():
822 pass
824 wr = csv.writer(sys.stdout)
825 wr.writerow(["mode", "rate", "compress", "decompress"])
826 for c in compress:
827 mode = c
828 m = stream_map.get(mode)
829 if not m:
830 click.Abort(f"no such compress mode: {mode}")
831 continue
832 comp_st: type[Stream] = m[1]
833 decomp_st: type[Stream] = m[2]
834 compressed_data = comp_st(RawReadStream(raw_data)).read_all()
835 assert raw_data == decomp_st(RawReadStream(compressed_data)).read_all()
836 isz = len(raw_data)
837 csz = len(compressed_data)
838 rate = csz / isz
839 cnum, csec = timeit.Timer(
840 stmt="bench(input_data)",
841 globals={"input_data": raw_data, "comp_st": comp_st, "bench": bench_comp},
842 ).autorange()
843 dnum, dsec = timeit.Timer(
844 stmt="bench(input_data)",
845 globals={
846 "input_data": compressed_data,
847 "decomp_st": decomp_st,
848 "bench": bench_decomp,
849 },
850 ).autorange()
851 wr.writerow(
852 [str(x) for x in [mode, rate, isz * cnum / csec, isz * dnum / dsec]]
853 )
856@cli.command()
857@verbose_option
858@click.option(
859 "--format",
860 type=click.Choice(["combined", "common", "debug"]),
861 default="combined",
862 show_default=True,
863)
864@click.option(
865 "--nth", type=int, default=1, show_default=True, help="parse from n-th '{'"
866)
867@click.argument("file", type=click.Path(exists=True, file_okay=True, dir_okay=False))
868def traefik_json_convert(file, nth, format):
869 """
870 convert traefik access-log(json) to other format
872 \b
873 traefik --accesslog=true --accesslog.format=json \\
874 --accesslog.fields.defaultmode=keep \\
875 --accesslog.fields.headers.defaultmode=keep
876 """
877 from .compr_stream import FileReadStream, auto_compress_stream
878 from collections import defaultdict
880 common = (
881 "%(ClientHost)s - %(ClientUsername)s [%(httptime)s]"
882 ' "%(RequestMethod)s %(RequestPath)s %(RequestProtocol)s"'
883 " %(DownstreamStatus)d %(DownstreamContentSize)s"
884 )
885 combined = (
886 "%(ClientHost)s - %(ClientUsername)s [%(httptime)s]"
887 ' "%(RequestMethod)s %(RequestPath)s %(RequestProtocol)s"'
888 " %(DownstreamStatus)d %(DownstreamContentSize)s"
889 ' "%(request_Referer)s" "%(request_User-Agent)s"'
890 )
891 dateformat = "%d/%b/%Y:%H:%M:%S %z"
892 format_map = {
893 "combined": combined,
894 "common": common,
895 "debug": "%s",
896 }
897 if file in (None, "-"):
898 fp = sys.stdin.buffer
899 path = pathlib.Path("-")
900 ist = FileReadStream(fp)
901 else:
902 path = pathlib.Path(file)
903 ist = None
904 _, ofp = auto_compress_stream(path, "decompress", ist)
905 fmt = format_map.get(format, combined)
906 for line in ofp.text_gen():
907 n = line.split("{", nth)
908 jsonstr = "{" + n[-1]
909 try:
910 jsdata: dict = json.loads(jsonstr)
911 if "time" in jsdata:
912 ts = datetime.datetime.fromisoformat(
913 jsdata.get("time", "")
914 ).astimezone()
915 jsdata["httptime"] = ts.strftime(dateformat)
916 click.echo(fmt % defaultdict(lambda: "-", **jsdata))
917 except json.JSONDecodeError:
918 _log.exception("parse error: %s", jsonstr)
921def do_ible1(name: str, fn: click.Command, args: dict, dry: bool):
922 _log.debug("name=%s, fn=%s, args=%s, dry=%s", name, fn, arg_mask(args), dry)
923 _log.info("start %s", name)
924 if dry:
925 _log.info("run(dry): %s %s", fn, arg_mask(args))
926 else:
927 _log.info("run(wet): %s %s", fn, arg_mask(args))
928 assert fn.callback is not None
929 fn.callback(**args)
930 _log.info("end %s", name)
933def convert_ible(data: Union[list[dict], dict]) -> list[dict]:
934 if isinstance(data, dict):
935 d = []
936 _log.debug("convert %s", arg_mask(data))
937 for k, v in data.items():
938 name = v.pop("name", k)
939 allow_fail = v.pop("allow-fail", None)
940 ent = {"name": name, k: v}
941 if allow_fail is not None:
942 ent["allow-fail"] = allow_fail
943 d.append(ent)
944 _log.debug("converted: %s", arg_mask(d))
945 data = d
946 return data
949def arg2arg(fn: click.Command, args: dict, baseparam: dict) -> dict:
950 params = {}
951 for opt in fn.params:
952 if isinstance(opt.envvar, list) and len(opt.envvar) != 0: 952 ↛ 953line 952 didn't jump to line 953 because the condition on line 952 was never true
953 for ev in opt.envvar:
954 if os.getenv(ev):
955 params[opt.name] = os.getenv(ev)
956 break
957 elif isinstance(opt.envvar, str) and opt.envvar and os.getenv(opt.envvar): 957 ↛ 958line 957 didn't jump to line 958 because the condition on line 957 was never true
958 params[opt.name] = os.getenv(opt.envvar)
959 elif opt.default or not opt.required: 959 ↛ 951line 959 didn't jump to line 951 because the condition on line 959 was always true
960 params[opt.name] = opt.default
961 pnames = [x.name for x in fn.params]
962 for name in pnames:
963 if name in args:
964 if args[name] != UNSET: 964 ↛ 962line 964 didn't jump to line 962 because the condition on line 964 was always true
965 params[name] = args[name]
966 elif name in baseparam:
967 params[name] = baseparam[name]
968 _log.debug("arg=%s, base=%s, result=%s", args, baseparam, params)
969 return params
972def ible_gen(
973 data: list[dict],
974) -> Generator[tuple[str, click.Command, dict, dict], None, None]:
975 _log.debug("input: %s", arg_mask(data))
976 if not isinstance(data, list): 976 ↛ 977line 976 didn't jump to line 977 because the condition on line 976 was never true
977 raise Exception(f"invalid list style: {type(data)}")
978 baseparam = {}
979 for v in data:
980 _log.debug("exec %s", arg_mask(v))
981 if not isinstance(v, dict): 981 ↛ 982line 981 didn't jump to line 982 because the condition on line 981 was never true
982 raise Exception(f"invalid dict style: {type(v)}")
983 kw: set[str] = v.keys() - {"name", "allow-fail"}
984 if len(kw) != 1: 984 ↛ 985line 984 didn't jump to line 985 because the condition on line 984 was never true
985 raise Exception(f"invalid command style: {kw}")
986 cmd: str = kw.pop()
987 args: dict = v[cmd]
988 name: str = v.get("name", cmd)
989 if not isinstance(args, dict): 989 ↛ 990line 989 didn't jump to line 990 because the condition on line 989 was never true
990 raise Exception(f"invalid args: {args}")
991 if cmd == "params":
992 baseparam.update(args)
993 continue
994 if cmd not in cli.commands: 994 ↛ 995line 994 didn't jump to line 995 because the condition on line 994 was never true
995 raise Exception(f"invalid command: {cmd} / {cli.commands.keys()}")
996 if cmd.startswith("ible"): 996 ↛ 997line 996 didn't jump to line 997 because the condition on line 996 was never true
997 raise Exception(f"unsupported command: {cmd}")
998 fn = cli.commands[cmd]
999 yield name, fn, arg2arg(fn, args, baseparam), v
1002def do_ible(data: list[dict], dry: bool):
1003 for name, fn, args, v in ible_gen(data):
1004 try:
1005 do_ible1(name, fn, args, dry)
1006 except Exception as e:
1007 if not v.get("allow-fail"): 1007 ↛ 1008line 1007 didn't jump to line 1008 because the condition on line 1007 was never true
1008 _log.exception("error occured. stop")
1009 raise
1010 _log.info("failed. continue: %s", e)
1013def gen_sh(file: str) -> Generator[tuple[Optional[str], list[str]], None, None]:
1014 with open(file, "r") as fp:
1015 name = None
1016 for line in fp:
1017 line = line.lstrip().rstrip("\n")
1018 if line.startswith("#"):
1019 name = line.lstrip()[1:].strip()
1020 _log.debug("name: %s", name)
1021 continue
1022 tokens = list(shlex.shlex(line, punctuation_chars=True))
1023 while len(tokens) != 0 and tokens[-1] == "\\": 1023 ↛ 1024line 1023 didn't jump to line 1024 because the condition on line 1023 was never true
1024 try:
1025 line += next(fp)
1026 except StopIteration:
1027 break
1028 tokens = list(shlex.shlex(line, punctuation_chars=True))
1029 if len(tokens) == 0:
1030 continue
1031 yield name, tokens
1032 name = None
1035def sh_line2arg(cmdop: click.Command, tokens: list[str]) -> tuple[dict, bool]:
1036 args: dict[str, Union[str, bool]] = {}
1037 allow_fail = False
1038 if tokens[-2:] == ["||", "true"]:
1039 allow_fail = True
1040 tokens = tokens[:-2]
1041 _log.debug("allow fail: %s", tokens)
1042 tks = iter(tokens)
1043 for token in tks:
1044 if token.startswith("#"): 1044 ↛ 1046line 1044 didn't jump to line 1046 because the condition on line 1044 was never true
1045 # comment
1046 break
1047 for opt in cmdop.params: 1047 ↛ 1064line 1047 didn't jump to line 1064 because the loop on line 1047 didn't complete
1048 assert isinstance(opt.name, str)
1049 if token in opt.opts:
1050 _log.debug("option %s", opt)
1051 if getattr(opt, "is_flag", False):
1052 _log.debug("true: %s", opt)
1053 args[opt.name] = True
1054 else:
1055 val = next(tks)
1056 _log.debug("args: %s", arg_mask({opt.name: val}))
1057 args[opt.name] = val
1058 break
1059 elif token in opt.secondary_opts: 1059 ↛ 1060line 1059 didn't jump to line 1060 because the condition on line 1059 was never true
1060 _log.debug("false: %s", opt)
1061 args[opt.name] = False
1062 break
1063 else:
1064 raise Exception(f"no such option: {token}")
1065 return args, allow_fail
1068def read_sh(file: str) -> list[dict]:
1069 res: list[dict] = []
1070 for name, tokens in gen_sh(file):
1071 if len(tokens) <= 1: 1071 ↛ 1072line 1071 didn't jump to line 1072 because the condition on line 1071 was never true
1072 continue
1073 if tokens[0] != "log2s3":
1074 continue
1075 _log.debug("tokens: %s", tokens[1:])
1076 cmd = tokens[1]
1077 if cmd not in cli.commands: 1077 ↛ 1078line 1077 didn't jump to line 1078 because the condition on line 1077 was never true
1078 raise Exception(f"no such subcommand: {cmd}")
1079 cmdop = cli.commands[cmd]
1080 args, allow_fail = sh_line2arg(cmdop, tokens[2:])
1081 _log.debug("result args: %s", arg_mask(args))
1082 ent = {
1083 "name": name or cmd,
1084 cmd: args,
1085 }
1086 if allow_fail:
1087 ent["allow-fail"] = True
1088 res.append(ent)
1089 name = None
1090 return res
1093def try_read(file: str) -> Union[list[dict], dict]:
1094 try:
1095 import tomllib
1097 with open(file, "rb") as fp:
1098 return tomllib.load(fp)
1099 except ValueError as e:
1100 _log.debug("toml error", exc_info=e)
1101 try:
1102 import yaml
1104 with open(file, "rb") as fp:
1105 return yaml.safe_load(fp)
1106 except (yaml.error.YAMLError, ImportError) as e:
1107 _log.debug("yaml error", exc_info=e)
1108 try:
1109 import json
1111 with open(file, "rb") as fp:
1112 return json.load(fp)
1113 except ValueError as e:
1114 _log.debug("json error", exc_info=e)
1115 try:
1116 return read_sh(file)
1117 except Exception as e:
1118 _log.debug("sh error", exc_info=e)
1119 raise Exception(f"cannot load {file}: unknown filetype")
1122@cli.command()
1123@verbose_option
1124@click.option(
1125 "--dry/--wet", help="dry run or wet run", default=False, show_default=True
1126)
1127@click.argument(
1128 "file", type=click.Path(file_okay=True, dir_okay=False, readable=True, exists=True)
1129)
1130def ible_playbook(file, dry):
1131 """do log2s3-ible playbook"""
1132 do_ible(convert_ible(try_read(file)), dry)
1135def sh_dump(data, output):
1136 click.echo("#! /bin/sh", file=output)
1137 click.echo("set -eu", file=output)
1138 click.echo("", file=output)
1139 for name, fn, args, v in ible_gen(data):
1140 allow_fail = v.get("allow-fail")
1141 for comment in io.StringIO(name):
1142 click.echo(f"# {comment.rstrip()}", file=output)
1143 fnname: str = fn.name or name
1144 options: list[str] = [fnname]
1145 for k, v in sorted(args.items()):
1146 opt = [x for x in fn.params if x.name == k][0]
1147 if opt.default == v:
1148 continue
1149 if isinstance(v, bool):
1150 if v: 1150 ↛ 1153line 1150 didn't jump to line 1153 because the condition on line 1150 was always true
1151 options.append(opt.opts[0])
1152 else:
1153 options.append(opt.secondary_opts[0])
1154 elif v is not None: 1154 ↛ 1145line 1154 didn't jump to line 1145 because the condition on line 1154 was always true
1155 options.append(opt.opts[0])
1156 options.append(v)
1157 optstr = shlex.join(options)
1158 suffixstr = ""
1159 if allow_fail:
1160 suffixstr = " || true"
1161 click.echo(f"log2s3 {optstr}{suffixstr}", file=output)
1162 click.echo("", file=output)
1165@cli.command()
1166@verbose_option
1167@click.option("--format", type=click.Choice(["yaml", "json", "sh"]))
1168@click.argument(
1169 "file", type=click.Path(file_okay=True, dir_okay=False, readable=True, exists=True)
1170)
1171@click.option("--output", type=click.File("w"), default="-")
1172def ible_convert(file, format, output):
1173 """convert log2s3-ible playbook"""
1174 data: list[dict] = convert_ible(try_read(file))
1175 if format == "yaml": 1175 ↛ 1176line 1175 didn't jump to line 1176 because the condition on line 1175 was never true
1176 import yaml
1178 yaml.dump(data, stream=output, allow_unicode=True)
1179 elif format == "json":
1180 import json
1182 json.dump(data, fp=output, ensure_ascii=False, indent=2)
1183 elif format == "sh": 1183 ↛ 1186line 1183 didn't jump to line 1186 because the condition on line 1183 was always true
1184 sh_dump(data, output)
1185 else:
1186 raise Exception(f"unknown format: {format}")
1189@cli.command()
1190@click.argument("args", nargs=-1)
1191def sh(args):
1192 """execute /bin/sh"""
1193 subprocess.run(["sh", *args])
1196@cli.command()
1197@click.argument("args", nargs=-1)
1198def bash(args):
1199 """execute bash"""
1200 subprocess.run(["bash", *args])
1203@cli.command()
1204@verbose_option
1205@click.option("--host", default="0.0.0.0", show_default=True)
1206@click.option("--port", type=int, default=8000, show_default=True)
1207@click.option("--root", default=".", show_default=True)
1208@click.option("--prefix", default="")
1209@click.option("--debug/--no-debug", default=False, show_default=True)
1210def serve(prefix, root, host, port, debug):
1211 """start viewer"""
1212 from .app import update_config, router
1213 import uvicorn
1214 from fastapi import FastAPI
1216 update_config(
1217 {
1218 "working_dir": root,
1219 }
1220 )
1221 if prefix:
1222 update_config(
1223 {
1224 "prefix": prefix,
1225 }
1226 )
1227 app = FastAPI(debug=debug)
1228 app.include_router(router, prefix=prefix)
1229 uvicorn.run(app, host=host, port=port, log_config=None)
1232if __name__ == "__main__": 1232 ↛ 1233line 1232 didn't jump to line 1233 because the condition on line 1232 was never true
1233 cli()