Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/abstract/__init__.py: 90%
251 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from __future__ import annotations
4from abc import ABC, abstractmethod
5from marshmallow import fields
6import traceback
7import typing
8import functools
10from daiquiri.core.schema.hardware import HardwareSchema, HardwareObjectBaseSchema
13import logging
15logger = logging.getLogger(__name__)
18class ProtocolHandler:
19 """A protocol handler instantiates a hardware object from a specific protocol
21 i.e. initialise a motor object from bliss, or a shutter from tango
22 """
24 library = None
26 def __init__(self, *args, **kwargs):
27 self._app = kwargs.get("app")
29 def disconnect(self):
30 """Called at the termination of the handler.
32 Implement it if the protocol hold resources.
33 """
34 pass
36 @abstractmethod
37 def get(self, *args, **kwargs):
38 """Get the specific object from the protocol handler.
40 This function checks that kwargs conforms to Schema defined for the
41 protocol handler (see core/hardware/bliss/__init__.py for a concrete example)
43 Returns:
44 The hardware object instance for the specific protocol
46 """
47 pass
50class AbstractHardwareProperty2(ABC):
51 """
52 Implement an automomous property.
53 """
55 def __init__(self, parent: MappedHardwareObject):
56 self._object = parent
57 self.__update = None
59 def _connect_update(self, callback):
60 """Called by daiquiri at the initialization to be informed by changes"""
61 assert self.__update is None
62 self.__update = callback
64 def emit_update(self, value):
65 """To be called by the property itself when the hardware property have changed"""
66 update = self.__update
67 if update is not None:
68 update(value)
70 @abstractmethod
71 def read_hardware(self, obj):
72 """Read the value from the subsystem"""
73 ...
75 @abstractmethod
76 def write_hardware(self, obj, value):
77 """Write the value to the subsystem"""
78 ...
80 @abstractmethod
81 def connect_hardware(self, obj):
82 """Do the connection of this property to the subsystem"""
83 ...
86class HardwareProperty:
87 """Describe a property from the device controls system.
89 Provides translation function between the controls system specific
90 nomenclature and the abstract one. This can be overwritten.
92 Arguments:
93 name: Name used by the controls system
94 """
96 def __init__(self, name: str, getter: callable = None):
97 self._name = name
98 self._getter = getter
100 @property
101 def name(self) -> str:
102 return self._name
104 @property
105 def getter(self) -> typing.Optional[callable]:
106 return self._getter
108 def translate_from(self, value):
109 """Translate the value from the controls layer to the abstraction layer
110 i.e for getters
112 Arguments:
113 value: The property value to translate.
115 Returns:
116 The translated value
117 """
118 return value
120 def translate_to(self, value):
121 """Translate the value to the controls layer from the abstraction layer
122 i.e for setters
124 Arguments:
125 value: The property value to translate.
127 Returns:
128 The translated value
129 """
130 return value
133class HardwareObject(ABC):
134 """Base HardwareObject from which all inherit
136 The base hardware object defines the objects procotol, type, its properties
137 and callables schema, and mechanisms to subscribe to property changes
139 Attributes:
140 _object (obj): The instance of the control system object.
141 _protocol (str): The protocol for this object, i.e. bliss
142 _type (str): The object type, i.e. motor, shutter
144 """
146 _object = None
147 _online = False
148 _state_ok = []
150 _protocol = None
151 _type = None
153 _properties = HardwareSchema()
154 _callables = HardwareSchema()
156 def schema_name(self):
157 ty = self._type
158 return ty[0].upper() + ty[1:]
160 def __init__(self, *args, **kwargs):
161 self._callbacks = {}
162 self._online_callbacks = []
164 self._id = kwargs.get("id", None)
165 self._name = kwargs.get("name", "")
166 self._require_staff = kwargs.get("require_staff", False)
167 self._alias = None
168 self._user_tags = []
170 def get(self, prop: str):
171 """Get a property from a hardware object
173 First checks the property is defined in the objects
174 property schema, then delegates to the local getter
175 implementation _get
177 Arguments:
178 prop: The property to retreive.
180 Returns:
181 The property value if the property exists, otherwise
182 rasises an exception
184 """
185 if not self._online:
186 return
188 if prop in self._properties:
189 value = self._get(prop)
190 return value
191 else:
192 raise KeyError("Unknown property {p}".format(p=prop))
194 def set(self, prop: str, value):
195 """Set a property on a hardware object
197 First checks the property is defined in the objects
198 property schema, then delegates to the local setter
199 implementation _set
201 Arguments:
202 prop: The property to set.
203 value: The property to set.
205 Returns:
206 The the result from the object setter if the property exists
207 otherwise raises an exception
209 """
210 if not self._online:
211 return
213 if prop in self._properties:
214 if self._properties.read_only(prop):
215 raise AttributeError("Property {p} is read only".format(p=prop))
217 value = self._properties.validate(prop, value)
218 return self._set(prop, value)
219 else:
220 raise KeyError("Unknown property {p}".format(p=prop))
222 def get_subobject_configs(self):
223 """Returns a list of referenced objects own by this object."""
224 return []
226 def call(self, function, value, **kwargs):
227 """Calls a function on a hardware object
229 First checks the function is defined in the objects
230 callables schema, then delegates to the local call
231 implementation _call
233 Args:
234 function (str): The function to call.
235 value: The value to call the function with.
237 Returns:
238 The the result from the object function if the function exists
239 otherwise rasises an exception
241 """
242 if not self._online:
243 return
245 if function in self._callables:
246 value = self._callables.validate(function, value)
247 # try:
248 ret = self._call(function, value, **kwargs)
249 if ret:
250 return ret
251 else:
252 return True
253 # except Exception as e:
254 # return e
255 else:
256 raise KeyError("Unknown function {fn}".format(fn=function))
258 @abstractmethod
259 def _get(self, prop: str):
260 """Local implementation of getter"""
261 pass
263 @abstractmethod
264 def _set(self, prop: str, value):
265 """Local implementation of setter"""
266 pass
268 @abstractmethod
269 def _call(self, function, value):
270 """Local implementation of call"""
271 pass
273 def state(self, **kwargs):
274 """Gets the current state of a hardware object
276 Builds a dictionary of the basic info of the object, plus its properties, and
277 callables.
279 Returns:
280 A dict of the hardware object status
282 """
283 info = {
284 k: getattr(self, "_" + k)
285 for k in ["name", "type", "id", "protocol", "online"]
286 }
287 info["callables"] = [k for k in self._callables]
288 info["errors"] = []
289 properties = {}
290 for p in self._properties:
291 try:
292 properties[p] = self.get(p)
293 except Exception as e:
294 properties[p] = None
295 info["online"] = False
296 info["errors"].append(
297 {
298 "property": p,
299 "exception": str(e),
300 "traceback": "".join(traceback.format_tb(e.__traceback__)),
301 }
302 )
303 logger.exception(f"Couldn't get property `{p}` for `{self.name()}`")
305 try:
306 info["properties"] = self._properties.dump(properties)
307 except Exception:
308 logger.error(
309 "Error while serializing %s (%s)",
310 self._name,
311 properties,
312 exc_info=True,
313 )
314 info["properties"] = {}
315 info["online"] = False
317 try:
318 info["properties"]["state_ok"] = self.state_ok()
319 except Exception:
320 logger.debug(
321 f"Could not determine `state_ok` for {self._name}", exec_info=True
322 )
324 info["require_staff"] = self.require_staff()
325 info["alias"] = self.alias()
326 info["user_tags"] = self.user_tags()
328 return info
330 def schema(self):
331 """Returns the schema for the current hardware object
333 The hardware schema is built from the `HardwareObjectBaseSchema`
334 and the object specific _property and _callable schema
335 (both instances of `HardwareSchema`)
337 Returns:
338 An instance of a schema
340 """
341 schema = type(
342 f"HW{self.schema_name()}Schema",
343 (HardwareObjectBaseSchema,),
344 {
345 "properties": fields.Nested(self._properties.__class__),
346 "callables": fields.Nested(self._callables.__class__),
347 },
348 )
349 return schema()
351 def set_online(self, state):
352 """Set the online state of the device
354 Sets the state and execute any registered callbacks
356 Args:
357 state (boolean): Set the online state
358 """
359 self._online = state
361 for cb in self._online_callbacks:
362 cb(self, self._online)
364 def subscribe_online(self, fn):
365 """Subscribe to the online state of the hardware object
367 Add a function to a list of callbacks for when the online state of the object change
369 Args:
370 fn: (:callable) The function to call when this property changes.
372 """
373 if not callable(fn):
374 raise AttributeError("Callback function must be callable")
376 if not (fn in self._online_callbacks):
377 self._online_callbacks.append(fn)
379 def id(self):
380 return self._id
382 def name(self):
383 return self._name
385 def type(self):
386 return self._type
388 def object(self):
389 return self._object
391 def online(self):
392 return self._online
394 def alias(self) -> typing.Optional[str]:
395 return self._alias
397 def user_tags(self) -> typing.List[str]:
398 return self._user_tags
400 def require_staff(self):
401 return self._require_staff
403 def state_ok(self):
404 """Returns if the current object state is `ok`"""
405 state = self.get("state")
406 if isinstance(state, list):
407 st = False
408 for ok in self._state_ok:
409 if ok in state:
410 st = True
411 return st
413 else:
414 return state in self._state_ok
416 def subscribe(self, prop: str, fn):
417 """Subscribe to property changes on the hardware object
419 Add a function to a list of callbacks for when properties on the object change
421 Arguments:
422 prop: The property to subscribe to. Can pass 'all' to subscribe to all changes
423 fn: (:callable) The function to call when this property changes.
425 """
426 if not callable(fn):
427 raise AttributeError("Callback function must be callable")
429 if prop in self._properties or prop == "all":
430 if not (prop in self._callbacks):
431 self._callbacks[prop] = []
433 if not (fn in self._callbacks[prop]):
434 self._callbacks[prop].append(fn)
436 else:
437 raise KeyError(f"No such property: {prop}")
439 def unsubscribe(self, prop: str, fn):
440 """Unsubscribe from a property change on the hardware object
442 Arguments:
443 prop: The property to unsubscribe from.
444 fn: (:callable) The function to unsubscribe
446 """
447 if prop in self._callbacks:
448 if fn in self._callbacks[prop]:
449 # logger.debug("Unsubscribe from property %s, %s", prop, fn)
450 self._callbacks[prop].remove(fn)
451 return True
453 return False
455 def _execute_callbacks(self, name, value):
456 if name in self._callbacks:
457 for cb in self._callbacks[name]:
458 cb(self, name, value)
460 if "all" in self._callbacks:
461 for cb in self._callbacks["all"]:
462 cb(self, name, value)
464 def _update(self, name: str, prop: HardwareProperty, value):
465 """Internal function to call when a property has changed
467 This delegates to all subscribes that a property has changed
469 Arguments:
470 name: Property name of the abstract device
471 prop: Property of the control system hardware
472 value: The new value (from the control system).
474 """
475 # logger.debug('{c}._update {n} - {p}: {v}'.format(c=self.__class__.__name__, n=self._address, p=prop, v=value))
476 if not isinstance(prop, AbstractHardwareProperty2):
477 value = prop.translate_from(value)
478 if name in self._properties:
479 self._execute_callbacks(name, value)
481 # If `state` changes emit an updated `state_ok`
482 if name == "state":
483 self._execute_callbacks("state_ok", self.state_ok())
486class MappedHardwareObject(HardwareObject):
487 """Hardware object that maps properties via a simple dict
489 HardwareObject that has a simple map between abstract properties and their
490 actual properties on the object with fallback to a function on the parent
491 """
493 PROPERTY_MAP: typing.Dict[str, HardwareProperty] = {}
494 CALLABLE_MAP: typing.Dict[str, str] = {}
496 def __init__(self, *args, **kwargs):
497 HardwareObject.__init__(self, *args, **kwargs)
498 self._property_map: typing.Dict[
499 str, HardwareProperty | AbstractHardwareProperty2
500 ] = self._create_properties()
501 self._callable_map: typing.Dict[str, str] = dict(self.CALLABLE_MAP)
503 for name, prop in self._property_map.items():
504 if isinstance(prop, AbstractHardwareProperty2):
505 prop._connect_update(functools.partial(self._update, name, prop))
507 def _create_properties(
508 self,
509 ) -> typing.Dict[str, HardwareProperty | AbstractHardwareProperty2]:
510 """Return the properties to be used for this hardware object
512 The default implementation reads the descriptions from the
513 class attribute `PROPERTY_MAP`.
514 """
515 return dict(self.PROPERTY_MAP)
517 def _set(self, prop: str, value):
518 """Set a property on the child object
520 First try from the simple property map which maps properties to attributes
521 on the child object. Delegates to _do_set which locally implements the setter
523 Second, if not in the map, try calling the function _get_<prop> on the parent
525 Args:
526 prop: The property to set.
527 value: Its value.
528 """
529 hprop = self._property_map.get(prop)
530 if hprop is not None:
531 if not isinstance(hprop, AbstractHardwareProperty2):
532 value = hprop.translate_to(value)
533 self._do_set(hprop, value)
534 else:
535 raise KeyError(
536 f"Couldnt find a setter for property `{prop}` on `{self.name()}`"
537 )
539 @abstractmethod
540 def _do_set(self, prop, value):
541 """Local implementation of how to set the property on the object"""
542 pass
544 def _get(self, prop: str):
545 """Get a property from the child object
547 First try from the simple property map which maps properties to attributes
548 on the child object. Delegates to _do_get which locally implements the getter
550 Second, if not in the map, try calling the function _get_<prop> on the parent
552 Arguments:
553 prop: The property to set.
555 Returns:
556 The property value
557 """
558 hprop = self._property_map.get(prop)
559 if hprop is not None:
560 try:
561 hvalue = self._do_get(hprop)
562 if not isinstance(hprop, AbstractHardwareProperty2):
563 hvalue = hprop.translate_from(hvalue)
564 return hvalue
565 except NotImplementedError:
566 logger.info("Object %s does not implement %s", self._id, prop)
568 # `state_ok` is dynamically generated from `state`
569 elif prop == "state_ok":
570 pass
571 else:
572 raise KeyError(
573 f"Couldnt find a getter for property `{prop}` on `{self.name()}`"
574 )
576 @abstractmethod
577 def _do_get(self, prop):
578 """Local implementation of how to get the property from the object"""
579 pass
581 def _call(self, function, value, **kwargs):
582 """Call a function on the child object
584 First try from the simple function map which maps to function names
585 on the child object. Delegates to _do_call which locally implements the getter
587 Second, if not in the map, try calling the function _call_<fn> on the parent
589 Args:
590 function (str): The function to call.
591 value: The value to call the function with
593 Returns:
594 True if function successfully called
595 """
596 if function in self._callable_map:
597 ret = self._do_call(function, value, **kwargs)
598 elif hasattr(self, "_call_{fn}".format(fn=function)):
599 ret = getattr(self, "_call_{fn}".format(fn=function))(value, **kwargs)
601 else:
602 raise KeyError(
603 f"Couldnt find a handler for function `{function}` on `{self.name()}`"
604 )
606 if ret is None:
607 # When a function returns nothing
608 ret = True
610 return ret
612 @abstractmethod
613 def _do_call(self, function, value, **kwargs):
614 """
615 Local implementation of how to get the function from the object
616 """
617 pass