Coverage for volexpcsi/accesslog.py: 83%
71 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-28 12:48 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-28 12:48 +0000
1import grpc
2import functools
3import time
4import urllib.parse
5import inspect
6from typing import Callable, Type
7from google.protobuf.message import Message
8from google.protobuf.json_format import MessageToJson
9from logging import getLogger
10from requests.exceptions import HTTPError, Timeout
12_log = getLogger(__name__)
15def accesslog(f: Callable):
16 def _m2j(msg: Message) -> str:
17 return MessageToJson(msg, preserving_proto_field_name=True, indent=None, ensure_ascii=False)
19 @functools.wraps(f)
20 def _(self, request: Message, context: grpc.ServicerContext):
21 client = urllib.parse.unquote(context.peer())
22 funcname = f.__qualname__
23 _log.info("start %s -> %s: %s", client, funcname, _m2j(request))
24 start = time.time()
25 try:
26 res = f(self, request, context)
27 finish = time.time()
28 _log.info("finish(OK) %s <- %s(%.3f sec): %s", client, funcname, finish - start, _m2j(res))
29 return res
30 except PermissionError as e:
31 finish = time.time()
32 _log.error("finish(permission) %s <- %s(%.3f sec): %s", client, funcname, finish - start, e, exc_info=e)
33 context.abort(code=grpc.StatusCode.PERMISSION_DENIED, details=f"{type(e).__qualname__}: {e}")
34 except ValueError as e:
35 finish = time.time()
36 _log.error("finish(value) %s <- %s(%.3f sec): %s", client, funcname, finish - start, e, exc_info=e)
37 context.abort(code=grpc.StatusCode.INVALID_ARGUMENT, details=f"{type(e).__qualname__}: {e}")
38 except NotImplementedError as e:
39 finish = time.time()
40 _log.error(
41 "finish(not implemented) %s <- %s(%.3f sec): %s", client, funcname, finish - start, e, exc_info=e
42 )
43 context.abort(code=grpc.StatusCode.UNIMPLEMENTED, details=f"{type(e).__qualname__}: {e}")
44 except FileExistsError as e:
45 finish = time.time()
46 _log.error("finish(exists) %s <- %s(%.3f sec): %s", client, funcname, finish - start, e, exc_info=e)
47 context.abort(code=grpc.StatusCode.ALREADY_EXISTS, details=f"{type(e).__qualname__}: {e}")
48 except FileNotFoundError as e:
49 finish = time.time()
50 _log.error("finish(not found) %s <- %s(%.3f sec): %s", client, funcname, finish - start, e, exc_info=e)
51 context.abort(code=grpc.StatusCode.NOT_FOUND, details=f"{type(e).__qualname__}: {e}")
52 except (Timeout, TimeoutError) as e:
53 finish = time.time()
54 _log.error("finish(timeout) %s <- %s(%.3f sec): %s", client, funcname, finish - start, e, exc_info=e)
55 context.abort(code=grpc.StatusCode.DEADLINE_EXCEEDED, details=f"{type(e).__qualname__}: {e}")
56 except AssertionError as e:
57 finish = time.time()
58 _log.error("finish(abort) %s <- %s(%.3f sec): %s", client, funcname, finish - start, e, exc_info=e)
59 context.abort(code=grpc.StatusCode.ABORTED, details=f"{type(e).__qualname__}: {e}")
60 except HTTPError as e:
61 finish = time.time()
62 _log.error(
63 "finish(http %s) %s <- %s(%.3f sec): %s",
64 e.response.status_code,
65 client,
66 funcname,
67 finish - start,
68 e,
69 exc_info=e,
70 )
71 codemap: dict[int, grpc.StatusCode] = {
72 400: grpc.StatusCode.INVALID_ARGUMENT,
73 401: grpc.StatusCode.UNAUTHENTICATED,
74 403: grpc.StatusCode.PERMISSION_DENIED,
75 404: grpc.StatusCode.NOT_FOUND,
76 408: grpc.StatusCode.DEADLINE_EXCEEDED,
77 409: grpc.StatusCode.ALREADY_EXISTS,
78 429: grpc.StatusCode.RESOURCE_EXHAUSTED,
79 499: grpc.StatusCode.CANCELLED,
80 500: grpc.StatusCode.INTERNAL,
81 501: grpc.StatusCode.UNIMPLEMENTED,
82 503: grpc.StatusCode.UNAVAILABLE,
83 504: grpc.StatusCode.DEADLINE_EXCEEDED,
84 }
85 context.abort(
86 code=codemap.get(e.response.status_code, grpc.StatusCode.UNKNOWN),
87 details=f"{type(e).__qualname__}: {e}",
88 )
89 except Exception as e:
90 finish = time.time()
91 _log.error("finish(other error) %s <- %s(%.3f sec): %s", client, funcname, finish - start, e, exc_info=e)
92 context.abort(code=grpc.StatusCode.INTERNAL, details=f"{type(e).__qualname__}: {e}")
94 return _
97def servicer_accesslog(cls: Type):
98 names = {n for n, fn in inspect.getmembers(cls.mro()[1]) if not n.startswith("__") and callable(fn)}
99 _log.info("decorate names: %s", names)
100 for name, fn in inspect.getmembers(cls):
101 if name in names:
102 _log.debug("update method: %s", name)
103 setattr(cls, name, accesslog(fn))
104 return cls