Coverage for /opt/conda/envs/apienv/lib/python3.11/site-packages/daiquiri/core/hardware/__init__.py: 73%
141 statements
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-29 02:12 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-29 02:12 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3# import traceback
4from marshmallow import ValidationError, INCLUDE
6from daiquiri.core import CoreBase
7from daiquiri.core.schema.hardware import HOConfigSchema, HardwareGroupSchema
8from daiquiri.core.utils import loader
9from daiquiri.resources.utils import ConfigDict
10from daiquiri.core.exceptions import InvalidYAML
12import logging
13from daiquiri.core.hardware.abstract import HardwareObject
15logger = logging.getLogger(__name__)
18class Hardware(CoreBase):
19 """Core Hardware Control
21 This initialises all hardware defined in hardware.yml, first loading the protocol
22 handlers, then initialising each object with the relevant handler. A list of groups
23 is built from the yaml, as well as a list of object types along with their schema
25 """
27 _blueprint = False
29 def __init__(self, *args, **kwargs):
30 self._handlers = {}
31 self._objects = []
32 self._types = {}
33 self._groups = []
34 CoreBase.__init__(self, *args, **kwargs)
36 def setup(self):
37 yml = "hardware.yml"
38 try:
39 self._config = ConfigDict(yml)
40 except (IOError, RuntimeError):
41 logger.warning(
42 "Configuration file missing, hardware component not configured"
43 )
44 else:
45 self.read_config(self._config, yml)
47 def read_config(self, config, filename=None):
48 self._handlers["daiquiri"] = loader(
49 "daiquiri.core.hardware",
50 "Handler",
51 "daiquiri",
52 app=self._app,
53 )
55 for h in config["protocols"]:
56 logger.debug("Loading handler {hand}".format(hand=h))
57 self._handlers[h["id"]] = loader(
58 "daiquiri.core.hardware", "Handler", h["type"], app=self._app, **h
59 )
61 library = self._handlers[h["id"]].library
62 if library and library not in self._base_config["versions"]:
63 self._base_config["versions"].append(library)
65 hoschema = HOConfigSchema()
66 ids = []
67 subconfigs = []
68 for obj in config["objects"]:
69 try:
70 obj = hoschema.load(obj, unknown=INCLUDE)
71 except ValidationError as err:
72 raise InvalidYAML(
73 {
74 "message": "Hardware object definition is invalid",
75 "file": filename,
76 "obj": obj,
77 "errors": err.messages,
78 }
79 )
81 if obj["protocol"] in self._handlers:
82 if obj["id"] in ids:
83 auto_message = (
84 obj.get("auto_id", "")
85 and " This id was generated automatically from its `url`, please specify an `id` property."
86 )
87 raise InvalidYAML(
88 {
89 "message": f"Hardware object id: {obj['id']} is not unique."
90 + auto_message,
91 "file": filename,
92 "obj": obj,
93 }
94 )
95 ids.append(obj["id"])
97 if self.get_object(obj["id"]):
98 continue
100 o = self.register_object_from_config(obj, register_sub_objects=False)
101 # Store the sub object configs to load them later
102 subconfigs.extend(o.get_subobject_configs())
103 else:
104 raise Exception(
105 f'No protocol handler {obj["protocol"]} for {obj["name"]}'
106 )
108 # Load the sub object configs
109 for conf in subconfigs:
110 self.register_object_from_config(conf)
112 logger.debug("Create monitors {hand}".format(hand=h))
113 self._monitors = []
114 for h in config["protocols"]:
115 handler = self._handlers[h["id"]]
116 monitor = handler.create_monitor()
117 if monitor is not None:
118 self._monitors.append(monitor)
120 hgschema = HardwareGroupSchema()
121 groups = config.get("groups", None)
122 self._groups = list(groups) if groups else []
124 for g in self._groups:
125 try:
126 hgschema.load(g, unknown=INCLUDE)
127 except ValidationError as err:
128 raise InvalidYAML(
129 {
130 "message": "Hardware group definition is invalid",
131 "file": filename,
132 "obj": g,
133 "errors": err.messages,
134 }
135 )
137 for oid in g["objects"]:
138 obj = self.get_object(oid)
139 if obj is None:
140 raise InvalidYAML(
141 {
142 "message": f'Could not find object: {oid} for group: {g["name"]}',
143 "file": filename,
144 "obj": g,
145 }
146 )
148 def register_object(self, obj: HardwareObject, register_sub_objects=True):
149 """Register a new object to this hardware repository
151 Attributes:
152 obj: Object to register
153 register_sub_objects: If true (default), sub components are loaded
154 and registered
155 """
156 logger.info(f"Register {obj._protocol}://{obj.id()}")
157 self._objects.append(obj)
158 if not (obj.type() in self._types):
159 self._types[obj.type()] = obj.schema().__class__.__name__
160 self._schema.register(obj.schema())
162 if register_sub_objects:
163 subconfigs = obj.get_subobject_configs()
164 for config in subconfigs:
165 self.register_object_from_config(config)
167 def register_object_from_config(self, config: dict, register_sub_objects=True):
168 """
169 Register an object from its config.
171 If the object id was already registered, the object is skipped.
173 It have to contains a `protocol` and an `id` key.
175 Attributes:
176 config: Object to register
177 register_sub_objects: If true (default), sub components are loaded
178 and registered
179 """
180 objectid = config["id"]
181 obj = self.get_object(objectid)
182 if obj:
183 logger.debug(
184 "Object id %s already registered. COnfiguration skipped", objectid
185 )
186 return
188 obj = self._handlers[config["protocol"]].get(**config)
189 self.register_object(obj, register_sub_objects=register_sub_objects)
190 return obj
192 def remove_object(self, obj: HardwareObject):
193 """Remove an object from this hardware repository"""
194 logger.info(f"Removing {obj.id()}")
195 self._objects.remove(obj)
197 def reload(self):
198 self.setup()
199 ids = [o["id"] for o in self._config["objects"]]
200 removed = [o for o in self.get_objects() if o.id() not in ids]
201 for obj in removed:
202 self.remove_object(obj)
204 def disconnect(self):
205 for h in self._handlers.values():
206 h.disconnect()
208 def get_objects(self, *args, **kwargs):
209 """Get a list of hardware objects
211 Kwargs:
212 type (str): Filter the list of objects by a type (i.e. motor)
213 group (int): Filter the objects by a group id
215 Returns:
216 A list of objects
217 """
218 objs = self._objects
220 ty = kwargs.get("type", None)
221 if ty:
222 objs = filter(lambda obj: obj.type() == ty, objs)
224 group = kwargs.get("group", None)
225 if group:
226 group = self.get_group(group)
227 objs = filter(lambda obj: obj.id() in group["objects"], objs)
229 return objs
231 def get_object(self, objectid):
232 """Get a specific object
234 Args:
235 objectid (str): The object id
237 Returns
238 The object
239 """
240 for o in self._objects:
241 if o.id() == objectid:
242 return o
243 return None
245 def get_types(self):
246 """Return the dict of object types loaded and their schema"""
247 return self._types
249 def get_groups(self, groupid=None, objectid=None):
250 """Get the list of groups loaded"""
251 groups = self._groups
252 ret = []
253 for g in groups:
254 if groupid:
255 if groupid == g["groupid"]:
256 return g
258 if objectid:
259 if objectid in g["objects"]:
260 ret.append(g)
261 else:
262 ret.append(g)
264 logger.debug(
265 "get_groups groupid=%s, objectid=%s to_update=%s", groupid, objectid, ret
266 )
267 # logger.warning("Call stack:\n%s", "".join(traceback.format_stack()))
268 for r in ret:
269 r["state"] = True
270 for o in r["objects"]:
271 obj = self.get_object(o)
272 r["state"] = r["state"] and obj.online() and obj.state_ok()
274 return ret
276 def get_group(self, groupid):
277 """Get a specific group
279 Args:
280 groupid (int): The groupid to return details for
282 Returns:
283 The group for the groupid
284 """
285 for g in self._groups:
286 if g["groupid"] == groupid:
287 return g