Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/hardware.py: 95%

170 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-02-06 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3from __future__ import annotations 

4import time 

5import gevent 

6from flask import g 

7from marshmallow import fields, ValidationError 

8import numpy 

9 

10from daiquiri.core import marshal, require_control 

11from daiquiri.core.logging import log 

12from daiquiri.core.components import Component, ComponentResource 

13from daiquiri.core.schema import ErrorSchema, ValidationErrorSchema 

14from daiquiri.core.schema.hardware import ( 

15 HardwareObjectBaseSchema, 

16 HardwareGroupSchema, 

17 HardwareTypeSchema, 

18 SetObjectProperty, 

19 CallObjectFunction, 

20) 

21from daiquiri.core.schema.metadata import paginated 

22 

23import logging 

24 

25logger = logging.getLogger(__name__) 

26 

27 

28class HardwaresResource(ComponentResource): 

29 @marshal( 

30 inp={ 

31 "type": fields.Str(metadata={"description": "Filter by a specific type"}), 

32 "group": fields.Str( 

33 metadata={"description": "Filter by a specific group name"} 

34 ), 

35 }, 

36 out=[[200, paginated(HardwareObjectBaseSchema), "List of object statuses"]], 

37 ) 

38 def get(self, **kwargs): 

39 """Get a list of all hardware statuses""" 

40 return ( 

41 self._parent.get_objects( 

42 group=kwargs.get("group", None), type=kwargs.get("type", None) 

43 ), 

44 200, 

45 ) 

46 

47 

48class HardwareResource(ComponentResource): 

49 @marshal( 

50 out=[ 

51 [200, HardwareObjectBaseSchema(), "A single object status"], 

52 [404, ErrorSchema(), "Object not found"], 

53 ] 

54 ) 

55 def get(self, id, **kwargs): 

56 """Get the status of a particular hardware object""" 

57 obj = self._parent.get_object(id) 

58 if obj: 

59 return obj.state(), 200 

60 else: 

61 return {"error": "No such object"}, 404 

62 

63 @require_control 

64 @marshal( 

65 inp=SetObjectProperty, 

66 out=[ 

67 [200, SetObjectProperty(), "Object property updated"], 

68 [400, ErrorSchema(), "Could not set property"], 

69 [422, ValidationErrorSchema(), "Invalid value"], 

70 [404, ErrorSchema(), "Object not found"], 

71 ], 

72 ) 

73 def put(self, id, **kwargs): 

74 """Update a property on a hardware object""" 

75 obj = self._parent.get_object(id) 

76 if obj: 

77 try: 

78 obj.set(kwargs["property"], kwargs["value"]) 

79 log.get("user").info( 

80 f"Requested property {kwargs['property']} change to {kwargs['value']} for {obj.id()}", 

81 type="hardware", 

82 ) 

83 return {"property": kwargs["property"], "value": kwargs["value"]}, 200 

84 except ValidationError as e: 

85 return {"messages": e.messages}, 422 

86 # To catch gevent.Timeout as well 

87 except BaseException as e: 

88 log.get("user").exception( 

89 f"Could not change property {kwargs['property']}: {str(e)} for {obj.id()}", 

90 type="hardware", 

91 ) 

92 return {"error": str(e)}, 400 

93 else: 

94 return {"error": "No such object"}, 404 

95 

96 @require_control 

97 @marshal( 

98 inp=CallObjectFunction, 

99 out=[ 

100 [200, CallObjectFunction(), "Object function called"], 

101 [400, ErrorSchema(), "Could not call function"], 

102 [422, ValidationErrorSchema(), "Invalid value"], 

103 [404, ErrorSchema(), "Object not found"], 

104 ], 

105 ) 

106 def post(self, id, **kwargs): 

107 """Call a function on a hardware object""" 

108 obj = self._parent.get_object(id) 

109 if obj: 

110 try: 

111 resp = obj.call(kwargs["function"], kwargs.get("value", None)) 

112 log.get("user").info( 

113 f"Requested function {kwargs['function']} with value {kwargs.get('value', None)} for {obj.id()}", 

114 type="hardware", 

115 ) 

116 return {"function": kwargs["function"], "response": resp}, 200 

117 except ValidationError as e: 

118 return {"messages": e.messages}, 422 

