Coverage for log2s3/main.py: 74%

786 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-29 12:02 +0000

1from logging import getLogger 

2import os 

3import sys 

4import datetime 

5import shlex 

6import subprocess 

7import functools 

8import click 

9import json 

10import pathlib 

11import boto3 

12import io 

13from click.core import UNSET 

14from typing import Union, Generator, Optional 

15from .version import VERSION 

16from .common_stream import Stream, MergeStream 

17from .compr_stream import ( 

18 S3GetStream, 

19 S3PutStream, 

20 auto_compress_stream, 

21 stream_compress_modes, 

22) 

23 

24try: 

25 from mypy_boto3_s3.client import S3Client as S3ClientType 

26except ImportError: 

27 from typing import Any as S3ClientType 

28 

29_log = getLogger(__name__) 

30 

31 

32mask_prefix = ["s3_"] 

33 

34 

35def arg_mask(d: Union[list, dict]) -> Union[list, dict]: 

36 if isinstance(d, list): 

37 return [arg_mask(x) for x in d] 

38 if isinstance(d, dict): 38 ↛ 51line 38 didn't jump to line 51 because the condition on line 38 was always true

39 res = d.copy() 

40 for p in mask_prefix: 

41 for k, v in d.items(): 

42 if v is None or v == UNSET: 

43 res.pop(k) 

44 elif isinstance(v, str) and k.startswith(p): 

45 res[k] = "*" * len(v) 

46 elif isinstance(v, (dict, list)): 

47 res[k] = arg_mask(v) 

48 elif k.startswith(p): 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true

49 res[k] = "****" 

50 return res 

51 return d 

52 

53 

54@click.group(invoke_without_command=True) 

55@click.version_option(VERSION) 

56@click.pass_context 

57def cli(ctx): 

58 if ctx.invoked_subcommand is None: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true

59 print(ctx.get_help()) 

60 

61 

62def s3_option(func): 

63 @click.option("--s3-access-key", envvar="AWS_ACCESS_KEY_ID", help="AWS Access Key") 

64 @click.option( 

65 "--s3-secret-key", envvar="AWS_SECRET_ACCESS_KEY", help="AWS Secret Key" 

66 ) 

67 @click.option("--s3-region", envvar="AWS_DEFAULT_REGION", help="AWS Region") 

68 @click.option( 

69 "--s3-endpoint", envvar="AWS_ENDPOINT_URL_S3", help="AWS Endpoint URL for S3" 

70 ) 

71 @click.option("--s3-bucket", envvar="AWS_S3_BUCKET", help="AWS S3 Bucket name") 

72 @click.option( 

73 "--dotenv/--no-dotenv", default=False, help="load .env for S3 client config" 

74 ) 

75 @functools.wraps(func) 

76 def _( 

77 s3_endpoint, 

78 s3_access_key, 

79 s3_secret_key, 

80 s3_region, 

81 s3_bucket, 

82 dotenv, 

83 **kwargs, 

84 ): 

85 if dotenv: 

86 from dotenv import load_dotenv 

87 

88 load_dotenv() 

89 if not s3_bucket: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true

90 s3_bucket = os.getenv("AWS_S3_BUCKET") 

91 args = { 

92 "aws_access_key_id": s3_access_key, 

93 "aws_secret_access_key": s3_secret_key, 

94 "region_name": s3_region, 

95 "endpoint_url": s3_endpoint, 

96 } 

97 empty_keys = {k for k, v in args.items() if v is None} 

98 for k in empty_keys: 98 ↛ 99line 98 didn't jump to line 99 because the loop on line 98 never started

99 args.pop(k) 

100 s3 = boto3.client("s3", **args) 

101 return func(s3=s3, bucket_name=s3_bucket, **kwargs) 

102 

103 return _ 

104 

105 

106compress_option = click.option( 

107 "--compress", 

108 default="gzip", 

109 type=click.Choice(stream_compress_modes), 

110 help="compress type", 

111 show_default=True, 

112) 

113 

114 

115def filetree_option(func): 

116 @click.option( 

117 "--top", 

118 type=click.Path(dir_okay=True, exists=True, file_okay=False), 

119 required=True, 

120 help="root directory to find files", 

121 ) 

122 @click.option("--older", help="find older file") 

123 @click.option("--newer", help="find newer file") 

124 @click.option("--date", help="find date range(YYYY-mm-dd[..YYYY-mm-dd])") 

125 @click.option("--bigger", help="find bigger file") 

126 @click.option("--smaller", help="find smaller file") 

127 @click.option("--glob", help="glob pattern") 

128 @click.option( 

129 "--dry/--wet", help="dry run or wet run", default=False, show_default=True 

130 ) 

131 @functools.wraps(func) 

132 def _(top, older, newer, date, bigger, smaller, glob, dry, **kwargs): 

133 config = { 

134 "top": top, 

135 "older": older, 

136 "newer": newer, 

137 "date": date, 

138 "bigger": bigger, 

139 "smaller": smaller, 

140 "glob": glob, 

141 "dry": dry, 

142 } 

143 return func(top=pathlib.Path(top), config=config, **kwargs) 

144 

145 return _ 

146 

147 

148def s3tree_option(func): 

149 @click.option("--prefix", default="", help="AWS S3 Object Key Prefix") 

150 @click.option("--suffix", default="", help="AWS S3 Object Key Suffix") 

151 @click.option("--older", help="find older file") 

152 @click.option("--newer", help="find newer file") 

153 @click.option("--date", help="find date range(YYYY-mm-dd[..YYYY-mm-dd])") 

154 @click.option("--bigger", help="find bigger file") 

155 @click.option("--smaller", help="find smaller file") 

156 @click.option("--glob", help="glob pattern") 

157 @functools.wraps(func) 

158 def _(prefix, older, newer, date, bigger, smaller, suffix, glob, **kwargs): 

159 config = { 

160 "top": prefix, 

161 "older": older, 

162 "newer": newer, 

163 "date": date, 

164 "bigger": bigger, 

165 "smaller": smaller, 

166 "suffix": suffix, 

167 "glob": glob, 

168 } 

