Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/session/__init__.py: 65%

333 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-15 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3 

4from __future__ import annotations 

5import gevent 

6import time 

7from functools import partial 

8 

9from flask import request, g 

10from flask_socketio import disconnect, ConnectionRefusedError 

11import jwt 

12from marshmallow import fields 

13 

14from daiquiri.core.logging import log 

15from daiquiri.core import ( 

16 CoreBase, 

17 CoreResource, 

18 marshal, 

19 no_blsession_required, 

20 require_staff, 

21) 

22from daiquiri.core.schema.session import SessionListSchema, SignSchema 

23from daiquiri.core.schema.metadata import paginated 

24from daiquiri.core.schema import ErrorSchema, MessageSchema 

25from daiquiri.core.bewit import SignedUrl 

26 

27# from daiquiri.core.user import User 

28from daiquiri.core.utils import loader 

29 

30import logging 

31 

32logger = logging.getLogger(__name__) 

33 

34 

35class SessionListResource(CoreResource): 

36 @marshal( 

37 out=[ 

38 [200, paginated(SessionListSchema), "List of sessions"], 

39 [401, ErrorSchema(), "Invalid session"], 

40 ] 

41 ) 

42 def get(self): 

43 """ 

44 Get a list of current sessions 

45 """ 

46 return self._parent.sessions(), 200 

47 

48 

49class SessionResource(CoreResource): 

50 @no_blsession_required 

51 @marshal( 

52 out=[ 

53 [200, SessionListSchema(), "A single session"], 

54 [401, ErrorSchema(), "Invalid session"], 

55 [404, ErrorSchema(), "Session not found"], 

56 ] 

57 ) 

58 def get(self): 

59 """ 

60 Get the current session 

61 """ 

62 ses = self._parent.session(g.sessionid) 

63 if ses: 

64 return ses, 200 

65 else: 

66 return {"error": "No such session"}, 404 

67 

68 

69class RequestControlResource(CoreResource): 

70 @marshal( 

71 out=[ 

72 [200, MessageSchema(), "Control granted"], 

73 [400, ErrorSchema(), "Could not get control"], 

74 ] 

75 ) 

76 def post(self): 

77 """ 

78 Request control for the current session 

79 """ 

80 ses = self._parent.request_control(g.sessionid) 

81 if ses: 

82 return {"message": "Session now has control"}, 200 

83 else: 

84 return {"error": "Cant take control"}, 400 

85 

86 @marshal( 

87 out=[ 

88 [200, MessageSchema(), "Control granted"], 

89 [400, ErrorSchema(), "Could not get control"], 

90 ] 

91 ) 

92 def delete(self): 

93 """ 

94 Yield control for the current session 

95 """ 

96 ses = self._parent.yield_control(g.sessionid) 

97 if ses: 

98 return {"message": "Session yielded control"}, 200 

99 else: 

100 return {"error": "Could not yield control"}, 400 

101 

102 

103class RespondRequestControlResource(CoreResource): 

104 @marshal( 

105 inp={"grant": fields.Bool(required=True)}, 

106 out=[ 

107 [200, MessageSchema(), "Response acknowledged"], 

108 [400, ErrorSchema(), "Could not acknowledge control request response"], 

109 ], 

110 ) 

111 def post(self, **kwargs): 

112 """ 

113 Respond to a request for control in the current session 

114 """ 

115 ses = self._parent.respond_request_control(kwargs["grant"]) 

116 if ses: 

117 return {"message": "Response acknowledged"}, 200 

118 else: 

119 return {"error": "Could not acknowledge response"}, 400 

120 

121 

122class SignResource(CoreResource): 

123 @no_blsession_required 

124 @marshal( 

125 inp=SignSchema, 

126 out=[ 

127 [200, SignSchema(), "Url signed"], 

128 [400, ErrorSchema(), "Could not sign Url"], 

129 ], 

130 ) 

131 def post(self, **kwargs): 

132 """Create a signed url 

133 

134 This allows a resource to be accessed temporarily a single time without 

135 an access token. Can be used to downlad files, etc 

136 """ 

137 bewit = self._parent.sign(kwargs["url"]) 

138 if bewit: 

