Coverage for dlabel/compose.py: 74%

166 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 23:32 +0000

1import io 

2import tarfile 

3import fnmatch 

4import docker 

5import yaml 

6from typing import Any 

7from pathlib import Path 

8from logging import getLogger 

9from .util import download_files 

10_log = getLogger(__name__) 

11 

12 

13def envlist2map(env: list[str], sep: str = "=") -> dict[str, str]: 

14 res = {} 

15 for i in env: 

16 kv = i.split(sep, 1) 

17 if len(kv) == 2: 17 ↛ 15line 17 didn't jump to line 15 because the condition on line 17 was always true

18 res[kv[0]] = kv[1] 

19 return res 

20 

21 

22def portmap2compose(pmap: dict) -> list[str | dict]: 

23 res: list[str | dict] = [] 

24 for k, v in pmap.items(): 

25 ctport = k 

26 if ctport.endswith("/tcp") and len(v) == 1: 

27 ctport = k.split("/")[0] 

28 hostip = v[0].get("HostIp") 

29 hostport = v[0].get("HostPort") 

30 if hostip: 

31 res.append(f"{hostip}:{hostport}:{ctport}") 

32 else: 

33 res.append(f"{hostport}:{ctport}") 

34 else: 

35 target, protocol = k.split("/", 1) 

36 res.append({ 

37 "target": int(target), 

38 "published": int(v[0].get("HostPort")), 

39 "protocol": protocol, 

40 "mode": "host", 

41 }) 

42 return res 

43 

44 

45def convdict(convmap: dict[str, str], fromdict: dict[str, Any], todict: dict[str, Any]): 

46 for k, v in convmap.items(): 

47 if fromdict.get(k): 47 ↛ 48line 47 didn't jump to line 48 because the condition on line 47 was never true

48 todict[v] = fromdict.get(k) 

49 

50 

51def convdict_differ( 

52 convmap: dict[str, str], dict_img: dict[str, Any], dict_ctn: dict[str, Any], todict: dict[str, Any]): 

53 for k, v in convmap.items(): 

54 if k in dict_ctn and dict_img.get(k) != dict_ctn.get(k): 54 ↛ 55line 54 didn't jump to line 55 because the condition on line 54 was never true

55 todict[v] = dict_ctn[k] 

56 

57 

58def copy_files(ctn: docker.models.containers.Container, src: str | Path, dst: str | Path): 

59 def tfilter(member, path): 

60 res = tarfile.data_filter(member, path) 

61 if res: 

62 if '/' in res.name: 

63 _, res.name = res.name.split('/', 1) 

64 return res 

65 return None 

66 

67 _log.info("copy %s:%s -> %s", ctn.name, src, dst) 

68 odir = Path(dst) 

69 bin, arc = ctn.get_archive(str(src)) 

70 _log.debug("arc=%s", arc) 

71 bio = io.BytesIO() 

72 for x in bin: 

73 bio.write(x) 

74 bio.seek(0) 

75 tf = tarfile.TarFile(fileobj=bio) 

76 members = tf.getmembers() 

77 if len(members) == 1 and members[0].isreg(): 

78 _log.info("single file: %s", members[0]) 

79 tf.extractall(odir.parent, filter='data') 

80 else: 

81 odir.mkdir(exist_ok=True, parents=True) 

82 tf.extractall(odir, filter=tfilter) 

83 tf.close() 

84 bio.close() 

85 

86 

87def compose(client: docker.DockerClient, project, volume): # noqa: C901 

88 """generate docker-compose.yml from running containers""" 

89 svcs = {} 

90 vols = {} 

91 nets: dict[str, Any] = {} 

92 for ctn in client.containers.list(): 

93 config = ctn.attrs.get("Config", {}) 

94 hostconfig = ctn.attrs.get("HostConfig", {}) 

95 labels: dict[str, str] = config.get("Labels", {}) 

96 proj = labels.get("com.docker.compose.project") 

97 wdir = Path(labels.get("com.docker.compose.project.working_dir", "/")) 

98 if project and not proj: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true

99 _log.debug("skip: no project: %s", ctn.name) 

100 continue 

101 if project and proj and not fnmatch.fnmatch(proj, project): 

102 _log.debug("skip by project (%s)", proj) 

103 continue 

104 name = labels.get("com.docker.compose.service", ctn.name) 

105 _log.info("processing %s, service=%s", ctn.name, name) 

106 img = ctn.image 

107 imglabel = img.labels 

108 imgconfig = img.attrs.get("Config", {}) 

109 for k, v in imglabel.items(): 

110 if labels.get(k) == v: 

111 labels.pop(k) 

112 labels = {k: v for k, v in labels.items() if not k.startswith("com.docker.compose.")} 

113 envs = envlist2map(config.get("Env", [])) 

114 imgenv = envlist2map(imgconfig.get("Env", [])) 

115 for k, v in imgenv.items(): 

116 if envs.get(k) == v: 

117 envs.pop(k) 