169 if prefix: 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true

170 return func(top=pathlib.Path(prefix), config=config, **kwargs) 

171 return func(top="", config=config, **kwargs) 

172 

173 return _ 

174 

175 

176def verbose_option(func): 

177 @click.option("--verbose/--quiet", default=None) 

178 @functools.wraps(func) 

179 def _(verbose, **kwargs): 

180 from logging import basicConfig 

181 

182 fmt = "%(asctime)s %(levelname)s %(name)s %(message)s" 

183 if verbose is None: 

184 basicConfig(level="INFO", format=fmt) 

185 elif verbose is False: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true

186 basicConfig(level="WARNING", format=fmt) 

187 else: 

188 basicConfig(level="DEBUG", format=fmt) 

189 return func(**kwargs) 

190 

191 return _ 

192 

193 

194@cli.command() 

195@s3_option 

196@verbose_option 

197def s3_make_bucket(s3: S3ClientType, bucket_name: str): 

198 """make S3 buckets""" 

199 res = s3.create_bucket(Bucket=bucket_name) 

200 click.echo(f"response {res}") 

201 

202 

203@cli.command() 

204@s3_option 

205@verbose_option 

206def s3_bucket(s3: S3ClientType, bucket_name: str): 

207 """list S3 buckets""" 

208 res = s3.list_buckets() 

209 _log.debug("response %s", res) 

210 for bkt in res.get("Buckets", []): 210 ↛ 211line 210 didn't jump to line 211 because the loop on line 210 never started

211 click.echo("%s %s" % (bkt["CreationDate"], bkt["Name"])) 

212 

213 

214def allobjs(s3: S3ClientType, bucket_name: str, prefix: str, marker: str = ""): 

215 res = s3.list_objects(Bucket=bucket_name, Prefix=prefix, Marker=marker) 

216 ct = res.get("Contents", []) 

217 yield from ct 

218 if res.get("IsTruncated") and len(ct) != 0: 

219 mk = ct[-1].get("Key") 

220 if mk: 220 ↛ exitline 220 didn't return from function 'allobjs' because the condition on line 220 was always true

221 yield from allobjs(s3, bucket_name=bucket_name, prefix=prefix, marker=mk) 

222 

223 

224def s3obj2stat(obj: dict) -> os.stat_result: 

225 ts = obj.get("LastModified", datetime.datetime.now()).timestamp() 

226 sz = obj.get("Size", 0) 

227 return os.stat_result((0o644, 0, 0, 0, 0, 0, sz, ts, ts, ts)) 

228 

229 

230def allobjs_conf(s3: S3ClientType, bucket_name: str, prefix: str, config: dict): 

231 _log.debug("allobjs: bucket=%s, prefix=%s, config=%s", bucket_name, prefix, config) 

232 from .processor import DebugProcessor 

233 

234 dummy = DebugProcessor(config) 

235 suffix = config.get("suffix", "") 

236 objs = allobjs(s3, bucket_name, prefix) 

237 return filter( 

238 lambda x: x["Key"].endswith(suffix) 

239 and dummy.check(pathlib.Path(x["Key"]), s3obj2stat(x)), 

240 objs, 

241 ) 

242 

243 

244@cli.command() 

245@s3_option 

246@s3tree_option 

247@verbose_option 

248def s3_list(s3: S3ClientType, bucket_name: str, config: dict, top: pathlib.Path): 

249 """list S3 objects""" 

250 topstr = str(top) 

251 if topstr == ".": 251 ↛ 252line 251 didn't jump to line 252 because the condition on line 251 was never true

252 topstr = "" 

253 for i in allobjs_conf(s3, bucket_name, topstr.lstrip("/"), config): 

254 click.echo("%s %6d %s" % (i["LastModified"], i["Size"], i["Key"])) 

255 

256 

257@cli.command() 

258@click.option("--summary/--no-summary", "-S", default=False, type=bool) 

259@click.option("--pathsep", default="/", show_default=True) 

260@s3_option 

261@s3tree_option 

262@verbose_option 

263def s3_du( 

264 s3: S3ClientType, 

265 bucket_name: str, 

266 config: dict, 

267 top: pathlib.Path, 

268 summary: bool, 

269 pathsep: str, 

270): 

271 """show S3 directory usage""" 

272 out = {} 

273 for i in allobjs_conf(s3, bucket_name, str(top).lstrip("/"), config): 

274 key = i["Key"] 

275 ks = key.rsplit(pathsep, 1) 

276 dirname = ks[0] 

277 sz = i["Size"] 

278 if dirname not in out: 

279 out[dirname] = [0, 0] 

280 out[dirname][0] += 1 

281 out[dirname][1] += sz 

282 if len(out) == 0: 

283 click.echo("(empty result)") 

284 return 

285 if summary: 

286 for korig in list(out.keys()): 

287 k = korig 

288 while len(k) != 0: 288 ↛ 286line 288 didn't jump to line 286 because the condition on line 288 was always true

289 k0 = k.rsplit(pathsep, 1) 

290 if len(k0) == 1: 

291 break 

292 k = k0[0] 

293 if k not in out: 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true

294 out[k] = [0, 0] 

295 out[k][0] += out[korig][0] 

296 out[k][1] += out[korig][1] 

297 click.echo("%10s %5s %s" % ("size", "cnt", "name")) 

298 click.echo("----------+-----+-----------------------") 

299 for k, v in sorted(out.items(), key=lambda f: f[1][1], reverse=True): 

300 click.echo("%10d %5d %s" % (v[1], v[0], k)) 

301 

302 

303@cli.command() 

304@s3_option 

305@s3tree_option 

306@verbose_option 

307@click.option( 

308 "--dry/--wet", help="dry run or wet run", default=False, show_default=True 

309) 

310def s3_delete_by( 

311 s3: S3ClientType, bucket_name: str, top: pathlib.Path, config: dict, dry: bool 

312): 

313 """delete S3 objects""" 

314 del_keys = [ 

315 x["Key"] for x in allobjs_conf(s3, bucket_name, str(top).lstrip("/"), config) 

316 ] 

