Coverage for volexpcsi/node.py: 68%
142 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-28 12:48 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-28 12:48 +0000
1import grpc
2import subprocess
3import shlex
4from pathlib import Path
5from logging import getLogger
6from google.protobuf.message import Message
7from google.protobuf.json_format import MessageToDict
8from volexport.client import VERequest
9from . import api
10from .accesslog import servicer_accesslog
12_log = getLogger(__name__)
15@servicer_accesslog
16class VolExpNode(api.NodeServicer):
17 def __init__(self, config: dict):
18 self.config = config
19 self.req = VERequest(config["endpoint"])
20 self.become_method: str | None = config.get("become_method")
22 def _validate(self, request: Message):
23 notempty = {"volume_id", "target_path"}
24 for i in notempty:
25 if hasattr(request, i):
26 if not getattr(request, i):
27 raise ValueError(f"empty {i}")
29 def runcmd(self, cmd: list[str], root: bool = True):
30 """Run a command"""
31 _log.info("run %s, root=%s", cmd, root)
32 if root and self.become_method: 32 ↛ 33line 32 didn't jump to line 33 because the condition on line 32 was never true
33 if self.become_method in ("su",):
34 cmd = ["su", "-c", shlex.join(cmd)]
35 elif self.become_method.lower() not in ("false", "none"):
36 cmd[0:0] = shlex.split(self.become_method)
37 res = subprocess.run(
38 cmd,
39 capture_output=True,
40 encoding="utf-8",
41 timeout=10.0,
42 stdin=subprocess.DEVNULL,
43 start_new_session=True,
44 )
45 _log.info("returncode=%s, stdout=%s, stderr=%s", res.returncode, repr(res.stdout), repr(res.stderr))
46 res.check_returncode()
47 return res
49 def iscsiadm(self, **kwargs):
50 arg = ["iscsiadm"]
51 for k, v in kwargs.items():
52 if len(k) == 1: 52 ↛ 55line 52 didn't jump to line 55 because the condition on line 52 was always true
53 arg.append(f"-{k}")
54 else:
55 arg.append(f"--{k}")
56 if v is not None:
57 arg.append(v)
58 return self.runcmd(arg, root=True)
60 def NodeGetInfo(self, request: api.NodeGetInfoRequest, context: grpc.ServicerContext):
61 return api.NodeGetInfoResponse(node_id=self.config["nodeid"])
63 def NodeStageVolume(self, request: api.NodeStageVolumeRequest, context: grpc.ServicerContext):
64 self._validate(request)
65 if not request.staging_target_path:
66 raise ValueError("no staging target path")
67 if not MessageToDict(request.volume_capability): 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true
68 raise ValueError("no capability")
69 # attach iscsi
70 targetname = request.publish_context.get("targetname")
71 username = request.publish_context.get("user")
72 password = request.publish_context.get("passwd")
73 addrs = self.req.get("/address").json()
74 self.iscsiadm(m="discovery", t="st", p=addrs[0])
75 self.iscsiadm(m="node", T=targetname, o="update", n="node.session.auth.authmethod", v="CHAP")
76 self.iscsiadm(m="node", T=targetname, o="update", n="node.session.auth.username", v=username)
77 self.iscsiadm(m="node", T=targetname, o="update", n="node.session.auth.password", v=password)
78 try:
79 self.iscsiadm(m="node", T=targetname, l=None)
80 except subprocess.CalledProcessError as e:
81 if e.returncode == 15 and "already present" in e.stderr:
82 _log.info("already logged in: %s", targetname)
83 else:
84 raise
85 return api.NodeStageVolumeResponse()
87 def NodeUnstageVolume(self, request: api.NodeUnstageVolumeRequest, context: grpc.ServicerContext):
88 self._validate(request)
89 if not request.staging_target_path:
90 raise ValueError("no staging target path")
91 # detach iscsi
92 res = self.req.get("/export", params=dict(volume=request.volume_id))
93 res.raise_for_status()
94 for tgt in res.json():
95 if request.volume_id not in tgt["volumes"]: 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true
96 _log.warning("export response: volume_id=%s, tgt=%s", request.volume_id, tgt)
97 continue
98 targetname = tgt.get("targetname")
99 portal = None
100 try:
101 cmdres = self.iscsiadm(m="node", T=targetname, u=None)
102 for line in cmdres.stdout.splitlines(): 102 ↛ 112line 102 didn't jump to line 112 because the loop on line 102 didn't complete
103 if line.endswith("successful."): 103 ↛ 102line 103 didn't jump to line 102 because the condition on line 103 was always true
104 portal = line.split("[", 1)[-1].split("]", 1)[0].rsplit(maxsplit=1)[-1]
105 break
106 except subprocess.CalledProcessError as e:
107 if e.returncode == 21 and "No matching sessions found" in e.stderr:
108 _log.info("already logout? %s", targetname)
109 pass
110 else:
111 raise
112 if portal: 112 ↛ 94line 112 didn't jump to line 94 because the condition on line 112 was always true
113 self.iscsiadm(m="discoverydb", t="st", p=portal, o="delete")
114 return api.NodeUnstageVolumeResponse()
116 def NodePublishVolume(self, request: api.NodePublishVolumeRequest, context: grpc.ServicerContext):
117 self._validate(request)
118 if not MessageToDict(request.volume_capability): 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true
119 raise ValueError("no capability")
120 # mount device
121 p = Path(request.target_path)
122 if not p.exists(): 122 ↛ 124line 122 didn't jump to line 124 because the condition on line 122 was always true
123 p.mkdir()
124 try:
125 self.runcmd(["mount", "-L", request.volume_id[:16], request.target_path], root=True)
126 except subprocess.CalledProcessError as e:
127 if e.returncode == 32 and "already mounted" in e.stderr:
128 _log.info("alread mounted: %s", request.volume_id)
129 else:
130 raise
131 return api.NodePublishVolumeResponse()
133 def NodeUnpublishVolume(self, request: api.NodeUnpublishVolumeRequest, context: grpc.ServicerContext):
134 self._validate(request)
135 p = Path(request.target_path)
136 if p.is_mount():
137 # umount device
138 self.runcmd(["umount", request.target_path], root=True)
139 p.rmdir()
140 else:
141 raise FileNotFoundError(f"target path is not mounted: {request.target_path}")
142 return api.NodeUnpublishVolumeResponse()
144 def NodeExpandVolume(self, request: api.NodeExpandVolumeRequest, context: grpc.ServicerContext):
145 self._validate(request)
146 if not request.volume_path: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true
147 raise ValueError("no volume path")
148 res = self.req.get("/export", params=dict(volume=request.volume_id))
149 res.raise_for_status()
150 for tgt in res.json(): 150 ↛ 157line 150 didn't jump to line 157 because the loop on line 150 didn't complete
151 if request.volume_id not in tgt["volumes"]: 151 ↛ 152line 151 didn't jump to line 152 because the condition on line 151 was never true
152 _log.warning("export response: volume_id=%s, tgt=%s", request.volume_id, tgt)
153 continue
154 targetname = tgt.get("targetname")
155 break
156 else:
157 _log.warning(f"volume not exported: {request.volume_id}")
158 return api.NodeExpandVolumeResponse()
159 try:
160 res = self.runcmd(["blkid", "-L", request.volume_id[:16]])
161 except subprocess.CalledProcessError:
162 raise FileNotFoundError(f"volume not found: {request.volume_id}")
163 devname = res.stdout.strip()
164 # rescan iscsi
165 self.iscsiadm(m="node", T=targetname, R=None)
166 # online resize
167 self.runcmd(["resize2fs", devname], root=True)
168 return api.NodeExpandVolumeResponse()
170 def NodeGetCapabilities(self, request: api.NodeGetCapabilitiesRequest, context: grpc.ServicerContext):
171 caps: list[api.NodeServiceCapability.RPC.Type] = [
172 api.NodeServiceCapability.RPC.Type.STAGE_UNSTAGE_VOLUME,
173 api.NodeServiceCapability.RPC.Type.EXPAND_VOLUME,
174 ]
175 return api.NodeGetCapabilitiesResponse(
176 capabilities=[api.NodeServiceCapability(rpc=api.NodeServiceCapability.RPC(type=x)) for x in caps]
177 )
179 def NodeGetVolumeStats(self, request: api.NodeGetVolumeStatsRequest, context: grpc.ServicerContext):
180 self._validate(request)
181 # df
182 return api.NodeGetVolumeStatsResponse()