Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/session/memory.py: 93%

55 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-15 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3import time 

4import uuid 

5 

6from daiquiri.core.session.storage import SessionStorage 

7 

8 

9class MemorySessionStorage(SessionStorage): 

10 _sessions = {} 

11 

12 def create(self, data={}): 

13 uid = str(uuid.uuid4()) 

14 token = self._generate_token(uid) 

15 

16 MemorySessionStorage._sessions[uid] = { 

17 "sessionid": uid, 

18 "token": token, 

19 "last_access": time.time(), 

20 "operator": False, 

21 "data": data, 

22 } 

23 

24 return MemorySessionStorage._sessions[uid] 

25 

26 def list(self): 

27 return list(MemorySessionStorage._sessions.values()) 

28 

29 def remove(self, sessionid, token=None): 

30 if sessionid: 

31 del MemorySessionStorage._sessions[sessionid] 

32 return sessionid 

33 

34 elif token: 

35 sid = None 

36 for k, s in MemorySessionStorage._sessions.items(): 

37 if s["token"] == token: 

38 sid = k 

39 

40 if sid: 

41 del MemorySessionStorage._sessions[sid] 

42 return sid 

43 

44 def exists(self, sessionid): 

45 return sessionid in MemorySessionStorage._sessions 

46 

47 def get(self, sessionid): 

48 if sessionid in MemorySessionStorage._sessions: 

49 return MemorySessionStorage._sessions[sessionid] 

50 

51 def update(self, sessionid, **kwargs): 

52 if sessionid in MemorySessionStorage._sessions: 

53 for k, v in kwargs.items(): 

54 MemorySessionStorage._sessions[sessionid]["data"][k] = v 

55 

56 MemorySessionStorage._sessions[sessionid]["last_access"] = time.time() 

57 

58 def take_control(self, sessionid): 

59 for s in self._sessions.keys(): 

60 MemorySessionStorage._sessions[s]["operator"] = ( 

61 True if s == sessionid else False 

62 ) 

63 

64 def yield_control(self, sessionid): 

65 for s in self._sessions.keys(): 

66 if s == sessionid: 

67 if MemorySessionStorage._sessions[s]["operator"] is True: 

68 MemorySessionStorage._sessions[s]["operator"] = False 

69 return True 

70 

71 return False 

72 

73 def has_control(self, sessionid): 

74 if sessionid in self._sessions: 

75 if self._sessions[sessionid]["operator"]: 

76 return True 

77 

78 return False 

79 

80 def no_control(self): 

81 no_control = True 

82 for s in self._sessions.values(): 

83 if s["operator"]: 

84 no_control = False 

85 

86 return no_control