Coverage for volexpcsi/controller.py: 87%

146 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-28 12:48 +0000

1import grpc 

2from logging import getLogger 

3from volexport.client import VERequest 

4from google.protobuf.message import Message 

5from google.protobuf.json_format import MessageToDict 

6from . import api 

7from .accesslog import servicer_accesslog 

8 

9_log = getLogger(__name__) 

10 

11 

12@servicer_accesslog 

13class VolExpControl(api.ControllerServicer): 

14 def __init__(self, config: dict): 

15 self.config = config 

16 self.req = VERequest(config["endpoint"]) 

17 

18 def _validate(self, request: Message): 

19 if hasattr(request, "volume_id"): 

20 if not getattr(request, "volume_id"): 

21 raise ValueError("volume id is empty") 

22 

23 def GetCapacity(self, request: api.GetCapacityRequest, context: grpc.ServicerContext): 

24 res = self.req.get("/stats/volume") 

25 res.raise_for_status() 

26 resj = res.json() 

27 return api.GetCapacityResponse(available_capacity=resj["free"]) 

28 

29 def ControllerGetCapabilities(self, request: api.ControllerGetCapabilitiesRequest, context: grpc.ServicerContext): 

30 caps: list[api.ControllerServiceCapability.RPC.Type] = [ 

31 api.ControllerServiceCapability.RPC.CREATE_DELETE_VOLUME, 

32 api.ControllerServiceCapability.RPC.PUBLISH_UNPUBLISH_VOLUME, 

33 api.ControllerServiceCapability.RPC.LIST_VOLUMES, 

34 api.ControllerServiceCapability.RPC.EXPAND_VOLUME, 

35 api.ControllerServiceCapability.RPC.GET_CAPACITY, 

36 api.ControllerServiceCapability.RPC.GET_VOLUME, 

37 api.ControllerServiceCapability.RPC.PUBLISH_READONLY, 

38 # api.ControllerServiceCapability.RPC.MODIFY_VOLUME, 

39 # api.ControllerServiceCapability.RPC.GET_SNAPSHOT, 

40 ] 

41 res: list[api.ControllerServiceCapability] = [ 

42 api.ControllerServiceCapability(rpc=api.ControllerServiceCapability.RPC(type=typ)) for typ in caps 

43 ] 

44 return api.ControllerGetCapabilitiesResponse(capabilities=res) 

45 

46 def ListVolumes(self, request: api.ListVolumesRequest, context: grpc.ServicerContext): 

47 vols = self.req.get("/volume") 

48 vols.raise_for_status() 

49 res: list[api.ListVolumesResponse.Entry] = [] 

50 flag = True 

51 if request.starting_token: 

52 if not request.starting_token.startswith("vol-"): 

53 raise AssertionError(f"invalid starting token: {request.starting_token}") 

54 flag = False 

55 next_entry: str | None = None 

56 for vol in vols.json(): 

57 if request.max_entries and request.max_entries <= len(res): 

58 next_entry = vol["name"] 

59 break 

60 if not flag and vol["name"] == request.starting_token.removeprefix("vol-"): 

61 flag = True 

62 if not flag: 

63 continue 

64 vent = api.Volume( 

65 volume_id=vol.get("name"), 

66 capacity_bytes=vol.get("size"), 

67 ) 

68 stat = api.ListVolumesResponse.VolumeStatus( 

69 volume_condition=api.VolumeCondition(abnormal=False), 

70 published_node_ids=vol.get("addresses", []), 

71 ) 

72 ent = api.ListVolumesResponse.Entry(volume=vent, status=stat) 

73 res.append(ent) 

74 if next_entry: 

75 return api.ListVolumesResponse(entries=res, next_token="vol-" + next_entry) 

76 return api.ListVolumesResponse(entries=res) 

77 

78 def CreateVolume(self, request: api.CreateVolumeRequest, context: grpc.ServicerContext): 

79 self._validate(request) 

80 if not request.name: 

81 raise ValueError("no volume name") 

82 if not request.capacity_range.required_bytes: 

83 raise ValueError("no capacity specified") 

84 chk = self.req.get(f"/volume/{request.name}") 

85 if chk.status_code == 200: 

86 volsize = chk.json()["size"] 

87 if request.capacity_range.required_bytes and volsize < request.capacity_range.required_bytes: 

88 raise FileExistsError("volume already exists(short)") 

89 if request.capacity_range.limit_bytes and request.capacity_range.limit_bytes < volsize: 

90 raise FileExistsError("volume already exists(too large)") 

91 return api.CreateVolumeResponse(volume=api.Volume(capacity_bytes=volsize, volume_id=request.name)) 

92 res = self.req.post( 

93 "/volume", 

94 json=dict( 

95 name=request.name, 

96 size=request.capacity_range.required_bytes, 

97 ), 

98 ) 

99 res.raise_for_status() 

100 resj = res.json() 

101 volname = resj["name"] 

102 mkfsres = self.req.post(f"/volume/{volname}/mkfs", json=dict()) 

103 mkfsres.raise_for_status() 

104 return api.CreateVolumeResponse( 

105 volume=api.Volume( 

106 capacity_bytes=resj["size"], 

107 volume_id=resj["name"], 

108 ) 

109 ) 

110 

111 def DeleteVolume(self, request: api.DeleteVolumeRequest, context: grpc.ServicerContext): 

