Coverage for dlabel/api.py: 64%

119 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 23:32 +0000

1from fastapi import APIRouter, HTTPException 

2from fastapi.responses import JSONResponse, PlainTextResponse, StreamingResponse 

3import docker 

4import io 

5import jsonpointer 

6import crossplane 

7import tempfile 

8import tarfile 

9import time 

10import json 

11from abc import abstractmethod, ABCMeta 

12from typing import Any 

13from logging import getLogger 

14from .compose import compose 

15from .traefik import traefik_dump, traefik2nginx 

16from .traefik_conf import TraefikConfig 

17from .dockerfile import get_dockerfile 

18 

19_log = getLogger(__name__) 

20 

21 

22class CommonRoute(metaclass=ABCMeta): 

23 def __init__(self, client: docker.DockerClient): 

24 self.client = client 

25 self.router = APIRouter() 

26 kwargs = dict(response_model_exclude_none=True, response_model_exclude_unset=True) 

27 self.router.add_api_route("/", self.getroot, methods=["GET"], **kwargs) 

28 self.router.add_api_route("/{path:path}", self.getsub, methods=["GET"], **kwargs) 

29 

30 @abstractmethod 

31 def getroot(self, **kwargs) -> dict: 

32 raise NotImplementedError("GET /: not implemented") 

33 

34 def subpath(self, obj: dict, path: str) -> Any: 

35 try: 

36 res = jsonpointer.resolve_pointer(obj, "/"+path) 

37 if isinstance(res, (int, str)): 

38 return PlainTextResponse(content=str(res)) 

39 return JSONResponse(content=res) 

40 except jsonpointer.JsonPointerException as e: 

41 raise HTTPException(status_code=404, detail=dict(path=path, message=e.args[0])) 

42 

43 @abstractmethod 

44 def getsub(self, path: str, **kwargs) -> Any: 

45 return self.subpath(self.getroot(**kwargs), path=path) 

46 

47 

48class ComposeRoute(CommonRoute): 

49 def getroot(self, project: str | None = None) -> dict: 

50 try: 

51 g = compose(self.client, project=project, volume=False) 

52 while True: 

53 _ = next(g) 

54 except StopIteration as e: 

55 return e.value 

56 

57 def getsub(self, path: str, project: str | None = None) -> Any: 

58 if path == "_tar": 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true

59 return self.getarchive(project) 

60 return super().getsub(path, project=project) 

61 

62 def getarchive(self, project: str | None = None): 

63 def arc(): 

64 yield "hello" 

65 

66 return StreamingResponse(arc, media_type="application/x-tar") 

67 

68 

69class DockerfileRoute: 

70 def __init__(self, client: docker.DockerClient): 

71 self.client = client 

72 self.router = APIRouter() 

73 kwargs = dict(response_model_exclude_none=True, response_model_exclude_unset=True) 

74 self.router.add_api_route("/", self.getroot, methods=["GET"], **kwargs) 

75 self.router.add_api_route("/{container:path}/Dockerfile", self.get_dockerfile, methods=["GET"], 

76 response_class=PlainTextResponse, **kwargs) 

77 self.router.add_api_route( 

78 "/{container:path}/archive.tar", self.get_archive, methods=["GET"], 

79 response_class=StreamingResponse, 

80 responses={200: {"content": {"application/x-tar": {}}, }}, **kwargs) 

81 

82 def getroot(self) -> list[str]: 

83 # list containers 

84 return [x.name for x in self.client.containers.list()] 

85 

86 def get_dockerfile(self, container: str, ignore: list[str] = [], labels: bool = False) -> Any: 

87 # get dockerfile 

88 ctn = self.client.containers.get(container) 

89 for _, bin in get_dockerfile(ctn, ignore, labels, do_output=False): 

90 return PlainTextResponse(bin.decode()) 

91 

92 def get_archive(self, container: str, ignore: list[str] = [], labels: bool = False): 

93 ctn = self.client.containers.get(container) 

94 

95 def arc(): 

96 ofp = io.BytesIO() 

97 osk = ofp.tell() 

98 tf = tarfile.open(mode="w", fileobj=ofp, format=tarfile.GNU_FORMAT) 

99 for name, bin in get_dockerfile(ctn, ignore, labels, do_output=True): 

100 ti = tarfile.TarInfo(name) 

101 ti.mode = 0o644 

102 ti.mtime = time.time() 

103 ti.size = len(bin) 

104 tf.addfile(ti, io.BytesIO(bin)) 

105 _log.info("addfile %s, size=%s", name, len(bin)) 

106 if osk != ofp.tell(): 

107 ofp.seek(osk) 

108 yield ofp.read() 

109 osk = ofp.tell() 

110 tf.close() 

111 if osk != ofp.tell(): 

112 ofp.seek(osk) 

113 yield ofp.read() 

114 _log.info("finished: %s", container) 

115 

116 return StreamingResponse(arc(), media_type="application/x-tar") 

117 

118 

119class TraefikRoute(CommonRoute): 

120 def getroot(self) -> TraefikConfig: 

121 return traefik_dump(self.client) 

122 

123 def getsub(self, path: str) -> Any: 

124 return self.subpath(self.getroot().to_dict(), path=path) 

125 

126 

127class NginxRoute(CommonRoute): 

128 base_url = "http://localhost/" 

129 

130 def __init__(self, client: docker.DockerClient): 

131 self.client = client 

132 self.router = APIRouter() 

133 self.router.add_api_route("/", self.getroot, methods=["GET"], 

134 response_class=PlainTextResponse) 

135 self.router.add_api_route("/json", self.getplane, methods=["GET"]) 

136 self.router.add_api_route("/json/{path:path}", self.getplanesub, methods=["GET"]) 

137 

138 def getroot(self, ipaddr: bool = True) -> PlainTextResponse: 

139 tmp = io.StringIO() 

140 traefik2nginx(traefik_dump(self.client), tmp, baseconf=None, 

141 server_url=self.base_url, ipaddr=ipaddr) 

142 return PlainTextResponse(tmp.getvalue()) 

143 

144 def getplane(self, ipaddr: bool = True) -> dict: 

145 with tempfile.NamedTemporaryFile("r+") as tf: 

146 traefik2nginx(traefik_dump(self.client), tf, baseconf=None, 

147 server_url=self.base_url, ipaddr=ipaddr) 

148 tf.flush() 

149 res = crossplane.parse(tf.name, combine=True) 

150 return json.loads(json.dumps(res).replace("\""+tf.name+"\"", '"nginx.conf"')) 

151 

152 def getplanesub(self, path: str, ipaddr: bool = True) -> dict: 

153 return self.subpath(self.getplane(ipaddr), path) 

154 

155 def getsub(self, path): 

156 pass