Coverage for log2s3 / main.py: 74%

786 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-20 04:42 +0000

1from logging import getLogger 

2import os 

3import sys 

4import datetime 

5import shlex 

6import subprocess 

7import functools 

8import click 

9import json 

10import pathlib 

11import boto3 

12import io 

13from click.core import UNSET 

14from typing import Union, Generator, Optional 

15from .version import VERSION 

16from .common_stream import Stream, MergeStream 

17from .compr_stream import ( 

18 S3GetStream, 

19 S3PutStream, 

20 auto_compress_stream, 

21 stream_compress_modes, 

22) 

23 

24try: 

25 from mypy_boto3_s3.client import S3Client as S3ClientType 

26except ImportError: 

27 from typing import Any as S3ClientType 

28 

29_log = getLogger(__name__) 

30 

31 

32mask_prefix = ["s3_"] 

33 

34 

35def arg_mask(d: Union[list, dict]) -> Union[list, dict]: 

36 if isinstance(d, list): 

37 return [arg_mask(x) for x in d] 

38 if isinstance(d, dict): 38 ↛ 51line 38 didn't jump to line 51 because the condition on line 38 was always true

39 res = d.copy() 

40 for p in mask_prefix: 

41 for k, v in d.items(): 

42 if v is None or v == UNSET: 

43 res.pop(k) 

44 elif isinstance(v, str) and k.startswith(p): 

45 res[k] = "*" * len(v) 

46 elif isinstance(v, (dict, list)): 

47 res[k] = arg_mask(v) 

48 elif k.startswith(p): 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true

49 res[k] = "****" 

50 return res 

51 return d 

52 

53 

54@click.group(invoke_without_command=True) 

55@click.version_option(VERSION) 

56@click.pass_context 

57def cli(ctx): 

58 if ctx.invoked_subcommand is None: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true

59 print(ctx.get_help()) 

60 

61 

62def s3_option(func): 

63 @click.option("--s3-access-key", envvar="AWS_ACCESS_KEY_ID", help="AWS Access Key") 

64 @click.option("--s3-secret-key", envvar="AWS_SECRET_ACCESS_KEY", help="AWS Secret Key") 

65 @click.option("--s3-region", envvar="AWS_DEFAULT_REGION", help="AWS Region") 

66 @click.option("--s3-endpoint", envvar="AWS_ENDPOINT_URL_S3", help="AWS Endpoint URL for S3") 

67 @click.option("--s3-bucket", envvar="AWS_S3_BUCKET", help="AWS S3 Bucket name") 

68 @click.option("--dotenv/--no-dotenv", default=False, help="load .env for S3 client config") 

69 @functools.wraps(func) 

70 def _( 

71 s3_endpoint, 

72 s3_access_key, 

73 s3_secret_key, 

74 s3_region, 

75 s3_bucket, 

76 dotenv, 

77 **kwargs, 

78 ): 

79 if dotenv: 

80 from dotenv import load_dotenv 

81 

82 load_dotenv() 

83 if not s3_bucket: 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true

84 s3_bucket = os.getenv("AWS_S3_BUCKET") 

85 args = { 

86 "aws_access_key_id": s3_access_key, 

87 "aws_secret_access_key": s3_secret_key, 

88 "region_name": s3_region, 

89 "endpoint_url": s3_endpoint, 

90 } 

91 empty_keys = {k for k, v in args.items() if v is None or v == UNSET} 

92 for k in empty_keys: 

93 args.pop(k) 

94 s3 = boto3.client("s3", **args) 

95 return func(s3=s3, bucket_name=s3_bucket, **kwargs) 

96 

97 return _ 

98 

99 

100compress_option = click.option( 

101 "--compress", 

102 default="gzip", 

103 type=click.Choice(stream_compress_modes), 

104 help="compress type", 

105 show_default=True, 

106) 

107 

108 

109def filetree_option(func): 

110 @click.option( 

111 "--top", 

112 type=click.Path(dir_okay=True, exists=True, file_okay=False), 

113 required=True, 

114 help="root directory to find files", 

115 ) 

116 @click.option("--older", help="find older file") 

117 @click.option("--newer", help="find newer file") 

118 @click.option("--date", help="find date range(YYYY-mm-dd[..YYYY-mm-dd])") 

119 @click.option("--bigger", help="find bigger file") 

120 @click.option("--smaller", help="find smaller file") 

121 @click.option("--glob", help="glob pattern") 

122 @click.option("--dry/--wet", help="dry run or wet run", default=False, show_default=True) 

123 @functools.wraps(func) 

124 def _(top, older, newer, date, bigger, smaller, glob, dry, **kwargs): 

125 config = { 

126 "top": top, 

127 "older": older, 

128 "newer": newer, 

129 "date": date, 

130 "bigger": bigger, 

131 "smaller": smaller, 

132 "glob": glob, 

133 "dry": dry, 

134 } 

135 return func(top=pathlib.Path(top), config=config, **kwargs) 

136 

137 return _ 

138 

139 

140def s3tree_option(func): 

141 @click.option("--prefix", default="", help="AWS S3 Object Key Prefix") 

142 @click.option("--suffix", default="", help="AWS S3 Object Key Suffix") 

143 @click.option("--older", help="find older file") 

144 @click.option("--newer", help="find newer file") 

145 @click.option("--date", help="find date range(YYYY-mm-dd[..YYYY-mm-dd])") 

146 @click.option("--bigger", help="find bigger file") 

147 @click.option("--smaller", help="find smaller file") 

148 @click.option("--glob", help="glob pattern") 

149 @functools.wraps(func) 

150 def _(prefix, older, newer, date, bigger, smaller, suffix, glob, **kwargs): 

151 config = { 

152 "top": prefix, 

153 "older": older, 

154 "newer": newer, 

155 "date": date, 

156 "bigger": bigger, 

157 "smaller": smaller, 

158 "suffix": suffix, 

159 "glob": glob, 

160 } 

161 if prefix: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true

