Coverage for volexport/main.py: 50%

182 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-28 12:48 +0000

1import os 

2import click 

3import uvicorn 

4from logging import getLogger 

5from .cli_utils import verbose_option 

6from .version import VERSION 

7 

8_log = getLogger(__name__) 

9 

10 

11@click.version_option(version=VERSION, prog_name="volexport") 

12@click.group(invoke_without_command=True) 

13@click.pass_context 

14def cli(ctx): 

15 if ctx.invoked_subcommand is None: 15 ↛ 16line 15 didn't jump to line 16 because the condition on line 15 was never true

16 click.echo(ctx.get_help()) 

17 

18 

19@cli.command() 

20@verbose_option 

21@click.option("--become-method", help="sudo/doas/runas, etc...") 

22@click.option("--tgtadm-bin", help="tgtadm command") 

23@click.option("--tgt-bstype", help="backing store type") 

24@click.option("--tgt-bsopts", help="bs options") 

25@click.option("--tgt-bsoflags", help="bs open flags") 

26@click.option("--lvm-bin", help="lvm command") 

27@click.option("--nics", multiple=True, help="use interfaces") 

28@click.option("--iqn-base", help="iSCSI target basename") 

29@click.option("--vg", help="LVM volume group") 

30@click.option("--lvm-thinpool", help="LVM thin pool") 

31@click.option( 

32 "--hostport", 

33 default="127.0.0.1:8080", 

34 envvar="VOLEXP_HOSTPORT", 

35 help="listen host:port, unix socket: unix://(path)", 

36 show_default=True, 

37) 

38@click.option("--log-config", type=click.Path(), help="uvicorn log config") 

39@click.option("--cmd-timeout", type=float, envvar="VOLEXP_CMD_TIMEOUT", help="command execution timeout") 

40@click.option("--check/--skip-check", default=True, help="pre-boot check") 

41def server(hostport, log_config, check, **kwargs): 

42 """Run the volexport server.""" 

43 import json 

44 from urllib.parse import urlparse 

45 

46 for k, v in kwargs.items(): 

47 if k is None or v is None or (isinstance(v, tuple) and len(v) == 0): 

48 continue 

49 kk = f"VOLEXP_{k.upper()}" 

50 if isinstance(v, tuple): 

51 vv = json.dumps(list(v)) 

52 else: 

53 vv = v 

54 os.environ[kk] = vv 

55 

56 from .api import api 

57 from .config import config 

58 from .config2 import config2 

59 from .lvm2 import VG 

60 from .tgtd import Tgtd 

61 

62 _log.debug("config: %s", config) 

63 if log_config is None: 63 ↛ 67line 63 didn't jump to line 67 because the condition on line 63 was always true

64 getLogger("uvicorn").setLevel("INFO") 

65 

66 # pre-boot check 

67 if check: 67 ↛ 75line 67 didn't jump to line 75 because the condition on line 67 was always true

68 if os.getuid() == 0 and config.BECOME_METHOD: 68 ↛ 69line 68 didn't jump to line 69 because the condition on line 68 was never true

69 _log.info("you are already root. disable become_method") 

70 config.BECOME_METHOD = "" 

71 assert VG(config2.VG).get() is not None 

72 assert Tgtd().sys_show() is not None 

73 

74 # start server 

75 if "://" not in hostport: 

76 url = urlparse("//" + hostport) 

77 else: 

78 url = urlparse(hostport) 

79 if url.scheme == "unix": 

80 uvicorn.run(api, uds=url.path, log_config=log_config) 

81 else: 

82 uvicorn.run(api, host=url.hostname, port=url.port, log_config=log_config) 

83 

84 

85@cli.command() 

86@click.option("--format", type=click.Choice(["yaml", "json"]), default="yaml", show_default=True) 

87def apispec(format): 

88 """Generate OpenAPI specification for the volexport API.""" 

89 import sys 

90 

91 os.environ["VOLEXP_VG"] = "dummy" 

92 os.environ["VOLEXP_NICS"] = "[]" 

93 from .api import api 

94 

95 if format == "yaml": 

96 import yaml 

97 

98 yaml.dump(api.openapi(), stream=sys.stdout) 

99 elif format == "json": 99 ↛ exitline 99 didn't return from function 'apispec' because the condition on line 99 was always true

100 import json 

101 

102 json.dump(api.openapi(), fp=sys.stdout) 

103 

104 

105@cli.command() 

106@verbose_option 

107@click.option("--endpoint", required=True, help="volexport endpoint", envvar="VOLEXP_ENDPOINT") 

108@click.option("--node-id", required=True, help="node id", envvar="VOLEXP_NODE_ID") 

109@click.option( 

110 "--hostport", 

111 default="127.0.0.1:9999", 

112 show_default=True, 

113 help="listen host:port, unix socket: unix://(path)", 

114 envvar="VOLEXP_HOSTPORT", 

115) 

116@click.option("--private-key", type=click.File("r"), help="private key .pem file for TLS") 

117@click.option("--cert", type=click.File("r"), help="certificate .pem file for TLS") 

118@click.option("--rootcert", type=click.File("r"), help="ca cert for TLS/mTLS") 

