Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/abstract/__init__.py: 89%
270 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-06 02:13 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-06 02:13 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from __future__ import annotations
4from abc import ABC, abstractmethod
5from marshmallow import fields
6import traceback
7import typing
8import functools
10from daiquiri.core.schema.hardware import HardwareSchema, HardwareObjectBaseSchema
13import logging
15logger = logging.getLogger(__name__)
18class ProtocolHandler:
19 """A protocol handler instantiates a hardware object from a specific protocol
21 i.e. initialise a motor object from bliss, or a shutter from tango
22 """
24 library = None
26 def __init__(self, *args, **kwargs):
27 self._app = kwargs.get("app")
29 def disconnect(self):
30 """Called at the termination of the handler.
32 Implement it if the protocol hold resources.
33 """
34 pass
36 @abstractmethod
37 def get(self, *args, **kwargs):
38 """Get the specific object from the protocol handler.
40 This function checks that kwargs conforms to Schema defined for the
41 protocol handler (see core/hardware/bliss/__init__.py for a concrete example)
43 Returns:
44 The hardware object instance for the specific protocol
46 """
47 pass
49 def create_monitor(self):
50 """Create a global monitor at global state of the handler"""
51 return None
54class AbstractHardwareProperty2(ABC):
55 """
56 Implement an automomous property.
57 """
59 def __init__(self, parent: MappedHardwareObject):
60 self._object = parent
61 self.__update = None
63 def _connect_update(self, callback):
64 """Called by daiquiri at the initialization to be informed by changes"""
65 assert self.__update is None
66 self.__update = callback
68 def emit_update(self, value):
69 """To be called by the property itself when the hardware property have changed"""
70 update = self.__update
71 if update is not None:
72 update(value)
74 @abstractmethod
75 def read_hardware(self, obj):
76 """Read the value from the subsystem"""
77 ...
79 @abstractmethod
80 def write_hardware(self, obj, value):
81 """Write the value to the subsystem"""
82 ...
84 @abstractmethod
85 def connect_hardware(self, obj):
86 """Do the connection of this property to the subsystem"""
87 ...
90class HardwareProperty:
91 """Describe a property from the device controls system.
93 Provides translation function between the controls system specific
94 nomenclature and the abstract one. This can be overwritten.
96 Arguments:
97 name: Name used by the controls system
98 """
100 def __init__(self, name: str, getter: callable = None):
101 self._name = name
102 self._getter = getter
104 @property
105 def name(self) -> str:
106 return self._name
108 @property
109 def getter(self) -> typing.Optional[callable]:
110 return self._getter
112 def translate_from(self, value):
113 """Translate the value from the controls layer to the abstraction layer
114 i.e for getters
116 Arguments:
117 value: The property value to translate.
119 Returns:
120 The translated value
121 """
122 return value
124 def translate_to(self, value):
125 """Translate the value to the controls layer from the abstraction layer
126 i.e for setters
128 Arguments:
129 value: The property value to translate.
131 Returns:
132 The translated value
133 """
134 return value
137class HardwareObject(ABC):
138 """Base HardwareObject from which all inherit
140 The base hardware object defines the objects procotol, type, its properties
141 and callables schema, and mechanisms to subscribe to property changes
143 Attributes:
144 _object (obj): The instance of the control system object.
145 _protocol (str): The protocol for this object, i.e. bliss
146 _type (str): The object type, i.e. motor, shutter
148 """
150 _object = None
151 _online = False
152 _state_ok = []
154 _protocol = None
155 _type = None
157 _properties = HardwareSchema()
158 _callables = HardwareSchema()
160 def schema_name(self):
161 ty = self._type
162 return ty[0].upper() + ty[1:]
164 def __init__(self, *args, **kwargs):
165 self._callbacks = {}
166 self._online_callbacks = []
167 self._locked_callbacks = []
168 self._locked: str | None = None
170 self._id = kwargs.get("id", None)
171 self._name = kwargs.get("name", "")
172 self._require_staff = kwargs.get("require_staff", False)
173 self._alias = None
174 self._user_tags = []
176 def get(self, prop: str):
177 """Get a property from a hardware object
179 First checks the property is defined in the objects
180 property schema, then delegates to the local getter
181 implementation _get
183 Arguments:
184 prop: The property to retreive.
186 Returns:
187 The property value if the property exists, otherwise
188 rasises an exception
190 """
191 if not self._online:
192 return
194 if prop in self._properties:
195 value = self._get(prop)
196 return value
197 else:
198 raise KeyError(f"Unknown property '{prop}' from object {self._name}")
200 def set(self, prop: str, value):
201 """Set a property on a hardware object
203 First checks the property is defined in the objects
204 property schema, then delegates to the local setter
205 implementation _set
207 Arguments:
208 prop: The property to set.
209 value: The property to set.
211 Returns:
212 The the result from the object setter if the property exists
213 otherwise raises an exception
215 """
216 if not self._online:
217 return
219 if prop in self._properties:
220 if self._properties.read_only(prop):
221 raise AttributeError("Property {p} is read only".format(p=prop))
223 value = self._properties.validate(prop, value)
224 return self._set(prop, value)
225 else:
226 raise KeyError("Unknown property {p}".format(p=prop))
228 def get_subobject_configs(self):
229 """Returns a list of referenced objects own by this object."""
230 return []
232 def call(self, function, value, **kwargs):
233 """Calls a function on a hardware object
235 First checks the function is defined in the objects
236 callables schema, then delegates to the local call
237 implementation _call
239 Args:
240 function (str): The function to call.
241 value: The value to call the function with.
243 Returns:
244 The the result from the object function if the function exists
245 otherwise rasises an exception
247 """
248 if not self._online:
249 return
251 if function in self._callables:
252 value = self._callables.validate(function, value)
253 # try:
254 ret = self._call(function, value, **kwargs)
255 if ret:
256 return ret
257 else:
258 return True
259 # except Exception as e:
260 # return e
261 else:
262 raise KeyError("Unknown function {fn}".format(fn=function))
264 @abstractmethod
265 def _get(self, prop: str):
266 """Local implementation of getter"""
267 pass
269 @abstractmethod
270 def _set(self, prop: str, value):
271 """Local implementation of setter"""
272 pass
274 @abstractmethod
275 def _call(self, function, value):
276 """Local implementation of call"""
277 pass
279 def state(self, **kwargs):
280 """Gets the current state of a hardware object
282 Builds a dictionary of the basic info of the object, plus its properties, and
283 callables.
285 Returns:
286 A dict of the hardware object status
288 """
289 info = {
290 k: getattr(self, "_" + k)
291 for k in ["name", "type", "id", "protocol", "online", "locked"]
292 }
293 info["callables"] = [k for k in self._callables]
294 info["errors"] = []
295 properties = {}
296 for p in self._properties:
297 try:
298 properties[p] = self.get(p)
299 except Exception as e:
300 properties[p] = None
301 info["online"] = False
302 info["errors"].append(
303 {
304 "property": p,
305 "exception": str(e),
306 "traceback": "".join(traceback.format_tb(e.__traceback__)),
307 }
308 )
309 logger.exception(f"Couldn't get property `{p}` for `{self.name()}`")
311 try:
312 info["properties"] = self._properties.dump(properties)
313 except Exception:
314 logger.error(
315 "Error while serializing %s (%s)",
316 self._name,
317 properties,
318 exc_info=True,
319 )
320 info["properties"] = {}
321 info["online"] = False
323 try:
324 info["properties"]["state_ok"] = self.state_ok()
325 except Exception:
326 logger.debug(
327 f"Could not determine `state_ok` for {self._name}", exec_info=True
328 )
330 info["require_staff"] = self.require_staff()
331 info["alias"] = self.alias()
332 info["user_tags"] = self.user_tags()
334 return info
336 def schema(self):
337 """Returns the schema for the current hardware object
339 The hardware schema is built from the `HardwareObjectBaseSchema`
340 and the object specific _property and _callable schema
341 (both instances of `HardwareSchema`)
343 Returns:
344 An instance of a schema
346 """
347 schema = type(
348 f"HW{self.schema_name()}Schema",
349 (HardwareObjectBaseSchema,),
350 {
351 "properties": fields.Nested(self._properties.__class__),
352 "callables": fields.Nested(self._callables.__class__),
353 },
354 )
355 return schema()
357 def set_online(self, state):
358 """Set the online state of the device
360 Sets the state and execute any registered callbacks
362 Args:
363 state (boolean): Set the online state
364 """
365 self._online = state
367 for cb in self._online_callbacks:
368 cb(self, self._online)
370 def set_locked(self, reason: str | None):
371 """Set the device locked for a reason.
373 Argument:
374 reason: Locking reason. If none the device is not locked.
375 """
376 if self._locked == reason:
377 return
378 self._locked = reason
379 for cb in self._locked_callbacks:
380 cb(self, self._locked)
382 def subscribe_online(self, fn):
383 """Subscribe to the online state of the hardware object
385 Add a function to a list of callbacks for when the online state of the object change
387 Args:
388 fn: (:callable) The function to call when this property changes.
390 """
391 if not callable(fn):
392 raise AttributeError("Callback function must be callable")
394 if fn not in self._online_callbacks:
395 self._online_callbacks.append(fn)
397 def subscribe_locked(self, fn):
398 """Subscribe to the locked state of the hardware object
400 Add a function to a list of callbacks for when the locked state of the object change
402 Args:
403 fn: (:callable) The function to call when this property changes.
405 """
406 if not callable(fn):
407 raise AttributeError("Callback function must be callable")
409 if fn not in self._locked_callbacks:
410 self._locked_callbacks.append(fn)
412 def id(self):
413 return self._id
415 def name(self):
416 return self._name
418 def type(self):
419 return self._type
421 def object(self):
422 return self._object
424 def online(self):
425 return self._online
427 def locked(self) -> str | None:
428 return self._locked
430 def alias(self) -> typing.Optional[str]:
431 return self._alias
433 def user_tags(self) -> typing.List[str]:
434 return self._user_tags
436 def require_staff(self):
437 return self._require_staff
439 def state_ok(self):
440 """Returns if the current object state is `ok`"""
441 state = self.get("state")
442 return self._is_state_ok(state)
444 def _is_state_ok(self, state: str):
445 if isinstance(state, list):
446 st = False
447 for ok in self._state_ok:
448 if ok in state:
449 st = True
450 return st
451 else:
452 return state in self._state_ok
454 def subscribe(self, prop: str, fn):
455 """Subscribe to property changes on the hardware object
457 Add a function to a list of callbacks for when properties on the object change
459 Arguments:
460 prop: The property to subscribe to. Can pass 'all' to subscribe to all changes
461 fn: (:callable) The function to call when this property changes.
463 """
464 if not callable(fn):
465 raise AttributeError("Callback function must be callable")
467 if prop in self._properties or prop == "all":
468 if not (prop in self._callbacks):
469 self._callbacks[prop] = []
471 if not (fn in self._callbacks[prop]):
472 self._callbacks[prop].append(fn)
474 else:
475 raise KeyError(f"No such property: {prop}")
477 def unsubscribe(self, prop: str, fn):
478 """Unsubscribe from a property change on the hardware object
480 Arguments:
481 prop: The property to unsubscribe from.
482 fn: (:callable) The function to unsubscribe
484 """
485 if prop in self._callbacks:
486 if fn in self._callbacks[prop]:
487 # logger.debug("Unsubscribe from property %s, %s", prop, fn)
488 self._callbacks[prop].remove(fn)
489 return True
491 return False
493 def _execute_callbacks(self, name, value):
494 if name in self._callbacks:
495 for cb in self._callbacks[name]:
496 cb(self, name, value)
498 if "all" in self._callbacks:
499 for cb in self._callbacks["all"]:
500 cb(self, name, value)
502 def _update(self, name: str, prop: HardwareProperty, value):
503 """Internal function to call when a property has changed
505 This delegates to all subscribes that a property has changed
507 Arguments:
508 name: Property name of the abstract device
509 prop: Property of the control system hardware
510 value: The new value (from the control system).
512 """
513 # logger.debug('{c}._update {n} - {p}: {v}'.format(c=self.__class__.__name__, n=self._address, p=prop, v=value))
514 if not isinstance(prop, AbstractHardwareProperty2):
515 value = prop.translate_from(value)
516 if name in self._properties:
517 self._execute_callbacks(name, value)
519 # If `state` changes emit an updated `state_ok`
520 if name == "state":
521 self._execute_callbacks("state_ok", self._is_state_ok(value))
524class MappedHardwareObject(HardwareObject):
525 """Hardware object that maps properties via a simple dict
527 HardwareObject that has a simple map between abstract properties and their
528 actual properties on the object with fallback to a function on the parent
529 """
531 PROPERTY_MAP: typing.Dict[str, HardwareProperty] = {}
532 CALLABLE_MAP: typing.Dict[str, str] = {}
534 def __init__(self, *args, **kwargs):
535 HardwareObject.__init__(self, *args, **kwargs)
536 self._property_map: typing.Dict[
537 str, HardwareProperty | AbstractHardwareProperty2
538 ] = self._create_properties()
539 self._callable_map: typing.Dict[str, str] = dict(self.CALLABLE_MAP)
541 for name, prop in self._property_map.items():
542 if isinstance(prop, AbstractHardwareProperty2):
543 prop._connect_update(functools.partial(self._update, name, prop))
545 def _create_properties(
546 self,
547 ) -> typing.Dict[str, HardwareProperty | AbstractHardwareProperty2]:
548 """Return the properties to be used for this hardware object
550 The default implementation reads the descriptions from the
551 class attribute `PROPERTY_MAP`.
552 """
553 return dict(self.PROPERTY_MAP)
555 def _set(self, prop: str, value):
556 """Set a property on the child object
558 First try from the simple property map which maps properties to attributes
559 on the child object. Delegates to _do_set which locally implements the setter
561 Second, if not in the map, try calling the function _get_<prop> on the parent
563 Args:
564 prop: The property to set.
565 value: Its value.
566 """
567 hprop = self._property_map.get(prop)
568 if hprop is not None:
569 if not isinstance(hprop, AbstractHardwareProperty2):
570 value = hprop.translate_to(value)
571 self._do_set(hprop, value)
572 else:
573 raise KeyError(
574 f"Couldnt find a setter for property `{prop}` on `{self.name()}`"
575 )
577 @abstractmethod
578 def _do_set(self, prop, value):
579 """Local implementation of how to set the property on the object"""
580 pass
582 def _get(self, prop: str):
583 """Get a property from the child object
585 First try from the simple property map which maps properties to attributes
586 on the child object. Delegates to _do_get which locally implements the getter
588 Second, if not in the map, try calling the function _get_<prop> on the parent
590 Arguments:
591 prop: The property to set.
593 Returns:
594 The property value
595 """
596 hprop = self._property_map.get(prop)
597 if hprop is not None:
598 try:
599 hvalue = self._do_get(hprop)
600 if not isinstance(hprop, AbstractHardwareProperty2):
601 hvalue = hprop.translate_from(hvalue)
602 return hvalue
603 except NotImplementedError:
604 logger.info("Object %s does not implement %s", self._id, prop)
606 # `state_ok` is dynamically generated from `state`
607 elif prop == "state_ok":
608 pass
609 else:
610 raise KeyError(
611 f"Couldnt find a getter for property `{prop}` on `{self.name()}`"
612 )
614 @abstractmethod
615 def _do_get(self, prop):
616 """Local implementation of how to get the property from the object"""
617 pass
619 def _call(self, function, value, **kwargs):
620 """Call a function on the child object
622 First try from the simple function map which maps to function names
623 on the child object. Delegates to _do_call which locally implements the getter
625 Second, if not in the map, try calling the function _call_<fn> on the parent
627 Args:
628 function (str): The function to call.
629 value: The value to call the function with
631 Returns:
632 True if function successfully called
633 """
634 if function in self._callable_map:
635 ret = self._do_call(function, value, **kwargs)
636 elif hasattr(self, "_call_{fn}".format(fn=function)):
637 ret = getattr(self, "_call_{fn}".format(fn=function))(value, **kwargs)
639 else:
640 raise KeyError(
641 f"Couldnt find a handler for function `{function}` on `{self.name()}`"
642 )
644 if ret is None:
645 # When a function returns nothing
646 ret = True
648 return ret
650 @abstractmethod
651 def _do_call(self, function, value, **kwargs):
652 """
653 Local implementation of how to get the function from the object
654 """
655 pass