Coverage for dlabel/api.py: 64%
119 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 23:32 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 23:32 +0000
1from fastapi import APIRouter, HTTPException
2from fastapi.responses import JSONResponse, PlainTextResponse, StreamingResponse
3import docker
4import io
5import jsonpointer
6import crossplane
7import tempfile
8import tarfile
9import time
10import json
11from abc import abstractmethod, ABCMeta
12from typing import Any
13from logging import getLogger
14from .compose import compose
15from .traefik import traefik_dump, traefik2nginx
16from .traefik_conf import TraefikConfig
17from .dockerfile import get_dockerfile
19_log = getLogger(__name__)
22class CommonRoute(metaclass=ABCMeta):
23 def __init__(self, client: docker.DockerClient):
24 self.client = client
25 self.router = APIRouter()
26 kwargs = dict(response_model_exclude_none=True, response_model_exclude_unset=True)
27 self.router.add_api_route("/", self.getroot, methods=["GET"], **kwargs)
28 self.router.add_api_route("/{path:path}", self.getsub, methods=["GET"], **kwargs)
30 @abstractmethod
31 def getroot(self, **kwargs) -> dict:
32 raise NotImplementedError("GET /: not implemented")
34 def subpath(self, obj: dict, path: str) -> Any:
35 try:
36 res = jsonpointer.resolve_pointer(obj, "/"+path)
37 if isinstance(res, (int, str)):
38 return PlainTextResponse(content=str(res))
39 return JSONResponse(content=res)
40 except jsonpointer.JsonPointerException as e:
41 raise HTTPException(status_code=404, detail=dict(path=path, message=e.args[0]))
43 @abstractmethod
44 def getsub(self, path: str, **kwargs) -> Any:
45 return self.subpath(self.getroot(**kwargs), path=path)
48class ComposeRoute(CommonRoute):
49 def getroot(self, project: str | None = None) -> dict:
50 try:
51 g = compose(self.client, project=project, volume=False)
52 while True:
53 _ = next(g)
54 except StopIteration as e:
55 return e.value
57 def getsub(self, path: str, project: str | None = None) -> Any:
58 if path == "_tar": 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true
59 return self.getarchive(project)
60 return super().getsub(path, project=project)
62 def getarchive(self, project: str | None = None):
63 def arc():
64 yield "hello"
66 return StreamingResponse(arc, media_type="application/x-tar")
69class DockerfileRoute:
70 def __init__(self, client: docker.DockerClient):
71 self.client = client
72 self.router = APIRouter()
73 kwargs = dict(response_model_exclude_none=True, response_model_exclude_unset=True)
74 self.router.add_api_route("/", self.getroot, methods=["GET"], **kwargs)
75 self.router.add_api_route("/{container:path}/Dockerfile", self.get_dockerfile, methods=["GET"],
76 response_class=PlainTextResponse, **kwargs)
77 self.router.add_api_route(
78 "/{container:path}/archive.tar", self.get_archive, methods=["GET"],
79 response_class=StreamingResponse,
80 responses={200: {"content": {"application/x-tar": {}}, }}, **kwargs)
82 def getroot(self) -> list[str]:
83 # list containers
84 return [x.name for x in self.client.containers.list()]
86 def get_dockerfile(self, container: str, ignore: list[str] = [], labels: bool = False) -> Any:
87 # get dockerfile
88 ctn = self.client.containers.get(container)
89 for _, bin in get_dockerfile(ctn, ignore, labels, do_output=False):
90 return PlainTextResponse(bin.decode())
92 def get_archive(self, container: str, ignore: list[str] = [], labels: bool = False):
93 ctn = self.client.containers.get(container)
95 def arc():
96 ofp = io.BytesIO()
97 osk = ofp.tell()
98 tf = tarfile.open(mode="w", fileobj=ofp, format=tarfile.GNU_FORMAT)
99 for name, bin in get_dockerfile(ctn, ignore, labels, do_output=True):
100 ti = tarfile.TarInfo(name)
101 ti.mode = 0o644
102 ti.mtime = time.time()
103 ti.size = len(bin)
104 tf.addfile(ti, io.BytesIO(bin))
105 _log.info("addfile %s, size=%s", name, len(bin))
106 if osk != ofp.tell():
107 ofp.seek(osk)
108 yield ofp.read()
109 osk = ofp.tell()
110 tf.close()
111 if osk != ofp.tell():
112 ofp.seek(osk)
113 yield ofp.read()
114 _log.info("finished: %s", container)
116 return StreamingResponse(arc(), media_type="application/x-tar")
119class TraefikRoute(CommonRoute):
120 def getroot(self) -> TraefikConfig:
121 return traefik_dump(self.client)
123 def getsub(self, path: str) -> Any:
124 return self.subpath(self.getroot().to_dict(), path=path)
127class NginxRoute(CommonRoute):
128 base_url = "http://localhost/"
130 def __init__(self, client: docker.DockerClient):
131 self.client = client
132 self.router = APIRouter()
133 self.router.add_api_route("/", self.getroot, methods=["GET"],
134 response_class=PlainTextResponse)
135 self.router.add_api_route("/json", self.getplane, methods=["GET"])
136 self.router.add_api_route("/json/{path:path}", self.getplanesub, methods=["GET"])
138 def getroot(self, ipaddr: bool = True) -> PlainTextResponse:
139 tmp = io.StringIO()
140 traefik2nginx(traefik_dump(self.client), tmp, baseconf=None,
141 server_url=self.base_url, ipaddr=ipaddr)
142 return PlainTextResponse(tmp.getvalue())
144 def getplane(self, ipaddr: bool = True) -> dict:
145 with tempfile.NamedTemporaryFile("r+") as tf:
146 traefik2nginx(traefik_dump(self.client), tf, baseconf=None,
147 server_url=self.base_url, ipaddr=ipaddr)
148 tf.flush()
149 res = crossplane.parse(tf.name, combine=True)
150 return json.loads(json.dumps(res).replace("\""+tf.name+"\"", '"nginx.conf"'))
152 def getplanesub(self, path: str, ipaddr: bool = True) -> dict:
153 return self.subpath(self.getplane(ipaddr), path)
155 def getsub(self, path):
156 pass