139 return {"url": kwargs["url"], "bewit": bewit}, 200 

140 else: 

141 return {"error": "Could not sign url"}, 400 

142 

143 

144class MirrorResource(CoreResource): 

145 @require_staff 

146 @marshal( 

147 inp={ 

148 "sessionid": fields.Str( 

149 allow_none=True, metadata={"description": "The sessionid to mirror"} 

150 ) 

151 }, 

152 out=[ 

153 [200, MessageSchema(), "Session mirroring enabled"], 

154 [400, ErrorSchema(), "Could not mirror session"], 

155 ], 

156 ) 

157 def post(self, **kwargs): 

158 """Request mirroring of a session""" 

159 mirror = self._parent.request_mirror(**kwargs) 

160 if mirror: 

161 return {"message": "Mirroring enabled"}, 200 

162 else: 

163 return {"error": "Could not mirror session"}, 400 

164 

165 

166class Session(CoreBase): 

167 """Session Handler 

168 

169 Loads the session storage mechanism dynamically from the config.yaml session_type. 

170 """ 

171 

172 _namespace = "session" 

173 

174 _require_session = True 

175 _require_blsession = True 

176 _pending_control_request = None 

177 

178 _ui_state = {} 

179 

180 def __init__(self, *args, **kwargs): 

181 CoreBase.__init__(self, *args, **kwargs) 

182 self._session_start_timeout: dict[str, float] = {} 

183 

184 def set_metadata(self, metadata): 

185 self._metadata = metadata 

186 

187 def setup(self): 

188 self._sign = SignedUrl(self._config["url_secret"]) 

189 

190 self._session = self 

191 self._storage = loader( 

192 "daiquiri.core.session", "SessionStorage", self._config["session_type"] 

193 ) 

194 self._storage.set_generate_token(self._generate_token) 

195 

196 self.register_route(SessionListResource, "") 

197 self.register_route(SessionResource, "/current") 

198 self.register_route(RequestControlResource, "/current/control") 

199 self.register_route(RespondRequestControlResource, "/current/control/respond") 

200 self.register_route(SignResource, "/sign") 

201 self.register_route(MirrorResource, "/mirror") 

202 

203 for namespace in [ 

204 "app", 

205 "metadata", 

206 "persist", 

207 "queue", 

208 "session", 

209 "stomp", 

210 ]: 

211 self.register_namespace(namespace) 

212 

213 self.setup_persistence() 

214 

215 self.on("connect", namespace=None)(partial(self._on_socket_connect, "")) 

216 self.on("disconnect", namespace=None)(partial(self._on_socket_disconnect, "")) 

217 

218 def register_namespace(self, namespace): 

219 self.on("connect", namespace=f"/{namespace}")( 

220 partial(self._on_socket_connect, namespace) 

221 ) 

222 self.on("disconnect", namespace=f"/{namespace}")( 

223 partial(self._on_socket_disconnect, namespace) 

224 ) 

225 

226 def _on_socket_connect(self, namespace, *args, **kwargs): 

227 """Authenticate SocketIO Connection 

228 

229 Makes sure a token is present on the socketio request and checks its validity in 

230 the current session list 

231 

232 Returns: 

233 None if session is valid, else False which disconnects the SocketIO request 

234 """ 

235 if "token" in request.args: 

236 sessionid = self._verify_token(request.args["token"]) 

237 if isinstance(sessionid, str): 

238 if self._storage.exists(sessionid): 

239 self._storage.update(sessionid, **{f"sio_{namespace}": request.sid}) 

240 self._stop_session_timeout(sessionid) 

241 logger.debug(f"Got token for socketio login {sessionid}") 

242 return True 

243 else: 

244 logger.debug( 

245 f"Invalid session for {sessionid}, token: {request.args['token']}" 

246 ) 

247 else: 

248 logger.debug("Invalid token") 

249 

250 raise ConnectionRefusedError({"error": "Invalid Session"}) 

251 

252 def _on_socket_disconnect(self, namespace, *args, **kwargs): 

253 """SocketIO disconnect event 

254 

255 Used to start session timeout 

256 """ 

257 self._start_session_timeout(self._session_from_sio(namespace)) 

258 

259 def _start_session_timeout(self, sessionid: str): 

