Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/session/__init__.py: 65%
333 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
4from __future__ import annotations
5import gevent
6import time
7from functools import partial
9from flask import request, g
10from flask_socketio import disconnect, ConnectionRefusedError
11import jwt
12from marshmallow import fields
14from daiquiri.core.logging import log
15from daiquiri.core import (
16 CoreBase,
17 CoreResource,
18 marshal,
19 no_blsession_required,
20 require_staff,
21)
22from daiquiri.core.schema.session import SessionListSchema, SignSchema
23from daiquiri.core.schema.metadata import paginated
24from daiquiri.core.schema import ErrorSchema, MessageSchema
25from daiquiri.core.bewit import SignedUrl
27# from daiquiri.core.user import User
28from daiquiri.core.utils import loader
30import logging
32logger = logging.getLogger(__name__)
35class SessionListResource(CoreResource):
36 @marshal(
37 out=[
38 [200, paginated(SessionListSchema), "List of sessions"],
39 [401, ErrorSchema(), "Invalid session"],
40 ]
41 )
42 def get(self):
43 """
44 Get a list of current sessions
45 """
46 return self._parent.sessions(), 200
49class SessionResource(CoreResource):
50 @no_blsession_required
51 @marshal(
52 out=[
53 [200, SessionListSchema(), "A single session"],
54 [401, ErrorSchema(), "Invalid session"],
55 [404, ErrorSchema(), "Session not found"],
56 ]
57 )
58 def get(self):
59 """
60 Get the current session
61 """
62 ses = self._parent.session(g.sessionid)
63 if ses:
64 return ses, 200
65 else:
66 return {"error": "No such session"}, 404
69class RequestControlResource(CoreResource):
70 @marshal(
71 out=[
72 [200, MessageSchema(), "Control granted"],
73 [400, ErrorSchema(), "Could not get control"],
74 ]
75 )
76 def post(self):
77 """
78 Request control for the current session
79 """
80 ses = self._parent.request_control(g.sessionid)
81 if ses:
82 return {"message": "Session now has control"}, 200
83 else:
84 return {"error": "Cant take control"}, 400
86 @marshal(
87 out=[
88 [200, MessageSchema(), "Control granted"],
89 [400, ErrorSchema(), "Could not get control"],
90 ]
91 )
92 def delete(self):
93 """
94 Yield control for the current session
95 """
96 ses = self._parent.yield_control(g.sessionid)
97 if ses:
98 return {"message": "Session yielded control"}, 200
99 else:
100 return {"error": "Could not yield control"}, 400
103class RespondRequestControlResource(CoreResource):
104 @marshal(
105 inp={"grant": fields.Bool(required=True)},
106 out=[
107 [200, MessageSchema(), "Response acknowledged"],
108 [400, ErrorSchema(), "Could not acknowledge control request response"],
109 ],
110 )
111 def post(self, **kwargs):
112 """
113 Respond to a request for control in the current session
114 """
115 ses = self._parent.respond_request_control(kwargs["grant"])
116 if ses:
117 return {"message": "Response acknowledged"}, 200
118 else:
119 return {"error": "Could not acknowledge response"}, 400
122class SignResource(CoreResource):
123 @no_blsession_required
124 @marshal(
125 inp=SignSchema,
126 out=[
127 [200, SignSchema(), "Url signed"],
128 [400, ErrorSchema(), "Could not sign Url"],
129 ],
130 )
131 def post(self, **kwargs):
132 """Create a signed url
134 This allows a resource to be accessed temporarily a single time without
135 an access token. Can be used to downlad files, etc
136 """
137 bewit = self._parent.sign(kwargs["url"])
138 if bewit:
139 return {"url": kwargs["url"], "bewit": bewit}, 200
140 else:
141 return {"error": "Could not sign url"}, 400
144class MirrorResource(CoreResource):
145 @require_staff
146 @marshal(
147 inp={
148 "sessionid": fields.Str(
149 allow_none=True, metadata={"description": "The sessionid to mirror"}
150 )
151 },
152 out=[
153 [200, MessageSchema(), "Session mirroring enabled"],
154 [400, ErrorSchema(), "Could not mirror session"],
155 ],
156 )
157 def post(self, **kwargs):
158 """Request mirroring of a session"""
159 mirror = self._parent.request_mirror(**kwargs)
160 if mirror:
161 return {"message": "Mirroring enabled"}, 200
162 else:
163 return {"error": "Could not mirror session"}, 400
166class Session(CoreBase):
167 """Session Handler
169 Loads the session storage mechanism dynamically from the config.yaml session_type.
170 """
172 _namespace = "session"
174 _require_session = True
175 _require_blsession = True
176 _pending_control_request = None
178 _ui_state = {}
180 def __init__(self, *args, **kwargs):
181 CoreBase.__init__(self, *args, **kwargs)
182 self._session_start_timeout: dict[str, float] = {}
184 def set_metadata(self, metadata):
185 self._metadata = metadata
187 def setup(self):
188 self._sign = SignedUrl(self._config["url_secret"])
190 self._session = self
191 self._storage = loader(
192 "daiquiri.core.session", "SessionStorage", self._config["session_type"]
193 )
194 self._storage.set_generate_token(self._generate_token)
196 self.register_route(SessionListResource, "")
197 self.register_route(SessionResource, "/current")
198 self.register_route(RequestControlResource, "/current/control")
199 self.register_route(RespondRequestControlResource, "/current/control/respond")
200 self.register_route(SignResource, "/sign")
201 self.register_route(MirrorResource, "/mirror")
203 for namespace in [
204 "app",
205 "metadata",
206 "persist",
207 "queue",
208 "session",
209 "stomp",
210 ]:
211 self.register_namespace(namespace)
213 self.setup_persistence()
215 self.on("connect", namespace=None)(partial(self._on_socket_connect, ""))
216 self.on("disconnect", namespace=None)(partial(self._on_socket_disconnect, ""))
218 def register_namespace(self, namespace):
219 self.on("connect", namespace=f"/{namespace}")(
220 partial(self._on_socket_connect, namespace)
221 )
222 self.on("disconnect", namespace=f"/{namespace}")(
223 partial(self._on_socket_disconnect, namespace)
224 )
226 def _on_socket_connect(self, namespace, *args, **kwargs):
227 """Authenticate SocketIO Connection
229 Makes sure a token is present on the socketio request and checks its validity in
230 the current session list
232 Returns:
233 None if session is valid, else False which disconnects the SocketIO request
234 """
235 if "token" in request.args:
236 sessionid = self._verify_token(request.args["token"])
237 if isinstance(sessionid, str):
238 if self._storage.exists(sessionid):
239 self._storage.update(sessionid, **{f"sio_{namespace}": request.sid})
240 self._stop_session_timeout(sessionid)
241 logger.debug(f"Got token for socketio login {sessionid}")
242 return True
243 else:
244 logger.debug(
245 f"Invalid session for {sessionid}, token: {request.args['token']}"
246 )
247 else:
248 logger.debug("Invalid token")
250 raise ConnectionRefusedError({"error": "Invalid Session"})
252 def _on_socket_disconnect(self, namespace, *args, **kwargs):
253 """SocketIO disconnect event
255 Used to start session timeout
256 """
257 self._start_session_timeout(self._session_from_sio(namespace))
259 def _start_session_timeout(self, sessionid: str):
260 """Start session timeout
262 Start a 10s timeout for the current session
264 Args:
265 sessionid(str): The sessionid
266 """
267 new_timeout = sessionid not in self._session_start_timeout
268 self._session_start_timeout[sessionid] = time.time()
270 if new_timeout:
271 logger.info(f"Starting session timeout {sessionid}")
272 gevent.spawn(self._session_timeout_process, sessionid)
274 def _session_timeout_process(self, sessionid: str, sleep=5, timeout=30):
275 """Session timeout process
277 This greenlet will wait for 10 seconds, if the timeout has
278 not been cancelled the session will be removed
280 Args:
281 sessionid(str): The sessionid
282 sleep(int): Time to sleep
283 timeout(int): Time in seconds after which to timeout
284 """
285 delete = False
286 while True:
287 time.sleep(sleep)
288 start_timeout = self._session_start_timeout.get(sessionid, None)
289 if start_timeout is None:
290 # timeout was cancelled
291 break
293 if time.time() >= start_timeout + timeout:
294 delete = True
295 del self._session_start_timeout[sessionid]
296 break
298 if delete:
299 if self._storage.exists(sessionid):
300 self._storage.remove(sessionid)
301 log.get("user").info("Session disconnected", type="session")
302 self.emit(
303 "message",
304 {"type": "disconnect", "sessionid": sessionid},
305 )
307 for session in self._storage.list():
308 if session["data"].get("mirror") == sessionid:
309 self.emit(
310 "message",
311 {"type": "mirror_disconnect", "sessionid": sessionid},
312 room=session["data"]["sio_session"],
313 )
315 del session["data"]["mirror"]
317 def _stop_session_timeout(self, sessionid):
318 """Cancel a session timeout
320 Args:
321 sessionid(str): The sessionid
322 """
323 logger.info(f"Cancelling session timeout {sessionid}")
324 if sessionid in self._session_start_timeout:
325 del self._session_start_timeout[sessionid]
327 def sign(self, url):
328 """Create a signed url
330 Args:
331 url(str): The url to sign
332 Returns
333 bewit(str): The bewit token
334 """
335 bewit = self._sign.sign(url, g.sessionid)
336 if bewit:
337 return bewit
339 def create(self, data):
340 """Create a new session
342 Delegates to the session storage engine to create the session and emit a connect
343 event
345 Args:
346 data (dict): Dictionary of session parameters to save
348 Returns:
349 The session id
350 """
351 session = self._storage.create(data)
352 username = data.get("username", None)
353 log.get("user").info(f"New session connected {username}", type="session")
354 self.emit(
355 "message",
356 {
357 "type": "connect",
358 "sessionid": session["sessionid"],
359 "username": username,
360 },
361 )
363 if self._config.get("session_auto_control") and self._storage.no_control():
364 logger.info(
365 f"Automatically granting control for to {username} with session {session['sessionid']}"
366 )
367 self._do_grant_control(session["sessionid"])
369 return session
371 def remove(self):
372 """Remove the current session
374 Remove a session from the storage engine and emit a disconnect event
376 Returns
377 True is session successfully remove, False if not
378 """
379 invalid = self.require_valid_session()
380 if not invalid:
381 if self._storage.exists(g.sessionid):
382 session = self._storage.get(g.sessionid)
383 for key, value in session["data"].items():
384 if key.startswith("sio_"):
385 disconnect(sid=value, namespace=key.replace("sio_", ""))
387 self._storage.remove(g.sessionid)
388 log.get("user").info("Session disconnected", type="session")
389 self.emit(
390 "message",
391 {"type": "disconnect", "sessionid": g.sessionid},
392 )
393 return True
394 else:
395 return False
397 def sessions(self):
398 """Return a list of sessions"""
399 sessions = self._storage.list()
400 for s in sessions:
401 s["operator_pending"] = (
402 True if s["sessionid"] == self._pending_control_request else False
403 )
405 return {"total": len(sessions), "rows": sessions}
407 def session(self, sessionid):
408 """Return a specific session
410 Args:
411 sessionid (uuid): The session to return
413 Returns
414 The session dict
415 """
416 session = self._storage.get(sessionid)
418 if not session:
419 return
421 mirrored = False
422 for ses in self._storage.list():
423 if sessionid == ses["data"].get("mirror"):
424 mirrored = True
426 session["mirrored"] = mirrored
428 return session
430 def update(self, data):
431 """Update data within the current session
433 Kwargs:
434 data (dict): Keywords to update
435 """
436 self._storage.update(g.sessionid, **data)
438 def _generate_token(self, sessionid):
439 """Generate a session token
441 Generates a session token with similar properties to a JWT, storing the sessionid
442 within the generated token. Token expirey defined in config session_length
444 Args:
445 sessionid (uuid): The sessionid
447 Returns
448 A session token used to validate all requests
449 """
450 encoded = jwt.encode(
451 {
452 "exp": time.time() + self._config["session_length"],
453 "sessionid": sessionid,
454 },
455 self._config["secret"],
456 algorithm="HS256",
457 )
458 return encoded
460 def _verify_token(self, token):
461 """Verify a token is valid
463 Checks the signature of the token is valid and return the encoded sessionid
465 Args:
466 token (str): The token
468 Returns:
469 The sessionid from the token if valid, otherwise an error dict
470 """
471 try:
472 data = jwt.decode(token, self._config["secret"], algorithms=["HS256"])
473 except jwt.ExpiredSignatureError:
474 logger.info("SignatureExpired {token}".format(token=token))
475 sessionid = self._storage.remove(None, token=token)
476 if sessionid:
477 self.emit(
478 "message",
479 {"type": "expired", "sessionid": sessionid},
480 )
481 logger.info("Removing expired session {sid}".format(sid=sessionid))
483 return {"error": "Token Expired"}
485 except jwt.InvalidTokenError:
486 return {"error": "Invalid Token"}
488 return data["sessionid"]
490 def request_control(self, sessionid):
491 """Requst session control
493 Request control for the sessionid and emit an event notifying other clients that
494 sessionid is now in control.
496 If the user is staff the request is automatically granted, otherwise the current
497 user in control must respond
499 Args:
500 sessionid (uuid): The sessionid
501 Returns:
502 True if sessionid now in control
503 """
504 if g.user.staff() or self._storage.no_control():
505 self._do_grant_control(sessionid)
506 return True
508 else:
509 self._pending_control_request = sessionid
510 self.start_control_request_timeout()
511 log.get("user").info(
512 f"Session {sessionid} requesting control", type="session"
513 )
514 self.emit(
515 "message",
516 {"type": "request_control", "sessionid": sessionid},
517 )
519 def start_control_request_timeout(self):
520 """Start a greelet that counts down the control request timeout
522 If the current operator does not respond within this timeframe
523 the requester will automatically be granted control
524 """
525 if self._pending_control_request:
526 self._pending_control_request_timeout = self._config[
527 "session_control_timeout"
528 ]
529 self._greenlet = gevent.spawn(self._control_request_timeout)
531 def _control_request_timeout(self):
532 while self._pending_control_request:
533 if self._pending_control_request_timeout == 0:
534 self.emit(
535 "message",
536 {
537 "type": "timeout_control",
538 "sessionid": self._pending_control_request,
539 },
540 )
541 self._do_grant_control(self._pending_control_request)
542 self._pending_control_request = None
544 self._pending_control_request_timeout -= 1
545 time.sleep(1)
547 def respond_request_control(self, grant):
548 """Respond to a request for control if not admin
550 Args:
551 grant (bool): Whether to grant control or not
553 Returns:
554 True if response acknowledged
555 """
556 if self._storage.has_control(g.sessionid):
557 if grant:
558 self._do_grant_control(self._pending_control_request)
559 else:
560 log.get("user").info(
561 f"Session {self._pending_control_request} denied control",
562 type="session",
563 )
564 self.emit(
565 "message",
566 {
567 "type": "deny_control",
568 "sessionid": self._pending_control_request,
569 "operator": False,
570 },
571 )
573 self._pending_control_request = None
574 return True
576 def _do_grant_control(self, sessionid):
577 self._storage.take_control(sessionid)
579 log.get("user").info("Session granted control", type="session")
580 self.emit(
581 "message",
582 {"type": "control", "sessionid": sessionid, "operator": True},
583 )
584 return True
586 def yield_control(self, sessionid):
587 """Yield control for the current session
589 Args:
590 sessionid (uuid): Sessionid to yield
592 Returns:
593 True if session was in control and has now yielded
594 """
595 yielded = self._storage.yield_control(sessionid)
596 if yielded:
597 log.get("user").info("Session yielded control", type="session")
598 self.emit(
599 "message",
600 {"type": "control", "sessionid": sessionid, "operator": True},
601 )
602 return True
604 def require_valid_session(self, **kwargs):
605 """Require a valid session for this request
607 Require a valid session for a flask request.
608 First checks if there is a bewit present and valid.
609 Then parses the request header to retrieve a token, try to decode it, and set
610 g.sessionid to the current sessionid. Return various error messages if the token
611 is not present, valid, etc
613 Kwargs:
614 require_blsession (bool): Whether this resource requires a valid blsession
616 Returns:
617 error(dict): None if valid, otherwise an error dict
618 """
619 logger.debug("Session: require_valid_session")
621 bewit = request.args.get("bewit")
622 if bewit:
623 url = f"http://localhost{request.path}?bewit={bewit}"
624 session = self._sign.verify(url)
625 if not isinstance(session, str):
626 return session, 401
627 else:
628 return self._verify_session(session, **kwargs)
630 auth_header = request.headers.get("Authorization")
631 if auth_header:
632 auth_token = auth_header.split(" ")[1]
633 else:
634 auth_token = None
636 if auth_token is not None:
637 resp = self._verify_token(auth_token)
638 if not isinstance(resp, str):
639 return resp, 401
640 else:
641 return self._verify_session(resp, **kwargs)
642 else:
643 return {"error": "No token provided"}, 401
645 def _verify_session(self, session, **kwargs):
646 if not self._storage.exists(session):
647 return {"error": "That session is not valid"}, 401
648 else:
649 self._storage.update(session)
651 g.sessionid = session
653 ses = self._storage.get(session)
655 g.login = ses["data"]["username"]
656 g.user = self._metadata.get_user()
657 if not g.user:
658 return {"error": "The user does not exist in the database"}, 401
660 g.blsession = None
661 if kwargs.get("require_blsession"):
662 valid = self._metadata.verify_session(
663 ses["data"].get("blsession", None)
664 )
665 if not valid:
666 return {"error": "No blsession selected"}, 403
667 else:
668 g.blsession = valid
670 def require_control(self):
671 """Require control for this request
673 Require that the current session has control to parse the current flask request.
674 Use to block access to say moving a motor if the session is not in control.
676 Returns:
677 None if session is in control, otherwise an error dict
679 """
680 logger.debug("Session: require_control")
682 if g.sessionid:
683 if not self._storage.has_control(g.sessionid):
684 return {"error": "That session does not have control"}
685 else:
686 return {"error": "No valid session"}
688 def veto(self, state):
689 """Veto all current sessions from control
691 Remove control from all sessions, useful to veto all control when say a scan
692 is in progress.
694 Args:
695 state (boolean): The veto state
696 """
697 pass
699 # UI Persistence
700 def _session_from_sio(self, namespace=""):
701 """Get a sessionid from SocketIO sid"""
702 if request.sid:
703 for session in self._storage.list():
704 if session["data"].get(f"sio_{namespace}") == request.sid:
705 return session["sessionid"]
707 def request_mirror(self, **kwargs):
708 """Request mirroring of another session"""
709 if self._storage.exists(g.sessionid):
710 self._storage.update(g.sessionid, mirror=kwargs.get("sessionid"))
712 self.emit(
713 "message",
714 {
715 "type": "mirror",
716 "sessionid": g.sessionid,
717 "mirrorid": kwargs.get("sessionid"),
718 },
719 )
721 return True
723 def setup_persistence(self):
724 """Setup UI persistence
726 These methods are used by redux-persist in the UI to persist
727 state via SocketIO to allow session mirroring
728 """
729 self.on("get_item", namespace="/persist")(self.ui_get_item)
730 self.on("get_all_items", namespace="/persist")(self.ui_get_all_items)
731 self.on("set_item", namespace="/persist")(self.ui_set_item)
732 self.on("remove_item", namespace="/persist")(self.ui_remove_item)
734 def _ensure_session(self):
735 """Ensure session exists in _ui_state"""
736 sessionid = self._session_from_sio("persist")
737 if sessionid:
738 if sessionid not in self._ui_state:
739 self._ui_state[sessionid] = {}
741 return sessionid
743 def ui_get_item(self, payload):
744 """Get an item from _ui_state"""
745 sessionid = self._ensure_session()
746 if not sessionid:
747 logger.error(
748 f"Requesting ui_get_item for unknown sessionid, sio {request.sid}"
749 )
750 return
752 return self._ui_state[sessionid].get(payload)
754 def ui_get_all_items(self, *args):
755 """Get all keys from _ui_state"""
756 sessionid = self._ensure_session()
757 if not sessionid:
758 logger.error(
759 f"Requesting ui_get_all_items for unknown sessionid, sio {request.sid}"
760 )
761 return
763 return self._ui_state[sessionid].keys()
765 def ui_set_item(self, payload):
766 """Set a key in _ui_state"""
767 sessionid = self._ensure_session()
768 if not sessionid:
769 logger.error(
770 f"Requesting ui_set_item for unknown sessionid, sio {request.sid}"
771 )
772 return
774 key, val = payload
775 self._ui_state[sessionid][key] = val
777 for mirror in self._storage.list():
778 if mirror["data"].get("mirror") == sessionid:
779 if mirror["data"].get("sio_persist"):
780 self.emit(
781 "state_update",
782 {key: self._ui_state[sessionid][key]},
783 namespace="/persist",
784 room=mirror["data"]["sio_persist"],
785 )
787 def ui_remove_item(self, payload):
788 """Remove a key from _ui_state"""
789 sessionid = self._ensure_session()
790 if not sessionid:
791 logger.error(
792 f"Requesting ui_remove_item for unknown sessionid, sio {request.sid}"
793 )
794 return
796 try:
797 del self._ui_state[sessionid][payload]
798 except KeyError:
799 logger.warning(
800 f"Trying to remove none existant key {payload} from _ui_state[{sessionid}]"
801 )