162 return func(top=pathlib.Path(prefix), config=config, **kwargs) 

163 return func(top="", config=config, **kwargs) 

164 

165 return _ 

166 

167 

168def verbose_option(func): 

169 @click.option("--verbose/--quiet", default=None) 

170 @functools.wraps(func) 

171 def _(verbose, **kwargs): 

172 from logging import basicConfig 

173 

174 fmt = "%(asctime)s %(levelname)s %(name)s %(message)s" 

175 if verbose is None: 

176 basicConfig(level="INFO", format=fmt) 

177 elif verbose is False: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true

178 basicConfig(level="WARNING", format=fmt) 

179 else: 

180 basicConfig(level="DEBUG", format=fmt) 

181 return func(**kwargs) 

182 

183 return _ 

184 

185 

186@cli.command() 

187@s3_option 

188@verbose_option 

189def s3_make_bucket(s3: S3ClientType, bucket_name: str): 

190 """make S3 buckets""" 

191 res = s3.create_bucket(Bucket=bucket_name) 

192 click.echo(f"response {res}") 

193 

194 

195@cli.command() 

196@s3_option 

197@verbose_option 

198def s3_bucket(s3: S3ClientType, bucket_name: str): 

199 """list S3 buckets""" 

200 res = s3.list_buckets() 

201 _log.debug("response %s", res) 

202 for bkt in res.get("Buckets", []): 202 ↛ 203line 202 didn't jump to line 203 because the loop on line 202 never started

203 click.echo("%s %s" % (bkt["CreationDate"], bkt["Name"])) 

204 

205 

206def allobjs(s3: S3ClientType, bucket_name: str, prefix: str, marker: str = ""): 

207 res = s3.list_objects(Bucket=bucket_name, Prefix=prefix, Marker=marker) 

208 ct = res.get("Contents", []) 

209 yield from ct 

210 if res.get("IsTruncated") and len(ct) != 0: 

211 mk = ct[-1].get("Key") 

212 if mk: 212 ↛ exitline 212 didn't return from function 'allobjs' because the condition on line 212 was always true

213 yield from allobjs(s3, bucket_name=bucket_name, prefix=prefix, marker=mk) 

214 

215 

216def s3obj2stat(obj: dict) -> os.stat_result: 

217 ts = obj.get("LastModified", datetime.datetime.now()).timestamp() 

218 sz = obj.get("Size", 0) 

219 return os.stat_result((0o644, 0, 0, 0, 0, 0, sz, ts, ts, ts)) 

220 

221 

222def allobjs_conf(s3: S3ClientType, bucket_name: str, prefix: str, config: dict): 

223 _log.debug("allobjs: bucket=%s, prefix=%s, config=%s", bucket_name, prefix, config) 

224 from .processor import DebugProcessor 

225 

226 dummy = DebugProcessor(config) 

227 suffix = config.get("suffix", "") 

228 objs = allobjs(s3, bucket_name, prefix) 

229 return filter( 

230 lambda x: x["Key"].endswith(suffix) and dummy.check(pathlib.Path(x["Key"]), s3obj2stat(x)), 

231 objs, 

232 ) 

233 

234 

235@cli.command() 

236@s3_option 

237@s3tree_option 

238@verbose_option 

239def s3_list(s3: S3ClientType, bucket_name: str, config: dict, top: pathlib.Path): 

240 """list S3 objects""" 

241 topstr = str(top) 

242 if topstr == ".": 242 ↛ 243line 242 didn't jump to line 243 because the condition on line 242 was never true

243 topstr = "" 

244 for i in allobjs_conf(s3, bucket_name, topstr.lstrip("/"), config): 

245 click.echo("%s %6d %s" % (i["LastModified"], i["Size"], i["Key"])) 

246 

247 

248@cli.command() 

249@click.option("--summary/--no-summary", "-S", default=False, type=bool) 

250@click.option("--pathsep", default="/", show_default=True) 

251@s3_option 

252@s3tree_option 

253@verbose_option 

254def s3_du( 

255 s3: S3ClientType, 

256 bucket_name: str, 

257 config: dict, 

258 top: pathlib.Path, 

259 summary: bool, 

260 pathsep: str, 

261): 

262 """show S3 directory usage""" 

263 out = {} 

264 for i in allobjs_conf(s3, bucket_name, str(top).lstrip("/"), config): 

265 key = i["Key"] 

266 ks = key.rsplit(pathsep, 1) 

267 dirname = ks[0] 

268 sz = i["Size"] 

269 if dirname not in out: 

270 out[dirname] = [0, 0] 

271 out[dirname][0] += 1 

272 out[dirname][1] += sz 

273 if len(out) == 0: 

274 click.echo("(empty result)") 

275 return 

276 if summary: 

277 for korig in list(out.keys()): 

278 k = korig 

279 while len(k) != 0: 279 ↛ 277line 279 didn't jump to line 277 because the condition on line 279 was always true

280 k0 = k.rsplit(pathsep, 1) 

281 if len(k0) == 1: 

282 break 

283 k = k0[0] 

284 if k not in out: 284 ↛ 285line 284 didn't jump to line 285 because the condition on line 284 was never true

285 out[k] = [0, 0] 

286 out[k][0] += out[korig][0] 

287 out[k][1] += out[korig][1] 

288 click.echo("%10s %5s %s" % ("size", "cnt", "name")) 

289 click.echo("----------+-----+-----------------------") 

290 for k, v in sorted(out.items(), key=lambda f: f[1][1], reverse=True): 

291 click.echo("%10d %5d %s" % (v[1], v[0], k)) 

292 

293 

294@cli.command() 

295@s3_option 

296@s3tree_option 

297@verbose_option 

298@click.option("--dry/--wet", help="dry run or wet run", default=False, show_default=True) 

299def s3_delete_by(s3: S3ClientType, bucket_name: str, top: pathlib.Path, config: dict, dry: bool): 

300 """delete S3 objects""" 

301 del_keys = [x["Key"] for x in allobjs_conf(s3, bucket_name, str(top).lstrip("/"), config)] 