119 # To catch gevent.Timeout as well 

120 except BaseException as e: 

121 log.get("user").exception( 

122 f"Could not call function {kwargs['function']}: {e} for {obj.id()}", 

123 type="hardware", 

124 ) 

125 return {"error": "Could not call function: {err}".format(err=e)}, 400 

126 else: 

127 return {"error": "No such object"}, 404 

128 

129 

130class HardwareGroupsResource(ComponentResource): 

131 @marshal(out=[[200, paginated(HardwareGroupSchema), "List of hardware groups"]]) 

132 def get(self): 

133 """Get a list of the hardware object groups""" 

134 return self._parent.get_groups(), 200 

135 

136 

137class HardwareTypesResource(ComponentResource): 

138 @marshal(out=[[200, paginated(HardwareTypeSchema), "List of hardware types"]]) 

139 def get(self): 

140 """Get a list of the different hardware types in use""" 

141 return self._parent.get_types(), 200 

142 

143 

144class Hardware(Component): 

145 """The Hardware Feature 

146 

147 This makes all loaded hardware available via flask resources, properties can be changed 

148 via put, and functions called via post requests. 

149 

150 Hardware changes are notified via socketio events 

151 

152 """ 

153 

154 _config_export = ["monitor"] 

155 _last_value = {} 

156 

157 def subscribe(self): 

158 for o in self._hardware.get_objects(): 

159 o.subscribe("all", self._obj_param_change) 

160 o.subscribe_online(self._obj_online_change) 

161 o.subscribe_locked(self._obj_locked_change) 

162 

163 def setup(self): 

164 self._emit_timeout = {} 

165 self._last_emit_time = {} 

166 self._group_status_timeout = {} 

167 self._last_group_emit = {} 

168 self.subscribe() 

169 

170 self.register_route(HardwaresResource, "") 

171 self.register_route(HardwareResource, "/<string:id>") 

172 self.register_route(HardwareGroupsResource, "/groups") 

173 self.register_route(HardwareTypesResource, "/types") 

174 

175 def reload(self): 

176 self.subscribe() 

177 

178 def _ensure_last_val(self, obj, prop): 

179 """Ensure a last value 

180 

181 Args: 

182 obj (str): The object id 

183 prop (str): The property 

184 

185 Ensure a last value for obj/prop 

186 """ 

187 if obj not in self._last_value: 

188 self._last_value[obj] = {} 

189 if obj not in self._emit_timeout: 

190 self._emit_timeout[obj] = {} 

191 if obj not in self._last_emit_time: 

192 self._last_emit_time[obj] = {} 

193 

194 if prop not in self._last_value[obj]: 

195 self._last_value[obj][prop] = None 

196 if prop not in self._emit_timeout[obj]: 

197 self._emit_timeout[obj][prop] = None 

198 if prop not in self._last_emit_time[obj]: 

199 self._last_emit_time[obj][prop] = 0 

200 

201 if obj not in self._last_group_emit: 

202 self._last_group_emit[obj] = 0 

203 if obj not in self._group_status_timeout: 

204 self._group_status_timeout[obj] = None 

205 

206 def _obj_online_change(self, obj, value): 

207 """Hardware online callback 

208 

209 Emits socketio event on online change 

210 """ 

211 self._ensure_last_val(obj.id(), "_online") 

212 cache = self._last_value[obj.id()] 

213 if cache["_online"] != value: 

214 cache["_online"] = value 

215 self.emit("online", {"id": obj.id(), "state": value}) 

216 self._queue_update_group_status(obj.id()) 

217 else: 

218 logger.debug(f"Duplicate update for {obj.id()}, online, {value}") 

219 

220 def _obj_locked_change(self, obj, reason: str | None): 

221 """Hardware locked callback 

222 

223 Emits socketio event on lock change 

224 """ 

225 self._ensure_last_val(obj.id(), "_locked") 

226 cache = self._last_value[obj.id()] 

227 if cache["_locked"] != reason: 

228 cache["_locked"] = reason 

229 self.emit("locked", {"id": obj.id(), "state": reason}) 

230 else: 

231 logger.debug(f"Duplicate update for {obj.id()}, locked, {reason}") 

232 

233 def _obj_param_change(self, obj, prop, value): 

