Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/hardware.py: 99%
159 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import time
4import gevent
5from flask import g
6from marshmallow import fields, ValidationError
7import numpy
9from daiquiri.core import marshal, require_control
10from daiquiri.core.logging import log
11from daiquiri.core.components import Component, ComponentResource
12from daiquiri.core.schema import ErrorSchema, ValidationErrorSchema
13from daiquiri.core.schema.hardware import (
14 HardwareObjectBaseSchema,
15 HardwareGroupSchema,
16 HardwareTypeSchema,
17 SetObjectProperty,
18 CallObjectFunction,
19)
20from daiquiri.core.schema.metadata import paginated
22import logging
24logger = logging.getLogger(__name__)
27class HardwaresResource(ComponentResource):
28 @marshal(
29 inp={
30 "type": fields.Str(metadata={"description": "Filter by a specific type"}),
31 "group": fields.Str(
32 metadata={"description": "Filter by a specific group name"}
33 ),
34 },
35 out=[[200, paginated(HardwareObjectBaseSchema), "List of object statuses"]],
36 )
37 def get(self, **kwargs):
38 """Get a list of all hardware statuses"""
39 return (
40 self._parent.get_objects(
41 group=kwargs.get("group", None), type=kwargs.get("type", None)
42 ),
43 200,
44 )
47class HardwareResource(ComponentResource):
48 @marshal(
49 out=[
50 [200, HardwareObjectBaseSchema(), "A single object status"],
51 [404, ErrorSchema(), "Object not found"],
52 ]
53 )
54 def get(self, id, **kwargs):
55 """Get the status of a particular hardware object"""
56 obj = self._parent.get_object(id)
57 if obj:
58 return obj.state(), 200
59 else:
60 return {"error": "No such object"}, 404
62 @require_control
63 @marshal(
64 inp=SetObjectProperty,
65 out=[
66 [200, SetObjectProperty(), "Object property updated"],
67 [400, ErrorSchema(), "Could not set property"],
68 [422, ValidationErrorSchema(), "Invalid value"],
69 [404, ErrorSchema(), "Object not found"],
70 ],
71 )
72 def put(self, id, **kwargs):
73 """Update a property on a hardware object"""
74 obj = self._parent.get_object(id)
75 if obj:
76 try:
77 obj.set(kwargs["property"], kwargs["value"])
78 log.get("user").info(
79 f"Requested property {kwargs['property']} change to {kwargs['value']} for {obj.id()}",
80 type="hardware",
81 )
82 return {"property": kwargs["property"], "value": kwargs["value"]}, 200
83 except ValidationError as e:
84 return {"messages": e.messages}, 422
85 # To catch gevent.Timeout as well
86 except BaseException as e:
87 log.get("user").exception(
88 f"Could not change property {kwargs['property']}: {str(e)} for {obj.id()}",
89 type="hardware",
90 )
91 return {"error": str(e)}, 400
92 else:
93 return {"error": "No such object"}, 404
95 @require_control
96 @marshal(
97 inp=CallObjectFunction,
98 out=[
99 [200, CallObjectFunction(), "Object function called"],
100 [400, ErrorSchema(), "Could not call function"],
101 [422, ValidationErrorSchema(), "Invalid value"],
102 [404, ErrorSchema(), "Object not found"],
103 ],
104 )
105 def post(self, id, **kwargs):
106 """Call a function on a hardware object"""
107 obj = self._parent.get_object(id)
108 if obj:
109 try:
110 resp = obj.call(kwargs["function"], kwargs.get("value", None))
111 log.get("user").info(
112 f"Requested function {kwargs['function']} with value {kwargs.get('value', None)} for {obj.id()}",
113 type="hardware",
114 )
115 return {"function": kwargs["function"], "response": resp}, 200
116 except ValidationError as e:
117 return {"messages": e.messages}, 422
118 # To catch gevent.Timeout as well
119 except BaseException as e:
120 log.get("user").exception(
121 f"Could not call function {kwargs['function']}: {e} for {obj.id()}",
122 type="hardware",
123 )
124 return {"error": "Could not call function: {err}".format(err=e)}, 400
125 else:
126 return {"error": "No such object"}, 404
129class HardwareGroupsResource(ComponentResource):
130 @marshal(out=[[200, paginated(HardwareGroupSchema), "List of hardware groups"]])
131 def get(self):
132 """Get a list of the hardware object groups"""
133 return self._parent.get_groups(), 200
136class HardwareTypesResource(ComponentResource):
137 @marshal(out=[[200, paginated(HardwareTypeSchema), "List of hardware types"]])
138 def get(self):
139 """Get a list of the different hardware types in use"""
140 return self._parent.get_types(), 200
143class Hardware(Component):
144 """The Hardware Feature
146 This makes all loaded hardware available via flask resources, properties can be changed
147 via put, and functions called via post requests.
149 Hardware changes are notified via socketio events
151 """
153 _config_export = ["monitor"]
154 _last_value = {}
156 def subscribe(self):
157 for o in self._hardware.get_objects():
158 o.subscribe("all", self._obj_param_change)
159 o.subscribe_online(self._obj_online_change)
161 def setup(self):
162 self._emit_timeout = {}
163 self._last_emit_time = {}
164 self._group_status_timeout = {}
165 self._last_group_emit = {}
166 self.subscribe()
168 self.register_route(HardwaresResource, "")
169 self.register_route(HardwareResource, "/<string:id>")
170 self.register_route(HardwareGroupsResource, "/groups")
171 self.register_route(HardwareTypesResource, "/types")
173 def reload(self):
174 self.subscribe()
176 def _ensure_last_val(self, obj, prop):
177 """Ensure a last value
179 Args:
180 obj (str): The object id
181 prop (str): The property
183 Ensure a last value for obj/prop
184 """
185 if obj not in self._last_value:
186 self._last_value[obj] = {}
187 if obj not in self._emit_timeout:
188 self._emit_timeout[obj] = {}
189 if obj not in self._last_emit_time:
190 self._last_emit_time[obj] = {}
192 if prop not in self._last_value[obj]:
193 self._last_value[obj][prop] = None
194 if prop not in self._emit_timeout[obj]:
195 self._emit_timeout[obj][prop] = None
196 if prop not in self._last_emit_time[obj]:
197 self._last_emit_time[obj][prop] = 0
199 if obj not in self._last_group_emit:
200 self._last_group_emit[obj] = 0
201 if obj not in self._group_status_timeout:
202 self._group_status_timeout[obj] = None
204 def _obj_online_change(self, obj, value):
205 """Hardware online callback
207 Emits socketio event on online change
208 """
209 self._ensure_last_val(obj.id(), "online")
211 if self._last_value[obj.id()]["online"] != value:
212 self.emit("online", {"id": obj.id(), "state": value})
213 self._queue_update_group_status(obj.id())
214 else:
215 logger.debug(f"Duplicate update for {obj.id()}, online, {value}")
217 def _obj_param_change(self, obj, prop, value):
218 """Hardware parameter change callback
220 Emits a socketio event on parameter change
222 Args:
223 obj (obj): The hardware object whos parameter changed
224 prop (str): The property that changed
225 value (mixed): The new value
226 """
227 self._ensure_last_val(obj.id(), prop)
229 if isinstance(value, numpy.ndarray):
230 # the following compare with `_last_value` and the socketio event do not support numpy array
231 if value.size > 10:
232 # Warn in case you are about to transmit huge data by mistakes
233 logging.warning(
234 "Prop '%s' are about to transmit a numpy array (size %s) using a socketio event, with a slow conversion",
235 prop,
236 value.size,
237 )
238 value = value.tolist()
240 if self._last_value[obj.id()][prop] != value:
241 self._queue_emit_value(obj, prop, value)
242 self._last_value[obj.id()][prop] = value
244 else:
245 logger.debug(f"Duplicate update for {obj.id()}, {prop}, {value}")
247 def _queue_emit_value(self, obj, prop, value):
248 if self._emit_timeout[obj.id()][prop] is not None:
249 self._emit_timeout[obj.id()][prop].kill()
250 self._emit_timeout[obj.id()][prop] = None
252 now = time.time()
253 if now - self._last_emit_time[obj.id()][prop] > 0.2:
254 self._emit_value(obj, prop, value)
255 else:
256 self._emit_timeout[obj.id()][prop] = gevent.spawn_later(
257 0.2, self._emit_value, obj, prop, value
258 )
260 def _emit_value(self, obj, prop, value):
261 data = {}
262 data[prop] = value
263 self.emit("change", {"id": obj.id(), "data": data})
264 self._last_emit_time[obj.id()][prop] = time.time()
266 if prop == "state":
267 self._queue_update_group_status(obj.id())
269 def _queue_update_group_status(self, objectid):
270 if self._group_status_timeout[objectid] is not None:
271 self._group_status_timeout[objectid].kill()
272 self._group_status_timeout[objectid] = None
274 now = time.time()
275 if now - self._last_group_emit[objectid] > 2:
276 self.update_group_status(objectid)
277 else:
278 self._group_status_timeout[objectid] = gevent.spawn_later(
279 2, self.update_group_status, objectid
280 )
282 def update_group_status(self, objectid):
283 """
284 Emits a socketio event for the group(s) related to objectid
285 """
286 for group in self._hardware.get_groups(objectid=objectid):
287 self._ensure_last_val("group", group["groupid"])
289 if self._last_value["group"][group["groupid"]] != group["state"]:
290 self.emit(
291 "group",
292 {"groupid": group["groupid"], "data": {"state": group["state"]}},
293 )
294 self._last_value["group"][group["groupid"]] = group["state"]
295 else:
296 logger.debug(
297 f"Duplicate update for group {group['groupid']}, {group['state']}"
298 )
300 self._last_group_emit[objectid] = time.time()
302 def get_objects(self, *args, **kwargs):
303 """Get a list of all hardware object states"""
304 objects = [o.state() for o in self._hardware.get_objects(*args, **kwargs)]
305 return {"total": len(objects), "rows": objects}
307 def get_object(self, objectid):
308 """Get a specific object
310 Args:
311 objectid (str): The objects id
313 Returns:
314 object (obj): The hardware object
315 """
316 obj = self._hardware.get_object(objectid)
317 if obj:
318 if (obj.require_staff() and g.user.staff()) or not obj.require_staff():
319 return obj
321 def get_groups(self):
322 """Get a list of the hardware groups"""
323 groups = self._hardware.get_groups()
324 return {"total": len(groups), "rows": groups}
326 def get_types(self):
327 """Get a list of the hardware types and their schemas"""
328 types = [
329 {"type": k, "schema": s} for k, s in self._hardware.get_types().items()
330 ]
331 return {"total": len(types), "rows": types}