302 if len(del_keys) == 0: 

303 _log.info("no object found") 

304 elif dry: 

305 _log.info("(dry)remove objects: %s", del_keys) 

306 click.echo(f"(dry)remove {len(del_keys)} objects") 

307 else: 

308 _log.info("(wet)remove %s objects", len(del_keys)) 

309 s3.delete_objects(Bucket=bucket_name, Delete={"Objects": [{"Key": x} for x in del_keys]}) 

310 

311 

312@cli.command() 

313@s3_option 

314@filetree_option 

315@verbose_option 

316@click.option("--prefix", default="", help="AWS S3 Object Prefix") 

317@click.option("--content/--stat", help="diff content or stat", default=False, show_default=True) 

318def s3_diff( 

319 s3: S3ClientType, 

320 bucket_name: str, 

321 prefix: str, 

322 top: pathlib.Path, 

323 config: dict, 

324 content: bool, 

325): 

326 """diff S3 and filetree""" 

327 all_keys = {pathlib.Path(x["Key"][len(prefix) :]): x for x in allobjs_conf(s3, bucket_name, prefix, config)} 

328 from .processor import ListProcessor, process_walk 

329 

330 lp = ListProcessor(config) 

331 process_walk(top, [lp]) 

332 files = {k.relative_to(top): v for k, v in lp.output if v is not None} 

333 for k in set(all_keys.keys()) - set(files.keys()): 

334 click.echo("only-s3: %s: %s" % (k, all_keys[k])) 

335 for k in set(files.keys()) - set(all_keys.keys()): 

336 click.echo("only-file: %s: %s" % (k, files[k])) 

337 for k in set(files.keys()) & set(all_keys.keys()): 

338 if files[k].st_size != all_keys[k].get("Size"): 

339 click.echo("size mismatch %s file=%s, obj=%s" % (k, files[k].st_size, all_keys[k]["Size"])) 

340 

341 

342@cli.command() 

343@s3_option 

344@s3tree_option 

345@verbose_option 

346@compress_option 

347@click.option("--dry/--wet", help="dry run or wet run", default=False, show_default=True) 

348@click.option("--keep/--remove", help="keep old file or delete", default=True, show_default=True) 

349def s3_compress_tree( 

350 s3: S3ClientType, 

351 bucket_name: str, 

352 config: dict, 

353 top: pathlib.Path, 

354 compress: str, 

355 dry: bool, 

356 keep: bool, 

357): 

358 """compress S3 objects""" 

359 topstr = str(top) 

360 if topstr == ".": 

361 topstr = "" 

362 for i in allobjs_conf(s3, bucket_name, topstr.lstrip("/"), config): 

363 rd = S3GetStream(s3, bucket=bucket_name, key=i["Key"]) 

364 newname, data = auto_compress_stream(pathlib.Path(i["Key"]), compress, rd) 

365 if newname == i["Key"]: 

366 _log.debug("do nothing: %s", i["Key"]) 

367 continue 

368 if dry: 

369 new_length = sum([len(x) for x in data.gen()]) 

370 _log.info( 

371 "(dry) recompress %s -> %s (%s->%s)", 

372 i["Key"], 

373 newname, 

374 i["Size"], 

375 new_length, 

376 ) 

377 else: 

378 ps = S3PutStream(data, s3, bucket=bucket_name, key=str(newname)) 

379 for _ in ps.gen(): 

380 pass 

381 res = s3.head_object(Bucket=bucket_name, Key=str(newname)) 

382 _log.info( 

383 "(wet) recompress %s -> %s (%s->%s)", 

384 i["Key"], 

385 newname, 

386 i["Size"], 

387 res["ContentLength"], 

388 ) 

389 if not keep: 

390 _log.info("remove old %s (->%s)", i["Key"], newname) 

391 s3.delete_object(Bucket=bucket_name, Key=i["Key"]) 

392 

393 

394@cli.command() 

395@filetree_option 

396@verbose_option 

397def filetree_debug(top: pathlib.Path, config: dict): 

398 """(debug command)""" 

399 from .processor import DebugProcessor, process_walk 

400 

401 proc = [DebugProcessor(config)] 

402 process_walk(top, proc) 

403 

404 

405@cli.command() 

406@filetree_option 

407@verbose_option 

408def filetree_list(top: pathlib.Path, config: dict): 

409 """list files""" 

410 from .processor import ListProcessor, process_walk 

411 

412 lp = ListProcessor(config) 

413 process_walk(top, [lp]) 

414 click.echo("%10s %-19s %s %d(+%d) total" % ("size", "mtime", "name", lp.processed, lp.skipped)) 

415 click.echo("----------+-------------------+-----------------------") 

416 for p, st in lp.output: 

417 if st is None: 417 ↛ 418line 417 didn't jump to line 418 because the condition on line 417 was never true

418 tmstr = "unknown" 

419 sz = -1 

420 else: 

421 tmstr = datetime.datetime.fromtimestamp(st.st_mtime).strftime("%Y-%m-%d %H:%M:%S") 

422 sz = st.st_size 

423 click.echo("%10s %19s %s" % (sz, tmstr, p)) 

424 

425 

426@cli.command() 

427@filetree_option 

428@compress_option 

429@verbose_option 

430def filetree_compress(top: pathlib.Path, config: dict, compress): 

431 """compress files""" 

432 if compress: 432 ↛ 434line 432 didn't jump to line 434 because the condition on line 432 was always true

433 config["compress"] = compress 

434 from .processor import CompressProcessor, process_walk 

435 

436 cproc = CompressProcessor(config) 

437 proc = [cproc] 

438 process_walk(top, proc) 

439 _log.info( 

440 "compressed=%d, skipped=%d, size=%d->%d (%d bytes)", 

441 cproc.processed, 

442 cproc.skipped, 

443 cproc.before_total, 

444 cproc.after_total, 

445 cproc.before_total - cproc.after_total, 

446 ) 

447 

