Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/queue.py: 57%

283 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-15 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3import time 

4import gevent 

5import uuid 

6 

7from marshmallow import fields 

8 

9from daiquiri.core.utils import loader 

10from daiquiri.core.logging import log 

11from daiquiri.core import CoreBase, CoreResource, marshal, require_control 

12from daiquiri.core.components import ComponentActor, ComponentActorKilled 

13from daiquiri.core.schema import ErrorSchema, MessageSchema 

14from daiquiri.core.schema.queue import ( 

15 QueueStatusSchema, 

16 ChangeQueueStatusSchema, 

17 ChangeQueueSettingSchema, 

18 MoveQueueItemSchema, 

19 ClearQueueItemsSchema, 

20 KillQueueSchema, 

21) 

22 

23import logging 

24 

25logger = logging.getLogger(__name__) 

26 

27 

28class PauseActor(ComponentActor): 

29 name = "pause" 

30 

31 def method(*args, **kwargs): 

32 pass 

33 

34 

35class SleepActor(ComponentActor): 

36 name = "sleep" 

37 

38 def method(*args, **kwargs): 

39 time.sleep(kwargs.get("time", 10)) 

40 

41 

42class QueueResource(CoreResource): 

43 @marshal( 

44 inp={"status": fields.Str()}, 

45 out=[[200, QueueStatusSchema(), "Current queue status"]], 

46 ) 

47 def get(self, **kwargs): 

48 """Get current queue status""" 

49 return self._parent.status(**kwargs), 200 

50 

51 @marshal( 

52 inp=ChangeQueueStatusSchema, 

53 out=[ 

54 [200, ChangeQueueStatusSchema(), "New queue state"], 

55 [400, ErrorSchema(), "Could not change queue state"], 

56 ], 

57 ) 

58 @require_control 

59 def put(self, *args, **kwargs): 

60 """Start or pause the queue""" 

61 state = kwargs.get("state", False) 

62 if state: 

63 self._parent.start() 

64 else: 

65 self._parent.stop() 

66 

67 return {"state": kwargs["state"]}, 200 

68 

69 @marshal( 

70 inp=ChangeQueueSettingSchema, 

71 out=[ 

72 [200, MessageSchema(), "Setting changed successfully"], 

73 [400, ErrorSchema(), "Could not change queue setting"], 

74 ], 

75 ) 

76 @require_control 

77 def patch(self, *args, **kwargs): 

78 """Update a queue setting""" 

79 success = self._parent.update_queue(**kwargs) 

80 if success: 

81 return {"message": "Queue setting updated"}, 200 

82 else: 

83 return {"error": "Could not update queue setting"}, 400 

84 

85 @marshal(inp=ClearQueueItemsSchema, out=[[200, MessageSchema(), "Queue cleared"]]) 

86 @require_control 

87 def delete(self, *args, **kwargs): 

88 """Clear the queue""" 

89 log.get("user").info("Queue was cleared", type="queue") 

90 self._parent.clear(**kwargs) 

91 return {"message": "Queue cleared"}, 200 

92 

93 

94class QueueItemResource(CoreResource): 

95 @marshal( 

96 out=[ 

97 [200, MessageSchema(), "Queue item removed"], 

98 [400, ErrorSchema(), "Could not remove queue item"], 

99 ] 

100 ) 

101 @require_control 

102 def delete(self, uuid): 

103 """Remove an item from the queue""" 

104 success = self._parent.remove(uuid) 

105 if success: 

106 log.get("user").info(f"Queue item {uuid} was removed", type="queue") 

107 return {"message": "Queue item removed"}, 200 

108 else: 

109 return {"error": "Could not remove queue item"}, 400 

110 

111 

112class QueueKillResource(CoreResource): 

113 @marshal( 

114 inp=KillQueueSchema, 

115 out=[ 

116 [200, MessageSchema(), "Queue item killed"], 

117 [400, ErrorSchema(), "Could not kill queue item"], 

118 ], 

119 ) 

