Coverage for dlabel/util.py: 64%
107 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 23:32 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 23:32 +0000
1import docker
2import tarfile
3import fnmatch
4import io
5from typing import Collection
6from pathlib import Path
7from logging import getLogger
9_log = getLogger(__name__)
11# https://pkg.go.dev/io/fs#ModeDir
12modebits = {
13 "dir": 1 << 31,
14 "append": 1 << 30,
15 "exclusive": 1 << 29,
16 "temporary": 1 << 28,
17 "symlink": 1 << 27,
18 "device": 1 << 26,
19 "namedpipe": 1 << 25,
20 "socket": 1 << 24,
21 "setuid": 1 << 23,
22 "setgid": 1 << 22,
23 "chardev": 1 << 21,
24 "sticky": 1 << 20,
25 "irregular": 1 << 19,
26}
27nonreg = {"dir", "device", "namedpipe", "socket", "chardev", "irregular"}
30def special_modes(mode: int) -> tuple[set[str], int]:
31 res: set[str] = set()
32 for k, v in modebits.items():
33 if (mode & v) != 0: 33 ↛ 34line 33 didn't jump to line 34 because the condition on line 33 was never true
34 res.add(k)
35 return res, (mode & 0o777)
38def download_files(ctn: docker.models.containers.Container, filename: str):
39 bins, stat = ctn.get_archive(filename)
40 is_dir = "dir" in special_modes(stat["mode"])[0]
41 _log.debug("download %s: %s is_dir=%s", filename, stat, is_dir)
42 fp = io.BytesIO()
43 for chunk in bins:
44 fp.write(chunk)
45 fp.seek(0)
46 with tarfile.open(fileobj=fp, mode="r|") as tar:
47 for member in tar:
48 if member.isfile(): 48 ↛ 47line 48 didn't jump to line 47 because the condition on line 48 was always true
49 _log.debug("extract %s", member.name)
50 tf = tar.extractfile(member)
51 if tf is not None: 51 ↛ 47line 51 didn't jump to line 47 because the condition on line 51 was always true
52 yield is_dir, member, tf.read()
55def get_archives(container: docker.models.containers.Container, names: set[str], ignore: Collection[str],
56 mode: str = "w:gz"):
57 if not names:
58 return
59 ofp = io.BytesIO()
60 outarchive = tarfile.open(mode=mode, fileobj=ofp)
61 for fn in sorted(names):
62 _log.debug("extract: %s", fn)
63 for is_dir, tinfo, bin in download_files(container, fn):
64 if is_dir:
65 tinfo.name = str(Path(fn) / tinfo.name).lstrip("/")
66 else:
67 tinfo.name = fn.lstrip("/")
68 if is_match(ignore, tinfo.name):
69 _log.debug("ignore: %s", tinfo.name)
70 continue
71 _log.debug("add file: %s (%s bytes) is_dir=%s", tinfo.name, len(bin), is_dir)
72 outarchive.addfile(tinfo, io.BytesIO(bin))
73 outarchive.close()
74 return ofp.getvalue()
77def is_match(patterns: Collection[str], target: str) -> bool:
78 for p in patterns: 78 ↛ 79line 78 didn't jump to line 79 because the loop on line 78 never started
79 if fnmatch.fnmatch(target, p):
80 return True
81 return False
84def is_already(prev: set[str], target: str) -> bool:
85 for p in prev: 85 ↛ 86line 85 didn't jump to line 86 because the loop on line 85 never started
86 if Path(p) in Path(target).parents:
87 return True
88 return False
91def do_kind0(modified: set[str], path: str, link: dict[str, str],
92 container: docker.models.containers.Container): # modified
93 _, stats = container.get_archive(path)
94 _log.debug("stats %s: %s", path, stats)
95 special, _ = special_modes(stats["mode"])
96 if nonreg & special: 96 ↛ 97line 96 didn't jump to line 97 because the condition on line 96 was never true
97 _log.debug("skip: %s %s", path, special)
98 elif "symlink" in special and stats["linkTarget"]: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true
99 link[path] = stats["linkTarget"]
100 else:
101 modified.add(path)
104def do_kind1(added: set[str], path: str, link: dict[str, str],
105 container: docker.models.containers.Container): # added
106 if is_already(added, path): 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true
107 _log.debug("skip(parent-exists): %s", path)
108 else:
109 _, stats = container.get_archive(path)
110 _log.debug("stats %s: %s", path, stats)
111 special, _ = special_modes(stats["mode"])
112 if (nonreg-{"dir"}) & special: 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true
113 _log.debug("skip: %s %s", path, special)
114 elif "symlink" in special and stats["linkTarget"]: 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true
115 link[path] = stats["linkTarget"]
116 else:
117 added.add(path)
120def do_kind2(deleted: set[str], path: str): # deleted
121 if is_already(deleted, path): 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true
122 _log.debug("skip(parent-exists): %s", path)
123 else:
124 deleted.add(path)
127def get_volumes(container: docker.models.containers.Container) -> set[str]:
128 return {x['Destination'] for x in container.attrs['Mounts']}
131def get_diff(container: docker.models.containers.Container, ignore: Collection[str]) -> \
132 tuple[set[str], set[str], set[str], dict[str, str]]:
133 deleted: set[str] = set()
134 added: set[str] = set()
135 modified: set[str] = set()
136 link: dict[str, str] = {}
137 for pathkind in container.diff():
138 path = pathkind["Path"]
139 kind = pathkind["Kind"]
140 if is_match(ignore, path): 140 ↛ 141line 140 didn't jump to line 141 because the condition on line 140 was never true
141 _log.debug("ignore: %s", path)
142 continue
143 if kind == 2: # deleted
144 do_kind2(deleted, path)
145 elif kind == 1: # added
146 do_kind1(added, path, link, container)
147 elif kind == 0: # modified 147 ↛ 137line 147 didn't jump to line 137 because the condition on line 147 was always true
148 do_kind0(modified, path, link, container)
149 _log.debug("deleted: %s", deleted)
150 _log.debug("added: %s", added)
151 _log.debug("modified: %s", modified)
152 _log.debug("link: %s", link)
153 return deleted, added, modified, link