Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import bcrypt
2import secrets
3import time
4from logging import getLogger
5from .proxy import Proxy
7log = getLogger(__name__)
10class LoginProxy(Proxy):
11 unauth_method = ["login", "logout"]
12 token_key = "auth"
13 pwd_store = {
14 "testuser": bcrypt.hashpw(b"testpassword", bcrypt.gensalt(rounds=10, prefix=b'2a')),
15 }
16 token_store = {}
17 token_expire = 3600
18 token_update = True
20 def login(self, params):
21 username = params.get("username", None)
22 password = params.get("password", None)
23 if username is None or password is None:
24 return {"error": "invalid username or password"}
25 pw = self.pwd_store.get(username, None)
26 if pw is None:
27 return {"error": "invalid username or password"}
28 if not bcrypt.checkpw(password.encode("utf-8"), pw):
29 return {"error": "invalid username or password"}
30 # generate token
31 token = secrets.token_urlsafe()
32 self.token_store[token] = (username, time.time())
33 return {
34 self.token_key: token,
35 }
37 def logout(self, params):
38 token = params.get(self.token_key, None)
39 if token in self.token_store:
40 self.token_store.pop(token)
41 return {}
43 def _check_token(self, token):
44 if token in self.token_store:
45 user, ts = self.token_store[token]
46 if ts - time.time() > self.token_expire:
47 self.token_store.pop(token)
48 return None
49 if self.token_update:
50 self.token_store[token] = (user, time.time())
51 return user
52 return None
54 def dispatcher(self, method: str, params: dict = {}):
55 if method in self.unauth_method:
56 return getattr(self, method)(params)
57 username = self._check_token(params.get(self.token_key, None))
58 if username is not None:
59 params.pop(self.token_key)
60 params["username"] = username
61 return super().dispatcher(method, params)
62 return {"error": "not logged in"}