Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/stomp.py: 40%
162 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-06 02:13 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-06 02:13 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import time
4import json
5import logging
6import uuid
7from pprint import pformat
9import gevent
10from stompest.config import StompConfig
11from stompest.protocol import StompSpec
12from stompest.sync import Stomp as StompClient
13from stompest.error import StompConnectionError, StompConnectTimeout, StompProtocolError
15from daiquiri.core.components import Component
16from daiquiri.core.schema.component import ComponentSchema
17from daiquiri.core.utils import make_json_safe
19from marshmallow import fields
21logger = logging.getLogger(__name__)
23stompest_logger = logging.getLogger("stompest.sync.client")
24stompest_logger.setLevel(logging.ERROR)
27def format_body(body):
28 """Parse zocalo recipe body"""
29 message = {}
31 if not isinstance(body, dict):
32 logger.warning("Message body is empty")
33 return message
35 try:
36 current_step = body["recipe"][str(body["recipe-pointer"])]
37 except KeyError:
38 logger.warning("Could not find recipe step")
39 return message
40 else:
41 message.update(current_step["parameters"])
42 if isinstance(body["payload"], dict):
43 message.update(body["payload"])
44 environment = body["environment"]
46 for key in sorted(environment, key=len, reverse=True):
47 for mk, value in message.items():
48 if isinstance(value, str):
49 if "$" + key == value:
50 message[mk] = value.replace("$" + key, str(environment[key]))
52 return message
55class StompConfigSchema(ComponentSchema):
56 stomp_user = fields.Str()
57 stomp_pass = fields.Str()
58 stomp_host = fields.Str(required=True)
59 stomp_port = fields.Int(required=True)
60 stomp_queue = fields.Bool(
61 metadata={"description": "Whether to subscribe to notification queue"}
62 )
63 stomp_destination = fields.Str()
64 stomp_queue_prefix = fields.Str()
67class Stomp(Component):
68 """Core Stomp Interface
70 Initialise stomp to connect to the message broker. If a local queue for this
71 instance of daiquiri has been enabled subscribe to the queue. Otherwise just
72 allow daiquiri to be able to send messages
73 """
75 _config_schema = StompConfigSchema()
76 _namespace = "stomp"
77 _client = None
78 _running = False
79 _last_heart_beat = 0
80 _callbacks = []
81 _backoff = 0
83 _last_emit_time = 0
84 _emit_timeout = None
85 _messages_received = {}
87 def read_client(self):
88 """Read queue
90 Read from queue and parse incoming messages
91 """
92 while self._running:
93 try:
94 if self._client.canRead(1):
95 frame = self._client.receiveFrame()
96 self._client.ack(frame)
97 if frame.command == "MESSAGE":
98 message = format_body(json.loads(frame.body))
99 self.on_message_recieved(frame.headers, message)
101 now = time.time()
102 if now - self._last_heart_beat > 30:
103 self._client.beat()
104 self._last_heart_beat = now
106 except StompConnectionError:
107 logger.warning("Could not connect to broker")
108 try:
109 self.connect_and_subscribe()
110 except StompConnectTimeout:
111 logger.warning("Timeout connecting to broker")
112 self._backoff += 1
114 gevent.sleep(0.1)
116 if self._backoff > 0:
117 logger.warning("Waiting for backoff")
118 gevent.sleep(self._backoff * 10)
119 self._backoff -= 1
121 def connect_and_subscribe(self):
122 logger.info("Trying to connect to stomp")
123 prefix = self._config.get("stomp_queue_prefix", "zocalo")
124 queue = f"{prefix}.transient.{self._config['meta_beamline']}.notification"
125 logger.info(f"Subscribing to queue: {queue}")
127 self._client.connect()
128 try:
129 self._client.subscribe(
130 f"/queue/{queue}",
131 {
132 StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL,
133 StompSpec.ID_HEADER: f"daiquiri_{self._config['meta_beamline']}",
134 },
135 )
136 except StompProtocolError as e:
137 logger.warning(f"StompProtocolError: {str(e)}")
139 logger.info("Connected to broker and subscribed to queue")
141 def setup(self):
142 self._stomp_config = StompConfig(
143 f"tcp://{self._config['stomp_host']}:{self._config['stomp_port']}",
144 login=self._config.get("stomp_user"),
145 passcode=self._config.get("stomp_pass"),
146 version=StompSpec.VERSION_1_1,
147 )
149 if self._config.get("stomp_queue"):
150 self._client = StompClient(self._stomp_config)
151 self._running = True
152 self._read_thread = gevent.spawn(self.read_client)
154 def __del__(self):
155 if self._running:
156 self._running = False
157 self._read_thread.join()
159 if self._client:
160 self._client.disconnect()
162 def add_listener(self, callback):
163 """Add a stomp listener
165 Args:
166 callback(function): Callback should be of the form callback(headers, message)
167 """
168 if not callable(callback):
169 raise AttributeError(f"Callback is not callable: {callback}")
171 if callback not in self._callbacks:
172 self._callbacks.append(callback)
173 else:
174 raise AttributeError(f"Callback is already registered: {callback}")
176 def on_message_recieved(self, headers, message):
177 """Incoming message handler
179 Args:
180 headers (dict): Message header
181 message (dict): Message body
182 """
183 logger.debug("Message recieved from stomp: %s", pformat(message))
184 for callback in self._callbacks:
185 callback(headers, message)
187 if message.get("daiquiri_id"):
188 self._messages_received[message.get("daiquiri_id")] = {
189 "time": time.time(),
190 "message": message,
191 }
193 self._queue_emit_message(headers, message)
195 def _queue_emit_message(self, headers, message):
196 """Debounce event emission
198 Try to not spam client
199 """
200 if self._emit_timeout is not None:
201 self._emit_timeout.kill()
202 self._emit_timeout = None
204 now = time.time()
205 if now - self._last_emit_time > 0.2:
206 self._emit_message(headers, message)
207 else:
208 self._emit_timeout = gevent.spawn_later(
209 0.2, self._emit_message, headers, message
210 )
212 def _emit_message(self, headers, message):
213 self.emit("message", message)
214 self._last_emit_time = time.time()
216 def send_message(self, message, destination=None):
217 """Send a message to a queue
219 Args:
220 message (dict): Dictionary of the message parameters
221 destination (str): The queue to send the message to
222 """
223 if destination is None:
224 destination = self._config["stomp_destination"]
226 if not message["parameters"]:
227 message["parameters"] = {}
228 message["parameters"]["daiquiri_id"] = str(uuid.uuid4())
230 try:
231 safe_message = make_json_safe(message)
232 client = StompClient(self._stomp_config)
233 client.connect()
234 client.send(
235 f"/queue/{destination}", json.dumps(safe_message).encode("utf-8")
236 )
237 client.disconnect()
238 return message["parameters"]["daiquiri_id"]
239 except Exception:
240 logger.exception("Problem sending message to broker, message not sent")
242 def start_recipe(self, datacollectionid, recipe, parameters):
243 """Start a processing recipe
245 Helper function to launch a processing recipe
246 """
247 parameters.update({"ispyb_dcid": datacollectionid})
248 uuid = self.send_message({"recipes": [recipe], "parameters": parameters})
249 self.emit(
250 "message",
251 {
252 "type": "start_recipe",
253 "recipe": recipe,
254 "datacollectionid": datacollectionid,
255 "daiquiri_id": uuid,
256 },
257 )
258 return uuid
260 def wait_for(self, uuid, timeout=60 * 5):
261 start = time.time()
262 while not self._messages_received.get(uuid):
263 time.sleep(1)
265 if time.time() - start > timeout:
266 logger.warning(
267 f"Did not get response for uuid: {uuid} after timeout {timeout}s"
268 )
269 return
271 return self._messages_received[uuid].get("message")
273 def send_event(self, datacollectionid, event):
274 """Send a processing event
276 Helper function to send an event message
277 """
278 if event not in ["start", "end"]:
279 raise AttributeError(f"Unknown event {event}")
281 uuid = self.send_message(
282 {
283 "recipes": ["mimas"],
284 "parameters": {"ispyb_dcid": datacollectionid, "event": event},
285 }
286 )
288 self.emit(
289 "message",
290 {
291 "type": "event",
292 "event": event,
293 "datacollectionid": datacollectionid,
294 "daiquiri_id": uuid,
295 },
296 )
298 return uuid