260 """Start session timeout 

261 

262 Start a 10s timeout for the current session 

263 

264 Args: 

265 sessionid(str): The sessionid 

266 """ 

267 new_timeout = sessionid not in self._session_start_timeout 

268 self._session_start_timeout[sessionid] = time.time() 

269 

270 if new_timeout: 

271 logger.info(f"Starting session timeout {sessionid}") 

272 gevent.spawn(self._session_timeout_process, sessionid) 

273 

274 def _session_timeout_process(self, sessionid: str, sleep=5, timeout=30): 

275 """Session timeout process 

276 

277 This greenlet will wait for 10 seconds, if the timeout has 

278 not been cancelled the session will be removed 

279 

280 Args: 

281 sessionid(str): The sessionid 

282 sleep(int): Time to sleep 

283 timeout(int): Time in seconds after which to timeout 

284 """ 

285 delete = False 

286 while True: 

287 time.sleep(sleep) 

288 start_timeout = self._session_start_timeout.get(sessionid, None) 

289 if start_timeout is None: 

290 # timeout was cancelled 

291 break 

292 

293 if time.time() >= start_timeout + timeout: 

294 delete = True 

295 del self._session_start_timeout[sessionid] 

296 break 

297 

298 if delete: 

299 if self._storage.exists(sessionid): 

300 self._storage.remove(sessionid) 

301 log.get("user").info("Session disconnected", type="session") 

302 self.emit( 

303 "message", 

304 {"type": "disconnect", "sessionid": sessionid}, 

305 ) 

306 

307 for session in self._storage.list(): 

308 if session["data"].get("mirror") == sessionid: 

309 self.emit( 

310 "message", 

311 {"type": "mirror_disconnect", "sessionid": sessionid}, 

312 room=session["data"]["sio_session"], 

313 ) 

314 

315 del session["data"]["mirror"] 

316 

317 def _stop_session_timeout(self, sessionid): 

318 """Cancel a session timeout 

319 

320 Args: 

321 sessionid(str): The sessionid 

322 """ 

323 logger.info(f"Cancelling session timeout {sessionid}") 

324 if sessionid in self._session_start_timeout: 

325 del self._session_start_timeout[sessionid] 

326 

327 def sign(self, url): 

328 """Create a signed url 

329 

330 Args: 

331 url(str): The url to sign 

332 Returns 

333 bewit(str): The bewit token 

334 """ 

335 bewit = self._sign.sign(url, g.sessionid) 

336 if bewit: 

337 return bewit 

338 

339 def create(self, data): 

340 """Create a new session 

341 

342 Delegates to the session storage engine to create the session and emit a connect 

343 event 

344 

345 Args: 

346 data (dict): Dictionary of session parameters to save 

347 

348 Returns: 

349 The session id 

350 """ 

351 session = self._storage.create(data) 

352 username = data.get("username", None) 

353 log.get("user").info(f"New session connected {username}", type="session") 

354 self.emit( 

355 "message", 

356 { 

357 "type": "connect", 

358 "sessionid": session["sessionid"], 

359 "username": username, 

360 }, 

361 ) 

362 

363 if self._config.get("session_auto_control") and self._storage.no_control(): 

364 logger.info( 

365 f"Automatically granting control for to {username} with session {session['sessionid']}" 

366 ) 

367 self._do_grant_control(session["sessionid"]) 

368 

369 return session 

370 

371 def remove(self): 

372 """Remove the current session 

373 

374 Remove a session from the storage engine and emit a disconnect event 

375 

376 Returns 

377 True is session successfully remove, False if not 

378 """ 

379 invalid = self.require_valid_session() 

380 if not invalid: 

381 if self._storage.exists(g.sessionid): 

382 session = self._storage.get(g.sessionid) 

383 for key, value in session["data"].items(): 

384 if key.startswith("sio_"): 

385 disconnect(sid=value, namespace=key.replace("sio_", "")) 

386 

387 self._storage.remove(g.sessionid) 

388 log.get("user").info("Session disconnected", type="session") 

389 self.emit( 

390 "message", 

391 {"type": "disconnect", "sessionid": g.sessionid}, 

392 ) 

393 return True 

394 else: 

