Skip to content

Celery

The celery component can be used to monitor workers and execute arbritrary tasks

The basic configuration requires a broker and backend url along with a list of tasks

component: celery
broker_url: redis://localhost:6379/1
backend_url: redis://localhost:6379/0
tasks:
  - actor: sleep
    description: Sleep for 60s

With rabbitmq and additionally a dead letter queue

component: celery
broker_url: amqp://daiquiri:daiquiri@localhost:5672/daiquiri
backend_url: redis://localhost:6379/0
broker_dlq: true
broker_queue: default
tasks:
  - actor: sleep
    description: Sleep for 60s

Task Actors

Each actor specifies which task it runs and can additionaly specify a parameter schema, the values of which are passed to the celery task as kwargs.

For example a simple sleep actor implmentors/celery/sleep.py:

from marshmallow import fields
from daiquiri.core.components.celery import CeleryTaskActor, CeleryTaskSchema


class SleepActorSchema(CeleryTaskSchema):
    name = fields.Str()


class SleepActor(CeleryTaskActor):
    schema = SleepActorSchema
    task = "sidecar.celery.sleep.sleep"

The celery task will recieve name as a kwarg if it is specified.

For the example celery task:

@app.task()
def sleep(**kwargs) -> int:
    print("running sleep", kwargs)
    time.sleep(60)
    print("sleep finished")
    return 1

and its output:

[2022-05-17 15:36:42,535: WARNING/ForkPoolWorker-6] running sleep
[2022-05-17 15:36:42,536: WARNING/ForkPoolWorker-6]
[2022-05-17 15:36:42,537: WARNING/ForkPoolWorker-6] {'name': 'a name'}

Beamline Notification

If beamline_queue is defined in the yaml configuration daiquiri will subscribe to queue called notification.{meta_beamline} in order to receive notifications. These notifications are by default forwarded to the UI