317 if len(del_keys) == 0: 

318 _log.info("no object found") 

319 elif dry: 

320 _log.info("(dry)remove objects: %s", del_keys) 

321 click.echo(f"(dry)remove {len(del_keys)} objects") 

322 else: 

323 _log.info("(wet)remove %s objects", len(del_keys)) 

324 s3.delete_objects( 

325 Bucket=bucket_name, Delete={"Objects": [{"Key": x} for x in del_keys]} 

326 ) 

327 

328 

329@cli.command() 

330@s3_option 

331@filetree_option 

332@verbose_option 

333@click.option("--prefix", default="", help="AWS S3 Object Prefix") 

334@click.option( 

335 "--content/--stat", help="diff content or stat", default=False, show_default=True 

336) 

337def s3_diff( 

338 s3: S3ClientType, 

339 bucket_name: str, 

340 prefix: str, 

341 top: pathlib.Path, 

342 config: dict, 

343 content: bool, 

344): 

345 """diff S3 and filetree""" 

346 all_keys = { 

347 pathlib.Path(x["Key"][len(prefix) :]): x 

348 for x in allobjs_conf(s3, bucket_name, prefix, config) 

349 } 

350 from .processor import ListProcessor, process_walk 

351 

352 lp = ListProcessor(config) 

353 process_walk(top, [lp]) 

354 files = {k.relative_to(top): v for k, v in lp.output if v is not None} 

355 for k in set(all_keys.keys()) - set(files.keys()): 

356 click.echo("only-s3: %s: %s" % (k, all_keys[k])) 

357 for k in set(files.keys()) - set(all_keys.keys()): 

358 click.echo("only-file: %s: %s" % (k, files[k])) 

359 for k in set(files.keys()) & set(all_keys.keys()): 

360 if files[k].st_size != all_keys[k].get("Size"): 

361 click.echo( 

362 "size mismatch %s file=%s, obj=%s" 

363 % (k, files[k].st_size, all_keys[k]["Size"]) 

364 ) 

365 

366 

367@cli.command() 

368@s3_option 

369@s3tree_option 

370@verbose_option 

371@compress_option 

372@click.option( 

373 "--dry/--wet", help="dry run or wet run", default=False, show_default=True 

374) 

375@click.option( 

376 "--keep/--remove", help="keep old file or delete", default=True, show_default=True 

377) 

378def s3_compress_tree( 

379 s3: S3ClientType, 

380 bucket_name: str, 

381 config: dict, 

382 top: pathlib.Path, 

383 compress: str, 

384 dry: bool, 

385 keep: bool, 

386): 

387 """compress S3 objects""" 

388 topstr = str(top) 

389 if topstr == ".": 

390 topstr = "" 

391 for i in allobjs_conf(s3, bucket_name, topstr.lstrip("/"), config): 

392 rd = S3GetStream(s3, bucket=bucket_name, key=i["Key"]) 

393 newname, data = auto_compress_stream(pathlib.Path(i["Key"]), compress, rd) 

394 if newname == i["Key"]: 

395 _log.debug("do nothing: %s", i["Key"]) 

396 continue 

397 if dry: 

398 new_length = sum([len(x) for x in data.gen()]) 

399 _log.info( 

400 "(dry) recompress %s -> %s (%s->%s)", 

401 i["Key"], 

402 newname, 

403 i["Size"], 

404 new_length, 

405 ) 

406 else: 

407 ps = S3PutStream(data, s3, bucket=bucket_name, key=str(newname)) 

408 for _ in ps.gen(): 

409 pass 

410 res = s3.head_object(Bucket=bucket_name, Key=str(newname)) 

411 _log.info( 

412 "(wet) recompress %s -> %s (%s->%s)", 

413 i["Key"], 

414 newname, 

415 i["Size"], 

416 res["ContentLength"], 

417 ) 

418 if not keep: 

419 _log.info("remove old %s (->%s)", i["Key"], newname) 

420 s3.delete_object(Bucket=bucket_name, Key=i["Key"]) 

421 

422 

423@cli.command() 

424@filetree_option 

425@verbose_option 

426def filetree_debug(top: pathlib.Path, config: dict): 

427 """(debug command)""" 

428 from .processor import DebugProcessor, process_walk 

429 

430 proc = [DebugProcessor(config)] 

431 process_walk(top, proc) 

432 

433 

434@cli.command() 

435@filetree_option 

436@verbose_option 

437def filetree_list(top: pathlib.Path, config: dict): 

438 """list files""" 

439 from .processor import ListProcessor, process_walk 

440 

441 lp = ListProcessor(config) 

442 process_walk(top, [lp]) 

443 click.echo( 

444 "%10s %-19s %s %d(+%d) total" 

445 % ("size", "mtime", "name", lp.processed, lp.skipped) 

446 ) 

447 click.echo("----------+-------------------+-----------------------") 

448 for p, st in lp.output: 

449 if st is None: 449 ↛ 450line 449 didn't jump to line 450 because the condition on line 449 was never true

450 tmstr = "unknown" 

451 sz = -1 

452 else: 

453 tmstr = datetime.datetime.fromtimestamp(st.st_mtime).strftime( 

454 "%Y-%m-%d %H:%M:%S" 

455 ) 

456 sz = st.st_size 

457 click.echo("%10s %19s %s" % (sz, tmstr, p)) 

458 

459 

460@cli.command() 

461@filetree_option 

462@compress_option 

463@verbose_option 

464def filetree_compress(top: pathlib.Path, config: dict, compress): 

465 """compress files""" 

466 if compress: 466 ↛ 468line 466 didn't jump to line 468 because the condition on line 466 was always true

467 config["compress"] = compress 

468 from .processor import CompressProcessor, process_walk 

469 

470 cproc = CompressProcessor(config) 

471 proc = [cproc] 

472 process_walk(top, proc) 

473 _log.info( 

474 "compressed=%d, skipped=%d, size=%d->%d (%d bytes)", 

475 cproc.processed, 

476 cproc.skipped, 

477 cproc.before_total, 

478 cproc.after_total, 

479 cproc.before_total - cproc.after_total, 

480 ) 

