Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/bewit.py: 94%
49 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import time
4import binascii
6from mohawk.util import utc_now
7from mohawk.base import Resource
8from mohawk.bewit import get_bewit, check_bewit, parse_bewit, strip_bewit
10import logging
12logger = logging.getLogger(__name__)
15class SignedUrl:
16 """Class for creating tokens to sign urls"""
18 _used = {}
20 def __init__(self, secret, max_expire=120):
21 """Create a signing object
23 Args:
24 key(str): The encryption secret
26 Kwargs:
27 max_expire(int): Maximum time a token can live
28 """
29 self._key = secret
30 self._max_expire = max_expire
32 @property
33 def credentials(self):
34 return {"key": self._key, "algorithm": "sha256"}
36 def expire_used(self):
37 now = time.time()
38 to_del = []
39 for bewit, tm in self._used.items():
40 if now - tm > self._max_expire:
41 to_del.append(bewit)
43 for d in to_del:
44 del self._used[d]
46 def sign(self, url, user, ttl=10):
47 """Generate a signed url
49 Args:
50 url(str): The url to sign
51 user(str): The user making the request
53 Kwargs:
54 ttl(int): Time for url to be valid in seconds
56 Returns:
57 bewit(str): The bewit used to protect the url
58 """
59 if ttl > self._max_expire:
60 raise AttributeError(f"ttl is longer than max time of {self._max_expire}s")
62 resource = Resource(
63 credentials={"id": user, **self.credentials},
64 url=f"http://localhost{url}",
65 method="GET",
66 nonce="",
67 timestamp=utc_now() + ttl,
68 )
69 return get_bewit(resource)
71 def lookup_credentials(self, recipient_id):
72 """Lookup a users credentials"""
74 # if recipient_id in allowed_recipients:
75 # return allowed_recipients[recipient_id]
76 # else:
77 # raise LookupError("unknown recipient_id")
79 # TODO: should each user have their own key?
80 return {"id": recipient_id, **self.credentials}
82 def verify(self, protected_url):
83 """Verify a signed url
85 Args:
86 url(str): The protected url to verify
88 Returns
89 id(str): The user if the bewit is valid
90 """
91 self.expire_used()
93 raw_bewit, _ = strip_bewit(protected_url)
95 try:
96 bewit = parse_bewit(raw_bewit)
97 except binascii.Error:
98 logger.error("Bewit token is not valid base64")
99 return {"error": "Invalid Token"}
101 if bewit in self._used:
102 logger.error("Used bewit token")
103 return {"error": "Invalid Token"}
104 else:
105 self._used[bewit] = time.time()
107 try:
108 valid = check_bewit(
109 protected_url, credential_lookup=self.lookup_credentials
110 )
111 except Exception as e:
112 logger.exception(f"Invalid bewit token: {str(e)}")
113 return {"error": "Invalid Token"}
114 else:
115 if valid:
116 return bewit.id