Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/hardware.py: 99%

159 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-15 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3import time 

4import gevent 

5from flask import g 

6from marshmallow import fields, ValidationError 

7import numpy 

8 

9from daiquiri.core import marshal, require_control 

10from daiquiri.core.logging import log 

11from daiquiri.core.components import Component, ComponentResource 

12from daiquiri.core.schema import ErrorSchema, ValidationErrorSchema 

13from daiquiri.core.schema.hardware import ( 

14 HardwareObjectBaseSchema, 

15 HardwareGroupSchema, 

16 HardwareTypeSchema, 

17 SetObjectProperty, 

18 CallObjectFunction, 

19) 

20from daiquiri.core.schema.metadata import paginated 

21 

22import logging 

23 

24logger = logging.getLogger(__name__) 

25 

26 

27class HardwaresResource(ComponentResource): 

28 @marshal( 

29 inp={ 

30 "type": fields.Str(metadata={"description": "Filter by a specific type"}), 

31 "group": fields.Str( 

32 metadata={"description": "Filter by a specific group name"} 

33 ), 

34 }, 

35 out=[[200, paginated(HardwareObjectBaseSchema), "List of object statuses"]], 

36 ) 

37 def get(self, **kwargs): 

38 """Get a list of all hardware statuses""" 

39 return ( 

40 self._parent.get_objects( 

41 group=kwargs.get("group", None), type=kwargs.get("type", None) 

42 ), 

43 200, 

44 ) 

45 

46 

47class HardwareResource(ComponentResource): 

48 @marshal( 

49 out=[ 

50 [200, HardwareObjectBaseSchema(), "A single object status"], 

51 [404, ErrorSchema(), "Object not found"], 

52 ] 

53 ) 

54 def get(self, id, **kwargs): 

55 """Get the status of a particular hardware object""" 

56 obj = self._parent.get_object(id) 

57 if obj: 

58 return obj.state(), 200 

59 else: 

60 return {"error": "No such object"}, 404 

61 

62 @require_control 

63 @marshal( 

64 inp=SetObjectProperty, 

65 out=[ 

66 [200, SetObjectProperty(), "Object property updated"], 

67 [400, ErrorSchema(), "Could not set property"], 

68 [422, ValidationErrorSchema(), "Invalid value"], 

69 [404, ErrorSchema(), "Object not found"], 

70 ], 

71 ) 

72 def put(self, id, **kwargs): 

73 """Update a property on a hardware object""" 

74 obj = self._parent.get_object(id) 

75 if obj: 

76 try: 

77 obj.set(kwargs["property"], kwargs["value"]) 

78 log.get("user").info( 

79 f"Requested property {kwargs['property']} change to {kwargs['value']} for {obj.id()}", 

80 type="hardware", 

81 ) 

82 return {"property": kwargs["property"], "value": kwargs["value"]}, 200 

83 except ValidationError as e: 

84 return {"messages": e.messages}, 422 

85 # To catch gevent.Timeout as well 

86 except BaseException as e: 

87 log.get("user").exception( 

88 f"Could not change property {kwargs['property']}: {str(e)} for {obj.id()}", 

89 type="hardware", 

90 ) 

91 return {"error": str(e)}, 400 

92 else: 

93 return {"error": "No such object"}, 404 

94 

95 @require_control 

96 @marshal( 

97 inp=CallObjectFunction, 

98 out=[ 

99 [200, CallObjectFunction(), "Object function called"], 

100 [400, ErrorSchema(), "Could not call function"], 

101 [422, ValidationErrorSchema(), "Invalid value"], 

102 [404, ErrorSchema(), "Object not found"], 

103 ], 

104 ) 

105 def post(self, id, **kwargs): 

106 """Call a function on a hardware object""" 

107 obj = self._parent.get_object(id) 

108 if obj: 

109 try: 

110 resp = obj.call(kwargs["function"], kwargs.get("value", None)) 

111 log.get("user").info( 

112 f"Requested function {kwargs['function']} with value {kwargs.get('value', None)} for {obj.id()}", 

113 type="hardware", 

114 ) 

