Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/stomp.py: 40%

162 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-02-06 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3import time 

4import json 

5import logging 

6import uuid 

7from pprint import pformat 

8 

9import gevent 

10from stompest.config import StompConfig 

11from stompest.protocol import StompSpec 

12from stompest.sync import Stomp as StompClient 

13from stompest.error import StompConnectionError, StompConnectTimeout, StompProtocolError 

14 

15from daiquiri.core.components import Component 

16from daiquiri.core.schema.component import ComponentSchema 

17from daiquiri.core.utils import make_json_safe 

18 

19from marshmallow import fields 

20 

21logger = logging.getLogger(__name__) 

22 

23stompest_logger = logging.getLogger("stompest.sync.client") 

24stompest_logger.setLevel(logging.ERROR) 

25 

26 

27def format_body(body): 

28 """Parse zocalo recipe body""" 

29 message = {} 

30 

31 if not isinstance(body, dict): 

32 logger.warning("Message body is empty") 

33 return message 

34 

35 try: 

36 current_step = body["recipe"][str(body["recipe-pointer"])] 

37 except KeyError: 

38 logger.warning("Could not find recipe step") 

39 return message 

40 else: 

41 message.update(current_step["parameters"]) 

42 if isinstance(body["payload"], dict): 

43 message.update(body["payload"]) 

44 environment = body["environment"] 

45 

46 for key in sorted(environment, key=len, reverse=True): 

47 for mk, value in message.items(): 

48 if isinstance(value, str): 

49 if "$" + key == value: 

50 message[mk] = value.replace("$" + key, str(environment[key])) 

51 

52 return message 

53 

54 

55class StompConfigSchema(ComponentSchema): 

56 stomp_user = fields.Str() 

57 stomp_pass = fields.Str() 

58 stomp_host = fields.Str(required=True) 

59 stomp_port = fields.Int(required=True) 

60 stomp_queue = fields.Bool( 

61 metadata={"description": "Whether to subscribe to notification queue"} 

62 ) 

63 stomp_destination = fields.Str() 

64 stomp_queue_prefix = fields.Str() 

65 

66 

67class Stomp(Component): 

68 """Core Stomp Interface 

69 

70 Initialise stomp to connect to the message broker. If a local queue for this 

71 instance of daiquiri has been enabled subscribe to the queue. Otherwise just 

72 allow daiquiri to be able to send messages 

73 """ 

74 

75 _config_schema = StompConfigSchema() 

76 _namespace = "stomp" 

77 _client = None 

78 _running = False 

79 _last_heart_beat = 0 

80 _callbacks = [] 

81 _backoff = 0 

82 

83 _last_emit_time = 0 

84 _emit_timeout = None 

85 _messages_received = {} 

86 

87 def read_client(self): 

88 """Read queue 

89 

90 Read from queue and parse incoming messages 

91 """ 

92 while self._running: 

93 try: 

94 if self._client.canRead(1): 

95 frame = self._client.receiveFrame() 

96 self._client.ack(frame) 

97 if frame.command == "MESSAGE": 

98 message = format_body(json.loads(frame.body)) 

99 self.on_message_recieved(frame.headers, message) 

100 

101 now = time.time() 

102 if now - self._last_heart_beat > 30: 

103 self._client.beat() 

104 self._last_heart_beat = now 

105 

106 except StompConnectionError: 

107 logger.warning("Could not connect to broker") 

108 try: 

109 self.connect_and_subscribe() 

110 except StompConnectTimeout: 

111 logger.warning("Timeout connecting to broker") 

112 self._backoff += 1 

113 

114 gevent.sleep(0.1) 

115 

116 if self._backoff > 0: 

117 logger.warning("Waiting for backoff") 

118 gevent.sleep(self._backoff * 10) 

119 self._backoff -= 1 

120 

121 def connect_and_subscribe(self): 

122 logger.info("Trying to connect to stomp") 

123 prefix = self._config.get("stomp_queue_prefix", "zocalo") 

124 queue = f"{prefix}.transient.{self._config['meta_beamline']}.notification" 

125 logger.info(f"Subscribing to queue: {queue}") 

126 

127 self._client.connect() 

128 try: 

129 self._client.subscribe( 

130 f"/queue/{queue}", 

131 { 

132 StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL, 

133 StompSpec.ID_HEADER: f"daiquiri_{self._config['meta_beamline']}", 

134 }, 

135 ) 

136 except StompProtocolError as e: 

137 logger.warning(f"StompProtocolError: {str(e)}") 

138 

139 logger.info("Connected to broker and subscribed to queue") 

140 

141 def setup(self): 

142 self._stomp_config = StompConfig( 

143 f"tcp://{self._config['stomp_host']}:{self._config['stomp_port']}", 

144 login=self._config.get("stomp_user"), 

145 passcode=self._config.get("stomp_pass"), 

146 version=StompSpec.VERSION_1_1, 

147 ) 

