Celery
The celery component can be used to monitor workers and execute arbritrary tasks
The basic configuration requires a broker and backend url along with a list of tasks
component: celery
broker_url: redis://localhost:6379/1
backend_url: redis://localhost:6379/0
tasks:
- actor: sleep
description: Sleep for 60s
With rabbitmq and additionally a dead letter queue
component: celery
broker_url: amqp://daiquiri:daiquiri@localhost:5672/daiquiri
backend_url: redis://localhost:6379/0
broker_dlq: true
broker_queue: default
tasks:
- actor: sleep
description: Sleep for 60s
Task Actors
Each actor specifies which task it runs and can additionaly specify a parameter schema, the values of which are passed to the celery task as kwargs
.
For example a simple sleep actor implmentors/celery/sleep.py
:
from marshmallow import fields
from daiquiri.core.components.celery import CeleryTaskActor, CeleryTaskSchema
class SleepActorSchema(CeleryTaskSchema):
name = fields.Str()
class SleepActor(CeleryTaskActor):
schema = SleepActorSchema
task = "sidecar.celery.sleep.sleep"
The celery task will recieve name
as a kwarg
if it is specified.
For the example celery task:
@app.task()
def sleep(**kwargs) -> int:
print("running sleep", kwargs)
time.sleep(60)
print("sleep finished")
return 1
and its output:
[2022-05-17 15:36:42,535: WARNING/ForkPoolWorker-6] running sleep
[2022-05-17 15:36:42,536: WARNING/ForkPoolWorker-6]
[2022-05-17 15:36:42,537: WARNING/ForkPoolWorker-6] {'name': 'a name'}
Beamline Notification
If beamline_queue
is defined in the yaml configuration daiquiri will subscribe to queue called notification.{meta_beamline}
in order to receive notifications. These notifications are by default forwarded to the UI