448 

449@cli.command() 

450@filetree_option 

451@verbose_option 

452def filetree_delete(top: pathlib.Path, config: dict): 

453 """remove files""" 

454 from .processor import DelProcessor, process_walk 

455 

456 proc = [DelProcessor(config)] 

457 process_walk(top, proc) 

458 _log.info("removed=%d, skipped=%d", proc[0].processed, proc[0].skipped) 

459 

460 

461@cli.command() 

462@click.argument( 

463 "files", 

464 type=click.Path(file_okay=True, dir_okay=True, exists=True, readable=True), 

465 nargs=-1, 

466) 

467@verbose_option 

468def merge(files: list[click.Path]): 

469 """merge sorted log files""" 

470 input_stream: list[Stream] = [] 

471 for fn in files: 

472 p = pathlib.Path(str(fn)) 

473 if p.is_file(): 

474 _, ch = auto_compress_stream(p, "decompress") 

475 input_stream.append(ch) 

476 elif p.is_dir(): 476 ↛ 482line 476 didn't jump to line 482 because the condition on line 476 was always true

477 for proot, _, pfiles in p.walk(): 

478 for pfn in pfiles: 

479 _, ch = auto_compress_stream(proot / pfn, "decompress") 

480 input_stream.append(ch) 

481 else: 

482 _log.warning("%s is not a directory or file", p) 

483 

484 for i in MergeStream(input_stream).text_gen(): 

485 click.echo(i, nl=False) 

486 

487 

488@cli.command() 

489@s3_option 

490@click.option("--prefix", default="", help="AWS S3 Object Prefix") 

491@filetree_option 

492@compress_option 

493@verbose_option 

494def s3_put_tree( 

495 s3: S3ClientType, 

496 bucket_name: str, 

497 prefix: str, 

498 top: pathlib.Path, 

499 config: dict, 

500 compress, 

501): 

502 """compress and put log files to S3""" 

503 config["s3"] = s3 

504 config["s3_bucket"] = bucket_name 

505 config["s3_prefix"] = prefix 

506 config["skip_names"] = {x["Key"] for x in allobjs(s3, bucket_name, prefix)} 

507 config["compress"] = compress 

508 from .processor import S3Processor, process_walk 

509 

510 proc = [S3Processor(config)] 

511 process_walk(top, proc) 

512 _log.info( 

513 "processed=%d, skipped=%d, upload %d bytes", 

514 proc[0].processed, 

515 proc[0].skipped, 

516 proc[0].uploaded, 

517 ) 

518 

519 

520@cli.command() 

521@s3_option 

522@click.option("--key", required=True, help="AWS S3 Object Key") 

523@click.argument("filename", type=click.Path(file_okay=True, dir_okay=False, exists=True)) 

524@compress_option 

525@verbose_option 

526def s3_put1(s3: S3ClientType, bucket_name: str, key: str, filename: str, compress: str): 

527 """put 1 file to S3""" 

528 from .compr_stream import S3PutStream, auto_compress_stream 

529 

530 input_path = pathlib.Path(filename) 

531 _, st = auto_compress_stream(input_path, compress) 

532 ost = S3PutStream(st, s3, bucket_name, key) 

533 for _ in ost.gen(): 

534 pass 

535 

536 

537def _s3_read_stream(s3: S3ClientType, bucket_name: str, key: str) -> Stream: 

538 res = S3GetStream(s3, bucket=bucket_name, key=key) 

539 _, res = auto_compress_stream(pathlib.Path(key), "decompress", res) 

540 return res 

541 

542 

543@cli.command() 

544@s3_option 

545@click.argument("keys", nargs=-1) 

546@verbose_option 

547def s3_cat(s3: S3ClientType, bucket_name: str, keys: list[str]): 

548 """concatinate compressed objects""" 

549 for key in keys: 

550 for d in _s3_read_stream(s3, bucket_name, key).gen(): 

551 sys.stdout.buffer.write(d) 

552 

553 

554def _data_via_pager(input: Stream): 

555 pager_bin = os.getenv("LOG2S3_PAGER", os.getenv("PAGER", "less")) 

556 proc = subprocess.Popen(shlex.split(pager_bin), stdin=subprocess.PIPE) 

557 assert proc.stdin is not None # for type check 

558 for d in input.gen(): 

559 proc.stdin.write(d) 

560 proc.communicate() 

561 

562 

563@cli.command() 

564@s3_option 

565@click.argument("key") 

566@verbose_option 

567def s3_less(s3: S3ClientType, bucket_name: str, key: str): 

568 """view compressed object""" 

569 _data_via_pager(_s3_read_stream(s3, bucket_name, key)) 

570 

571 

572@cli.command() 

573@s3_option 

574@click.argument("key") 

575@click.option("--dry/--wet", help="dry run or wet run", default=False, show_default=True) 

576@verbose_option 

577def s3_vi(s3: S3ClientType, bucket_name: str, key: str, dry): 

578 """edit compressed object and overwrite""" 

579 bindata = _s3_read_stream(s3, bucket_name, key).read_all().decode("utf-8") 

580 from .compr_stream import stream_ext, RawReadStream 

581 

582 _, ext = os.path.splitext(key) 

583 if ext in stream_ext: 

584 

585 def compress_fn(f: bytes) -> bytes: 

586 compress_st = stream_ext[ext][1] 

587 return compress_st(RawReadStream(f)).read_all() 

588 else: 

589 

590 def compress_fn(f: bytes) -> bytes: 

591 return f 

592 

593 newdata = click.edit(text=bindata) 

594 if newdata is not None and newdata != bindata: 

595 wr = compress_fn(newdata.encode("utf-8")) 

596 if dry: 

597 _log.info( 

598 "(dry) changed. write back to %s (%d->%d(%d))", 

599 key, 

600 len(bindata), 

601 len(newdata), 

602 len(wr), 

603 ) 

604 else: 