395 return False 

396 

397 def sessions(self): 

398 """Return a list of sessions""" 

399 sessions = self._storage.list() 

400 for s in sessions: 

401 s["operator_pending"] = ( 

402 True if s["sessionid"] == self._pending_control_request else False 

403 ) 

404 

405 return {"total": len(sessions), "rows": sessions} 

406 

407 def session(self, sessionid): 

408 """Return a specific session 

409 

410 Args: 

411 sessionid (uuid): The session to return 

412 

413 Returns 

414 The session dict 

415 """ 

416 session = self._storage.get(sessionid) 

417 

418 if not session: 

419 return 

420 

421 mirrored = False 

422 for ses in self._storage.list(): 

423 if sessionid == ses["data"].get("mirror"): 

424 mirrored = True 

425 

426 session["mirrored"] = mirrored 

427 

428 return session 

429 

430 def update(self, data): 

431 """Update data within the current session 

432 

433 Kwargs: 

434 data (dict): Keywords to update 

435 """ 

436 self._storage.update(g.sessionid, **data) 

437 

438 def _generate_token(self, sessionid): 

439 """Generate a session token 

440 

441 Generates a session token with similar properties to a JWT, storing the sessionid 

442 within the generated token. Token expirey defined in config session_length 

443 

444 Args: 

445 sessionid (uuid): The sessionid 

446 

447 Returns 

448 A session token used to validate all requests 

449 """ 

450 encoded = jwt.encode( 

451 { 

452 "exp": time.time() + self._config["session_length"], 

453 "sessionid": sessionid, 

454 }, 

455 self._config["secret"], 

456 algorithm="HS256", 

457 ) 

458 return encoded 

459 

460 def _verify_token(self, token): 

461 """Verify a token is valid 

462 

463 Checks the signature of the token is valid and return the encoded sessionid 

464 

465 Args: 

466 token (str): The token 

467 

468 Returns: 

469 The sessionid from the token if valid, otherwise an error dict 

470 """ 

471 try: 

472 data = jwt.decode(token, self._config["secret"], algorithms=["HS256"]) 

473 except jwt.ExpiredSignatureError: 

474 logger.info("SignatureExpired {token}".format(token=token)) 

475 sessionid = self._storage.remove(None, token=token) 

476 if sessionid: 

477 self.emit( 

478 "message", 

479 {"type": "expired", "sessionid": sessionid}, 

480 ) 

481 logger.info("Removing expired session {sid}".format(sid=sessionid)) 

482 

483 return {"error": "Token Expired"} 

484 

485 except jwt.InvalidTokenError: 

486 return {"error": "Invalid Token"} 

487 

488 return data["sessionid"] 

489 

490 def request_control(self, sessionid): 

491 """Requst session control 

492 

493 Request control for the sessionid and emit an event notifying other clients that 

494 sessionid is now in control. 

495 

496 If the user is staff the request is automatically granted, otherwise the current 

497 user in control must respond 

498 

499 Args: 

500 sessionid (uuid): The sessionid 

501 Returns: 

502 True if sessionid now in control 

503 """ 

504 if g.user.staff() or self._storage.no_control(): 

505 self._do_grant_control(sessionid) 

506 return True 

507 

508 else: 

509 self._pending_control_request = sessionid 

510 self.start_control_request_timeout() 

511 log.get("user").info( 

512 f"Session {sessionid} requesting control", type="session" 

513 ) 

514 self.emit( 

515 "message", 

516 {"type": "request_control", "sessionid": sessionid}, 

517 ) 

518 

519 def start_control_request_timeout(self): 

520 """Start a greelet that counts down the control request timeout 

521 

522 If the current operator does not respond within this timeframe 

523 the requester will automatically be granted control 

524 """ 

525 if self._pending_control_request: 

526 self._pending_control_request_timeout = self._config[ 

527 "session_control_timeout" 

528 ] 

529 self._greenlet = gevent.spawn(self._control_request_timeout) 

530 

531 def _control_request_timeout(self): 

532 while self._pending_control_request: 

533 if self._pending_control_request_timeout == 0: 

534 self.emit( 

535 "message", 

536 { 

537 "type": "timeout_control", 

538 "sessionid": self._pending_control_request, 

539 }, 

540 ) 

