Coverage for dlabel/util.py: 64%

107 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 23:32 +0000

1import docker 

2import tarfile 

3import fnmatch 

4import io 

5from typing import Collection 

6from pathlib import Path 

7from logging import getLogger 

8 

9_log = getLogger(__name__) 

10 

11# https://pkg.go.dev/io/fs#ModeDir 

12modebits = { 

13 "dir": 1 << 31, 

14 "append": 1 << 30, 

15 "exclusive": 1 << 29, 

16 "temporary": 1 << 28, 

17 "symlink": 1 << 27, 

18 "device": 1 << 26, 

19 "namedpipe": 1 << 25, 

20 "socket": 1 << 24, 

21 "setuid": 1 << 23, 

22 "setgid": 1 << 22, 

23 "chardev": 1 << 21, 

24 "sticky": 1 << 20, 

25 "irregular": 1 << 19, 

26} 

27nonreg = {"dir", "device", "namedpipe", "socket", "chardev", "irregular"} 

28 

29 

30def special_modes(mode: int) -> tuple[set[str], int]: 

31 res: set[str] = set() 

32 for k, v in modebits.items(): 

33 if (mode & v) != 0: 33 ↛ 34line 33 didn't jump to line 34 because the condition on line 33 was never true

34 res.add(k) 

35 return res, (mode & 0o777) 

36 

37 

38def download_files(ctn: docker.models.containers.Container, filename: str): 

39 bins, stat = ctn.get_archive(filename) 

40 is_dir = "dir" in special_modes(stat["mode"])[0] 

41 _log.debug("download %s: %s is_dir=%s", filename, stat, is_dir) 

42 fp = io.BytesIO() 

43 for chunk in bins: 

44 fp.write(chunk) 

45 fp.seek(0) 

46 with tarfile.open(fileobj=fp, mode="r|") as tar: 

47 for member in tar: 

48 if member.isfile(): 48 ↛ 47line 48 didn't jump to line 47 because the condition on line 48 was always true

49 _log.debug("extract %s", member.name) 

50 tf = tar.extractfile(member) 

51 if tf is not None: 51 ↛ 47line 51 didn't jump to line 47 because the condition on line 51 was always true

52 yield is_dir, member, tf.read() 

53 

54 

55def get_archives(container: docker.models.containers.Container, names: set[str], ignore: Collection[str], 

56 mode: str = "w:gz"): 

57 if not names: 

58 return 

59 ofp = io.BytesIO() 

60 outarchive = tarfile.open(mode=mode, fileobj=ofp) 

61 for fn in sorted(names): 

62 _log.debug("extract: %s", fn) 

63 for is_dir, tinfo, bin in download_files(container, fn): 

64 if is_dir: 

65 tinfo.name = str(Path(fn) / tinfo.name).lstrip("/") 

66 else: 

67 tinfo.name = fn.lstrip("/") 

68 if is_match(ignore, tinfo.name): 

69 _log.debug("ignore: %s", tinfo.name) 

70 continue 

71 _log.debug("add file: %s (%s bytes) is_dir=%s", tinfo.name, len(bin), is_dir) 

72 outarchive.addfile(tinfo, io.BytesIO(bin)) 

73 outarchive.close() 

74 return ofp.getvalue() 

75 

76 

77def is_match(patterns: Collection[str], target: str) -> bool: 

78 for p in patterns: 78 ↛ 79line 78 didn't jump to line 79 because the loop on line 78 never started

79 if fnmatch.fnmatch(target, p): 

80 return True 

81 return False 

82 

83 

84def is_already(prev: set[str], target: str) -> bool: 

85 for p in prev: 85 ↛ 86line 85 didn't jump to line 86 because the loop on line 85 never started

86 if Path(p) in Path(target).parents: 

87 return True 

88 return False 

89 

90 

91def do_kind0(modified: set[str], path: str, link: dict[str, str], 

92 container: docker.models.containers.Container): # modified 

93 _, stats = container.get_archive(path) 

94 _log.debug("stats %s: %s", path, stats) 

95 special, _ = special_modes(stats["mode"]) 

96 if nonreg & special: 96 ↛ 97line 96 didn't jump to line 97 because the condition on line 96 was never true

97 _log.debug("skip: %s %s", path, special) 

98 elif "symlink" in special and stats["linkTarget"]: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true

99 link[path] = stats["linkTarget"] 

100 else: 

101 modified.add(path) 

102 

103 

104def do_kind1(added: set[str], path: str, link: dict[str, str], 

105 container: docker.models.containers.Container): # added 

106 if is_already(added, path): 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true

107 _log.debug("skip(parent-exists): %s", path) 

108 else: 

109 _, stats = container.get_archive(path) 

110 _log.debug("stats %s: %s", path, stats) 

111 special, _ = special_modes(stats["mode"]) 

112 if (nonreg-{"dir"}) & special: 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true

113 _log.debug("skip: %s %s", path, special) 

114 elif "symlink" in special and stats["linkTarget"]: 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true

115 link[path] = stats["linkTarget"] 

116 else: 

117 added.add(path) 

118 

119 

120def do_kind2(deleted: set[str], path: str): # deleted 

121 if is_already(deleted, path): 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true

122 _log.debug("skip(parent-exists): %s", path) 

123 else: 

124 deleted.add(path) 

125 

126 

127def get_volumes(container: docker.models.containers.Container) -> set[str]: 

128 return {x['Destination'] for x in container.attrs['Mounts']} 

129 

130 

131def get_diff(container: docker.models.containers.Container, ignore: Collection[str]) -> \ 

132 tuple[set[str], set[str], set[str], dict[str, str]]: 

133 deleted: set[str] = set() 

134 added: set[str] = set() 

135 modified: set[str] = set() 

136 link: dict[str, str] = {} 

137 for pathkind in container.diff(): 

138 path = pathkind["Path"] 

139 kind = pathkind["Kind"] 

140 if is_match(ignore, path): 140 ↛ 141line 140 didn't jump to line 141 because the condition on line 140 was never true

141 _log.debug("ignore: %s", path) 

142 continue 

143 if kind == 2: # deleted 

144 do_kind2(deleted, path) 

145 elif kind == 1: # added 

146 do_kind1(added, path, link, container) 

147 elif kind == 0: # modified 147 ↛ 137line 147 didn't jump to line 137 because the condition on line 147 was always true

148 do_kind0(modified, path, link, container) 

149 _log.debug("deleted: %s", deleted) 

150 _log.debug("added: %s", added) 

151 _log.debug("modified: %s", modified) 

152 _log.debug("link: %s", link) 

153 return deleted, added, modified, link