605 _log.info( 

606 "(wet) changed. write back to %s (%d->%d(%d))", 

607 key, 

608 len(bindata), 

609 len(newdata), 

610 len(wr), 

611 ) 

612 s3.put_object(Body=wr, Bucket=bucket_name, Key=key) 

613 else: 

614 _log.info("not changed") 

615 

616 

617@cli.command() 

618@s3_option 

619@click.argument("keys", nargs=-1) 

620@verbose_option 

621def s3_merge(s3: S3ClientType, bucket_name: str, keys: list[str]): 

622 """merge sorted log objects""" 

623 input_stream: list[Stream] = [] 

624 for key in keys: 

625 input_stream.append(_s3_read_stream(s3, bucket_name, key)) 

626 

627 for i in MergeStream(input_stream).text_gen(): 

628 click.echo(i, nl=False) 

629 

630 

631@cli.command() 

632@s3_option 

633@click.argument("keys", nargs=-1) 

634@click.option("--dry/--wet", help="dry run or wet run", default=False, show_default=True) 

635@verbose_option 

636def s3_del(s3: S3ClientType, bucket_name: str, keys: list[str], dry): 

637 """delete objects""" 

638 for k in keys: 

639 res = s3.head_object(Bucket=bucket_name, Key=k) 

640 click.echo("%s %s %s" % (res["LastModified"], res["ContentLength"], k)) 

641 if dry: 

642 _log.info("(dry) delete %s keys", len(keys)) 

643 else: 

644 _log.info("(wet) delete %s keys", len(keys)) 

645 s3.delete_objects(Bucket=bucket_name, Delete={"Objects": [{"Key": x} for x in keys]}) 

646 

647 

648@cli.command() 

649@s3_option 

650@click.argument("keys", nargs=-1) 

651@verbose_option 

652def s3_head(s3: S3ClientType, bucket_name: str, keys: list[str]): 

653 """delete objects""" 

654 for k in keys: 

655 res = s3.head_object(Bucket=bucket_name, Key=k) 

656 click.echo(f"{k} = {res}") 

657 

658 

659@cli.command() 

660@s3_option 

661@verbose_option 

662@click.option("--cleanup/--no-cleanup", default=False) 

663def s3_list_parts(s3: S3ClientType, bucket_name: str, cleanup): 

664 """list in-progress multipart upload""" 

665 res = s3.list_multipart_uploads(Bucket=bucket_name) 

666 if len(res.get("Uploads", [])) == 0: 

667 click.echo("(no in-progress multipart uploads found)") 

668 for upl in res.get("Uploads", []): 

669 click.echo("%s %s %s" % (upl["Initiated"], upl["UploadId"], upl["Key"])) 

670 if cleanup: 

671 _log.info("cleanup %s/%s", upl["Key"], upl["UploadId"]) 

672 s3.abort_multipart_upload(Bucket=bucket_name, Key=upl["Key"], UploadId=upl["UploadId"]) 

673 

674 

675@cli.command("cat") 

676@click.argument( 

677 "files", 

678 type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True), 

679 nargs=-1, 

680) 

681@verbose_option 

682def cat_file(files: list[click.Path]): 

683 """concatinate compressed files""" 

684 for fn in files: 

685 _, data = auto_compress_stream(pathlib.Path(str(fn)), "decompress") 

686 for d in data.gen(): 

687 sys.stdout.buffer.write(d) 

688 

689 

690@cli.command("less") 

691@click.argument( 

692 "filename", 

693 type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True), 

694) 

695@verbose_option 

696def view_file(filename: str): 

697 """view compressed file""" 

698 _, data = auto_compress_stream(pathlib.Path(filename), "decompress") 

699 _data_via_pager(data) 

700 

701 

702@cli.command("vi") 

703@click.argument( 

704 "filename", 

705 type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True), 

706) 

707@click.option("--dry/--wet", help="dry run or wet run", default=False, show_default=True) 

708@verbose_option 

709def edit_file(filename: str, dry): 

710 """edit compressed file and overwrite""" 

711 from .compr_stream import stream_ext, RawReadStream 

712 

713 fname = pathlib.Path(filename) 

714 _, data = auto_compress_stream(fname, "decompress") 

715 _, ext = os.path.splitext(fname) 

716 if ext in stream_ext: 

717 

718 def compress_fn(f: bytes) -> bytes: 

719 compress_st = stream_ext[ext][1] 

720 return compress_st(RawReadStream(f)).read_all() 

721 else: 

722 

723 def compress_fn(f: bytes) -> bytes: 

724 return f 

725 

726 bindata = data.read_all().decode("utf-8") 

727 newdata = click.edit(text=bindata) 

728 if newdata is not None and newdata != bindata: 

729 if dry: 

730 _log.info("(dry) changed. write back to %s", fname) 

731 else: 

732 _log.info("(wet) changed. write back to %s", fname) 

733 # mode, timestamp will be changed 

734 fname.write_bytes(compress_fn(newdata.encode("utf-8"))) 

735 else: 

736 _log.info("not changed") 

737 

738 

739@cli.command() 

740@click.argument("file") 

741@click.option( 

742 "--compress", 

743 default=None, 

744 type=click.Choice(list(set(stream_compress_modes) - {"raw", "decompress"})), 

745 help="compress type (default: all)", 

746 multiple=True, 

747) 

748def compress_benchmark(compress, file): 

749 """benchmark compress algorithm 

750 

751 \b 

752 outputs: 

753 mode: mode string 

754 rate: compression rate (compressed size/original size) 

755 compress: compress throuput (original bytes/sec) 

756 decompress: decompress throuput (original bytes/sec) 

757 """ 

758 import csv 

759 import timeit 

760 from .compr_stream import stream_map, stream_compress_modes, RawReadStream, Stream 

761 

762 if not compress: 

763 compress = stream_compress_modes 

764 raw_data = pathlib.Path(file).read_bytes() 

765 

766 def bench_comp(input_data: bytes): 

767 s1 = RawReadStream(input_data) 

768 s2 = comp_st(s1) 

769 for _ in s2.gen(): 