541 self._do_grant_control(self._pending_control_request) 

542 self._pending_control_request = None 

543 

544 self._pending_control_request_timeout -= 1 

545 time.sleep(1) 

546 

547 def respond_request_control(self, grant): 

548 """Respond to a request for control if not admin 

549 

550 Args: 

551 grant (bool): Whether to grant control or not 

552 

553 Returns: 

554 True if response acknowledged 

555 """ 

556 if self._storage.has_control(g.sessionid): 

557 if grant: 

558 self._do_grant_control(self._pending_control_request) 

559 else: 

560 log.get("user").info( 

561 f"Session {self._pending_control_request} denied control", 

562 type="session", 

563 ) 

564 self.emit( 

565 "message", 

566 { 

567 "type": "deny_control", 

568 "sessionid": self._pending_control_request, 

569 "operator": False, 

570 }, 

571 ) 

572 

573 self._pending_control_request = None 

574 return True 

575 

576 def _do_grant_control(self, sessionid): 

577 self._storage.take_control(sessionid) 

578 

579 log.get("user").info("Session granted control", type="session") 

580 self.emit( 

581 "message", 

582 {"type": "control", "sessionid": sessionid, "operator": True}, 

583 ) 

584 return True 

585 

586 def yield_control(self, sessionid): 

587 """Yield control for the current session 

588 

589 Args: 

590 sessionid (uuid): Sessionid to yield 

591 

592 Returns: 

593 True if session was in control and has now yielded 

594 """ 

595 yielded = self._storage.yield_control(sessionid) 

596 if yielded: 

597 log.get("user").info("Session yielded control", type="session") 

598 self.emit( 

599 "message", 

600 {"type": "control", "sessionid": sessionid, "operator": True}, 

601 ) 

602 return True 

603 

604 def require_valid_session(self, **kwargs): 

605 """Require a valid session for this request 

606 

607 Require a valid session for a flask request. 

608 First checks if there is a bewit present and valid. 

609 Then parses the request header to retrieve a token, try to decode it, and set 

610 g.sessionid to the current sessionid. Return various error messages if the token 

611 is not present, valid, etc 

612 

613 Kwargs: 

614 require_blsession (bool): Whether this resource requires a valid blsession 

615 

616 Returns: 

617 error(dict): None if valid, otherwise an error dict 

618 """ 

619 logger.debug("Session: require_valid_session") 

620 

621 bewit = request.args.get("bewit") 

622 if bewit: 

623 url = f"http://localhost{request.path}?bewit={bewit}" 

624 session = self._sign.verify(url) 

625 if not isinstance(session, str): 

626 return session, 401 

627 else: 

628 return self._verify_session(session, **kwargs) 

629 

630 auth_header = request.headers.get("Authorization") 

631 if auth_header: 

632 auth_token = auth_header.split(" ")[1] 

633 else: 

634 auth_token = None 

635 

636 if auth_token is not None: 

637 resp = self._verify_token(auth_token) 

638 if not isinstance(resp, str): 

639 return resp, 401 

640 else: 

641 return self._verify_session(resp, **kwargs) 

642 else: 

643 return {"error": "No token provided"}, 401 

644 

645 def _verify_session(self, session, **kwargs): 

646 if not self._storage.exists(session): 

647 return {"error": "That session is not valid"}, 401 

648 else: 

649 self._storage.update(session) 

650 

651 g.sessionid = session 

652 

653 ses = self._storage.get(session) 

654 

655 g.login = ses["data"]["username"] 

656 g.user = self._metadata.get_user() 

657 if not g.user: 

658 return {"error": "The user does not exist in the database"}, 401 

659 

660 g.blsession = None 

661 if kwargs.get("require_blsession"): 

662 valid = self._metadata.verify_session( 

663 ses["data"].get("blsession", None) 

664 ) 

665 if not valid: 

666 return {"error": "No blsession selected"}, 403 

667 else: 

668 g.blsession = valid 

669 

670 def require_control(self): 

671 """Require control for this request 

672 

673 Require that the current session has control to parse the current flask request. 

674 Use to block access to say moving a motor if the session is not in control. 

675 

676 Returns: 

677 None if session is in control, otherwise an error dict 

678 

679 """ 