234 """Hardware parameter change callback 

235 

236 Emits a socketio event on parameter change 

237 

238 Args: 

239 obj (obj): The hardware object whos parameter changed 

240 prop (str): The property that changed 

241 value (mixed): The new value 

242 """ 

243 self._ensure_last_val(obj.id(), prop) 

244 

245 if isinstance(value, numpy.ndarray): 

246 # the following compare with `_last_value` and the socketio event do not support numpy array 

247 if value.size > 10: 

248 # Warn in case you are about to transmit huge data by mistakes 

249 logging.warning( 

250 "Prop '%s' are about to transmit a numpy array (size %s) using a socketio event, with a slow conversion", 

251 prop, 

252 value.size, 

253 ) 

254 value = value.tolist() 

255 

256 if self._last_value[obj.id()][prop] != value: 

257 self._queue_emit_value(obj, prop, value) 

258 self._last_value[obj.id()][prop] = value 

259 

260 else: 

261 logger.debug(f"Duplicate update for {obj.id()}, {prop}, {value}") 

262 

263 def _queue_emit_value(self, obj, prop, value): 

264 if self._emit_timeout[obj.id()][prop] is not None: 

265 self._emit_timeout[obj.id()][prop].kill() 

266 self._emit_timeout[obj.id()][prop] = None 

267 

268 now = time.time() 

269 if now - self._last_emit_time[obj.id()][prop] > 0.2: 

270 self._emit_value(obj, prop, value) 

271 else: 

272 self._emit_timeout[obj.id()][prop] = gevent.spawn_later( 

273 0.2, self._emit_value, obj, prop, value 

274 ) 

275 

276 def _emit_value(self, obj, prop, value): 

277 data = {} 

278 data[prop] = value 

279 self.emit("change", {"id": obj.id(), "data": data}) 

280 self._last_emit_time[obj.id()][prop] = time.time() 

281 

282 if prop == "state": 

283 self._queue_update_group_status(obj.id()) 

284 

285 def _queue_update_group_status(self, objectid): 

286 if self._group_status_timeout[objectid] is not None: 

287 self._group_status_timeout[objectid].kill() 

288 self._group_status_timeout[objectid] = None 

289 

290 now = time.time() 

291 if now - self._last_group_emit[objectid] > 2: 

292 self.update_group_status(objectid) 

293 else: 

294 self._group_status_timeout[objectid] = gevent.spawn_later( 

295 2, self.update_group_status, objectid 

296 ) 

297 

298 def update_group_status(self, objectid): 

299 """ 

300 Emits a socketio event for the group(s) related to objectid 

301 """ 

302 for group in self._hardware.get_groups(objectid=objectid): 

303 self._ensure_last_val("group", group["groupid"]) 

304 

305 if self._last_value["group"][group["groupid"]] != group["state"]: 

306 self.emit( 

307 "group", 

308 {"groupid": group["groupid"], "data": {"state": group["state"]}}, 

309 ) 

310 self._last_value["group"][group["groupid"]] = group["state"] 

311 else: 

312 logger.debug( 

313 f"Duplicate update for group {group['groupid']}, {group['state']}" 

314 ) 

315 

316 self._last_group_emit[objectid] = time.time() 

317 

318 def get_objects(self, *args, **kwargs): 

319 """Get a list of all hardware object states""" 

320 objects = [o.state() for o in self._hardware.get_objects(*args, **kwargs)] 

321 return {"total": len(objects), "rows": objects} 

322 

323 def get_object(self, objectid): 

324 """Get a specific object 

325 

326 Args: 

327 objectid (str): The objects id 

328 

329 Returns: 

330 object (obj): The hardware object 

331 """ 

332 obj = self._hardware.get_object(objectid) 

333 if obj: 

334 if (obj.require_staff() and g.user.staff()) or not obj.require_staff(): 

335 return obj 

336 

337 def get_groups(self): 

338 """Get a list of the hardware groups""" 

339 groups = self._hardware.get_groups() 

340 return {"total": len(groups), "rows": groups} 

341 

342 def get_types(self): 

343 """Get a list of the hardware types and their schemas""" 

344 types = [ 

345 {"type": k, "schema": s} for k, s in self._hardware.get_types().items() 

346 ] 

347 return {"total": len(types), "rows": types}