120 @require_control 

121 def post(self, **kwargs): 

122 """Kill the currently running queue item 

123 

124 Args: 

125 stop (bool) : Whether the queue should stop after 

126 the current queue item was killed 

127 """ 

128 uuid = self._parent.kill() 

129 if kwargs.get("stop"): 

130 self._parent.stop() 

131 if uuid: 

132 log.get("user").info(f"Queue item {uuid} was killed", type="queue") 

133 return {"message": "Queue item killed"}, 200 

134 else: 

135 return {"error": "Could not kill queue item"}, 400 

136 

137 

138class MoveQueueItemResource(CoreResource): 

139 @marshal( 

140 inp=MoveQueueItemSchema, 

141 out=[ 

142 [200, MessageSchema(), "Queue item moved"], 

143 [400, ErrorSchema(), "Could not move queue item"], 

144 ], 

145 ) 

146 @require_control 

147 def post(self, **kwargs): 

148 """Move a queue item to a different position""" 

149 moved = self._parent.move_item(kwargs["uid"], kwargs["position"]) 

150 

151 if moved: 

152 log.get("user").info(f"Queue item {kwargs['uid']} was moved", type="queue") 

153 return {"message": "Queue item moved"}, 200 

154 else: 

155 return {"error": "Could not move queue item"}, 400 

156 

157 

158class PauseItemResource(CoreResource): 

159 @marshal( 

160 out=[ 

161 [200, MessageSchema(), "Pause item added"], 

162 [400, ErrorSchema(), "Could not add pause item"], 

163 ] 

164 ) 

165 @require_control 

166 def post(self): 

167 """Add a pause item to the queue (pauses the queue when reached)""" 

168 uid = self._parent.add_pause() 

169 if uid: 

170 log.get("user").info(f"Pause item {uid} was added", type="queue") 

171 return {"message": "Pause item added"}, 200 

172 else: 

173 return {"error": "Could not add pause item"}, 400 

174 

175 

176class SleepItemResource(CoreResource): 

177 @marshal( 

178 inp={"time": fields.Int(required=True)}, 

179 out=[ 

180 [200, MessageSchema(), "Sleep item added"], 

181 [400, ErrorSchema(), "Could not add sleep item"], 

182 ], 

183 ) 

184 @require_control 

185 def post(self, **kwargs): 

186 """Add a sleep item to the queue (sleeps for n seconds when reached)""" 

187 uid = self._parent.add_sleep(kwargs["time"]) 

188 if uid: 

189 log.get("user").info( 

190 f"Sleep item {uid} for {kwargs['time']}s was added", type="queue" 

191 ) 

192 return {"message": "Sleep item added"}, 200 

193 else: 

194 return {"error": "Could not add sleep item"}, 400 

195 

196 

197class Queue(CoreBase): 

198 """Core Queue functionality 

199 

200 The queue consists of a stack of `Actor` instances. The queue is started at initialise 

201 time and then by default paused. The queue can be started and stopped via flask resources 

202 

203 """ 

204 

205 _namespace = "queue" 

206 _require_session = True 

207 _require_blsession = True 

208 _stack = [] 

209 _finished = [] 

210 _running_actor = None 

211 _greenlet = None 

212 

213 _running = False 

214 _ready = False 

215 _check_actor_available = True 

216 

217 def setup(self): 

218 self.register_route(QueueResource, "") 

219 self.register_route(QueueKillResource, "/kill") 

220 self.register_route(MoveQueueItemResource, "/move") 

221 self.register_route(PauseItemResource, "/pause") 

222 self.register_route(SleepItemResource, "/sleep") 

223 self.register_route(QueueItemResource, "/<uuid>") 

224 

225 self._pause_on_fail = self._config.get("pause_queue_on_fail", True) 

226 

227 self.launch() 

228 