115 return {"function": kwargs["function"], "response": resp}, 200 

116 except ValidationError as e: 

117 return {"messages": e.messages}, 422 

118 # To catch gevent.Timeout as well 

119 except BaseException as e: 

120 log.get("user").exception( 

121 f"Could not call function {kwargs['function']}: {e} for {obj.id()}", 

122 type="hardware", 

123 ) 

124 return {"error": "Could not call function: {err}".format(err=e)}, 400 

125 else: 

126 return {"error": "No such object"}, 404 

127 

128 

129class HardwareGroupsResource(ComponentResource): 

130 @marshal(out=[[200, paginated(HardwareGroupSchema), "List of hardware groups"]]) 

131 def get(self): 

132 """Get a list of the hardware object groups""" 

133 return self._parent.get_groups(), 200 

134 

135 

136class HardwareTypesResource(ComponentResource): 

137 @marshal(out=[[200, paginated(HardwareTypeSchema), "List of hardware types"]]) 

138 def get(self): 

139 """Get a list of the different hardware types in use""" 

140 return self._parent.get_types(), 200 

141 

142 

143class Hardware(Component): 

144 """The Hardware Feature 

145 

146 This makes all loaded hardware available via flask resources, properties can be changed 

147 via put, and functions called via post requests. 

148 

149 Hardware changes are notified via socketio events 

150 

151 """ 

152 

153 _config_export = ["monitor"] 

154 _last_value = {} 

155 

156 def subscribe(self): 

157 for o in self._hardware.get_objects(): 

158 o.subscribe("all", self._obj_param_change) 

159 o.subscribe_online(self._obj_online_change) 

160 

161 def setup(self): 

162 self._emit_timeout = {} 

163 self._last_emit_time = {} 

164 self._group_status_timeout = {} 

165 self._last_group_emit = {} 

166 self.subscribe() 

167 

168 self.register_route(HardwaresResource, "") 

169 self.register_route(HardwareResource, "/<string:id>") 

170 self.register_route(HardwareGroupsResource, "/groups") 

171 self.register_route(HardwareTypesResource, "/types") 

172 

173 def reload(self): 

174 self.subscribe() 

175 

176 def _ensure_last_val(self, obj, prop): 

177 """Ensure a last value 

178 

179 Args: 

180 obj (str): The object id 

181 prop (str): The property 

182 

183 Ensure a last value for obj/prop 

184 """ 

185 if obj not in self._last_value: 

186 self._last_value[obj] = {} 

187 if obj not in self._emit_timeout: 

188 self._emit_timeout[obj] = {} 

189 if obj not in self._last_emit_time: 

190 self._last_emit_time[obj] = {} 

191 

192 if prop not in self._last_value[obj]: 

193 self._last_value[obj][prop] = None 

194 if prop not in self._emit_timeout[obj]: 

195 self._emit_timeout[obj][prop] = None 

196 if prop not in self._last_emit_time[obj]: 

197 self._last_emit_time[obj][prop] = 0 

198 

199 if obj not in self._last_group_emit: 

200 self._last_group_emit[obj] = 0 

201 if obj not in self._group_status_timeout: 

202 self._group_status_timeout[obj] = None 

203 

204 def _obj_online_change(self, obj, value): 

205 """Hardware online callback 

206 

207 Emits socketio event on online change 

208 """ 

209 self._ensure_last_val(obj.id(), "online") 

210 

211 if self._last_value[obj.id()]["online"] != value: 

212 self.emit("online", {"id": obj.id(), "state": value}) 

213 self._queue_update_group_status(obj.id()) 

214 else: 

215 logger.debug(f"Duplicate update for {obj.id()}, online, {value}") 

216 

217 def _obj_param_change(self, obj, prop, value): 

218 """Hardware parameter change callback 

219 

220 Emits a socketio event on parameter change 

221 

222 Args: 

223 obj (obj): The hardware object whos parameter changed 

224 prop (str): The property that changed 

225 value (mixed): The new value 

226 """ 