148 

149 if self._config.get("stomp_queue"): 

150 self._client = StompClient(self._stomp_config) 

151 self._running = True 

152 self._read_thread = gevent.spawn(self.read_client) 

153 

154 def __del__(self): 

155 if self._running: 

156 self._running = False 

157 self._read_thread.join() 

158 

159 if self._client: 

160 self._client.disconnect() 

161 

162 def add_listener(self, callback): 

163 """Add a stomp listener 

164 

165 Args: 

166 callback(function): Callback should be of the form callback(headers, message) 

167 """ 

168 if not callable(callback): 

169 raise AttributeError(f"Callback is not callable: {callback}") 

170 

171 if callback not in self._callbacks: 

172 self._callbacks.append(callback) 

173 else: 

174 raise AttributeError(f"Callback is already registered: {callback}") 

175 

176 def on_message_recieved(self, headers, message): 

177 """Incoming message handler 

178 

179 Args: 

180 headers (dict): Message header 

181 message (dict): Message body 

182 """ 

183 logger.debug("Message recieved from stomp: %s", pformat(message)) 

184 for callback in self._callbacks: 

185 callback(headers, message) 

186 

187 if message.get("daiquiri_id"): 

188 self._messages_received[message.get("daiquiri_id")] = { 

189 "time": time.time(), 

190 "message": message, 

191 } 

192 

193 self._queue_emit_message(headers, message) 

194 

195 def _queue_emit_message(self, headers, message): 

196 """Debounce event emission 

197 

198 Try to not spam client 

199 """ 

200 if self._emit_timeout is not None: 

201 self._emit_timeout.kill() 

202 self._emit_timeout = None 

203 

204 now = time.time() 

205 if now - self._last_emit_time > 0.2: 

206 self._emit_message(headers, message) 

207 else: 

208 self._emit_timeout = gevent.spawn_later( 

209 0.2, self._emit_message, headers, message 

210 ) 

211 

212 def _emit_message(self, headers, message): 

213 self.emit("message", message) 

214 self._last_emit_time = time.time() 

215 

216 def send_message(self, message, destination=None): 

217 """Send a message to a queue 

218 

219 Args: 

220 message (dict): Dictionary of the message parameters 

221 destination (str): The queue to send the message to 

222 """ 

223 if destination is None: 

224 destination = self._config["stomp_destination"] 

225 

226 if not message["parameters"]: 

227 message["parameters"] = {} 

228 message["parameters"]["daiquiri_id"] = str(uuid.uuid4()) 

229 

230 try: 

231 safe_message = make_json_safe(message) 

232 client = StompClient(self._stomp_config) 

233 client.connect() 

234 client.send( 

235 f"/queue/{destination}", json.dumps(safe_message).encode("utf-8") 

236 ) 

237 client.disconnect() 

238 return message["parameters"]["daiquiri_id"] 

239 except Exception: 

240 logger.exception("Problem sending message to broker, message not sent") 

241 

242 def start_recipe(self, datacollectionid, recipe, parameters): 

243 """Start a processing recipe 

244 

245 Helper function to launch a processing recipe 

246 """ 

247 parameters.update({"ispyb_dcid": datacollectionid}) 

248 uuid = self.send_message({"recipes": [recipe], "parameters": parameters}) 

249 self.emit( 

250 "message", 

251 { 

252 "type": "start_recipe", 

253 "recipe": recipe, 

254 "datacollectionid": datacollectionid, 

255 "daiquiri_id": uuid, 

256 }, 

257 ) 

258 return uuid 

259 

260 def wait_for(self, uuid, timeout=60 * 5): 

261 start = time.time() 

262 while not self._messages_received.get(uuid): 

263 time.sleep(1) 

264 

265 if time.time() - start > timeout: 

266 logger.warning( 

267 f"Did not get response for uuid: {uuid} after timeout {timeout}s" 

268 ) 

269 return 

270 

271 return self._messages_received[uuid].get("message") 

272 

273 def send_event(self, datacollectionid, event): 

274 """Send a processing event 

275 

276 Helper function to send an event message 

277 """ 

278 if event not in ["start", "end"]: 

279 raise AttributeError(f"Unknown event {event}") 

280 

281 uuid = self.send_message( 

282 { 

283 "recipes": ["mimas"], 

284 "parameters": {"ispyb_dcid": datacollectionid, "event": event}, 

285 } 

286 ) 

287 

288 self.emit( 

289 "message", 

290 { 

291 "type": "event", 

292 "event": event, 

293 "datacollectionid": datacollectionid, 

294 "daiquiri_id": uuid, 

295 }, 

296 ) 

297 

298 return uuid