770 pass 

771 

772 def bench_decomp(input_data: bytes): 

773 s1 = RawReadStream(input_data) 

774 s2 = decomp_st(s1) 

775 for _ in s2.gen(): 

776 pass 

777 

778 wr = csv.writer(sys.stdout) 

779 wr.writerow(["mode", "rate", "compress", "decompress"]) 

780 for c in compress: 

781 mode = c 

782 m = stream_map.get(mode) 

783 if not m: 

784 click.Abort(f"no such compress mode: {mode}") 

785 continue 

786 comp_st: type[Stream] = m[1] 

787 decomp_st: type[Stream] = m[2] 

788 compressed_data = comp_st(RawReadStream(raw_data)).read_all() 

789 assert raw_data == decomp_st(RawReadStream(compressed_data)).read_all() 

790 isz = len(raw_data) 

791 csz = len(compressed_data) 

792 rate = csz / isz 

793 cnum, csec = timeit.Timer( 

794 stmt="bench(input_data)", 

795 globals={"input_data": raw_data, "comp_st": comp_st, "bench": bench_comp}, 

796 ).autorange() 

797 dnum, dsec = timeit.Timer( 

798 stmt="bench(input_data)", 

799 globals={ 

800 "input_data": compressed_data, 

801 "decomp_st": decomp_st, 

802 "bench": bench_decomp, 

803 }, 

804 ).autorange() 

805 wr.writerow([str(x) for x in [mode, rate, isz * cnum / csec, isz * dnum / dsec]]) 

806 

807 

808@cli.command() 

809@verbose_option 

810@click.option( 

811 "--format", 

812 type=click.Choice(["combined", "common", "debug"]), 

813 default="combined", 

814 show_default=True, 

815) 

816@click.option("--nth", type=int, default=1, show_default=True, help="parse from n-th '{'") 

817@click.argument("file", type=click.Path(exists=True, file_okay=True, dir_okay=False)) 

818def traefik_json_convert(file, nth, format): 

819 """ 

820 convert traefik access-log(json) to other format 

821 

822 \b 

823 traefik --accesslog=true --accesslog.format=json \\ 

824 --accesslog.fields.defaultmode=keep \\ 

825 --accesslog.fields.headers.defaultmode=keep 

826 """ 

827 from .compr_stream import FileReadStream, auto_compress_stream 

828 from collections import defaultdict 

829 

830 common = ( 

831 "%(ClientHost)s - %(ClientUsername)s [%(httptime)s]" 

832 ' "%(RequestMethod)s %(RequestPath)s %(RequestProtocol)s"' 

833 " %(DownstreamStatus)d %(DownstreamContentSize)s" 

834 ) 

835 combined = ( 

836 "%(ClientHost)s - %(ClientUsername)s [%(httptime)s]" 

837 ' "%(RequestMethod)s %(RequestPath)s %(RequestProtocol)s"' 

838 " %(DownstreamStatus)d %(DownstreamContentSize)s" 

839 ' "%(request_Referer)s" "%(request_User-Agent)s"' 

840 ) 

841 dateformat = "%d/%b/%Y:%H:%M:%S %z" 

842 format_map = { 

843 "combined": combined, 

844 "common": common, 

845 "debug": "%s", 

846 } 

847 if file in (None, "-"): 

848 fp = sys.stdin.buffer 

849 path = pathlib.Path("-") 

850 ist = FileReadStream(fp) 

851 else: 

852 path = pathlib.Path(file) 

853 ist = None 

854 _, ofp = auto_compress_stream(path, "decompress", ist) 

855 fmt = format_map.get(format, combined) 

856 for line in ofp.text_gen(): 

857 n = line.split("{", nth) 

858 jsonstr = "{" + n[-1] 

859 try: 

860 jsdata: dict = json.loads(jsonstr) 

861 if "time" in jsdata: 

862 ts = datetime.datetime.fromisoformat(jsdata.get("time", "")).astimezone() 

863 jsdata["httptime"] = ts.strftime(dateformat) 

864 click.echo(fmt % defaultdict(lambda: "-", **jsdata)) 

865 except json.JSONDecodeError: 

866 _log.exception("parse error: %s", jsonstr) 

867 

868 

869def do_ible1(name: str, fn: click.Command, args: dict, dry: bool): 

870 _log.debug("name=%s, fn=%s, args=%s, dry=%s", name, fn, arg_mask(args), dry) 

871 _log.info("start %s", name) 

872 if dry: 

873 _log.info("run(dry): %s %s", fn, arg_mask(args)) 

874 else: 

875 _log.info("run(wet): %s %s", fn, arg_mask(args)) 

876 assert fn.callback is not None 

877 fn.callback(**args) 

878 _log.info("end %s", name) 

879 

880 

881def convert_ible(data: Union[list[dict], dict]) -> list[dict]: 

882 if isinstance(data, dict): 

883 d = [] 

884 _log.debug("convert %s", arg_mask(data)) 

885 for k, v in data.items(): 

886 name = v.pop("name", k) 

887 allow_fail = v.pop("allow-fail", None) 

888 ent = {"name": name, k: v} 

889 if allow_fail is not None: 

890 ent["allow-fail"] = allow_fail 

891 d.append(ent) 

892 _log.debug("converted: %s", arg_mask(d)) 

893 data = d 

894 return data 

895 

896 

897def arg2arg(fn: click.Command, args: dict, baseparam: dict) -> dict: 

898 params = {} 

899 for opt in fn.params: 

900 if isinstance(opt.envvar, list) and len(opt.envvar) != 0: 900 ↛ 901line 900 didn't jump to line 901 because the condition on line 900 was never true

901 for ev in opt.envvar: 

902 if os.getenv(ev): 

903 params[opt.name] = os.getenv(ev) 

904 break 

905 elif isinstance(opt.envvar, str) and opt.envvar and os.getenv(opt.envvar): 905 ↛ 906line 905 didn't jump to line 906 because the condition on line 905 was never true