229 def push(self, actor): 

230 """Push an Actor onto the stack 

231 

232 Pushes an actor onto the stack and emits a message 

233 

234 Args: 

235 actor (obj): An instance of an `Actor` 

236 """ 

237 if not isinstance(actor, ComponentActor): 

238 raise AttributeError( 

239 f"Actor: {actor} must be an instance of `ComponentActor`" 

240 ) 

241 log.get("user").info( 

242 f"Actor added to the queue ({actor.name}, {actor.uid})", type="queue" 

243 ) 

244 self.emit( 

245 "message", 

246 {"type": "add", "message": "Adding Actor", "uid": actor.uid}, 

247 ) 

248 self._stack.append(actor) 

249 

250 def run_now(self, actor, pause=True): 

251 """Push an Actor onto the stack and run it now 

252 

253 Args: 

254 actor (obj): An instance of an `Actor` 

255 pause (bool): Whether the queue should pause after this actor 

256 """ 

257 if pause and len(self._stack) > 0: 

258 pause = self._create_pause() 

259 self._stack.insert(0, pause) 

260 

261 self._stack.insert(0, actor) 

262 self.start() 

263 

264 def start(self): 

265 """Start the queue running 

266 

267 Emits a message that the queue has started 

268 """ 

269 self._running = True 

270 log.get("user").info("Queue was started", type="queue") 

271 self.emit("message", {"type": "status", "message": True}) 

272 

273 def stop(self): 

274 """Stop the queue 

275 

276 Emits a message that the queue has stopped 

277 """ 

278 self._running = False 

279 log.get("user").info("Queue was paused", type="queue") 

280 self.emit("message", {"type": "status", "message": False}) 

281 

282 def clear(self, **kwargs): 

283 """Clear the queue 

284 Args: 

285 finished (bool) : Whether to clear the finished queue array or the stack 

286 """ 

287 if kwargs.get("finished"): 

288 for i in self._finished: 

289 i.remove() 

290 

291 self._finished = [] 

292 else: 

293 for i in self._stack: 

294 i.remove() 

295 

296 self._stack = [] 

297 

298 self.emit("message", {"type": "clear", "message": "Queue Cleared"}) 

299 

300 def remove(self, uuid): 

301 """Remove an item from the queue 

302 

303 Args: 

304 uuid (uuid): The queue item uuid to remove 

305 """ 

306 item = None 

307 for i in self._stack: 

308 if i.uid == uuid: 

309 item = i 

310 break 

311 

312 if item: 

313 item.remove() 

314 self._stack.remove(item) 

315 

316 self.emit( 

317 "message", 

318 {"type": "remove", "message": "Actor removed", "uid": uuid}, 

319 ) 

320 

321 return True 

322 

323 def move_item(self, uid, position): 

324 """Move a queue item to a different position 

325 

326 Args: 

327 uid (uuid): The queue item uuid to move 

328 position (int): The new queue item position 

329 """ 

330 item = None 

331 for i in self._stack: 

332 if i.uid == uid: 

333 item = i 

334 

335 if item: 

336 self._stack.remove(item) 

337 self._stack.insert(position, item) 

338 

339 self.emit( 

340 "message", 

341 {"type": "move", "message": "Actor moved", "uid": uid}, 

342 ) 

343 

344 return True 

345 

346 def _create_pause(self): 

347 """Create a pause Actor 

348 

349 Return: 

350 actor (obj): An instance of an `Actor` 

351 """ 

352 

353 def cb(*args, **kwargs): 

354 pass 

355 

356 pause = PauseActor( 

357 uid=str(uuid.uuid4()), 

358 name="pause", 

359 started=cb, 

360 error=cb, 

361 finished=cb, 

362 socketio=self._socketio, 

363 ) 

364 pause.prepare() 

365 return pause 

366 

367 def add_pause(self): 

368 """Add a pause item to the queue""" 

369 pause = self._create_pause() 

