Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/stomp.py: 25%

151 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3import time 

4import json 

5import logging 

6import uuid 

7from pprint import pformat 

8 

9import gevent 

10from stompest.config import StompConfig 

11from stompest.protocol import StompSpec 

12from stompest.sync import Stomp as StompClient 

13from stompest.error import StompConnectionError, StompConnectTimeout, StompProtocolError 

14 

15from daiquiri.core import CoreBase 

16from daiquiri.core.utils import make_json_safe 

17 

18 

19logger = logging.getLogger(__name__) 

20 

21stompest_logger = logging.getLogger("stompest.sync.client") 

22stompest_logger.setLevel(logging.ERROR) 

23 

24 

25def format_body(body): 

26 """Parse zocalo recipe body""" 

27 message = {} 

28 

29 if not isinstance(body, dict): 

30 logger.warning("Message body is empty") 

31 return message 

32 

33 try: 

34 current_step = body["recipe"][str(body["recipe-pointer"])] 

35 except KeyError: 

36 logger.warning("Could not find recipe step") 

37 return message 

38 else: 

39 message.update(current_step["parameters"]) 

40 if isinstance(body["payload"], dict): 

41 message.update(body["payload"]) 

42 environment = body["environment"] 

43 

44 for key in sorted(environment, key=len, reverse=True): 

45 for mk, value in message.items(): 

46 if isinstance(value, str): 

47 if "$" + key == value: 

48 message[mk] = value.replace("$" + key, str(environment[key])) 

49 

50 return message 

51 

52 

53class Stomp(CoreBase): 

54 """Core Stomp Interface 

55 

56 Initialise stomp to connect to the message broker. If a local queue for this 

57 instance of daiquiri has been enabled subscribe to the queue. Otherwise just 

58 allow daiquiri to be able to send messages 

59 """ 

60 

61 _namespace = "stomp" 

62 _client = None 

63 _running = False 

64 _last_heart_beat = 0 

65 _callbacks = [] 

66 _backoff = 0 

67 

68 _last_emit_time = 0 

69 _emit_timeout = None 

70 _messages_received = {} 

71 

72 def read_client(self): 

73 """Read queue 

74 

75 Read from queue and parse incoming messages 

76 """ 

77 while self._running: 

78 try: 

79 if self._client.canRead(1): 

80 frame = self._client.receiveFrame() 

81 self._client.ack(frame) 

82 if frame.command == "MESSAGE": 

83 message = format_body(json.loads(frame.body)) 

84 self.on_message_recieved(frame.headers, message) 

85 

86 now = time.time() 

87 if now - self._last_heart_beat > 30: 

88 self._client.beat() 

89 self._last_heart_beat = now 

90 

91 except StompConnectionError: 

92 logger.warning("Could not connect to broker") 

93 try: 

94 self.connect_and_subscribe() 

95 except StompConnectTimeout: 

96 logger.warning("Timeout connecting to broker") 

97 self._backoff += 1 

98 

99 gevent.sleep(0.1) 

100 

101 if self._backoff > 0: 

102 logger.warning("Waiting for backoff") 

103 gevent.sleep(self._backoff * 10) 

104 self._backoff -= 1 

105 

106 def connect_and_subscribe(self): 

107 logger.info("Trying to connect to stomp") 

108 prefix = self._config.get("stomp_queue_prefix", "zocalo") 

109 queue = f"{prefix}.transient.{self._config['meta_beamline']}.notification" 

110 logger.info(f"Subscribing to queue: {queue}") 

111 

112 self._client.connect() 

113 try: 

114 self._client.subscribe( 

115 f"/queue/{queue}", 

116 { 

117 StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL, 

118 StompSpec.ID_HEADER: f"daiquiri_{self._config['meta_beamline']}", 

119 }, 

120 ) 

121 except StompProtocolError as e: 

122 logger.warning(f"StompProtocolError: {str(e)}") 

123 

124 logger.info("Connected to broker and subscribed to queue") 

125 

126 def setup(self): 

127 self._stomp_config = StompConfig( 

128 f"tcp://{self._config['stomp_host']}:{self._config['stomp_port']}", 

129 version=StompSpec.VERSION_1_1, 

130 ) 

131 

132 if self._config.get("stomp_queue"): 

133 self._client = StompClient(self._stomp_config) 

134 self._running = True 

135 self._read_thread = gevent.spawn(self.read_client) 

136 

137 def __del__(self): 