906 params[opt.name] = os.getenv(opt.envvar) 

907 elif opt.default or not opt.required: 907 ↛ 899line 907 didn't jump to line 899 because the condition on line 907 was always true

908 params[opt.name] = opt.default 

909 pnames = [x.name for x in fn.params] 

910 for name in pnames: 

911 if name in args: 

912 if args[name] != UNSET: 912 ↛ 910line 912 didn't jump to line 910 because the condition on line 912 was always true

913 params[name] = args[name] 

914 elif name in baseparam: 

915 params[name] = baseparam[name] 

916 _log.debug("arg=%s, base=%s, result=%s", args, baseparam, params) 

917 return params 

918 

919 

920def ible_gen( 

921 data: list[dict], 

922) -> Generator[tuple[str, click.Command, dict, dict], None, None]: 

923 _log.debug("input: %s", arg_mask(data)) 

924 if not isinstance(data, list): 924 ↛ 925line 924 didn't jump to line 925 because the condition on line 924 was never true

925 raise Exception(f"invalid list style: {type(data)}") 

926 baseparam = {} 

927 for v in data: 

928 _log.debug("exec %s", arg_mask(v)) 

929 if not isinstance(v, dict): 929 ↛ 930line 929 didn't jump to line 930 because the condition on line 929 was never true

930 raise Exception(f"invalid dict style: {type(v)}") 

931 kw: set[str] = v.keys() - {"name", "allow-fail"} 

932 if len(kw) != 1: 932 ↛ 933line 932 didn't jump to line 933 because the condition on line 932 was never true

933 raise Exception(f"invalid command style: {kw}") 

934 cmd: str = kw.pop() 

935 args: dict = v[cmd] 

936 name: str = v.get("name", cmd) 

937 if not isinstance(args, dict): 937 ↛ 938line 937 didn't jump to line 938 because the condition on line 937 was never true

938 raise Exception(f"invalid args: {args}") 

939 if cmd == "params": 

940 baseparam.update(args) 

941 continue 

942 if cmd not in cli.commands: 942 ↛ 943line 942 didn't jump to line 943 because the condition on line 942 was never true

943 raise Exception(f"invalid command: {cmd} / {cli.commands.keys()}") 

944 if cmd.startswith("ible"): 944 ↛ 945line 944 didn't jump to line 945 because the condition on line 944 was never true

945 raise Exception(f"unsupported command: {cmd}") 

946 fn = cli.commands[cmd] 

947 yield name, fn, arg2arg(fn, args, baseparam), v 

948 

949 

950def do_ible(data: list[dict], dry: bool): 

951 for name, fn, args, v in ible_gen(data): 

952 try: 

953 do_ible1(name, fn, args, dry) 

954 except Exception as e: 

955 if not v.get("allow-fail"): 955 ↛ 956line 955 didn't jump to line 956 because the condition on line 955 was never true

956 _log.exception("error occured. stop") 

957 raise 

958 _log.info("failed. continue: %s", e) 

959 

960 

961def gen_sh(file: str) -> Generator[tuple[Optional[str], list[str]], None, None]: 

962 with open(file, "r") as fp: 

963 name = None 

964 for line in fp: 

965 line = line.lstrip().rstrip("\n") 

966 if line.startswith("#"): 

967 name = line.lstrip()[1:].strip() 

968 _log.debug("name: %s", name) 

969 continue 

970 tokens = list(shlex.shlex(line, punctuation_chars=True)) 

971 while len(tokens) != 0 and tokens[-1] == "\\": 971 ↛ 972line 971 didn't jump to line 972 because the condition on line 971 was never true

972 try: 

973 line += next(fp) 

974 except StopIteration: 

975 break 

976 tokens = list(shlex.shlex(line, punctuation_chars=True)) 

977 if len(tokens) == 0: 

978 continue 

979 yield name, tokens 

980 name = None 

981 

982 

983def sh_line2arg(cmdop: click.Command, tokens: list[str]) -> tuple[dict, bool]: 

984 args: dict[str, Union[str, bool]] = {} 

985 allow_fail = False 

986 if tokens[-2:] == ["||", "true"]: 

987 allow_fail = True 

988 tokens = tokens[:-2] 

989 _log.debug("allow fail: %s", tokens) 

990 tks = iter(tokens) 

991 for token in tks: 

992 if token.startswith("#"): 992 ↛ 994line 992 didn't jump to line 994 because the condition on line 992 was never true

993 # comment 

994 break 

995 for opt in cmdop.params: 995 ↛ 1012line 995 didn't jump to line 1012 because the loop on line 995 didn't complete

996 assert isinstance(opt.name, str) 

997 if token in opt.opts: 

998 _log.debug("option %s", opt) 

999 if getattr(opt, "is_flag", False): 

1000 _log.debug("true: %s", opt) 

1001 args[opt.name] = True 

1002 else: 

1003 val = next(tks) 

1004 _log.debug("args: %s", arg_mask({opt.name: val})) 

1005 args[opt.name] = val 

1006 break 

1007 elif token in opt.secondary_opts: 1007 ↛ 1008line 1007 didn't jump to line 1008 because the condition on line 1007 was never true

1008 _log.debug("false: %s", opt) 

1009 args[opt.name] = False 

1010 break 

1011 else: 

1012 raise Exception(f"no such option: {token}") 

1013 return args, allow_fail 

1014 

1015 

1016def read_sh(file: str) -> list[dict]: 

1017 res: list[dict] = [] 

1018 for name, tokens in gen_sh(file): 

1019 if len(tokens) <= 1: 1019 ↛ 1020line 1019 didn't jump to line 1020 because the condition on line 1019 was never true

1020 continue 

1021 if tokens[0] != "log2s3": 

1022 continue 

1023 _log.debug("tokens: %s", tokens[1:]) 

1024 cmd = tokens[1] 

1025 if cmd not in cli.commands: 1025 ↛ 1026line 1025 didn't jump to line 1026 because the condition on line 1025 was never true