481 

482 

483@cli.command() 

484@filetree_option 

485@verbose_option 

486def filetree_delete(top: pathlib.Path, config: dict): 

487 """remove files""" 

488 from .processor import DelProcessor, process_walk 

489 

490 proc = [DelProcessor(config)] 

491 process_walk(top, proc) 

492 _log.info("removed=%d, skipped=%d", proc[0].processed, proc[0].skipped) 

493 

494 

495@cli.command() 

496@click.argument( 

497 "files", 

498 type=click.Path(file_okay=True, dir_okay=True, exists=True, readable=True), 

499 nargs=-1, 

500) 

501@verbose_option 

502def merge(files: list[click.Path]): 

503 """merge sorted log files""" 

504 input_stream: list[Stream] = [] 

505 for fn in files: 

506 p = pathlib.Path(str(fn)) 

507 if p.is_file(): 

508 _, ch = auto_compress_stream(p, "decompress") 

509 input_stream.append(ch) 

510 elif p.is_dir(): 510 ↛ 516line 510 didn't jump to line 516 because the condition on line 510 was always true

511 for proot, _, pfiles in p.walk(): 

512 for pfn in pfiles: 

513 _, ch = auto_compress_stream(proot / pfn, "decompress") 

514 input_stream.append(ch) 

515 else: 

516 _log.warning("%s is not a directory or file", p) 

517 

518 for i in MergeStream(input_stream).text_gen(): 

519 click.echo(i, nl=False) 

520 

521 

522@cli.command() 

523@s3_option 

524@click.option("--prefix", default="", help="AWS S3 Object Prefix") 

525@filetree_option 

526@compress_option 

527@verbose_option 

528def s3_put_tree( 

529 s3: S3ClientType, 

530 bucket_name: str, 

531 prefix: str, 

532 top: pathlib.Path, 

533 config: dict, 

534 compress, 

535): 

536 """compress and put log files to S3""" 

537 config["s3"] = s3 

538 config["s3_bucket"] = bucket_name 

539 config["s3_prefix"] = prefix 

540 config["skip_names"] = {x["Key"] for x in allobjs(s3, bucket_name, prefix)} 

541 config["compress"] = compress 

542 from .processor import S3Processor, process_walk 

543 

544 proc = [S3Processor(config)] 

545 process_walk(top, proc) 

546 _log.info( 

547 "processed=%d, skipped=%d, upload %d bytes", 

548 proc[0].processed, 

549 proc[0].skipped, 

550 proc[0].uploaded, 

551 ) 

552 

553 

554@cli.command() 

555@s3_option 

556@click.option("--key", required=True, help="AWS S3 Object Key") 

557@click.argument( 

558 "filename", type=click.Path(file_okay=True, dir_okay=False, exists=True) 

559) 

560@compress_option 

561@verbose_option 

562def s3_put1(s3: S3ClientType, bucket_name: str, key: str, filename: str, compress: str): 

563 """put 1 file to S3""" 

564 from .compr_stream import S3PutStream, auto_compress_stream 

565 

566 input_path = pathlib.Path(filename) 

567 _, st = auto_compress_stream(input_path, compress) 

568 ost = S3PutStream(st, s3, bucket_name, key) 

569 for _ in ost.gen(): 

570 pass 

571 

572 

573def _s3_read_stream(s3: S3ClientType, bucket_name: str, key: str) -> Stream: 

574 res = S3GetStream(s3, bucket=bucket_name, key=key) 

575 _, res = auto_compress_stream(pathlib.Path(key), "decompress", res) 

576 return res 

577 

578 

579@cli.command() 

580@s3_option 

581@click.argument("keys", nargs=-1) 

582@verbose_option 

583def s3_cat(s3: S3ClientType, bucket_name: str, keys: list[str]): 

584 """concatinate compressed objects""" 

585 for key in keys: 

586 for d in _s3_read_stream(s3, bucket_name, key).gen(): 

587 sys.stdout.buffer.write(d) 

588 

589 

590def _data_via_pager(input: Stream): 

591 pager_bin = os.getenv("LOG2S3_PAGER", os.getenv("PAGER", "less")) 

592 proc = subprocess.Popen(shlex.split(pager_bin), stdin=subprocess.PIPE) 

593 assert proc.stdin is not None # for type check 

594 for d in input.gen(): 

595 proc.stdin.write(d) 

596 proc.communicate() 

597 

598 

599@cli.command() 

600@s3_option 

601@click.argument("key") 

602@verbose_option 

603def s3_less(s3: S3ClientType, bucket_name: str, key: str): 

604 """view compressed object""" 

605 _data_via_pager(_s3_read_stream(s3, bucket_name, key)) 

606 

607 

608@cli.command() 

609@s3_option 

610@click.argument("key") 

611@click.option( 

612 "--dry/--wet", help="dry run or wet run", default=False, show_default=True 

613) 

614@verbose_option 

615def s3_vi(s3: S3ClientType, bucket_name: str, key: str, dry): 

616 """edit compressed object and overwrite""" 

617 bindata = _s3_read_stream(s3, bucket_name, key).read_all().decode("utf-8") 

618 from .compr_stream import stream_ext, RawReadStream 

619 

620 _, ext = os.path.splitext(key) 

621 if ext in stream_ext: 

622 

623 def compress_fn(f: bytes) -> bytes: 

624 compress_st = stream_ext[ext][1] 

625 return compress_st(RawReadStream(f)).read_all() 

626 else: 

627 

628 def compress_fn(f: bytes) -> bytes: 

629 return f 

630 

631 newdata = click.edit(text=bindata) 

632 if newdata is not None and newdata != bindata: 

633 wr = compress_fn(newdata.encode("utf-8")) 

634 if dry: 

635 _log.info( 

636 "(dry) changed. write back to %s (%d->%d(%d))", 

637 key, 

638 len(bindata), 

639 len(newdata), 

640 len(wr), 

641 ) 

642 else: 

