Coverage for volexpcsi/accesslog.py: 83%

71 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-28 12:48 +0000

1import grpc 

2import functools 

3import time 

4import urllib.parse 

5import inspect 

6from typing import Callable, Type 

7from google.protobuf.message import Message 

8from google.protobuf.json_format import MessageToJson 

9from logging import getLogger 

10from requests.exceptions import HTTPError, Timeout 

11 

12_log = getLogger(__name__) 

13 

14 

15def accesslog(f: Callable): 

16 def _m2j(msg: Message) -> str: 

17 return MessageToJson(msg, preserving_proto_field_name=True, indent=None, ensure_ascii=False) 

18 

19 @functools.wraps(f) 

20 def _(self, request: Message, context: grpc.ServicerContext): 

21 client = urllib.parse.unquote(context.peer()) 

22 funcname = f.__qualname__ 

23 _log.info("start %s -> %s: %s", client, funcname, _m2j(request)) 

24 start = time.time() 

25 try: 

26 res = f(self, request, context) 

27 finish = time.time() 

28 _log.info("finish(OK) %s <- %s(%.3f sec): %s", client, funcname, finish - start, _m2j(res)) 

29 return res 

30 except PermissionError as e: 

31 finish = time.time() 

32 _log.error("finish(permission) %s <- %s(%.3f sec): %s", client, funcname, finish - start, e, exc_info=e) 

33 context.abort(code=grpc.StatusCode.PERMISSION_DENIED, details=f"{type(e).__qualname__}: {e}") 

34 except ValueError as e: 

35 finish = time.time() 

36 _log.error("finish(value) %s <- %s(%.3f sec): %s", client, funcname, finish - start, e, exc_info=e) 

37 context.abort(code=grpc.StatusCode.INVALID_ARGUMENT, details=f"{type(e).__qualname__}: {e}") 

38 except NotImplementedError as e: 

39 finish = time.time() 

40 _log.error( 

41 "finish(not implemented) %s <- %s(%.3f sec): %s", client, funcname, finish - start, e, exc_info=e 

42 ) 

43 context.abort(code=grpc.StatusCode.UNIMPLEMENTED, details=f"{type(e).__qualname__}: {e}") 

44 except FileExistsError as e: 

45 finish = time.time() 

46 _log.error("finish(exists) %s <- %s(%.3f sec): %s", client, funcname, finish - start, e, exc_info=e) 

47 context.abort(code=grpc.StatusCode.ALREADY_EXISTS, details=f"{type(e).__qualname__}: {e}") 

48 except FileNotFoundError as e: 

49 finish = time.time() 

50 _log.error("finish(not found) %s <- %s(%.3f sec): %s", client, funcname, finish - start, e, exc_info=e) 

51 context.abort(code=grpc.StatusCode.NOT_FOUND, details=f"{type(e).__qualname__}: {e}") 

52 except (Timeout, TimeoutError) as e: 

53 finish = time.time() 

54 _log.error("finish(timeout) %s <- %s(%.3f sec): %s", client, funcname, finish - start, e, exc_info=e) 

55 context.abort(code=grpc.StatusCode.DEADLINE_EXCEEDED, details=f"{type(e).__qualname__}: {e}") 

56 except AssertionError as e: 

57 finish = time.time() 

58 _log.error("finish(abort) %s <- %s(%.3f sec): %s", client, funcname, finish - start, e, exc_info=e) 

59 context.abort(code=grpc.StatusCode.ABORTED, details=f"{type(e).__qualname__}: {e}") 

60 except HTTPError as e: 

61 finish = time.time() 

62 _log.error( 

63 "finish(http %s) %s <- %s(%.3f sec): %s", 

64 e.response.status_code, 

65 client, 

66 funcname, 

67 finish - start, 

68 e, 

69 exc_info=e, 

70 ) 

71 codemap: dict[int, grpc.StatusCode] = { 

72 400: grpc.StatusCode.INVALID_ARGUMENT, 

73 401: grpc.StatusCode.UNAUTHENTICATED, 

74 403: grpc.StatusCode.PERMISSION_DENIED, 

75 404: grpc.StatusCode.NOT_FOUND, 

76 408: grpc.StatusCode.DEADLINE_EXCEEDED, 

77 409: grpc.StatusCode.ALREADY_EXISTS, 

78 429: grpc.StatusCode.RESOURCE_EXHAUSTED, 

79 499: grpc.StatusCode.CANCELLED, 

80 500: grpc.StatusCode.INTERNAL, 

81 501: grpc.StatusCode.UNIMPLEMENTED, 

82 503: grpc.StatusCode.UNAVAILABLE, 

83 504: grpc.StatusCode.DEADLINE_EXCEEDED, 

84 } 

85 context.abort( 

86 code=codemap.get(e.response.status_code, grpc.StatusCode.UNKNOWN), 

87 details=f"{type(e).__qualname__}: {e}", 

88 ) 

89 except Exception as e: 

90 finish = time.time() 

91 _log.error("finish(other error) %s <- %s(%.3f sec): %s", client, funcname, finish - start, e, exc_info=e) 

92 context.abort(code=grpc.StatusCode.INTERNAL, details=f"{type(e).__qualname__}: {e}") 

93 

94 return _ 

95 

96 

97def servicer_accesslog(cls: Type): 

98 names = {n for n, fn in inspect.getmembers(cls.mro()[1]) if not n.startswith("__") and callable(fn)} 

99 _log.info("decorate names: %s", names) 

100 for name, fn in inspect.getmembers(cls): 

101 if name in names: 

102 _log.debug("update method: %s", name) 

103 setattr(cls, name, accesslog(fn)) 

104 return cls