370 

371 self.push(pause) 

372 return pause.uid 

373 

374 def add_sleep(self, sleep_time): 

375 """Add a sleep item to the queue""" 

376 

377 def cb(*args, **kwargs): 

378 pass 

379 

380 sleep = SleepActor( 

381 uid=str(uuid.uuid4()), 

382 name="sleep", 

383 started=cb, 

384 error=cb, 

385 finished=cb, 

386 socketio=self._socketio, 

387 ) 

388 sleep.prepare(time=sleep_time) 

389 

390 self.push(sleep) 

391 return sleep.uid 

392 

393 def update_queue(self, **kwargs): 

394 """Update a queue setting""" 

395 if kwargs["setting"] == "pause_on_fail": 

396 try: 

397 val = bool(kwargs["value"]) 

398 except ValueError: 

399 logger.error( 

400 f"Value {kwargs['pause_on_fail']} is not a valid value for pause_on_fail" 

401 ) 

402 else: 

403 self._pause_on_fail = val 

404 self.emit( 

405 "message", 

406 {"type": "update", "message": "Setting updated"}, 

407 ) 

408 

409 return True 

410 

411 def status(self, **kwargs): 

412 """Return the queue status 

413 

414 Returns the queue status and a list of the queue stack 

415 

416 Returns: 

417 running (bool): Whether the queue is running 

418 stack (list(dict)): The list of queued actors 

419 current (dict): The currently running actor 

420 all (list(dict)): The list of finished, running, and queued actors 

421 

422 """ 

423 stack = [] 

424 for s in self._stack: 

425 stack.append({**s.info(), "status": "queued"}) 

426 

427 finished = [] 

428 for f in self._finished: 

429 info = f.info() 

430 finished.append( 

431 {**info, "status": "failed" if info["failed"] else "finished"} 

432 ) 

433 

434 cur = None 

435 if self._running_actor: 

436 cur = {**self._running_actor.info(), "status": "running"} 

437 

438 allq = ([cur] if cur else []) + stack 

439 

440 if kwargs.get("status") == "Finished": 

441 allq = finished + allq 

442 

443 return { 

444 "running": self._running, 

445 "ready": self._ready, 

446 "stack": stack, 

447 "current": cur, 

448 "all": allq, 

449 "pause_on_fail": self._pause_on_fail, 

450 } 

451 

452 def kill(self): 

453 """Kill the currently running actor""" 

454 if self._greenlet: 

455 act = self._running_actor 

456 self._greenlet.kill(exception=ComponentActorKilled) 

457 return act.uid 

458 

459 def launch(self): 

460 """Launch the queue background task""" 

461 logger.debug("Starting Queue background task") 

462 self._task = gevent.spawn(self.process) 

463 

464 def _check_ready(self): 

465 """Determine whether the queue is ready to process 

466 

467 Use an actor `queue/monitor` to determine if the queue is ready. This 

468 actor should return a boolean value defining the readystate of the queue. 

469 

470 This can be used to pause the queue if for example beam is lost 

471 """ 

472 if not self._check_actor_available: 

473 return 

474 

475 ready = False 

476 base = "{path}.{cls}".format( 

477 path=self._config["implementors"].replace("/", "."), 

478 cls=self.__class__.__name__.lower(), 

479 ) 

480 

481 def cb(*args, **kwargs): 

482 pass 

483 

484 try: 

485 actor = loader( 

486 base, 

487 "Actor", 

488 "monitor", 

489 uid=str(uuid.uuid4()), 

490 name="monitor", 

491 started=cb, 

492 error=cb, 

493 finished=cb, 

494 socketio=self._socketio, 

495 ) 

496 except ModuleNotFoundError: 

497 logger.warning("`monitor` actor not available, disabling check") 

498 self._check_actor_available = False 

499 self._ready = True 

500 return 

501 except Exception: 

