Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/hardware.py: 95%
170 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-06 02:13 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-06 02:13 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from __future__ import annotations
4import time
5import gevent
6from flask import g
7from marshmallow import fields, ValidationError
8import numpy
10from daiquiri.core import marshal, require_control
11from daiquiri.core.logging import log
12from daiquiri.core.components import Component, ComponentResource
13from daiquiri.core.schema import ErrorSchema, ValidationErrorSchema
14from daiquiri.core.schema.hardware import (
15 HardwareObjectBaseSchema,
16 HardwareGroupSchema,
17 HardwareTypeSchema,
18 SetObjectProperty,
19 CallObjectFunction,
20)
21from daiquiri.core.schema.metadata import paginated
23import logging
25logger = logging.getLogger(__name__)
28class HardwaresResource(ComponentResource):
29 @marshal(
30 inp={
31 "type": fields.Str(metadata={"description": "Filter by a specific type"}),
32 "group": fields.Str(
33 metadata={"description": "Filter by a specific group name"}
34 ),
35 },
36 out=[[200, paginated(HardwareObjectBaseSchema), "List of object statuses"]],
37 )
38 def get(self, **kwargs):
39 """Get a list of all hardware statuses"""
40 return (
41 self._parent.get_objects(
42 group=kwargs.get("group", None), type=kwargs.get("type", None)
43 ),
44 200,
45 )
48class HardwareResource(ComponentResource):
49 @marshal(
50 out=[
51 [200, HardwareObjectBaseSchema(), "A single object status"],
52 [404, ErrorSchema(), "Object not found"],
53 ]
54 )
55 def get(self, id, **kwargs):
56 """Get the status of a particular hardware object"""
57 obj = self._parent.get_object(id)
58 if obj:
59 return obj.state(), 200
60 else:
61 return {"error": "No such object"}, 404
63 @require_control
64 @marshal(
65 inp=SetObjectProperty,
66 out=[
67 [200, SetObjectProperty(), "Object property updated"],
68 [400, ErrorSchema(), "Could not set property"],
69 [422, ValidationErrorSchema(), "Invalid value"],
70 [404, ErrorSchema(), "Object not found"],
71 ],
72 )
73 def put(self, id, **kwargs):
74 """Update a property on a hardware object"""
75 obj = self._parent.get_object(id)
76 if obj:
77 try:
78 obj.set(kwargs["property"], kwargs["value"])
79 log.get("user").info(
80 f"Requested property {kwargs['property']} change to {kwargs['value']} for {obj.id()}",
81 type="hardware",
82 )
83 return {"property": kwargs["property"], "value": kwargs["value"]}, 200
84 except ValidationError as e:
85 return {"messages": e.messages}, 422
86 # To catch gevent.Timeout as well
87 except BaseException as e:
88 log.get("user").exception(
89 f"Could not change property {kwargs['property']}: {str(e)} for {obj.id()}",
90 type="hardware",
91 )
92 return {"error": str(e)}, 400
93 else:
94 return {"error": "No such object"}, 404
96 @require_control
97 @marshal(
98 inp=CallObjectFunction,
99 out=[
100 [200, CallObjectFunction(), "Object function called"],
101 [400, ErrorSchema(), "Could not call function"],
102 [422, ValidationErrorSchema(), "Invalid value"],
103 [404, ErrorSchema(), "Object not found"],
104 ],
105 )
106 def post(self, id, **kwargs):
107 """Call a function on a hardware object"""
108 obj = self._parent.get_object(id)
109 if obj:
110 try:
111 resp = obj.call(kwargs["function"], kwargs.get("value", None))
112 log.get("user").info(
113 f"Requested function {kwargs['function']} with value {kwargs.get('value', None)} for {obj.id()}",
114 type="hardware",
115 )
116 return {"function": kwargs["function"], "response": resp}, 200
117 except ValidationError as e:
118 return {"messages": e.messages}, 422
119 # To catch gevent.Timeout as well
120 except BaseException as e:
121 log.get("user").exception(
122 f"Could not call function {kwargs['function']}: {e} for {obj.id()}",
123 type="hardware",
124 )
125 return {"error": "Could not call function: {err}".format(err=e)}, 400
126 else:
127 return {"error": "No such object"}, 404
130class HardwareGroupsResource(ComponentResource):
131 @marshal(out=[[200, paginated(HardwareGroupSchema), "List of hardware groups"]])
132 def get(self):
133 """Get a list of the hardware object groups"""
134 return self._parent.get_groups(), 200
137class HardwareTypesResource(ComponentResource):
138 @marshal(out=[[200, paginated(HardwareTypeSchema), "List of hardware types"]])
139 def get(self):
140 """Get a list of the different hardware types in use"""
141 return self._parent.get_types(), 200
144class Hardware(Component):
145 """The Hardware Feature
147 This makes all loaded hardware available via flask resources, properties can be changed
148 via put, and functions called via post requests.
150 Hardware changes are notified via socketio events
152 """
154 _config_export = ["monitor"]
155 _last_value = {}
157 def subscribe(self):
158 for o in self._hardware.get_objects():
159 o.subscribe("all", self._obj_param_change)
160 o.subscribe_online(self._obj_online_change)
161 o.subscribe_locked(self._obj_locked_change)
163 def setup(self):
164 self._emit_timeout = {}
165 self._last_emit_time = {}
166 self._group_status_timeout = {}
167 self._last_group_emit = {}
168 self.subscribe()
170 self.register_route(HardwaresResource, "")
171 self.register_route(HardwareResource, "/<string:id>")
172 self.register_route(HardwareGroupsResource, "/groups")
173 self.register_route(HardwareTypesResource, "/types")
175 def reload(self):
176 self.subscribe()
178 def _ensure_last_val(self, obj, prop):
179 """Ensure a last value
181 Args:
182 obj (str): The object id
183 prop (str): The property
185 Ensure a last value for obj/prop
186 """
187 if obj not in self._last_value:
188 self._last_value[obj] = {}
189 if obj not in self._emit_timeout:
190 self._emit_timeout[obj] = {}
191 if obj not in self._last_emit_time:
192 self._last_emit_time[obj] = {}
194 if prop not in self._last_value[obj]:
195 self._last_value[obj][prop] = None
196 if prop not in self._emit_timeout[obj]:
197 self._emit_timeout[obj][prop] = None
198 if prop not in self._last_emit_time[obj]:
199 self._last_emit_time[obj][prop] = 0
201 if obj not in self._last_group_emit:
202 self._last_group_emit[obj] = 0
203 if obj not in self._group_status_timeout:
204 self._group_status_timeout[obj] = None
206 def _obj_online_change(self, obj, value):
207 """Hardware online callback
209 Emits socketio event on online change
210 """
211 self._ensure_last_val(obj.id(), "_online")
212 cache = self._last_value[obj.id()]
213 if cache["_online"] != value:
214 cache["_online"] = value
215 self.emit("online", {"id": obj.id(), "state": value})
216 self._queue_update_group_status(obj.id())
217 else:
218 logger.debug(f"Duplicate update for {obj.id()}, online, {value}")
220 def _obj_locked_change(self, obj, reason: str | None):
221 """Hardware locked callback
223 Emits socketio event on lock change
224 """
225 self._ensure_last_val(obj.id(), "_locked")
226 cache = self._last_value[obj.id()]
227 if cache["_locked"] != reason:
228 cache["_locked"] = reason
229 self.emit("locked", {"id": obj.id(), "state": reason})
230 else:
231 logger.debug(f"Duplicate update for {obj.id()}, locked, {reason}")
233 def _obj_param_change(self, obj, prop, value):
234 """Hardware parameter change callback
236 Emits a socketio event on parameter change
238 Args:
239 obj (obj): The hardware object whos parameter changed
240 prop (str): The property that changed
241 value (mixed): The new value
242 """
243 self._ensure_last_val(obj.id(), prop)
245 if isinstance(value, numpy.ndarray):
246 # the following compare with `_last_value` and the socketio event do not support numpy array
247 if value.size > 10:
248 # Warn in case you are about to transmit huge data by mistakes
249 logging.warning(
250 "Prop '%s' are about to transmit a numpy array (size %s) using a socketio event, with a slow conversion",
251 prop,
252 value.size,
253 )
254 value = value.tolist()
256 if self._last_value[obj.id()][prop] != value:
257 self._queue_emit_value(obj, prop, value)
258 self._last_value[obj.id()][prop] = value
260 else:
261 logger.debug(f"Duplicate update for {obj.id()}, {prop}, {value}")
263 def _queue_emit_value(self, obj, prop, value):
264 if self._emit_timeout[obj.id()][prop] is not None:
265 self._emit_timeout[obj.id()][prop].kill()
266 self._emit_timeout[obj.id()][prop] = None
268 now = time.time()
269 if now - self._last_emit_time[obj.id()][prop] > 0.2:
270 self._emit_value(obj, prop, value)
271 else:
272 self._emit_timeout[obj.id()][prop] = gevent.spawn_later(
273 0.2, self._emit_value, obj, prop, value
274 )
276 def _emit_value(self, obj, prop, value):
277 data = {}
278 data[prop] = value
279 self.emit("change", {"id": obj.id(), "data": data})
280 self._last_emit_time[obj.id()][prop] = time.time()
282 if prop == "state":
283 self._queue_update_group_status(obj.id())
285 def _queue_update_group_status(self, objectid):
286 if self._group_status_timeout[objectid] is not None:
287 self._group_status_timeout[objectid].kill()
288 self._group_status_timeout[objectid] = None
290 now = time.time()
291 if now - self._last_group_emit[objectid] > 2:
292 self.update_group_status(objectid)
293 else:
294 self._group_status_timeout[objectid] = gevent.spawn_later(
295 2, self.update_group_status, objectid
296 )
298 def update_group_status(self, objectid):
299 """
300 Emits a socketio event for the group(s) related to objectid
301 """
302 for group in self._hardware.get_groups(objectid=objectid):
303 self._ensure_last_val("group", group["groupid"])
305 if self._last_value["group"][group["groupid"]] != group["state"]:
306 self.emit(
307 "group",
308 {"groupid": group["groupid"], "data": {"state": group["state"]}},
309 )
310 self._last_value["group"][group["groupid"]] = group["state"]
311 else:
312 logger.debug(
313 f"Duplicate update for group {group['groupid']}, {group['state']}"
314 )
316 self._last_group_emit[objectid] = time.time()
318 def get_objects(self, *args, **kwargs):
319 """Get a list of all hardware object states"""
320 objects = [o.state() for o in self._hardware.get_objects(*args, **kwargs)]
321 return {"total": len(objects), "rows": objects}
323 def get_object(self, objectid):
324 """Get a specific object
326 Args:
327 objectid (str): The objects id
329 Returns:
330 object (obj): The hardware object
331 """
332 obj = self._hardware.get_object(objectid)
333 if obj:
334 if (obj.require_staff() and g.user.staff()) or not obj.require_staff():
335 return obj
337 def get_groups(self):
338 """Get a list of the hardware groups"""
339 groups = self._hardware.get_groups()
340 return {"total": len(groups), "rows": groups}
342 def get_types(self):
343 """Get a list of the hardware types and their schemas"""
344 types = [
345 {"type": k, "schema": s} for k, s in self._hardware.get_types().items()
346 ]
347 return {"total": len(types), "rows": types}