Coverage for volexpcsi/node.py: 68%

142 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-28 12:48 +0000

1import grpc 

2import subprocess 

3import shlex 

4from pathlib import Path 

5from logging import getLogger 

6from google.protobuf.message import Message 

7from google.protobuf.json_format import MessageToDict 

8from volexport.client import VERequest 

9from . import api 

10from .accesslog import servicer_accesslog 

11 

12_log = getLogger(__name__) 

13 

14 

15@servicer_accesslog 

16class VolExpNode(api.NodeServicer): 

17 def __init__(self, config: dict): 

18 self.config = config 

19 self.req = VERequest(config["endpoint"]) 

20 self.become_method: str | None = config.get("become_method") 

21 

22 def _validate(self, request: Message): 

23 notempty = {"volume_id", "target_path"} 

24 for i in notempty: 

25 if hasattr(request, i): 

26 if not getattr(request, i): 

27 raise ValueError(f"empty {i}") 

28 

29 def runcmd(self, cmd: list[str], root: bool = True): 

30 """Run a command""" 

31 _log.info("run %s, root=%s", cmd, root) 

32 if root and self.become_method: 32 ↛ 33line 32 didn't jump to line 33 because the condition on line 32 was never true

33 if self.become_method in ("su",): 

34 cmd = ["su", "-c", shlex.join(cmd)] 

35 elif self.become_method.lower() not in ("false", "none"): 

36 cmd[0:0] = shlex.split(self.become_method) 

37 res = subprocess.run( 

38 cmd, 

39 capture_output=True, 

40 encoding="utf-8", 

41 timeout=10.0, 

42 stdin=subprocess.DEVNULL, 

43 start_new_session=True, 

44 ) 

45 _log.info("returncode=%s, stdout=%s, stderr=%s", res.returncode, repr(res.stdout), repr(res.stderr)) 

46 res.check_returncode() 

47 return res 

48 

49 def iscsiadm(self, **kwargs): 

50 arg = ["iscsiadm"] 

51 for k, v in kwargs.items(): 

52 if len(k) == 1: 52 ↛ 55line 52 didn't jump to line 55 because the condition on line 52 was always true

53 arg.append(f"-{k}") 

54 else: 

55 arg.append(f"--{k}") 

56 if v is not None: 

57 arg.append(v) 

58 return self.runcmd(arg, root=True) 

59 

60 def NodeGetInfo(self, request: api.NodeGetInfoRequest, context: grpc.ServicerContext): 

61 return api.NodeGetInfoResponse(node_id=self.config["nodeid"]) 

62 

63 def NodeStageVolume(self, request: api.NodeStageVolumeRequest, context: grpc.ServicerContext): 

64 self._validate(request) 

65 if not request.staging_target_path: 

66 raise ValueError("no staging target path") 

67 if not MessageToDict(request.volume_capability): 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true

68 raise ValueError("no capability") 

69 # attach iscsi 

70 targetname = request.publish_context.get("targetname") 

71 username = request.publish_context.get("user") 

72 password = request.publish_context.get("passwd") 

73 addrs = self.req.get("/address").json() 

74 self.iscsiadm(m="discovery", t="st", p=addrs[0]) 

75 self.iscsiadm(m="node", T=targetname, o="update", n="node.session.auth.authmethod", v="CHAP") 

76 self.iscsiadm(m="node", T=targetname, o="update", n="node.session.auth.username", v=username) 

77 self.iscsiadm(m="node", T=targetname, o="update", n="node.session.auth.password", v=password) 

78 try: 

79 self.iscsiadm(m="node", T=targetname, l=None) 

80 except subprocess.CalledProcessError as e: 

81 if e.returncode == 15 and "already present" in e.stderr: 

82 _log.info("already logged in: %s", targetname) 

83 else: 

84 raise 

85 return api.NodeStageVolumeResponse() 

86 

87 def NodeUnstageVolume(self, request: api.NodeUnstageVolumeRequest, context: grpc.ServicerContext): 

88 self._validate(request) 

89 if not request.staging_target_path: 

90 raise ValueError("no staging target path") 

91 # detach iscsi 

92 res = self.req.get("/export", params=dict(volume=request.volume_id)) 

93 res.raise_for_status() 

94 for tgt in res.json(): 

95 if request.volume_id not in tgt["volumes"]: 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true

96 _log.warning("export response: volume_id=%s, tgt=%s", request.volume_id, tgt) 

97 continue 

98 targetname = tgt.get("targetname") 

99 portal = None 

100 try: 

