Coverage for volexpcsi/identity.py: 100%

26 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-28 12:48 +0000

1import grpc 

2import requests 

3from volexport.version import VERSION 

4from volexport.client import VERequest 

5from google.protobuf import wrappers_pb2 

6from logging import getLogger 

7from . import api 

8from .accesslog import servicer_accesslog 

9 

10_log = getLogger(__name__) 

11 

12 

13@servicer_accesslog 

14class VolExpIdentity(api.IdentityServicer): 

15 def __init__(self, config: dict): 

16 self.config = config 

17 self.req = VERequest(config["endpoint"]) 

18 

19 def GetPluginInfo(self, request: api.GetPluginInfoRequest, context: grpc.ServicerContext): 

20 return api.GetPluginInfoResponse(name="volexport", vendor_version=VERSION) 

21 

22 def GetPluginCapabilities(self, request: api.GetPluginCapabilitiesRequest, context: grpc.ServicerContext): 

23 return api.GetPluginCapabilitiesResponse( 

24 capabilities=[ 

25 api.PluginCapability( 

26 service=api.PluginCapability.Service( 

27 type=api.PluginCapability.Service.Type.CONTROLLER_SERVICE, 

28 ) 

29 ), 

30 api.PluginCapability( 

31 volume_expansion=api.PluginCapability.VolumeExpansion( 

32 type=api.PluginCapability.VolumeExpansion.Type.ONLINE, 

33 ) 

34 ), 

35 ] 

36 ) 

37 

38 def Probe(self, request: api.ProbeRequest, context: grpc.ServicerContext): 

39 try: 

40 res = self.req.get("/health") 

41 if res.status_code == requests.codes.ok: 

42 return api.ProbeResponse(ready=wrappers_pb2.BoolValue(value=True)) 

43 except Exception as e: 

44 _log.warning("health check error", exc_info=e) 

45 return api.ProbeResponse(ready=wrappers_pb2.BoolValue(value=False))