119@click.option("--use-mtls/--no-mtls", default=False, show_default=True, help="use client auth") 

120@click.option("--max-workers", type=int, help="# of workers", envvar="VOLEXP_MAX_WORKERS") 

121def csiserver(hostport, endpoint, node_id, private_key, cert, rootcert, use_mtls, max_workers): 

122 """Run the CSI driver service""" 

123 from pathlib import Path 

124 from volexpcsi.server import boot_server 

125 

126 _log.info("starting server: %s", hostport) 

127 conf = dict( 

128 endpoint=endpoint, 

129 nodeid=node_id, 

130 max_workers=max_workers, 

131 become_method="sudo", 

132 ) 

133 if private_key and cert: 

134 import grpc 

135 

136 pkey = Path(private_key).read_bytes() 

137 chain = Path(cert).read_bytes() 

138 root = None 

139 if rootcert: 

140 root = Path(rootcert).read_bytes() 

141 cred = grpc.ssl_server_credentials( 

142 [(pkey, chain)], 

143 root, 

144 require_client_auth=use_mtls, 

145 ) 

146 port, srv = boot_server(hostport=hostport, config=conf, cred=cred) 

147 else: 

148 port, srv = boot_server(hostport=hostport, config=conf) 

149 _log.info("server started: port=%s", port) 

150 exit = srv.wait_for_termination() 

151 _log.info("server finished: timeout=%s", exit) 

152 

153 

154@cli.command() 

155@verbose_option 

156@click.option("--vg", help="LVM volume group") 

157@click.option("--untag/--tag", default=False, help="remove/add tags to volumes") 

158def tag_volume(untag, **kwargs): 

159 for k, v in kwargs.items(): 

160 if k is None or v is None: 

161 continue 

162 kk = f"VOLEXP_{k.upper()}" 

163 os.environ[kk] = v 

164 os.environ["VOLEXP_NICS"] = "[]" 

165 from .config2 import config2 

166 from .lvm2 import LV 

167 from .util import runcmd 

168 

169 data = LV(config2.VG).getlist() 

170 

171 for vol in data: 

172 lvmname: str = vol["lv_name"] 

173 volname: str | None = None 

174 for tag in vol.get("lv_tags", "").split(","): 

175 if tag.startswith("volname."): 

176 volname = tag.removeprefix("volname.") 

177 break 

178 if untag and volname: 

179 _log.info("remove tag: %s (%s)", lvmname, volname) 

180 runcmd(["lvchange", "--deltag", f"volname.{volname}", f"{config2.VG}/{lvmname}"], root=True) 

181 elif not untag and not volname: 

182 _log.info("set tag: %s", lvmname) 

183 runcmd(["lvchange", "--addtag", f"volname.{lvmname}", f"{config2.VG}/{lvmname}"], root=True) 

184 

185 

186@cli.command() 

187@verbose_option 

188@click.option("--vg", help="LVM volume group") 

189@click.option("--format", type=click.Choice(["yaml", "json"]), default="yaml", show_default=True) 

190@click.option("--output", type=click.File("w"), default="-", help="output file") 

191def list_volume(output, format, **kwargs): 

192 for k, v in kwargs.items(): 

193 if k is None or v is None: 

194 continue 

195 kk = f"VOLEXP_{k.upper()}" 

196 os.environ[kk] = v 

197 os.environ["VOLEXP_NICS"] = "[]" 

198 from .config2 import config2 

199 from .lvm2 import LV 

200 

201 data = LV(config2.VG).volume_list() 

202 

203 if format == "yaml": 

204 import yaml 

205 

206 yaml.dump(data, stream=output) 

207 elif format == "json": 

208 import json 

209 

210 json.dump(data, fp=output) 

211 

212 

213@cli.command() 

214@verbose_option 

215@click.option("--format", type=click.Choice(["yaml", "json"]), default="yaml", show_default=True) 

216@click.option("--output", type=click.File("w"), default="-", help="output file") 

217def list_vg(output, format, **kwargs): 

218 os.environ["VOLEXP_VG"] = "dummy" 

219 os.environ["VOLEXP_NICS"] = "[]" 

220 from .lvm2 import VG 

221 

222 data = VG("dummy").getlist() 

223 

224 if format == "yaml": 

225 import yaml 

226 

227 yaml.dump(data, stream=output) 

228 elif format == "json": 

229 import json 

230 

231 json.dump(data, fp=output) 

232 

233 

234@cli.command() 

235@verbose_option 

236@click.option("--format", type=click.Choice(["yaml", "json"]), default="yaml", show_default=True) 

237@click.option("--output", type=click.File("w"), default="-", help="output file") 

238def list_pv(output, format, **kwargs): 

239 os.environ["VOLEXP_VG"] = "dummy" 

240 os.environ["VOLEXP_NICS"] = "[]" 

241 from .lvm2 import PV 

242 

243 data = PV("dummy").getlist() 

244 

245 if format == "yaml": 

246 import yaml 

247 

248 yaml.dump(data, stream=output) 

249 elif format == "json": 

250 import json 

251 

252 json.dump(data, fp=output) 

253 

254 

255if __name__ == "__main__": 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true

256 cli()