Coverage for volexpcsi/identity.py: 100%
26 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-28 12:48 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-28 12:48 +0000
1import grpc
2import requests
3from volexport.version import VERSION
4from volexport.client import VERequest
5from google.protobuf import wrappers_pb2
6from logging import getLogger
7from . import api
8from .accesslog import servicer_accesslog
10_log = getLogger(__name__)
13@servicer_accesslog
14class VolExpIdentity(api.IdentityServicer):
15 def __init__(self, config: dict):
16 self.config = config
17 self.req = VERequest(config["endpoint"])
19 def GetPluginInfo(self, request: api.GetPluginInfoRequest, context: grpc.ServicerContext):
20 return api.GetPluginInfoResponse(name="volexport", vendor_version=VERSION)
22 def GetPluginCapabilities(self, request: api.GetPluginCapabilitiesRequest, context: grpc.ServicerContext):
23 return api.GetPluginCapabilitiesResponse(
24 capabilities=[
25 api.PluginCapability(
26 service=api.PluginCapability.Service(
27 type=api.PluginCapability.Service.Type.CONTROLLER_SERVICE,
28 )
29 ),
30 api.PluginCapability(
31 volume_expansion=api.PluginCapability.VolumeExpansion(
32 type=api.PluginCapability.VolumeExpansion.Type.ONLINE,
33 )
34 ),
35 ]
36 )
38 def Probe(self, request: api.ProbeRequest, context: grpc.ServicerContext):
39 try:
40 res = self.req.get("/health")
41 if res.status_code == requests.codes.ok:
42 return api.ProbeResponse(ready=wrappers_pb2.BoolValue(value=True))
43 except Exception as e:
44 _log.warning("health check error", exc_info=e)
45 return api.ProbeResponse(ready=wrappers_pb2.BoolValue(value=False))