680 logger.debug("Session: require_control") 

681 

682 if g.sessionid: 

683 if not self._storage.has_control(g.sessionid): 

684 return {"error": "That session does not have control"} 

685 else: 

686 return {"error": "No valid session"} 

687 

688 def veto(self, state): 

689 """Veto all current sessions from control 

690 

691 Remove control from all sessions, useful to veto all control when say a scan 

692 is in progress. 

693 

694 Args: 

695 state (boolean): The veto state 

696 """ 

697 pass 

698 

699 # UI Persistence 

700 def _session_from_sio(self, namespace=""): 

701 """Get a sessionid from SocketIO sid""" 

702 if request.sid: 

703 for session in self._storage.list(): 

704 if session["data"].get(f"sio_{namespace}") == request.sid: 

705 return session["sessionid"] 

706 

707 def request_mirror(self, **kwargs): 

708 """Request mirroring of another session""" 

709 if self._storage.exists(g.sessionid): 

710 self._storage.update(g.sessionid, mirror=kwargs.get("sessionid")) 

711 

712 self.emit( 

713 "message", 

714 { 

715 "type": "mirror", 

716 "sessionid": g.sessionid, 

717 "mirrorid": kwargs.get("sessionid"), 

718 }, 

719 ) 

720 

721 return True 

722 

723 def setup_persistence(self): 

724 """Setup UI persistence 

725 

726 These methods are used by redux-persist in the UI to persist 

727 state via SocketIO to allow session mirroring 

728 """ 

729 self.on("get_item", namespace="/persist")(self.ui_get_item) 

730 self.on("get_all_items", namespace="/persist")(self.ui_get_all_items) 

731 self.on("set_item", namespace="/persist")(self.ui_set_item) 

732 self.on("remove_item", namespace="/persist")(self.ui_remove_item) 

733 

734 def _ensure_session(self): 

735 """Ensure session exists in _ui_state""" 

736 sessionid = self._session_from_sio("persist") 

737 if sessionid: 

738 if sessionid not in self._ui_state: 

739 self._ui_state[sessionid] = {} 

740 

741 return sessionid 

742 

743 def ui_get_item(self, payload): 

744 """Get an item from _ui_state""" 

745 sessionid = self._ensure_session() 

746 if not sessionid: 

747 logger.error( 

748 f"Requesting ui_get_item for unknown sessionid, sio {request.sid}" 

749 ) 

750 return 

751 

752 return self._ui_state[sessionid].get(payload) 

753 

754 def ui_get_all_items(self, *args): 

755 """Get all keys from _ui_state""" 

756 sessionid = self._ensure_session() 

757 if not sessionid: 

758 logger.error( 

759 f"Requesting ui_get_all_items for unknown sessionid, sio {request.sid}" 

760 ) 

761 return 

762 

763 return self._ui_state[sessionid].keys() 

764 

765 def ui_set_item(self, payload): 

766 """Set a key in _ui_state""" 

767 sessionid = self._ensure_session() 

768 if not sessionid: 

769 logger.error( 

770 f"Requesting ui_set_item for unknown sessionid, sio {request.sid}" 

771 ) 

772 return 

773 

774 key, val = payload 

775 self._ui_state[sessionid][key] = val 

776 

777 for mirror in self._storage.list(): 

778 if mirror["data"].get("mirror") == sessionid: 

779 if mirror["data"].get("sio_persist"): 

780 self.emit( 

781 "state_update", 

782 {key: self._ui_state[sessionid][key]}, 

783 namespace="/persist", 

784 room=mirror["data"]["sio_persist"], 

785 ) 

786 

787 def ui_remove_item(self, payload): 

788 """Remove a key from _ui_state""" 

789 sessionid = self._ensure_session() 

790 if not sessionid: 

791 logger.error( 

792 f"Requesting ui_remove_item for unknown sessionid, sio {request.sid}" 

793 ) 

794 return 

795 

796 try: 

797 del self._ui_state[sessionid][payload] 

798 except KeyError: 

799 logger.warning( 

800 f"Trying to remove none existant key {payload} from _ui_state[{sessionid}]" 

801 )