643 _log.info( 

644 "(wet) changed. write back to %s (%d->%d(%d))", 

645 key, 

646 len(bindata), 

647 len(newdata), 

648 len(wr), 

649 ) 

650 s3.put_object(Body=wr, Bucket=bucket_name, Key=key) 

651 else: 

652 _log.info("not changed") 

653 

654 

655@cli.command() 

656@s3_option 

657@click.argument("keys", nargs=-1) 

658@verbose_option 

659def s3_merge(s3: S3ClientType, bucket_name: str, keys: list[str]): 

660 """merge sorted log objects""" 

661 input_stream: list[Stream] = [] 

662 for key in keys: 

663 input_stream.append(_s3_read_stream(s3, bucket_name, key)) 

664 

665 for i in MergeStream(input_stream).text_gen(): 

666 click.echo(i, nl=False) 

667 

668 

669@cli.command() 

670@s3_option 

671@click.argument("keys", nargs=-1) 

672@click.option( 

673 "--dry/--wet", help="dry run or wet run", default=False, show_default=True 

674) 

675@verbose_option 

676def s3_del(s3: S3ClientType, bucket_name: str, keys: list[str], dry): 

677 """delete objects""" 

678 for k in keys: 

679 res = s3.head_object(Bucket=bucket_name, Key=k) 

680 click.echo("%s %s %s" % (res["LastModified"], res["ContentLength"], k)) 

681 if dry: 

682 _log.info("(dry) delete %s keys", len(keys)) 

683 else: 

684 _log.info("(wet) delete %s keys", len(keys)) 

685 s3.delete_objects( 

686 Bucket=bucket_name, Delete={"Objects": [{"Key": x} for x in keys]} 

687 ) 

688 

689 

690@cli.command() 

691@s3_option 

692@click.argument("keys", nargs=-1) 

693@verbose_option 

694def s3_head(s3: S3ClientType, bucket_name: str, keys: list[str]): 

695 """delete objects""" 

696 for k in keys: 

697 res = s3.head_object(Bucket=bucket_name, Key=k) 

698 click.echo(f"{k} = {res}") 

699 

700 

701@cli.command() 

702@s3_option 

703@verbose_option 

704@click.option("--cleanup/--no-cleanup", default=False) 

705def s3_list_parts(s3: S3ClientType, bucket_name: str, cleanup): 

706 """list in-progress multipart upload""" 

707 res = s3.list_multipart_uploads(Bucket=bucket_name) 

708 if len(res.get("Uploads", [])) == 0: 

709 click.echo("(no in-progress multipart uploads found)") 

710 for upl in res.get("Uploads", []): 

711 click.echo("%s %s %s" % (upl["Initiated"], upl["UploadId"], upl["Key"])) 

712 if cleanup: 

713 _log.info("cleanup %s/%s", upl["Key"], upl["UploadId"]) 

714 s3.abort_multipart_upload( 

715 Bucket=bucket_name, Key=upl["Key"], UploadId=upl["UploadId"] 

716 ) 

717 

718 

719@cli.command("cat") 

720@click.argument( 

721 "files", 

722 type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True), 

723 nargs=-1, 

724) 

725@verbose_option 

726def cat_file(files: list[click.Path]): 

727 """concatinate compressed files""" 

728 for fn in files: 

729 _, data = auto_compress_stream(pathlib.Path(str(fn)), "decompress") 

730 for d in data.gen(): 

731 sys.stdout.buffer.write(d) 

732 

733 

734@cli.command("less") 

735@click.argument( 

736 "filename", 

737 type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True), 

738) 

739@verbose_option 

740def view_file(filename: str): 

741 """view compressed file""" 

742 _, data = auto_compress_stream(pathlib.Path(filename), "decompress") 

743 _data_via_pager(data) 

744 

745 

746@cli.command("vi") 

747@click.argument( 

748 "filename", 

749 type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True), 

750) 

751@click.option( 

752 "--dry/--wet", help="dry run or wet run", default=False, show_default=True 

753) 

754@verbose_option 

755def edit_file(filename: str, dry): 

756 """edit compressed file and overwrite""" 

757 from .compr_stream import stream_ext, RawReadStream 

758 

759 fname = pathlib.Path(filename) 

760 _, data = auto_compress_stream(fname, "decompress") 

761 _, ext = os.path.splitext(fname) 

762 if ext in stream_ext: 

763 

764 def compress_fn(f: bytes) -> bytes: 

765 compress_st = stream_ext[ext][1] 

766 return compress_st(RawReadStream(f)).read_all() 

767 else: 

768 

769 def compress_fn(f: bytes) -> bytes: 

770 return f 

771 

772 bindata = data.read_all().decode("utf-8") 

773 newdata = click.edit(text=bindata) 

774 if newdata is not None and newdata != bindata: 

775 if dry: 

776 _log.info("(dry) changed. write back to %s", fname) 

777 else: 

778 _log.info("(wet) changed. write back to %s", fname) 

779 # mode, timestamp will be changed 

780 fname.write_bytes(compress_fn(newdata.encode("utf-8"))) 

781 else: 

782 _log.info("not changed") 

783 

784 

785@cli.command() 

786@click.argument("file") 

787@click.option( 

788 "--compress", 

789 default=None, 

790 type=click.Choice(list(set(stream_compress_modes) - {"raw", "decompress"})), 

791 help="compress type (default: all)", 

792 multiple=True, 

793) 

794def compress_benchmark(compress, file): 

795 """benchmark compress algorithm 

796 

797 \b 

798 outputs: 

799 mode: mode string 

800 rate: compression rate (compressed size/original size) 

801 compress: compress throuput (original bytes/sec) 

802 decompress: decompress throuput (original bytes/sec) 

803 """ 

804 import csv 

805 import timeit 

806 from .compr_stream import stream_map, stream_compress_modes, RawReadStream, Stream 

807 

808 if not compress: 

809 compress = stream_compress_modes 

810 raw_data = pathlib.Path(file).read_bytes() 

811 

812 def bench_comp(input_data: bytes): 

813 s1 = RawReadStream(input_data) 

814 s2 = comp_st(s1) 

815 for _ in s2.gen(): 