138 if self._running: 

139 self._running = False 

140 self._read_thread.join() 

141 

142 if self._client: 

143 self._client.disconnect() 

144 

145 def add_listener(self, callback): 

146 """Add a stomp listener 

147 

148 Args: 

149 callback(function): Callback should be of the form callback(headers, message) 

150 """ 

151 if not callable(callback): 

152 raise AttributeError(f"Callback is not callable: {callback}") 

153 

154 if callback not in self._callbacks: 

155 self._callbacks.append(callback) 

156 else: 

157 raise AttributeError(f"Callback is already registered: {callback}") 

158 

159 def on_message_recieved(self, headers, message): 

160 """Incoming message handler 

161 

162 Args: 

163 headers (dict): Message header 

164 message (dict): Message body 

165 """ 

166 logger.debug("Message recieved from stomp: %s", pformat(message)) 

167 for callback in self._callbacks: 

168 callback(headers, message) 

169 

170 if message.get("daiquiri_id"): 

171 self._messages_received[message.get("daiquiri_id")] = { 

172 "time": time.time(), 

173 "message": message, 

174 } 

175 

176 self._queue_emit_message(headers, message) 

177 

178 def _queue_emit_message(self, headers, message): 

179 """Debounce event emission 

180 

181 Try to not spam client 

182 """ 

183 if self._emit_timeout is not None: 

184 self._emit_timeout.kill() 

185 self._emit_timeout = None 

186 

187 now = time.time() 

188 if now - self._last_emit_time > 0.2: 

189 self._emit_message(headers, message) 

190 else: 

191 self._emit_timeout = gevent.spawn_later( 

192 0.2, self._emit_message, headers, message 

193 ) 

194 

195 def _emit_message(self, headers, message): 

196 self.emit("message", message) 

197 self._last_emit_time = time.time() 

198 

199 def send_message(self, message, destination=None): 

200 """Send a message to a queue 

201 

202 Args: 

203 message (dict): Dictionary of the message parameters 

204 destination (str): The queue to send the message to 

205 """ 

206 if destination is None: 

207 destination = self._config["stomp_destination"] 

208 

209 if not message["parameters"]: 

210 message["parameters"] = {} 

211 message["parameters"]["daiquiri_id"] = str(uuid.uuid4()) 

212 

213 try: 

214 safe_message = make_json_safe(message) 

215 client = StompClient(self._stomp_config) 

216 client.connect() 

217 client.send( 

218 f"/queue/{destination}", json.dumps(safe_message).encode("utf-8") 

219 ) 

220 client.disconnect() 

221 return message["parameters"]["daiquiri_id"] 

222 except Exception: 

223 logger.exception("Problem sending message to broker, message not sent") 

224 

225 def start_recipe(self, datacollectionid, recipe, parameters): 

226 """Start a processing recipe 

227 

228 Helper function to launch a processing recipe 

229 """ 

230 parameters.update({"ispyb_dcid": datacollectionid}) 

231 uuid = self.send_message({"recipes": [recipe], "parameters": parameters}) 

232 self.emit( 

233 "message", 

234 { 

235 "type": "start_recipe", 

236 "recipe": recipe, 

237 "datacollectionid": datacollectionid, 

238 "daiquiri_id": uuid, 

239 }, 

240 ) 

241 return uuid 

242 

243 def wait_for(self, uuid, timeout=60 * 5): 

244 start = time.time() 

245 while not self._messages_received.get(uuid): 

246 time.sleep(1) 

247 

248 if time.time() - start > timeout: 

249 logger.warning( 

250 f"Did not get response for uuid: {uuid} after timeout {timeout}s" 

251 ) 

252 return 

253 

254 return self._messages_received[uuid].get("message") 

255 

256 def send_event(self, datacollectionid, event): 

257 """Send a processing event 

258 

259 Helper function to send an event message 

260 """ 

261 if event not in ["start", "end"]: 

262 raise AttributeError(f"Unknown event {event}") 

263 

264 uuid = self.send_message( 

265 { 

266 "recipes": ["mimas"], 

267 "parameters": {"ispyb_dcid": datacollectionid, "event": event}, 

268 } 

269 ) 

270 

271 self.emit( 

272 "message", 

273 { 

274 "type": "event", 

275 "event": event, 

276 "datacollectionid": datacollectionid, 

277 "daiquiri_id": uuid, 

278 }, 

279 ) 

280 

281 return uuid