101 cmdres = self.iscsiadm(m="node", T=targetname, u=None) 

102 for line in cmdres.stdout.splitlines(): 102 ↛ 112line 102 didn't jump to line 112 because the loop on line 102 didn't complete

103 if line.endswith("successful."): 103 ↛ 102line 103 didn't jump to line 102 because the condition on line 103 was always true

104 portal = line.split("[", 1)[-1].split("]", 1)[0].rsplit(maxsplit=1)[-1] 

105 break 

106 except subprocess.CalledProcessError as e: 

107 if e.returncode == 21 and "No matching sessions found" in e.stderr: 

108 _log.info("already logout? %s", targetname) 

109 pass 

110 else: 

111 raise 

112 if portal: 112 ↛ 94line 112 didn't jump to line 94 because the condition on line 112 was always true

113 self.iscsiadm(m="discoverydb", t="st", p=portal, o="delete") 

114 return api.NodeUnstageVolumeResponse() 

115 

116 def NodePublishVolume(self, request: api.NodePublishVolumeRequest, context: grpc.ServicerContext): 

117 self._validate(request) 

118 if not MessageToDict(request.volume_capability): 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true

119 raise ValueError("no capability") 

120 # mount device 

121 p = Path(request.target_path) 

122 if not p.exists(): 122 ↛ 124line 122 didn't jump to line 124 because the condition on line 122 was always true

123 p.mkdir() 

124 try: 

125 self.runcmd(["mount", "-L", request.volume_id[:16], request.target_path], root=True) 

126 except subprocess.CalledProcessError as e: 

127 if e.returncode == 32 and "already mounted" in e.stderr: 

128 _log.info("alread mounted: %s", request.volume_id) 

129 else: 

130 raise 

131 return api.NodePublishVolumeResponse() 

132 

133 def NodeUnpublishVolume(self, request: api.NodeUnpublishVolumeRequest, context: grpc.ServicerContext): 

134 self._validate(request) 

135 p = Path(request.target_path) 

136 if p.is_mount(): 

137 # umount device 

138 self.runcmd(["umount", request.target_path], root=True) 

139 p.rmdir() 

140 else: 

141 raise FileNotFoundError(f"target path is not mounted: {request.target_path}") 

142 return api.NodeUnpublishVolumeResponse() 

143 

144 def NodeExpandVolume(self, request: api.NodeExpandVolumeRequest, context: grpc.ServicerContext): 

145 self._validate(request) 

146 if not request.volume_path: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true

147 raise ValueError("no volume path") 

148 res = self.req.get("/export", params=dict(volume=request.volume_id)) 

149 res.raise_for_status() 

150 for tgt in res.json(): 150 ↛ 157line 150 didn't jump to line 157 because the loop on line 150 didn't complete

151 if request.volume_id not in tgt["volumes"]: 151 ↛ 152line 151 didn't jump to line 152 because the condition on line 151 was never true

152 _log.warning("export response: volume_id=%s, tgt=%s", request.volume_id, tgt) 

153 continue 

154 targetname = tgt.get("targetname") 

155 break 

156 else: 

157 _log.warning(f"volume not exported: {request.volume_id}") 

158 return api.NodeExpandVolumeResponse() 

159 try: 

160 res = self.runcmd(["blkid", "-L", request.volume_id[:16]]) 

161 except subprocess.CalledProcessError: 

162 raise FileNotFoundError(f"volume not found: {request.volume_id}") 

163 devname = res.stdout.strip() 

164 # rescan iscsi 

165 self.iscsiadm(m="node", T=targetname, R=None) 

166 # online resize 

167 self.runcmd(["resize2fs", devname], root=True) 

168 return api.NodeExpandVolumeResponse() 

169 

170 def NodeGetCapabilities(self, request: api.NodeGetCapabilitiesRequest, context: grpc.ServicerContext): 

171 caps: list[api.NodeServiceCapability.RPC.Type] = [ 

172 api.NodeServiceCapability.RPC.Type.STAGE_UNSTAGE_VOLUME, 

173 api.NodeServiceCapability.RPC.Type.EXPAND_VOLUME, 

174 ] 

175 return api.NodeGetCapabilitiesResponse( 

176 capabilities=[api.NodeServiceCapability(rpc=api.NodeServiceCapability.RPC(type=x)) for x in caps] 

177 ) 

178 

179 def NodeGetVolumeStats(self, request: api.NodeGetVolumeStatsRequest, context: grpc.ServicerContext): 

180 self._validate(request) 

181 # df 

182 return api.NodeGetVolumeStatsResponse()