816 pass 

817 

818 def bench_decomp(input_data: bytes): 

819 s1 = RawReadStream(input_data) 

820 s2 = decomp_st(s1) 

821 for _ in s2.gen(): 

822 pass 

823 

824 wr = csv.writer(sys.stdout) 

825 wr.writerow(["mode", "rate", "compress", "decompress"]) 

826 for c in compress: 

827 mode = c 

828 m = stream_map.get(mode) 

829 if not m: 

830 click.Abort(f"no such compress mode: {mode}") 

831 continue 

832 comp_st: type[Stream] = m[1] 

833 decomp_st: type[Stream] = m[2] 

834 compressed_data = comp_st(RawReadStream(raw_data)).read_all() 

835 assert raw_data == decomp_st(RawReadStream(compressed_data)).read_all() 

836 isz = len(raw_data) 

837 csz = len(compressed_data) 

838 rate = csz / isz 

839 cnum, csec = timeit.Timer( 

840 stmt="bench(input_data)", 

841 globals={"input_data": raw_data, "comp_st": comp_st, "bench": bench_comp}, 

842 ).autorange() 

843 dnum, dsec = timeit.Timer( 

844 stmt="bench(input_data)", 

845 globals={ 

846 "input_data": compressed_data, 

847 "decomp_st": decomp_st, 

848 "bench": bench_decomp, 

849 }, 

850 ).autorange() 

851 wr.writerow( 

852 [str(x) for x in [mode, rate, isz * cnum / csec, isz * dnum / dsec]] 

853 ) 

854 

855 

856@cli.command() 

857@verbose_option 

858@click.option( 

859 "--format", 

860 type=click.Choice(["combined", "common", "debug"]), 

861 default="combined", 

862 show_default=True, 

863) 

864@click.option( 

865 "--nth", type=int, default=1, show_default=True, help="parse from n-th '{'" 

866) 

867@click.argument("file", type=click.Path(exists=True, file_okay=True, dir_okay=False)) 

868def traefik_json_convert(file, nth, format): 

869 """ 

870 convert traefik access-log(json) to other format 

871 

872 \b 

873 traefik --accesslog=true --accesslog.format=json \\ 

874 --accesslog.fields.defaultmode=keep \\ 

875 --accesslog.fields.headers.defaultmode=keep 

876 """ 

877 from .compr_stream import FileReadStream, auto_compress_stream 

878 from collections import defaultdict 

879 

880 common = ( 

881 "%(ClientHost)s - %(ClientUsername)s [%(httptime)s]" 

882 ' "%(RequestMethod)s %(RequestPath)s %(RequestProtocol)s"' 

883 " %(DownstreamStatus)d %(DownstreamContentSize)s" 

884 ) 

885 combined = ( 

886 "%(ClientHost)s - %(ClientUsername)s [%(httptime)s]" 

887 ' "%(RequestMethod)s %(RequestPath)s %(RequestProtocol)s"' 

888 " %(DownstreamStatus)d %(DownstreamContentSize)s" 

889 ' "%(request_Referer)s" "%(request_User-Agent)s"' 

890 ) 

891 dateformat = "%d/%b/%Y:%H:%M:%S %z" 

892 format_map = { 

893 "combined": combined, 

894 "common": common, 

895 "debug": "%s", 

896 } 

897 if file in (None, "-"): 

898 fp = sys.stdin.buffer 

899 path = pathlib.Path("-") 

900 ist = FileReadStream(fp) 

901 else: 

902 path = pathlib.Path(file) 

903 ist = None 

904 _, ofp = auto_compress_stream(path, "decompress", ist) 

905 fmt = format_map.get(format, combined) 

906 for line in ofp.text_gen(): 

907 n = line.split("{", nth) 

908 jsonstr = "{" + n[-1] 

909 try: 

910 jsdata: dict = json.loads(jsonstr) 

911 if "time" in jsdata: 

912 ts = datetime.datetime.fromisoformat( 

913 jsdata.get("time", "") 

914 ).astimezone() 

915 jsdata["httptime"] = ts.strftime(dateformat) 

916 click.echo(fmt % defaultdict(lambda: "-", **jsdata)) 

917 except json.JSONDecodeError: 

918 _log.exception("parse error: %s", jsonstr) 

919 

920 

921def do_ible1(name: str, fn: click.Command, args: dict, dry: bool): 

922 _log.debug("name=%s, fn=%s, args=%s, dry=%s", name, fn, arg_mask(args), dry) 

923 _log.info("start %s", name) 

924 if dry: 

925 _log.info("run(dry): %s %s", fn, arg_mask(args)) 

926 else: 

927 _log.info("run(wet): %s %s", fn, arg_mask(args)) 

928 assert fn.callback is not None 

929 fn.callback(**args) 

930 _log.info("end %s", name) 

931 

932 

933def convert_ible(data: Union[list[dict], dict]) -> list[dict]: 

934 if isinstance(data, dict): 

935 d = [] 

936 _log.debug("convert %s", arg_mask(data)) 

937 for k, v in data.items(): 

938 name = v.pop("name", k) 

939 allow_fail = v.pop("allow-fail", None) 

940 ent = {"name": name, k: v} 

941 if allow_fail is not None: 

942 ent["allow-fail"] = allow_fail 

943 d.append(ent) 

944 _log.debug("converted: %s", arg_mask(d)) 

945 data = d 

946 return data 

947 

948 

949def arg2arg(fn: click.Command, args: dict, baseparam: dict) -> dict: 

950 params = {} 

951 for opt in fn.params: 

952 if isinstance(opt.envvar, list) and len(opt.envvar) != 0: 952 ↛ 953line 952 didn't jump to line 953 because the condition on line 952 was never true

953 for ev in opt.envvar: 

954 if os.getenv(ev): 

955 params[opt.name] = os.getenv(ev) 

956 break 

957 elif isinstance(opt.envvar, str) and opt.envvar and os.getenv(opt.envvar): 957 ↛ 958line 957 didn't jump to line 958 because the condition on line 957 was never true

958 params[opt.name] = os.getenv(opt.envvar) 