118 imgvol = imgconfig.get("Volumes", {}) 

119 cvols = [] 

120 for i in (hostconfig.get("Binds") or []): 

121 v = i.split(":", 2) 

122 if imgvol and v[1] in imgvol: 122 ↛ 123line 122 didn't jump to line 123 because the condition on line 122 was never true

123 continue 

124 src = Path(v[0]) 

125 dest = v[1] 

126 if src.is_relative_to(wdir): 

127 srcstr = "./" + str(src.relative_to(wdir)) 

128 else: 

129 srcstr = str(src) 

130 if len(v) == 2 or v[2] == "rw": 

131 cvols.append(f"{srcstr}:{dest}") 

132 elif len(v) == 3: 132 ↛ 134line 132 didn't jump to line 134 because the condition on line 132 was always true

133 cvols.append(f"{srcstr}:{dest}:{v[2]}") 

134 if volume and srcstr.startswith("./"): 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true

135 for is_dir, tinfo, bin in download_files(ctn, dest): 

136 _log.debug("read from volume: src=%s, is_dir=%s, name=%s, %s bytes", 

137 srcstr, is_dir, tinfo.name, len(bin)) 

138 yield Path(srcstr) / ".." / tinfo.name, bin 

139 elif volume: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true

140 _log.info("skip copy: %s:%s -> %s", name, dest, srcstr) 

141 for m in hostconfig.get("Mounts", []): 

142 if imgvol and m.get("Target") in imgvol: 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true

143 continue 

144 volname = m.get("Source") 

145 if proj and volname.startswith(proj + "_"): 145 ↛ 147line 145 didn't jump to line 147 because the condition on line 145 was always true

146 volname = volname[len(proj) + 1:] 

147 if m.get("Type") == "volume": 147 ↛ 149line 147 didn't jump to line 149 because the condition on line 147 was always true

148 vols[volname] = m.get("VolumeOptions", {}) 

149 if m.get("Target"): 149 ↛ 141line 149 didn't jump to line 141 because the condition on line 149 was always true

150 cvols.append(f"{volname}:{m['Target']}") 

151 nwmode: str | None = None 

152 cnws = [] 

153 if not proj or hostconfig.get("NetworkMode") != f"{proj}_default": 153 ↛ 155line 153 didn't jump to line 155 because the condition on line 153 was always true

154 nwmode = hostconfig.get("NetworkMode") 

155 if isinstance(nwmode, str) and nwmode not in ("host", "none"): 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true

156 nets[nwmode] = {} 

157 cnws.append(nwmode) 

158 nwmode = None 

159 svc = { 

160 "image": config.get("Image"), 

161 } 

162 if proj and not ctn.name.startswith(proj + "_"): 

163 svc["container_name"] = ctn.name 

164 if nwmode: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true

165 svc["network_mode"] = nwmode 

166 if cvols: 

167 svc["volumes"] = cvols 

168 if cnws: 168 ↛ 169line 168 didn't jump to line 169 because the condition on line 168 was never true

169 svc["networks"] = cnws 

170 if hostconfig.get("PortBindings"): 

171 svc["ports"] = portmap2compose(hostconfig.get("PortBindings", {})) 

172 if hostconfig.get("RestartPolicy", {}).get("Name") not in ("no", None): 

173 svc["restart"] = hostconfig.get("RestartPolicy", {}).get("Name") 

174 if labels: 

175 svc["labels"] = labels 

176 if envs: 

177 svc["environment"] = envs 

178 convmap_hostconfig = { 

179 "ExtraHosts": "extra_hosts", 

180 "CpuShares": "cpu_shares", 

181 "CpuPeriod": "cpu_period", 

182 "CpuPercent": "cpu_percent", 

183 "CpuCount": "cpu_count", 

184 "CpuQuota": "cpu_quota", 

185 "CpuRealtimeRuntime": "cpu_rt_runtime", 

186 "CpuRealtimePeriod": "cpu_rt_period", 

187 "CpusetCpus": "cpuset", 

188 "CapAdd": "cap_add", 

189 "CapDrop": "cap_drop", 

190 "CgroupParent": "cgroup_parent", 

191 "GroupAdd": "group_add", 

192 "Privileged": "privileged", 

193 } 

194 convmap_label = { 

195 "com.docker.compose.depends_on": "depends_on", 

196 } 

197 convdict(convmap_hostconfig, hostconfig, svc) 

198 convdict(convmap_label, config.get("Labels", {}), svc) 

199 diffcopy_config = { 

200 "Cmd": "command", 

201 "Entrypoint": "entrypoint", 

202 } 

203 convdict_differ(diffcopy_config, imgconfig, config, svc) 

204 svcs[name] = svc 

205 res = {} 

206 if svcs: 

207 res["services"] = svcs 

208 if vols: 

209 res["volumes"] = vols 

210 if nets: 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true

211 res["networks"] = nets 

212 yield Path("compose.yml"), yaml.dump(res, allow_unicode=True, encoding="utf-8", sort_keys=False) 

213 return res