Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/authenticator/__init__.py: 83%
65 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import time
4from abc import ABC, abstractmethod
5from flask import request
6import gevent
8from daiquiri.core import CoreBase, CoreResource, marshal
9from daiquiri.core.schema import ErrorSchema
10from daiquiri.core.schema.login import LoginSchema, LoginResponseSchema
11from daiquiri.core.utils import loader
13import logging
15logger = logging.getLogger(__name__)
18class LoginResource(CoreResource):
19 @marshal(
20 inp=LoginSchema,
21 out=[
22 [201, LoginResponseSchema(), "Session created"],
23 [401, ErrorSchema(), "Invalid credentials"],
24 ],
25 )
26 def post(self, **kwargs):
27 """Login and obtain an authorisation token"""
28 session = self._parent.authenticate(
29 kwargs["username"], kwargs["password"], kwargs.pop("client", None)
30 )
31 if session:
32 return session, 201
33 else:
34 return {"error": "Invalid Credentials"}, 401
36 def delete(self):
37 """Logout and delete the session"""
38 logger.debug("Logout")
39 status = self._session.remove()
40 if status:
41 return {}, 200
42 else:
43 return {}, 400
46class Authenticator(CoreBase):
47 """Authenticator
49 Dynamically load the authenticator type from the config.yml auth_type property, and register
50 the login an logout flask resources
51 """
53 def setup(self):
54 self._fail_log = {}
55 self._auth = loader(
56 "daiquiri.core.authenticator",
57 "Authenticator",
58 self._config["auth_type"],
59 self._config,
60 )
61 self.register_route(LoginResource, "/login")
63 gevent.spawn(self.check_fail_log)
65 def check_fail_log(self):
66 while True:
67 to_del = []
68 now = time.time()
69 for username, fail in self._fail_log.items():
70 if fail["count"] > 2:
71 if now > fail["last"] + 5 ** (fail["count"] - 1):
72 to_del.append(username)
74 for fail in to_del:
75 logger.info(f"Removing {fail} from fail log")
76 del self._fail_log[fail]
78 time.sleep(5)
80 def authenticate(self, username, password, client=None):
81 """Authenticate
83 Authenticate the credentials against the loaded authentication handler, and if valid
84 create a session in the session handler
86 Args:
87 username (str): username to validate
88 password (str): password to validate
89 client (str): optional client name
91 Returns:
92 The session if credentials were valid, None if not
94 """
95 if username in self._fail_log:
96 if self._fail_log[username]["count"] > 2:
97 logger.warning(f"Login for {username} is blacklisted")
98 return
100 status = self._auth.authenticate(username, password)
101 if status:
102 if username in self._fail_log:
103 del self._fail_log[username]
105 logger.info(
106 f"Successful login from {username} with ip {request.remote_addr}"
107 )
108 else:
109 if username not in self._fail_log:
110 self._fail_log[username] = {"count": 0}
112 self._fail_log[username]["last"] = time.time()
113 self._fail_log[username]["count"] += 1
115 logger.warning(
116 f"Failed login attempt from {username} with ip {request.remote_addr}"
117 )
119 if status:
120 session = self._session.create({"username": username, "client": client})
121 return session
124class AuthenticatorInterface(ABC):
125 """Autneticator Interface"""
127 def __init__(self, config):
128 self._config = config
130 @abstractmethod
131 def authenticate(self, username, password):
132 """Autneticator Method
134 Args:
135 username (str): username to validate
136 password (str): password to validate
138 Returns:
139 True if the credentials are valid, False if not
140 """
141 pass