959 elif opt.default or not opt.required: 959 ↛ 951line 959 didn't jump to line 951 because the condition on line 959 was always true

960 params[opt.name] = opt.default 

961 pnames = [x.name for x in fn.params] 

962 for name in pnames: 

963 if name in args: 

964 if args[name] != UNSET: 964 ↛ 962line 964 didn't jump to line 962 because the condition on line 964 was always true

965 params[name] = args[name] 

966 elif name in baseparam: 

967 params[name] = baseparam[name] 

968 _log.debug("arg=%s, base=%s, result=%s", args, baseparam, params) 

969 return params 

970 

971 

972def ible_gen( 

973 data: list[dict], 

974) -> Generator[tuple[str, click.Command, dict, dict], None, None]: 

975 _log.debug("input: %s", arg_mask(data)) 

976 if not isinstance(data, list): 976 ↛ 977line 976 didn't jump to line 977 because the condition on line 976 was never true

977 raise Exception(f"invalid list style: {type(data)}") 

978 baseparam = {} 

979 for v in data: 

980 _log.debug("exec %s", arg_mask(v)) 

981 if not isinstance(v, dict): 981 ↛ 982line 981 didn't jump to line 982 because the condition on line 981 was never true

982 raise Exception(f"invalid dict style: {type(v)}") 

983 kw: set[str] = v.keys() - {"name", "allow-fail"} 

984 if len(kw) != 1: 984 ↛ 985line 984 didn't jump to line 985 because the condition on line 984 was never true

985 raise Exception(f"invalid command style: {kw}") 

986 cmd: str = kw.pop() 

987 args: dict = v[cmd] 

988 name: str = v.get("name", cmd) 

989 if not isinstance(args, dict): 989 ↛ 990line 989 didn't jump to line 990 because the condition on line 989 was never true

990 raise Exception(f"invalid args: {args}") 

991 if cmd == "params": 

992 baseparam.update(args) 

993 continue 

994 if cmd not in cli.commands: 994 ↛ 995line 994 didn't jump to line 995 because the condition on line 994 was never true

995 raise Exception(f"invalid command: {cmd} / {cli.commands.keys()}") 

996 if cmd.startswith("ible"): 996 ↛ 997line 996 didn't jump to line 997 because the condition on line 996 was never true

997 raise Exception(f"unsupported command: {cmd}") 

998 fn = cli.commands[cmd] 

999 yield name, fn, arg2arg(fn, args, baseparam), v 

1000 

1001 

1002def do_ible(data: list[dict], dry: bool): 

1003 for name, fn, args, v in ible_gen(data): 

1004 try: 

1005 do_ible1(name, fn, args, dry) 

1006 except Exception as e: 

1007 if not v.get("allow-fail"): 1007 ↛ 1008line 1007 didn't jump to line 1008 because the condition on line 1007 was never true

1008 _log.exception("error occured. stop") 

1009 raise 

1010 _log.info("failed. continue: %s", e) 

1011 

1012 

1013def gen_sh(file: str) -> Generator[tuple[Optional[str], list[str]], None, None]: 

1014 with open(file, "r") as fp: 

1015 name = None 

1016 for line in fp: 

1017 line = line.lstrip().rstrip("\n") 

1018 if line.startswith("#"): 

1019 name = line.lstrip()[1:].strip() 

1020 _log.debug("name: %s", name) 

1021 continue 

1022 tokens = list(shlex.shlex(line, punctuation_chars=True)) 

1023 while len(tokens) != 0 and tokens[-1] == "\\": 1023 ↛ 1024line 1023 didn't jump to line 1024 because the condition on line 1023 was never true

1024 try: 

1025 line += next(fp) 

1026 except StopIteration: 

1027 break 

1028 tokens = list(shlex.shlex(line, punctuation_chars=True)) 

1029 if len(tokens) == 0: 

1030 continue 

1031 yield name, tokens 

1032 name = None 

1033 

1034 

1035def sh_line2arg(cmdop: click.Command, tokens: list[str]) -> tuple[dict, bool]: 

1036 args: dict[str, Union[str, bool]] = {} 

1037 allow_fail = False 

1038 if tokens[-2:] == ["||", "true"]: 

1039 allow_fail = True 

1040 tokens = tokens[:-2] 

1041 _log.debug("allow fail: %s", tokens) 

1042 tks = iter(tokens) 

1043 for token in tks: 

1044 if token.startswith("#"): 1044 ↛ 1046line 1044 didn't jump to line 1046 because the condition on line 1044 was never true

1045 # comment 

1046 break 

1047 for opt in cmdop.params: 1047 ↛ 1064line 1047 didn't jump to line 1064 because the loop on line 1047 didn't complete

1048 assert isinstance(opt.name, str) 

1049 if token in opt.opts: 

1050 _log.debug("option %s", opt) 

1051 if getattr(opt, "is_flag", False): 

1052 _log.debug("true: %s", opt) 

1053 args[opt.name] = True 

1054 else: 

1055 val = next(tks) 

1056 _log.debug("args: %s", arg_mask({opt.name: val})) 

1057 args[opt.name] = val 

1058 break 

1059 elif token in opt.secondary_opts: 1059 ↛ 1060line 1059 didn't jump to line 1060 because the condition on line 1059 was never true

1060 _log.debug("false: %s", opt) 

1061 args[opt.name] = False 

1062 break 

1063 else: 

1064 raise Exception(f"no such option: {token}") 

1065 return args, allow_fail 

1066 

1067 

1068def read_sh(file: str) -> list[dict]: 

1069 res: list[dict] = [] 

1070 for name, tokens in gen_sh(file): 

1071 if len(tokens) <= 1: 1071 ↛ 1072line 1071 didn't jump to line 1072 because the condition on line 1071 was never true

1072 continue 

1073 if tokens[0] != "log2s3": 

1074 continue 

1075 _log.debug("tokens: %s", tokens[1:]) 

1076 cmd = tokens[1] 

1077 if cmd not in cli.commands: 1077 ↛ 1078line 1077 didn't jump to line 1078 because the condition on line 1077 was never true