112 self._validate(request) 

113 res = self.req.delete(f"/volume/{request.volume_id}") 

114 if res.status_code == 404: 

115 _log.info("delete not found: %s", request.volume_id) 

116 return api.DeleteVolumeResponse() 

117 res.raise_for_status() 

118 return api.DeleteVolumeResponse() 

119 

120 def ControllerPublishVolume(self, request: api.ControllerPublishVolumeRequest, context: grpc.ServicerContext): 

121 self._validate(request) 

122 if not request.node_id: 

123 raise ValueError("no node_id") 

124 if not MessageToDict(request.volume_capability): 

125 raise ValueError("no capability") 

126 # if request.volume_capability.access_mode not in (api.VolumeCapability.AccessMode.SINGLE_NODE_WRITER,): 

127 # raise ValueError("invalid mode") 

128 if not request.volume_capability.mount.fs_type: 

129 raise ValueError("invalid type") 

130 res = self.req.post("/export", json=dict(name=request.volume_id, readonly=request.readonly, acl=None)) 

131 res.raise_for_status() 

132 resj = res.json() 

133 ctxt = {k: str(v) for k, v in resj.items()} 

134 return api.ControllerPublishVolumeResponse(publish_context=ctxt) 

135 

136 def ControllerUnpublishVolume(self, request: api.ControllerUnpublishVolumeRequest, context: grpc.ServicerContext): 

137 self._validate(request) 

138 qres = self.req.get("/export", params=dict(volume=request.volume_id)) 

139 qres.raise_for_status() 

140 for tgt in qres.json(): 

141 if request.volume_id not in tgt["volumes"]: 

142 _log.warning("export response: volume_id=%s, tgt=%s", request.volume_id, tgt) 

143 continue 

144 tgtname = tgt["targetname"] 

145 res = self.req.delete(f"/export/{tgtname}") 

146 res.raise_for_status() 

147 return api.ControllerUnpublishVolumeResponse() 

148 

149 def ControllerExpandVolume(self, request: api.ControllerExpandVolumeRequest, context: grpc.ServicerContext): 

150 self._validate(request) 

151 res = self.req.post(f"/volume/{request.volume_id}", json=dict(size=request.capacity_range.required_bytes)) 

152 res.raise_for_status() 

153 return api.ControllerExpandVolumeResponse(capacity_bytes=res.json()["size"], node_expansion_required=True) 

154 

155 def ControllerGetVolume(self, request: api.ControllerGetVolumeRequest, context: grpc.ServicerContext): 

156 self._validate(request) 

157 res = self.req.get(f"/volume/{request.volume_id}") 

158 res.raise_for_status() 

159 resj = res.json() 

160 qres = self.req.get("/export", params=dict(volume=request.volume_id)) 

161 qres.raise_for_status() 

162 qresj = qres.json() 

163 nodes = [] 

164 for i in qresj: 

165 nodes.extend(i["connected"]["address"]) 

166 return api.ControllerGetVolumeResponse( 

167 volume=api.Volume( 

168 capacity_bytes=resj["size"], 

169 volume_id=resj["name"], 

170 ), 

171 status=api.ControllerGetVolumeResponse.VolumeStatus( 

172 published_node_ids=nodes, volume_condition=api.VolumeCondition(abnormal=False) 

173 ), 

174 ) 

175 

176 def ControllerModifyVolume(self, request: api.ControllerModifyVolumeRequest, context: grpc.ServicerContext): 

177 self._validate(request) 

178 return super().ControllerModifyVolume(request, context) 

179 

180 def ValidateVolumeCapabilities(self, request: api.ValidateVolumeCapabilitiesRequest, context: grpc.ServicerContext): 

181 self._validate(request) 

182 if not request.volume_capabilities: 

183 raise ValueError("no capabilities") 

184 res = self.req.get(f"/volume/{request.volume_id}") 

185 if res.status_code != 200: 

186 raise FileNotFoundError(f"volume not found: {request.volume_id}") 

187 supported_mode = [ 

188 api.VolumeCapability.AccessMode.Mode.SINGLE_NODE_WRITER, 

189 ] 

190 caps: list[api.VolumeCapability] = [] 

191 for cap in request.volume_capabilities: 

192 if cap.access_mode.mode in supported_mode: 

193 caps.append(cap) 

194 return api.ValidateVolumeCapabilitiesResponse( 

195 confirmed=api.ValidateVolumeCapabilitiesResponse.Confirmed( 

196 volume_capabilities=caps, 

197 parameters=request.parameters, 

198 mutable_parameters=request.mutable_parameters, 

199 ), 

200 ) 

201 

202 def ListSnapshots(self, request: api.ListSnapshotsRequest, context: grpc.ServicerContext): 

203 return super().ListSnapshots(request, context) 

204 

205 def CreateSnapshot(self, request: api.CreateSnapshotRequest, context: grpc.ServicerContext): 

206 return super().CreateSnapshot(request, context) 

207 

208 def DeleteSnapshot(self, request: api.DeleteSnapshotRequest, context: grpc.ServicerContext): 

209 return super().DeleteSnapshot(request, context) 

210 

211 def GetSnapshot(self, request: api.GetSnapshotRequest, context: grpc.ServicerContext): 

212 return super().GetSnapshot(request, context)