227 self._ensure_last_val(obj.id(), prop) 

228 

229 if isinstance(value, numpy.ndarray): 

230 # the following compare with `_last_value` and the socketio event do not support numpy array 

231 if value.size > 10: 

232 # Warn in case you are about to transmit huge data by mistakes 

233 logging.warning( 

234 "Prop '%s' are about to transmit a numpy array (size %s) using a socketio event, with a slow conversion", 

235 prop, 

236 value.size, 

237 ) 

238 value = value.tolist() 

239 

240 if self._last_value[obj.id()][prop] != value: 

241 self._queue_emit_value(obj, prop, value) 

242 self._last_value[obj.id()][prop] = value 

243 

244 else: 

245 logger.debug(f"Duplicate update for {obj.id()}, {prop}, {value}") 

246 

247 def _queue_emit_value(self, obj, prop, value): 

248 if self._emit_timeout[obj.id()][prop] is not None: 

249 self._emit_timeout[obj.id()][prop].kill() 

250 self._emit_timeout[obj.id()][prop] = None 

251 

252 now = time.time() 

253 if now - self._last_emit_time[obj.id()][prop] > 0.2: 

254 self._emit_value(obj, prop, value) 

255 else: 

256 self._emit_timeout[obj.id()][prop] = gevent.spawn_later( 

257 0.2, self._emit_value, obj, prop, value 

258 ) 

259 

260 def _emit_value(self, obj, prop, value): 

261 data = {} 

262 data[prop] = value 

263 self.emit("change", {"id": obj.id(), "data": data}) 

264 self._last_emit_time[obj.id()][prop] = time.time() 

265 

266 if prop == "state": 

267 self._queue_update_group_status(obj.id()) 

268 

269 def _queue_update_group_status(self, objectid): 

270 if self._group_status_timeout[objectid] is not None: 

271 self._group_status_timeout[objectid].kill() 

272 self._group_status_timeout[objectid] = None 

273 

274 now = time.time() 

275 if now - self._last_group_emit[objectid] > 2: 

276 self.update_group_status(objectid) 

277 else: 

278 self._group_status_timeout[objectid] = gevent.spawn_later( 

279 2, self.update_group_status, objectid 

280 ) 

281 

282 def update_group_status(self, objectid): 

283 """ 

284 Emits a socketio event for the group(s) related to objectid 

285 """ 

286 for group in self._hardware.get_groups(objectid=objectid): 

287 self._ensure_last_val("group", group["groupid"]) 

288 

289 if self._last_value["group"][group["groupid"]] != group["state"]: 

290 self.emit( 

291 "group", 

292 {"groupid": group["groupid"], "data": {"state": group["state"]}}, 

293 ) 

294 self._last_value["group"][group["groupid"]] = group["state"] 

295 else: 

296 logger.debug( 

297 f"Duplicate update for group {group['groupid']}, {group['state']}" 

298 ) 

299 

300 self._last_group_emit[objectid] = time.time() 

301 

302 def get_objects(self, *args, **kwargs): 

303 """Get a list of all hardware object states""" 

304 objects = [o.state() for o in self._hardware.get_objects(*args, **kwargs)] 

305 return {"total": len(objects), "rows": objects} 

306 

307 def get_object(self, objectid): 

308 """Get a specific object 

309 

310 Args: 

311 objectid (str): The objects id 

312 

313 Returns: 

314 object (obj): The hardware object 

315 """ 

316 obj = self._hardware.get_object(objectid) 

317 if obj: 

318 if (obj.require_staff() and g.user.staff()) or not obj.require_staff(): 

319 return obj 

320 

321 def get_groups(self): 

322 """Get a list of the hardware groups""" 

323 groups = self._hardware.get_groups() 

324 return {"total": len(groups), "rows": groups} 

325 

326 def get_types(self): 

327 """Get a list of the hardware types and their schemas""" 

328 types = [ 

329 {"type": k, "schema": s} for k, s in self._hardware.get_types().items() 

330 ] 

331 return {"total": len(types), "rows": types}