502 logger.warning("Error loading `monitor` actor", exc_info=True) 

503 self._ready = True 

504 return 

505 

506 actor.prepare() 

507 actor.execute() 

508 if not actor._failed: 

509 ready = actor.resp() 

510 else: 

511 try: 

512 raise actor._exception 

513 except Exception: 

514 logger.warning("Error running `monitor` actor", exc_info=True) 

515 ready = True 

516 

517 if ready != self._ready: 

518 self._ready = ready 

519 self.emit( 

520 "message", 

521 { 

522 "type": "ready_state", 

523 "message": "Queue ready" if ready else "Queue not ready", 

524 }, 

525 ) 

526 

527 def process(self): 

528 """Queue Task 

529 

530 Iterate through the stack, pausing as needed 

531 The queue is automatically paused when the stack is empty 

532 

533 """ 

534 while True: 

535 self._check_ready() 

536 if self._running_actor is None and self._running and self._ready: 

537 if len(self._stack) > 0: 

538 self._running_actor = self._stack.pop(0) 

539 

540 # Pause the queue on a stop action 

541 if isinstance(self._running_actor, PauseActor): 

542 self.stop() 

543 

544 log.get("user").info( 

545 f"Queue processing new actor {self._running_actor.name} {self._running_actor.uid}", 

546 type="queue", 

547 ) 

548 self.emit( 

549 "message", 

550 { 

551 "type": "start", 

552 "message": "Starting Actor", 

553 "uid": self._running_actor.uid, 

554 }, 

555 ) 

556 logger.debug( 

557 "Executing queued actor {name} {uid}".format( 

558 name=self._running_actor.name, uid=self._running_actor.uid 

559 ) 

560 ) 

561 

562 try: 

563 self._greenlet = gevent.spawn(self._running_actor.execute) 

564 self._greenlet.join() 

565 

566 error = self._greenlet.value 

567 if error: 

568 raise error 

569 

570 log.get("user").info( 

571 f"Actor finished {self._running_actor.name} {self._running_actor.uid}", 

572 type="queue", 

573 ) 

574 self.emit( 

575 "message", 

576 { 

577 "type": "end", 

578 "message": "Actor Finished", 

579 "uid": self._running_actor.uid, 

580 }, 

581 ) 

582 

583 except ComponentActorKilled: 

584 self._running = False 

585 self.emit( 

586 "message", 

587 { 

588 "type": "warning", 

589 "message": "Actor killed, pausing queue", 

590 "uid": self._running_actor.uid, 

591 }, 

592 ) 

593 

594 except Exception as e: 

595 log.get("user").error( 

596 f"Error running actor: {self._running_actor.name} {self._running_actor.uid}", 

597 type="queue", 

598 ) 

599 self.emit( 

600 "message", 

601 { 

602 "type": "error", 

603 "message": str(e), 

604 "uid": self._running_actor.uid, 

605 }, 

606 ) 

607 

608 if self._pause_on_fail: 

609 logger.warning("Queued actor failed, pausing queue") 

610 self._running = False 

611 else: 

612 if len(self._stack) > 0: 

613 logger.warning( 

614 "Queued actor failed, waiting for 30s then continuing" 

615 ) 

616 sleep = self.add_sleep(30) 

617 self.move_item(sleep, 0) 

618 

619 if not isinstance( 

620 self._running_actor, PauseActor 

621 ) and not isinstance(self._running_actor, SleepActor): 

622 self._finished.append(self._running_actor) 

623 self._running_actor = None 

624 self._greenlet = None 

625 

626 # stop queue when empty 

627 else: 

628 self._running = False 

629 log.get("user").info("Queue completed, pausing", type="queue") 

630 self.emit( 

631 "message", 

632 {"type": "finished", "message": "Queue Finished, Pausing"}, 

633 ) 

634 

635 if not self._ready or not self._running: 

636 time.sleep(4) 

637 

638 time.sleep(1)