Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/stomp.py: 25%
151 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import time
4import json
5import logging
6import uuid
7from pprint import pformat
9import gevent
10from stompest.config import StompConfig
11from stompest.protocol import StompSpec
12from stompest.sync import Stomp as StompClient
13from stompest.error import StompConnectionError, StompConnectTimeout, StompProtocolError
15from daiquiri.core import CoreBase
16from daiquiri.core.utils import make_json_safe
19logger = logging.getLogger(__name__)
21stompest_logger = logging.getLogger("stompest.sync.client")
22stompest_logger.setLevel(logging.ERROR)
25def format_body(body):
26 """Parse zocalo recipe body"""
27 message = {}
29 if not isinstance(body, dict):
30 logger.warning("Message body is empty")
31 return message
33 try:
34 current_step = body["recipe"][str(body["recipe-pointer"])]
35 except KeyError:
36 logger.warning("Could not find recipe step")
37 return message
38 else:
39 message.update(current_step["parameters"])
40 if isinstance(body["payload"], dict):
41 message.update(body["payload"])
42 environment = body["environment"]
44 for key in sorted(environment, key=len, reverse=True):
45 for mk, value in message.items():
46 if isinstance(value, str):
47 if "$" + key == value:
48 message[mk] = value.replace("$" + key, str(environment[key]))
50 return message
53class Stomp(CoreBase):
54 """Core Stomp Interface
56 Initialise stomp to connect to the message broker. If a local queue for this
57 instance of daiquiri has been enabled subscribe to the queue. Otherwise just
58 allow daiquiri to be able to send messages
59 """
61 _namespace = "stomp"
62 _client = None
63 _running = False
64 _last_heart_beat = 0
65 _callbacks = []
66 _backoff = 0
68 _last_emit_time = 0
69 _emit_timeout = None
70 _messages_received = {}
72 def read_client(self):
73 """Read queue
75 Read from queue and parse incoming messages
76 """
77 while self._running:
78 try:
79 if self._client.canRead(1):
80 frame = self._client.receiveFrame()
81 self._client.ack(frame)
82 if frame.command == "MESSAGE":
83 message = format_body(json.loads(frame.body))
84 self.on_message_recieved(frame.headers, message)
86 now = time.time()
87 if now - self._last_heart_beat > 30:
88 self._client.beat()
89 self._last_heart_beat = now
91 except StompConnectionError:
92 logger.warning("Could not connect to broker")
93 try:
94 self.connect_and_subscribe()
95 except StompConnectTimeout:
96 logger.warning("Timeout connecting to broker")
97 self._backoff += 1
99 gevent.sleep(0.1)
101 if self._backoff > 0:
102 logger.warning("Waiting for backoff")
103 gevent.sleep(self._backoff * 10)
104 self._backoff -= 1
106 def connect_and_subscribe(self):
107 logger.info("Trying to connect to stomp")
108 prefix = self._config.get("stomp_queue_prefix", "zocalo")
109 queue = f"{prefix}.transient.{self._config['meta_beamline']}.notification"
110 logger.info(f"Subscribing to queue: {queue}")
112 self._client.connect()
113 try:
114 self._client.subscribe(
115 f"/queue/{queue}",
116 {
117 StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL,
118 StompSpec.ID_HEADER: f"daiquiri_{self._config['meta_beamline']}",
119 },
120 )
121 except StompProtocolError as e:
122 logger.warning(f"StompProtocolError: {str(e)}")
124 logger.info("Connected to broker and subscribed to queue")
126 def setup(self):
127 self._stomp_config = StompConfig(
128 f"tcp://{self._config['stomp_host']}:{self._config['stomp_port']}",
129 version=StompSpec.VERSION_1_1,
130 )
132 if self._config.get("stomp_queue"):
133 self._client = StompClient(self._stomp_config)
134 self._running = True
135 self._read_thread = gevent.spawn(self.read_client)
137 def __del__(self):
138 if self._running:
139 self._running = False
140 self._read_thread.join()
142 if self._client:
143 self._client.disconnect()
145 def add_listener(self, callback):
146 """Add a stomp listener
148 Args:
149 callback(function): Callback should be of the form callback(headers, message)
150 """
151 if not callable(callback):
152 raise AttributeError(f"Callback is not callable: {callback}")
154 if callback not in self._callbacks:
155 self._callbacks.append(callback)
156 else:
157 raise AttributeError(f"Callback is already registered: {callback}")
159 def on_message_recieved(self, headers, message):
160 """Incoming message handler
162 Args:
163 headers (dict): Message header
164 message (dict): Message body
165 """
166 logger.debug("Message recieved from stomp: %s", pformat(message))
167 for callback in self._callbacks:
168 callback(headers, message)
170 if message.get("daiquiri_id"):
171 self._messages_received[message.get("daiquiri_id")] = {
172 "time": time.time(),
173 "message": message,
174 }
176 self._queue_emit_message(headers, message)
178 def _queue_emit_message(self, headers, message):
179 """Debounce event emission
181 Try to not spam client
182 """
183 if self._emit_timeout is not None:
184 self._emit_timeout.kill()
185 self._emit_timeout = None
187 now = time.time()
188 if now - self._last_emit_time > 0.2:
189 self._emit_message(headers, message)
190 else:
191 self._emit_timeout = gevent.spawn_later(
192 0.2, self._emit_message, headers, message
193 )
195 def _emit_message(self, headers, message):
196 self.emit("message", message)
197 self._last_emit_time = time.time()
199 def send_message(self, message, destination=None):
200 """Send a message to a queue
202 Args:
203 message (dict): Dictionary of the message parameters
204 destination (str): The queue to send the message to
205 """
206 if destination is None:
207 destination = self._config["stomp_destination"]
209 if not message["parameters"]:
210 message["parameters"] = {}
211 message["parameters"]["daiquiri_id"] = str(uuid.uuid4())
213 try:
214 safe_message = make_json_safe(message)
215 client = StompClient(self._stomp_config)
216 client.connect()
217 client.send(
218 f"/queue/{destination}", json.dumps(safe_message).encode("utf-8")
219 )
220 client.disconnect()
221 return message["parameters"]["daiquiri_id"]
222 except Exception:
223 logger.exception("Problem sending message to broker, message not sent")
225 def start_recipe(self, datacollectionid, recipe, parameters):
226 """Start a processing recipe
228 Helper function to launch a processing recipe
229 """
230 parameters.update({"ispyb_dcid": datacollectionid})
231 uuid = self.send_message({"recipes": [recipe], "parameters": parameters})
232 self.emit(
233 "message",
234 {
235 "type": "start_recipe",
236 "recipe": recipe,
237 "datacollectionid": datacollectionid,
238 "daiquiri_id": uuid,
239 },
240 )
241 return uuid
243 def wait_for(self, uuid, timeout=60 * 5):
244 start = time.time()
245 while not self._messages_received.get(uuid):
246 time.sleep(1)
248 if time.time() - start > timeout:
249 logger.warning(
250 f"Did not get response for uuid: {uuid} after timeout {timeout}s"
251 )
252 return
254 return self._messages_received[uuid].get("message")
256 def send_event(self, datacollectionid, event):
257 """Send a processing event
259 Helper function to send an event message
260 """
261 if event not in ["start", "end"]:
262 raise AttributeError(f"Unknown event {event}")
264 uuid = self.send_message(
265 {
266 "recipes": ["mimas"],
267 "parameters": {"ispyb_dcid": datacollectionid, "event": event},
268 }
269 )
271 self.emit(
272 "message",
273 {
274 "type": "event",
275 "event": event,
276 "datacollectionid": datacollectionid,
277 "daiquiri_id": uuid,
278 },
279 )
281 return uuid