Coverage for dlabel/compose.py: 74%
166 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 23:32 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 23:32 +0000
1import io
2import tarfile
3import fnmatch
4import docker
5import yaml
6from typing import Any
7from pathlib import Path
8from logging import getLogger
9from .util import download_files
10_log = getLogger(__name__)
13def envlist2map(env: list[str], sep: str = "=") -> dict[str, str]:
14 res = {}
15 for i in env:
16 kv = i.split(sep, 1)
17 if len(kv) == 2: 17 ↛ 15line 17 didn't jump to line 15 because the condition on line 17 was always true
18 res[kv[0]] = kv[1]
19 return res
22def portmap2compose(pmap: dict) -> list[str | dict]:
23 res: list[str | dict] = []
24 for k, v in pmap.items():
25 ctport = k
26 if ctport.endswith("/tcp") and len(v) == 1:
27 ctport = k.split("/")[0]
28 hostip = v[0].get("HostIp")
29 hostport = v[0].get("HostPort")
30 if hostip:
31 res.append(f"{hostip}:{hostport}:{ctport}")
32 else:
33 res.append(f"{hostport}:{ctport}")
34 else:
35 target, protocol = k.split("/", 1)
36 res.append({
37 "target": int(target),
38 "published": int(v[0].get("HostPort")),
39 "protocol": protocol,
40 "mode": "host",
41 })
42 return res
45def convdict(convmap: dict[str, str], fromdict: dict[str, Any], todict: dict[str, Any]):
46 for k, v in convmap.items():
47 if fromdict.get(k): 47 ↛ 48line 47 didn't jump to line 48 because the condition on line 47 was never true
48 todict[v] = fromdict.get(k)
51def convdict_differ(
52 convmap: dict[str, str], dict_img: dict[str, Any], dict_ctn: dict[str, Any], todict: dict[str, Any]):
53 for k, v in convmap.items():
54 if k in dict_ctn and dict_img.get(k) != dict_ctn.get(k): 54 ↛ 55line 54 didn't jump to line 55 because the condition on line 54 was never true
55 todict[v] = dict_ctn[k]
58def copy_files(ctn: docker.models.containers.Container, src: str | Path, dst: str | Path):
59 def tfilter(member, path):
60 res = tarfile.data_filter(member, path)
61 if res:
62 if '/' in res.name:
63 _, res.name = res.name.split('/', 1)
64 return res
65 return None
67 _log.info("copy %s:%s -> %s", ctn.name, src, dst)
68 odir = Path(dst)
69 bin, arc = ctn.get_archive(str(src))
70 _log.debug("arc=%s", arc)
71 bio = io.BytesIO()
72 for x in bin:
73 bio.write(x)
74 bio.seek(0)
75 tf = tarfile.TarFile(fileobj=bio)
76 members = tf.getmembers()
77 if len(members) == 1 and members[0].isreg():
78 _log.info("single file: %s", members[0])
79 tf.extractall(odir.parent, filter='data')
80 else:
81 odir.mkdir(exist_ok=True, parents=True)
82 tf.extractall(odir, filter=tfilter)
83 tf.close()
84 bio.close()
87def compose(client: docker.DockerClient, project, volume): # noqa: C901
88 """generate docker-compose.yml from running containers"""
89 svcs = {}
90 vols = {}
91 nets: dict[str, Any] = {}
92 for ctn in client.containers.list():
93 config = ctn.attrs.get("Config", {})
94 hostconfig = ctn.attrs.get("HostConfig", {})
95 labels: dict[str, str] = config.get("Labels", {})
96 proj = labels.get("com.docker.compose.project")
97 wdir = Path(labels.get("com.docker.compose.project.working_dir", "/"))
98 if project and not proj: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true
99 _log.debug("skip: no project: %s", ctn.name)
100 continue
101 if project and proj and not fnmatch.fnmatch(proj, project):
102 _log.debug("skip by project (%s)", proj)
103 continue
104 name = labels.get("com.docker.compose.service", ctn.name)
105 _log.info("processing %s, service=%s", ctn.name, name)
106 img = ctn.image
107 imglabel = img.labels
108 imgconfig = img.attrs.get("Config", {})
109 for k, v in imglabel.items():
110 if labels.get(k) == v:
111 labels.pop(k)
112 labels = {k: v for k, v in labels.items() if not k.startswith("com.docker.compose.")}
113 envs = envlist2map(config.get("Env", []))
114 imgenv = envlist2map(imgconfig.get("Env", []))
115 for k, v in imgenv.items():
116 if envs.get(k) == v:
117 envs.pop(k)
118 imgvol = imgconfig.get("Volumes", {})
119 cvols = []
120 for i in (hostconfig.get("Binds") or []):
121 v = i.split(":", 2)
122 if imgvol and v[1] in imgvol: 122 ↛ 123line 122 didn't jump to line 123 because the condition on line 122 was never true
123 continue
124 src = Path(v[0])
125 dest = v[1]
126 if src.is_relative_to(wdir):
127 srcstr = "./" + str(src.relative_to(wdir))
128 else:
129 srcstr = str(src)
130 if len(v) == 2 or v[2] == "rw":
131 cvols.append(f"{srcstr}:{dest}")
132 elif len(v) == 3: 132 ↛ 134line 132 didn't jump to line 134 because the condition on line 132 was always true
133 cvols.append(f"{srcstr}:{dest}:{v[2]}")
134 if volume and srcstr.startswith("./"): 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true
135 for is_dir, tinfo, bin in download_files(ctn, dest):
136 _log.debug("read from volume: src=%s, is_dir=%s, name=%s, %s bytes",
137 srcstr, is_dir, tinfo.name, len(bin))
138 yield Path(srcstr) / ".." / tinfo.name, bin
139 elif volume: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true
140 _log.info("skip copy: %s:%s -> %s", name, dest, srcstr)
141 for m in hostconfig.get("Mounts", []):
142 if imgvol and m.get("Target") in imgvol: 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true
143 continue
144 volname = m.get("Source")
145 if proj and volname.startswith(proj + "_"): 145 ↛ 147line 145 didn't jump to line 147 because the condition on line 145 was always true
146 volname = volname[len(proj) + 1:]
147 if m.get("Type") == "volume": 147 ↛ 149line 147 didn't jump to line 149 because the condition on line 147 was always true
148 vols[volname] = m.get("VolumeOptions", {})
149 if m.get("Target"): 149 ↛ 141line 149 didn't jump to line 141 because the condition on line 149 was always true
150 cvols.append(f"{volname}:{m['Target']}")
151 nwmode: str | None = None
152 cnws = []
153 if not proj or hostconfig.get("NetworkMode") != f"{proj}_default": 153 ↛ 155line 153 didn't jump to line 155 because the condition on line 153 was always true
154 nwmode = hostconfig.get("NetworkMode")
155 if isinstance(nwmode, str) and nwmode not in ("host", "none"): 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true
156 nets[nwmode] = {}
157 cnws.append(nwmode)
158 nwmode = None
159 svc = {
160 "image": config.get("Image"),
161 }
162 if proj and not ctn.name.startswith(proj + "_"):
163 svc["container_name"] = ctn.name
164 if nwmode: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true
165 svc["network_mode"] = nwmode
166 if cvols:
167 svc["volumes"] = cvols
168 if cnws: 168 ↛ 169line 168 didn't jump to line 169 because the condition on line 168 was never true
169 svc["networks"] = cnws
170 if hostconfig.get("PortBindings"):
171 svc["ports"] = portmap2compose(hostconfig.get("PortBindings", {}))
172 if hostconfig.get("RestartPolicy", {}).get("Name") not in ("no", None):
173 svc["restart"] = hostconfig.get("RestartPolicy", {}).get("Name")
174 if labels:
175 svc["labels"] = labels
176 if envs:
177 svc["environment"] = envs
178 convmap_hostconfig = {
179 "ExtraHosts": "extra_hosts",
180 "CpuShares": "cpu_shares",
181 "CpuPeriod": "cpu_period",
182 "CpuPercent": "cpu_percent",
183 "CpuCount": "cpu_count",
184 "CpuQuota": "cpu_quota",
185 "CpuRealtimeRuntime": "cpu_rt_runtime",
186 "CpuRealtimePeriod": "cpu_rt_period",
187 "CpusetCpus": "cpuset",
188 "CapAdd": "cap_add",
189 "CapDrop": "cap_drop",
190 "CgroupParent": "cgroup_parent",
191 "GroupAdd": "group_add",
192 "Privileged": "privileged",
193 }
194 convmap_label = {
195 "com.docker.compose.depends_on": "depends_on",
196 }
197 convdict(convmap_hostconfig, hostconfig, svc)
198 convdict(convmap_label, config.get("Labels", {}), svc)
199 diffcopy_config = {
200 "Cmd": "command",
201 "Entrypoint": "entrypoint",
202 }
203 convdict_differ(diffcopy_config, imgconfig, config, svc)
204 svcs[name] = svc
205 res = {}
206 if svcs:
207 res["services"] = svcs
208 if vols:
209 res["volumes"] = vols
210 if nets: 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true
211 res["networks"] = nets
212 yield Path("compose.yml"), yaml.dump(res, allow_unicode=True, encoding="utf-8", sort_keys=False)
213 return res