1026 raise Exception(f"no such subcommand: {cmd}") 

1027 cmdop = cli.commands[cmd] 

1028 args, allow_fail = sh_line2arg(cmdop, tokens[2:]) 

1029 _log.debug("result args: %s", arg_mask(args)) 

1030 ent = { 

1031 "name": name or cmd, 

1032 cmd: args, 

1033 } 

1034 if allow_fail: 

1035 ent["allow-fail"] = True 

1036 res.append(ent) 

1037 name = None 

1038 return res 

1039 

1040 

1041def try_read(file: str) -> Union[list[dict], dict]: 

1042 try: 

1043 import tomllib 

1044 

1045 with open(file, "rb") as fp: 

1046 return tomllib.load(fp) 

1047 except ValueError as e: 

1048 _log.debug("toml error", exc_info=e) 

1049 try: 

1050 import yaml 

1051 

1052 with open(file, "rb") as fp: 

1053 return yaml.safe_load(fp) 

1054 except (yaml.error.YAMLError, ImportError) as e: 

1055 _log.debug("yaml error", exc_info=e) 

1056 try: 

1057 import json 

1058 

1059 with open(file, "rb") as fp: 

1060 return json.load(fp) 

1061 except ValueError as e: 

1062 _log.debug("json error", exc_info=e) 

1063 try: 

1064 return read_sh(file) 

1065 except Exception as e: 

1066 _log.debug("sh error", exc_info=e) 

1067 raise Exception(f"cannot load {file}: unknown filetype") 

1068 

1069 

1070@cli.command() 

1071@verbose_option 

1072@click.option("--dry/--wet", help="dry run or wet run", default=False, show_default=True) 

1073@click.argument("file", type=click.Path(file_okay=True, dir_okay=False, readable=True, exists=True)) 

1074def ible_playbook(file, dry): 

1075 """do log2s3-ible playbook""" 

1076 do_ible(convert_ible(try_read(file)), dry) 

1077 

1078 

1079def sh_dump(data, output): 

1080 click.echo("#! /bin/sh", file=output) 

1081 click.echo("set -eu", file=output) 

1082 click.echo("", file=output) 

1083 for name, fn, args, v in ible_gen(data): 

1084 allow_fail = v.get("allow-fail") 

1085 for comment in io.StringIO(name): 

1086 click.echo(f"# {comment.rstrip()}", file=output) 

1087 fnname: str = fn.name or name 

1088 options: list[str] = [fnname] 

1089 for k, v in sorted(args.items()): 

1090 opt = [x for x in fn.params if x.name == k][0] 

1091 if opt.default == v: 

1092 continue 

1093 if isinstance(v, bool): 

1094 if v: 1094 ↛ 1097line 1094 didn't jump to line 1097 because the condition on line 1094 was always true

1095 options.append(opt.opts[0]) 

1096 else: 

1097 options.append(opt.secondary_opts[0]) 

1098 elif v is not None: 1098 ↛ 1089line 1098 didn't jump to line 1089 because the condition on line 1098 was always true

1099 options.append(opt.opts[0]) 

1100 options.append(v) 

1101 optstr = shlex.join(options) 

1102 suffixstr = "" 

1103 if allow_fail: 

1104 suffixstr = " || true" 

1105 click.echo(f"log2s3 {optstr}{suffixstr}", file=output) 

1106 click.echo("", file=output) 

1107 

1108 

1109@cli.command() 

1110@verbose_option 

1111@click.option("--format", type=click.Choice(["yaml", "json", "sh"])) 

1112@click.argument("file", type=click.Path(file_okay=True, dir_okay=False, readable=True, exists=True)) 

1113@click.option("--output", type=click.File("w"), default="-") 

1114def ible_convert(file, format, output): 

1115 """convert log2s3-ible playbook""" 

1116 data: list[dict] = convert_ible(try_read(file)) 

1117 if format == "yaml": 1117 ↛ 1118line 1117 didn't jump to line 1118 because the condition on line 1117 was never true

1118 import yaml 

1119 

1120 yaml.dump(data, stream=output, allow_unicode=True) 

1121 elif format == "json": 

1122 import json 

1123 

1124 json.dump(data, fp=output, ensure_ascii=False, indent=2) 

1125 elif format == "sh": 1125 ↛ 1128line 1125 didn't jump to line 1128 because the condition on line 1125 was always true

1126 sh_dump(data, output) 

1127 else: 

1128 raise Exception(f"unknown format: {format}") 

1129 

1130 

1131@cli.command() 

1132@click.argument("args", nargs=-1) 

1133def sh(args): 

1134 """execute /bin/sh""" 

1135 subprocess.run(["sh", *args]) 

1136 

1137 

1138@cli.command() 

1139@click.argument("args", nargs=-1) 

1140def bash(args): 

1141 """execute bash""" 

1142 subprocess.run(["bash", *args]) 

1143 

1144 

1145@cli.command() 

1146@verbose_option 

1147@click.option("--host", default="0.0.0.0", show_default=True) 

1148@click.option("--port", type=int, default=8000, show_default=True) 

1149@click.option("--root", default=".", show_default=True) 

1150@click.option("--prefix", default="") 

1151@click.option("--debug/--no-debug", default=False, show_default=True) 

1152def serve(prefix, root, host, port, debug): 

1153 """start viewer""" 

1154 from .app import update_config, router 

1155 import uvicorn 

1156 from fastapi import FastAPI 

1157 

1158 update_config( 

1159 { 

1160 "working_dir": root, 

1161 } 

1162 ) 

1163 if prefix: 

1164 update_config( 

1165 { 

1166 "prefix": prefix, 

1167 } 

1168 ) 

1169 app = FastAPI(debug=debug) 

1170 app.include_router(router, prefix=prefix) 

1171 uvicorn.run(app, host=host, port=port, log_config=None) 

1172 

1173 

1174if __name__ == "__main__": 1174 ↛ 1175line 1174 didn't jump to line 1175 because the condition on line 1174 was never true

1175 cli()