1078 raise Exception(f"no such subcommand: {cmd}") 

1079 cmdop = cli.commands[cmd] 

1080 args, allow_fail = sh_line2arg(cmdop, tokens[2:]) 

1081 _log.debug("result args: %s", arg_mask(args)) 

1082 ent = { 

1083 "name": name or cmd, 

1084 cmd: args, 

1085 } 

1086 if allow_fail: 

1087 ent["allow-fail"] = True 

1088 res.append(ent) 

1089 name = None 

1090 return res 

1091 

1092 

1093def try_read(file: str) -> Union[list[dict], dict]: 

1094 try: 

1095 import tomllib 

1096 

1097 with open(file, "rb") as fp: 

1098 return tomllib.load(fp) 

1099 except ValueError as e: 

1100 _log.debug("toml error", exc_info=e) 

1101 try: 

1102 import yaml 

1103 

1104 with open(file, "rb") as fp: 

1105 return yaml.safe_load(fp) 

1106 except (yaml.error.YAMLError, ImportError) as e: 

1107 _log.debug("yaml error", exc_info=e) 

1108 try: 

1109 import json 

1110 

1111 with open(file, "rb") as fp: 

1112 return json.load(fp) 

1113 except ValueError as e: 

1114 _log.debug("json error", exc_info=e) 

1115 try: 

1116 return read_sh(file) 

1117 except Exception as e: 

1118 _log.debug("sh error", exc_info=e) 

1119 raise Exception(f"cannot load {file}: unknown filetype") 

1120 

1121 

1122@cli.command() 

1123@verbose_option 

1124@click.option( 

1125 "--dry/--wet", help="dry run or wet run", default=False, show_default=True 

1126) 

1127@click.argument( 

1128 "file", type=click.Path(file_okay=True, dir_okay=False, readable=True, exists=True) 

1129) 

1130def ible_playbook(file, dry): 

1131 """do log2s3-ible playbook""" 

1132 do_ible(convert_ible(try_read(file)), dry) 

1133 

1134 

1135def sh_dump(data, output): 

1136 click.echo("#! /bin/sh", file=output) 

1137 click.echo("set -eu", file=output) 

1138 click.echo("", file=output) 

1139 for name, fn, args, v in ible_gen(data): 

1140 allow_fail = v.get("allow-fail") 

1141 for comment in io.StringIO(name): 

1142 click.echo(f"# {comment.rstrip()}", file=output) 

1143 fnname: str = fn.name or name 

1144 options: list[str] = [fnname] 

1145 for k, v in sorted(args.items()): 

1146 opt = [x for x in fn.params if x.name == k][0] 

1147 if opt.default == v: 

1148 continue 

1149 if isinstance(v, bool): 

1150 if v: 1150 ↛ 1153line 1150 didn't jump to line 1153 because the condition on line 1150 was always true

1151 options.append(opt.opts[0]) 

1152 else: 

1153 options.append(opt.secondary_opts[0]) 

1154 elif v is not None: 1154 ↛ 1145line 1154 didn't jump to line 1145 because the condition on line 1154 was always true

1155 options.append(opt.opts[0]) 

1156 options.append(v) 

1157 optstr = shlex.join(options) 

1158 suffixstr = "" 

1159 if allow_fail: 

1160 suffixstr = " || true" 

1161 click.echo(f"log2s3 {optstr}{suffixstr}", file=output) 

1162 click.echo("", file=output) 

1163 

1164 

1165@cli.command() 

1166@verbose_option 

1167@click.option("--format", type=click.Choice(["yaml", "json", "sh"])) 

1168@click.argument( 

1169 "file", type=click.Path(file_okay=True, dir_okay=False, readable=True, exists=True) 

1170) 

1171@click.option("--output", type=click.File("w"), default="-") 

1172def ible_convert(file, format, output): 

1173 """convert log2s3-ible playbook""" 

1174 data: list[dict] = convert_ible(try_read(file)) 

1175 if format == "yaml": 1175 ↛ 1176line 1175 didn't jump to line 1176 because the condition on line 1175 was never true

1176 import yaml 

1177 

1178 yaml.dump(data, stream=output, allow_unicode=True) 

1179 elif format == "json": 

1180 import json 

1181 

1182 json.dump(data, fp=output, ensure_ascii=False, indent=2) 

1183 elif format == "sh": 1183 ↛ 1186line 1183 didn't jump to line 1186 because the condition on line 1183 was always true

1184 sh_dump(data, output) 

1185 else: 

1186 raise Exception(f"unknown format: {format}") 

1187 

1188 

1189@cli.command() 

1190@click.argument("args", nargs=-1) 

1191def sh(args): 

1192 """execute /bin/sh""" 

1193 subprocess.run(["sh", *args]) 

1194 

1195 

1196@cli.command() 

1197@click.argument("args", nargs=-1) 

1198def bash(args): 

1199 """execute bash""" 

1200 subprocess.run(["bash", *args]) 

1201 

1202 

1203@cli.command() 

1204@verbose_option 

1205@click.option("--host", default="0.0.0.0", show_default=True) 

1206@click.option("--port", type=int, default=8000, show_default=True) 

1207@click.option("--root", default=".", show_default=True) 

1208@click.option("--prefix", default="") 

1209@click.option("--debug/--no-debug", default=False, show_default=True) 

1210def serve(prefix, root, host, port, debug): 

1211 """start viewer""" 

1212 from .app import update_config, router 

1213 import uvicorn 

1214 from fastapi import FastAPI 

1215 

1216 update_config( 

1217 { 

1218 "working_dir": root, 

1219 } 

1220 ) 

1221 if prefix: 

1222 update_config( 

1223 { 

1224 "prefix": prefix, 

1225 } 

1226 ) 

1227 app = FastAPI(debug=debug) 

1228 app.include_router(router, prefix=prefix) 

1229 uvicorn.run(app, host=host, port=port, log_config=None) 

1230 

1231 

1232if __name__ == "__main__": 1232 ↛ 1233line 1232 didn't jump to line 1233 because the condition on line 1232 was never true

1233 cli()