Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/bewit.py: 94%

49 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-15 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3import time 

4import binascii 

5 

6from mohawk.util import utc_now 

7from mohawk.base import Resource 

8from mohawk.bewit import get_bewit, check_bewit, parse_bewit, strip_bewit 

9 

10import logging 

11 

12logger = logging.getLogger(__name__) 

13 

14 

15class SignedUrl: 

16 """Class for creating tokens to sign urls""" 

17 

18 _used = {} 

19 

20 def __init__(self, secret, max_expire=120): 

21 """Create a signing object 

22 

23 Args: 

24 key(str): The encryption secret 

25 

26 Kwargs: 

27 max_expire(int): Maximum time a token can live 

28 """ 

29 self._key = secret 

30 self._max_expire = max_expire 

31 

32 @property 

33 def credentials(self): 

34 return {"key": self._key, "algorithm": "sha256"} 

35 

36 def expire_used(self): 

37 now = time.time() 

38 to_del = [] 

39 for bewit, tm in self._used.items(): 

40 if now - tm > self._max_expire: 

41 to_del.append(bewit) 

42 

43 for d in to_del: 

44 del self._used[d] 

45 

46 def sign(self, url, user, ttl=10): 

47 """Generate a signed url 

48 

49 Args: 

50 url(str): The url to sign 

51 user(str): The user making the request 

52 

53 Kwargs: 

54 ttl(int): Time for url to be valid in seconds 

55 

56 Returns: 

57 bewit(str): The bewit used to protect the url 

58 """ 

59 if ttl > self._max_expire: 

60 raise AttributeError(f"ttl is longer than max time of {self._max_expire}s") 

61 

62 resource = Resource( 

63 credentials={"id": user, **self.credentials}, 

64 url=f"http://localhost{url}", 

65 method="GET", 

66 nonce="", 

67 timestamp=utc_now() + ttl, 

68 ) 

69 return get_bewit(resource) 

70 

71 def lookup_credentials(self, recipient_id): 

72 """Lookup a users credentials""" 

73 

74 # if recipient_id in allowed_recipients: 

75 # return allowed_recipients[recipient_id] 

76 # else: 

77 # raise LookupError("unknown recipient_id") 

78 

79 # TODO: should each user have their own key? 

80 return {"id": recipient_id, **self.credentials} 

81 

82 def verify(self, protected_url): 

83 """Verify a signed url 

84 

85 Args: 

86 url(str): The protected url to verify 

87 

88 Returns 

89 id(str): The user if the bewit is valid 

90 """ 

91 self.expire_used() 

92 

93 raw_bewit, _ = strip_bewit(protected_url) 

94 

95 try: 

96 bewit = parse_bewit(raw_bewit) 

97 except binascii.Error: 

98 logger.error("Bewit token is not valid base64") 

99 return {"error": "Invalid Token"} 

100 

101 if bewit in self._used: 

102 logger.error("Used bewit token") 

103 return {"error": "Invalid Token"} 

104 else: 

105 self._used[bewit] = time.time() 

106 

107 try: 

108 valid = check_bewit( 

109 protected_url, credential_lookup=self.lookup_credentials 

110 ) 

111 except Exception as e: 

112 logger.exception(f"Invalid bewit token: {str(e)}") 

113 return {"error": "Invalid Token"} 

114 else: 

115 if valid: 

116 return bewit.id