Coverage for volexpcsi/controller.py: 87%
146 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-28 12:48 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-28 12:48 +0000
1import grpc
2from logging import getLogger
3from volexport.client import VERequest
4from google.protobuf.message import Message
5from google.protobuf.json_format import MessageToDict
6from . import api
7from .accesslog import servicer_accesslog
9_log = getLogger(__name__)
12@servicer_accesslog
13class VolExpControl(api.ControllerServicer):
14 def __init__(self, config: dict):
15 self.config = config
16 self.req = VERequest(config["endpoint"])
18 def _validate(self, request: Message):
19 if hasattr(request, "volume_id"):
20 if not getattr(request, "volume_id"):
21 raise ValueError("volume id is empty")
23 def GetCapacity(self, request: api.GetCapacityRequest, context: grpc.ServicerContext):
24 res = self.req.get("/stats/volume")
25 res.raise_for_status()
26 resj = res.json()
27 return api.GetCapacityResponse(available_capacity=resj["free"])
29 def ControllerGetCapabilities(self, request: api.ControllerGetCapabilitiesRequest, context: grpc.ServicerContext):
30 caps: list[api.ControllerServiceCapability.RPC.Type] = [
31 api.ControllerServiceCapability.RPC.CREATE_DELETE_VOLUME,
32 api.ControllerServiceCapability.RPC.PUBLISH_UNPUBLISH_VOLUME,
33 api.ControllerServiceCapability.RPC.LIST_VOLUMES,
34 api.ControllerServiceCapability.RPC.EXPAND_VOLUME,
35 api.ControllerServiceCapability.RPC.GET_CAPACITY,
36 api.ControllerServiceCapability.RPC.GET_VOLUME,
37 api.ControllerServiceCapability.RPC.PUBLISH_READONLY,
38 # api.ControllerServiceCapability.RPC.MODIFY_VOLUME,
39 # api.ControllerServiceCapability.RPC.GET_SNAPSHOT,
40 ]
41 res: list[api.ControllerServiceCapability] = [
42 api.ControllerServiceCapability(rpc=api.ControllerServiceCapability.RPC(type=typ)) for typ in caps
43 ]
44 return api.ControllerGetCapabilitiesResponse(capabilities=res)
46 def ListVolumes(self, request: api.ListVolumesRequest, context: grpc.ServicerContext):
47 vols = self.req.get("/volume")
48 vols.raise_for_status()
49 res: list[api.ListVolumesResponse.Entry] = []
50 flag = True
51 if request.starting_token:
52 if not request.starting_token.startswith("vol-"):
53 raise AssertionError(f"invalid starting token: {request.starting_token}")
54 flag = False
55 next_entry: str | None = None
56 for vol in vols.json():
57 if request.max_entries and request.max_entries <= len(res):
58 next_entry = vol["name"]
59 break
60 if not flag and vol["name"] == request.starting_token.removeprefix("vol-"):
61 flag = True
62 if not flag:
63 continue
64 vent = api.Volume(
65 volume_id=vol.get("name"),
66 capacity_bytes=vol.get("size"),
67 )
68 stat = api.ListVolumesResponse.VolumeStatus(
69 volume_condition=api.VolumeCondition(abnormal=False),
70 published_node_ids=vol.get("addresses", []),
71 )
72 ent = api.ListVolumesResponse.Entry(volume=vent, status=stat)
73 res.append(ent)
74 if next_entry:
75 return api.ListVolumesResponse(entries=res, next_token="vol-" + next_entry)
76 return api.ListVolumesResponse(entries=res)
78 def CreateVolume(self, request: api.CreateVolumeRequest, context: grpc.ServicerContext):
79 self._validate(request)
80 if not request.name:
81 raise ValueError("no volume name")
82 if not request.capacity_range.required_bytes:
83 raise ValueError("no capacity specified")
84 chk = self.req.get(f"/volume/{request.name}")
85 if chk.status_code == 200:
86 volsize = chk.json()["size"]
87 if request.capacity_range.required_bytes and volsize < request.capacity_range.required_bytes:
88 raise FileExistsError("volume already exists(short)")
89 if request.capacity_range.limit_bytes and request.capacity_range.limit_bytes < volsize:
90 raise FileExistsError("volume already exists(too large)")
91 return api.CreateVolumeResponse(volume=api.Volume(capacity_bytes=volsize, volume_id=request.name))
92 res = self.req.post(
93 "/volume",
94 json=dict(
95 name=request.name,
96 size=request.capacity_range.required_bytes,
97 ),
98 )
99 res.raise_for_status()
100 resj = res.json()
101 volname = resj["name"]
102 mkfsres = self.req.post(f"/volume/{volname}/mkfs", json=dict())
103 mkfsres.raise_for_status()
104 return api.CreateVolumeResponse(
105 volume=api.Volume(
106 capacity_bytes=resj["size"],
107 volume_id=resj["name"],
108 )
109 )
111 def DeleteVolume(self, request: api.DeleteVolumeRequest, context: grpc.ServicerContext):
112 self._validate(request)
113 res = self.req.delete(f"/volume/{request.volume_id}")
114 if res.status_code == 404:
115 _log.info("delete not found: %s", request.volume_id)
116 return api.DeleteVolumeResponse()
117 res.raise_for_status()
118 return api.DeleteVolumeResponse()
120 def ControllerPublishVolume(self, request: api.ControllerPublishVolumeRequest, context: grpc.ServicerContext):
121 self._validate(request)
122 if not request.node_id:
123 raise ValueError("no node_id")
124 if not MessageToDict(request.volume_capability):
125 raise ValueError("no capability")
126 # if request.volume_capability.access_mode not in (api.VolumeCapability.AccessMode.SINGLE_NODE_WRITER,):
127 # raise ValueError("invalid mode")
128 if not request.volume_capability.mount.fs_type:
129 raise ValueError("invalid type")
130 res = self.req.post("/export", json=dict(name=request.volume_id, readonly=request.readonly, acl=None))
131 res.raise_for_status()
132 resj = res.json()
133 ctxt = {k: str(v) for k, v in resj.items()}
134 return api.ControllerPublishVolumeResponse(publish_context=ctxt)
136 def ControllerUnpublishVolume(self, request: api.ControllerUnpublishVolumeRequest, context: grpc.ServicerContext):
137 self._validate(request)
138 qres = self.req.get("/export", params=dict(volume=request.volume_id))
139 qres.raise_for_status()
140 for tgt in qres.json():
141 if request.volume_id not in tgt["volumes"]:
142 _log.warning("export response: volume_id=%s, tgt=%s", request.volume_id, tgt)
143 continue
144 tgtname = tgt["targetname"]
145 res = self.req.delete(f"/export/{tgtname}")
146 res.raise_for_status()
147 return api.ControllerUnpublishVolumeResponse()
149 def ControllerExpandVolume(self, request: api.ControllerExpandVolumeRequest, context: grpc.ServicerContext):
150 self._validate(request)
151 res = self.req.post(f"/volume/{request.volume_id}", json=dict(size=request.capacity_range.required_bytes))
152 res.raise_for_status()
153 return api.ControllerExpandVolumeResponse(capacity_bytes=res.json()["size"], node_expansion_required=True)
155 def ControllerGetVolume(self, request: api.ControllerGetVolumeRequest, context: grpc.ServicerContext):
156 self._validate(request)
157 res = self.req.get(f"/volume/{request.volume_id}")
158 res.raise_for_status()
159 resj = res.json()
160 qres = self.req.get("/export", params=dict(volume=request.volume_id))
161 qres.raise_for_status()
162 qresj = qres.json()
163 nodes = []
164 for i in qresj:
165 nodes.extend(i["connected"]["address"])
166 return api.ControllerGetVolumeResponse(
167 volume=api.Volume(
168 capacity_bytes=resj["size"],
169 volume_id=resj["name"],
170 ),
171 status=api.ControllerGetVolumeResponse.VolumeStatus(
172 published_node_ids=nodes, volume_condition=api.VolumeCondition(abnormal=False)
173 ),
174 )
176 def ControllerModifyVolume(self, request: api.ControllerModifyVolumeRequest, context: grpc.ServicerContext):
177 self._validate(request)
178 return super().ControllerModifyVolume(request, context)
180 def ValidateVolumeCapabilities(self, request: api.ValidateVolumeCapabilitiesRequest, context: grpc.ServicerContext):
181 self._validate(request)
182 if not request.volume_capabilities:
183 raise ValueError("no capabilities")
184 res = self.req.get(f"/volume/{request.volume_id}")
185 if res.status_code != 200:
186 raise FileNotFoundError(f"volume not found: {request.volume_id}")
187 supported_mode = [
188 api.VolumeCapability.AccessMode.Mode.SINGLE_NODE_WRITER,
189 ]
190 caps: list[api.VolumeCapability] = []
191 for cap in request.volume_capabilities:
192 if cap.access_mode.mode in supported_mode:
193 caps.append(cap)
194 return api.ValidateVolumeCapabilitiesResponse(
195 confirmed=api.ValidateVolumeCapabilitiesResponse.Confirmed(
196 volume_capabilities=caps,
197 parameters=request.parameters,
198 mutable_parameters=request.mutable_parameters,
199 ),
200 )
202 def ListSnapshots(self, request: api.ListSnapshotsRequest, context: grpc.ServicerContext):
203 return super().ListSnapshots(request, context)
205 def CreateSnapshot(self, request: api.CreateSnapshotRequest, context: grpc.ServicerContext):
206 return super().CreateSnapshot(request, context)
208 def DeleteSnapshot(self, request: api.DeleteSnapshotRequest, context: grpc.ServicerContext):
209 return super().DeleteSnapshot(request, context)
211 def GetSnapshot(self, request: api.GetSnapshotRequest, context: grpc.ServicerContext):
212 return super().GetSnapshot(request, context)