Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/queue.py: 57%
283 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import time
4import gevent
5import uuid
7from marshmallow import fields
9from daiquiri.core.utils import loader
10from daiquiri.core.logging import log
11from daiquiri.core import CoreBase, CoreResource, marshal, require_control
12from daiquiri.core.components import ComponentActor, ComponentActorKilled
13from daiquiri.core.schema import ErrorSchema, MessageSchema
14from daiquiri.core.schema.queue import (
15 QueueStatusSchema,
16 ChangeQueueStatusSchema,
17 ChangeQueueSettingSchema,
18 MoveQueueItemSchema,
19 ClearQueueItemsSchema,
20 KillQueueSchema,
21)
23import logging
25logger = logging.getLogger(__name__)
28class PauseActor(ComponentActor):
29 name = "pause"
31 def method(*args, **kwargs):
32 pass
35class SleepActor(ComponentActor):
36 name = "sleep"
38 def method(*args, **kwargs):
39 time.sleep(kwargs.get("time", 10))
42class QueueResource(CoreResource):
43 @marshal(
44 inp={"status": fields.Str()},
45 out=[[200, QueueStatusSchema(), "Current queue status"]],
46 )
47 def get(self, **kwargs):
48 """Get current queue status"""
49 return self._parent.status(**kwargs), 200
51 @marshal(
52 inp=ChangeQueueStatusSchema,
53 out=[
54 [200, ChangeQueueStatusSchema(), "New queue state"],
55 [400, ErrorSchema(), "Could not change queue state"],
56 ],
57 )
58 @require_control
59 def put(self, *args, **kwargs):
60 """Start or pause the queue"""
61 state = kwargs.get("state", False)
62 if state:
63 self._parent.start()
64 else:
65 self._parent.stop()
67 return {"state": kwargs["state"]}, 200
69 @marshal(
70 inp=ChangeQueueSettingSchema,
71 out=[
72 [200, MessageSchema(), "Setting changed successfully"],
73 [400, ErrorSchema(), "Could not change queue setting"],
74 ],
75 )
76 @require_control
77 def patch(self, *args, **kwargs):
78 """Update a queue setting"""
79 success = self._parent.update_queue(**kwargs)
80 if success:
81 return {"message": "Queue setting updated"}, 200
82 else:
83 return {"error": "Could not update queue setting"}, 400
85 @marshal(inp=ClearQueueItemsSchema, out=[[200, MessageSchema(), "Queue cleared"]])
86 @require_control
87 def delete(self, *args, **kwargs):
88 """Clear the queue"""
89 log.get("user").info("Queue was cleared", type="queue")
90 self._parent.clear(**kwargs)
91 return {"message": "Queue cleared"}, 200
94class QueueItemResource(CoreResource):
95 @marshal(
96 out=[
97 [200, MessageSchema(), "Queue item removed"],
98 [400, ErrorSchema(), "Could not remove queue item"],
99 ]
100 )
101 @require_control
102 def delete(self, uuid):
103 """Remove an item from the queue"""
104 success = self._parent.remove(uuid)
105 if success:
106 log.get("user").info(f"Queue item {uuid} was removed", type="queue")
107 return {"message": "Queue item removed"}, 200
108 else:
109 return {"error": "Could not remove queue item"}, 400
112class QueueKillResource(CoreResource):
113 @marshal(
114 inp=KillQueueSchema,
115 out=[
116 [200, MessageSchema(), "Queue item killed"],
117 [400, ErrorSchema(), "Could not kill queue item"],
118 ],
119 )
120 @require_control
121 def post(self, **kwargs):
122 """Kill the currently running queue item
124 Args:
125 stop (bool) : Whether the queue should stop after
126 the current queue item was killed
127 """
128 uuid = self._parent.kill()
129 if kwargs.get("stop"):
130 self._parent.stop()
131 if uuid:
132 log.get("user").info(f"Queue item {uuid} was killed", type="queue")
133 return {"message": "Queue item killed"}, 200
134 else:
135 return {"error": "Could not kill queue item"}, 400
138class MoveQueueItemResource(CoreResource):
139 @marshal(
140 inp=MoveQueueItemSchema,
141 out=[
142 [200, MessageSchema(), "Queue item moved"],
143 [400, ErrorSchema(), "Could not move queue item"],
144 ],
145 )
146 @require_control
147 def post(self, **kwargs):
148 """Move a queue item to a different position"""
149 moved = self._parent.move_item(kwargs["uid"], kwargs["position"])
151 if moved:
152 log.get("user").info(f"Queue item {kwargs['uid']} was moved", type="queue")
153 return {"message": "Queue item moved"}, 200
154 else:
155 return {"error": "Could not move queue item"}, 400
158class PauseItemResource(CoreResource):
159 @marshal(
160 out=[
161 [200, MessageSchema(), "Pause item added"],
162 [400, ErrorSchema(), "Could not add pause item"],
163 ]
164 )
165 @require_control
166 def post(self):
167 """Add a pause item to the queue (pauses the queue when reached)"""
168 uid = self._parent.add_pause()
169 if uid:
170 log.get("user").info(f"Pause item {uid} was added", type="queue")
171 return {"message": "Pause item added"}, 200
172 else:
173 return {"error": "Could not add pause item"}, 400
176class SleepItemResource(CoreResource):
177 @marshal(
178 inp={"time": fields.Int(required=True)},
179 out=[
180 [200, MessageSchema(), "Sleep item added"],
181 [400, ErrorSchema(), "Could not add sleep item"],
182 ],
183 )
184 @require_control
185 def post(self, **kwargs):
186 """Add a sleep item to the queue (sleeps for n seconds when reached)"""
187 uid = self._parent.add_sleep(kwargs["time"])
188 if uid:
189 log.get("user").info(
190 f"Sleep item {uid} for {kwargs['time']}s was added", type="queue"
191 )
192 return {"message": "Sleep item added"}, 200
193 else:
194 return {"error": "Could not add sleep item"}, 400
197class Queue(CoreBase):
198 """Core Queue functionality
200 The queue consists of a stack of `Actor` instances. The queue is started at initialise
201 time and then by default paused. The queue can be started and stopped via flask resources
203 """
205 _namespace = "queue"
206 _require_session = True
207 _require_blsession = True
208 _stack = []
209 _finished = []
210 _running_actor = None
211 _greenlet = None
213 _running = False
214 _ready = False
215 _check_actor_available = True
217 def setup(self):
218 self.register_route(QueueResource, "")
219 self.register_route(QueueKillResource, "/kill")
220 self.register_route(MoveQueueItemResource, "/move")
221 self.register_route(PauseItemResource, "/pause")
222 self.register_route(SleepItemResource, "/sleep")
223 self.register_route(QueueItemResource, "/<uuid>")
225 self._pause_on_fail = self._config.get("pause_queue_on_fail", True)
227 self.launch()
229 def push(self, actor):
230 """Push an Actor onto the stack
232 Pushes an actor onto the stack and emits a message
234 Args:
235 actor (obj): An instance of an `Actor`
236 """
237 if not isinstance(actor, ComponentActor):
238 raise AttributeError(
239 f"Actor: {actor} must be an instance of `ComponentActor`"
240 )
241 log.get("user").info(
242 f"Actor added to the queue ({actor.name}, {actor.uid})", type="queue"
243 )
244 self.emit(
245 "message",
246 {"type": "add", "message": "Adding Actor", "uid": actor.uid},
247 )
248 self._stack.append(actor)
250 def run_now(self, actor, pause=True):
251 """Push an Actor onto the stack and run it now
253 Args:
254 actor (obj): An instance of an `Actor`
255 pause (bool): Whether the queue should pause after this actor
256 """
257 if pause and len(self._stack) > 0:
258 pause = self._create_pause()
259 self._stack.insert(0, pause)
261 self._stack.insert(0, actor)
262 self.start()
264 def start(self):
265 """Start the queue running
267 Emits a message that the queue has started
268 """
269 self._running = True
270 log.get("user").info("Queue was started", type="queue")
271 self.emit("message", {"type": "status", "message": True})
273 def stop(self):
274 """Stop the queue
276 Emits a message that the queue has stopped
277 """
278 self._running = False
279 log.get("user").info("Queue was paused", type="queue")
280 self.emit("message", {"type": "status", "message": False})
282 def clear(self, **kwargs):
283 """Clear the queue
284 Args:
285 finished (bool) : Whether to clear the finished queue array or the stack
286 """
287 if kwargs.get("finished"):
288 for i in self._finished:
289 i.remove()
291 self._finished = []
292 else:
293 for i in self._stack:
294 i.remove()
296 self._stack = []
298 self.emit("message", {"type": "clear", "message": "Queue Cleared"})
300 def remove(self, uuid):
301 """Remove an item from the queue
303 Args:
304 uuid (uuid): The queue item uuid to remove
305 """
306 item = None
307 for i in self._stack:
308 if i.uid == uuid:
309 item = i
310 break
312 if item:
313 item.remove()
314 self._stack.remove(item)
316 self.emit(
317 "message",
318 {"type": "remove", "message": "Actor removed", "uid": uuid},
319 )
321 return True
323 def move_item(self, uid, position):
324 """Move a queue item to a different position
326 Args:
327 uid (uuid): The queue item uuid to move
328 position (int): The new queue item position
329 """
330 item = None
331 for i in self._stack:
332 if i.uid == uid:
333 item = i
335 if item:
336 self._stack.remove(item)
337 self._stack.insert(position, item)
339 self.emit(
340 "message",
341 {"type": "move", "message": "Actor moved", "uid": uid},
342 )
344 return True
346 def _create_pause(self):
347 """Create a pause Actor
349 Return:
350 actor (obj): An instance of an `Actor`
351 """
353 def cb(*args, **kwargs):
354 pass
356 pause = PauseActor(
357 uid=str(uuid.uuid4()),
358 name="pause",
359 started=cb,
360 error=cb,
361 finished=cb,
362 socketio=self._socketio,
363 )
364 pause.prepare()
365 return pause
367 def add_pause(self):
368 """Add a pause item to the queue"""
369 pause = self._create_pause()
371 self.push(pause)
372 return pause.uid
374 def add_sleep(self, sleep_time):
375 """Add a sleep item to the queue"""
377 def cb(*args, **kwargs):
378 pass
380 sleep = SleepActor(
381 uid=str(uuid.uuid4()),
382 name="sleep",
383 started=cb,
384 error=cb,
385 finished=cb,
386 socketio=self._socketio,
387 )
388 sleep.prepare(time=sleep_time)
390 self.push(sleep)
391 return sleep.uid
393 def update_queue(self, **kwargs):
394 """Update a queue setting"""
395 if kwargs["setting"] == "pause_on_fail":
396 try:
397 val = bool(kwargs["value"])
398 except ValueError:
399 logger.error(
400 f"Value {kwargs['pause_on_fail']} is not a valid value for pause_on_fail"
401 )
402 else:
403 self._pause_on_fail = val
404 self.emit(
405 "message",
406 {"type": "update", "message": "Setting updated"},
407 )
409 return True
411 def status(self, **kwargs):
412 """Return the queue status
414 Returns the queue status and a list of the queue stack
416 Returns:
417 running (bool): Whether the queue is running
418 stack (list(dict)): The list of queued actors
419 current (dict): The currently running actor
420 all (list(dict)): The list of finished, running, and queued actors
422 """
423 stack = []
424 for s in self._stack:
425 stack.append({**s.info(), "status": "queued"})
427 finished = []
428 for f in self._finished:
429 info = f.info()
430 finished.append(
431 {**info, "status": "failed" if info["failed"] else "finished"}
432 )
434 cur = None
435 if self._running_actor:
436 cur = {**self._running_actor.info(), "status": "running"}
438 allq = ([cur] if cur else []) + stack
440 if kwargs.get("status") == "Finished":
441 allq = finished + allq
443 return {
444 "running": self._running,
445 "ready": self._ready,
446 "stack": stack,
447 "current": cur,
448 "all": allq,
449 "pause_on_fail": self._pause_on_fail,
450 }
452 def kill(self):
453 """Kill the currently running actor"""
454 if self._greenlet:
455 act = self._running_actor
456 self._greenlet.kill(exception=ComponentActorKilled)
457 return act.uid
459 def launch(self):
460 """Launch the queue background task"""
461 logger.debug("Starting Queue background task")
462 self._task = gevent.spawn(self.process)
464 def _check_ready(self):
465 """Determine whether the queue is ready to process
467 Use an actor `queue/monitor` to determine if the queue is ready. This
468 actor should return a boolean value defining the readystate of the queue.
470 This can be used to pause the queue if for example beam is lost
471 """
472 if not self._check_actor_available:
473 return
475 ready = False
476 base = "{path}.{cls}".format(
477 path=self._config["implementors"].replace("/", "."),
478 cls=self.__class__.__name__.lower(),
479 )
481 def cb(*args, **kwargs):
482 pass
484 try:
485 actor = loader(
486 base,
487 "Actor",
488 "monitor",
489 uid=str(uuid.uuid4()),
490 name="monitor",
491 started=cb,
492 error=cb,
493 finished=cb,
494 socketio=self._socketio,
495 )
496 except ModuleNotFoundError:
497 logger.warning("`monitor` actor not available, disabling check")
498 self._check_actor_available = False
499 self._ready = True
500 return
501 except Exception:
502 logger.warning("Error loading `monitor` actor", exc_info=True)
503 self._ready = True
504 return
506 actor.prepare()
507 actor.execute()
508 if not actor._failed:
509 ready = actor.resp()
510 else:
511 try:
512 raise actor._exception
513 except Exception:
514 logger.warning("Error running `monitor` actor", exc_info=True)
515 ready = True
517 if ready != self._ready:
518 self._ready = ready
519 self.emit(
520 "message",
521 {
522 "type": "ready_state",
523 "message": "Queue ready" if ready else "Queue not ready",
524 },
525 )
527 def process(self):
528 """Queue Task
530 Iterate through the stack, pausing as needed
531 The queue is automatically paused when the stack is empty
533 """
534 while True:
535 self._check_ready()
536 if self._running_actor is None and self._running and self._ready:
537 if len(self._stack) > 0:
538 self._running_actor = self._stack.pop(0)
540 # Pause the queue on a stop action
541 if isinstance(self._running_actor, PauseActor):
542 self.stop()
544 log.get("user").info(
545 f"Queue processing new actor {self._running_actor.name} {self._running_actor.uid}",
546 type="queue",
547 )
548 self.emit(
549 "message",
550 {
551 "type": "start",
552 "message": "Starting Actor",
553 "uid": self._running_actor.uid,
554 },
555 )
556 logger.debug(
557 "Executing queued actor {name} {uid}".format(
558 name=self._running_actor.name, uid=self._running_actor.uid
559 )
560 )
562 try:
563 self._greenlet = gevent.spawn(self._running_actor.execute)
564 self._greenlet.join()
566 error = self._greenlet.value
567 if error:
568 raise error
570 log.get("user").info(
571 f"Actor finished {self._running_actor.name} {self._running_actor.uid}",
572 type="queue",
573 )
574 self.emit(
575 "message",
576 {
577 "type": "end",
578 "message": "Actor Finished",
579 "uid": self._running_actor.uid,
580 },
581 )
583 except ComponentActorKilled:
584 self._running = False
585 self.emit(
586 "message",
587 {
588 "type": "warning",
589 "message": "Actor killed, pausing queue",
590 "uid": self._running_actor.uid,
591 },
592 )
594 except Exception as e:
595 log.get("user").error(
596 f"Error running actor: {self._running_actor.name} {self._running_actor.uid}",
597 type="queue",
598 )
599 self.emit(
600 "message",
601 {
602 "type": "error",
603 "message": str(e),
604 "uid": self._running_actor.uid,
605 },
606 )
608 if self._pause_on_fail:
609 logger.warning("Queued actor failed, pausing queue")
610 self._running = False
611 else:
612 if len(self._stack) > 0:
613 logger.warning(
614 "Queued actor failed, waiting for 30s then continuing"
615 )
616 sleep = self.add_sleep(30)
617 self.move_item(sleep, 0)
619 if not isinstance(
620 self._running_actor, PauseActor
621 ) and not isinstance(self._running_actor, SleepActor):
622 self._finished.append(self._running_actor)
623 self._running_actor = None
624 self._greenlet = None
626 # stop queue when empty
627 else:
628 self._running = False
629 log.get("user").info("Queue completed, pausing", type="queue")
630 self.emit(
631 "message",
632 {"type": "finished", "message": "Queue Finished, Pausing"},
633 )
635 if not self._ready or not self._running:
636 time.sleep(4)
638 time.sleep(1)