Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/__init__.py: 72%
134 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3# import traceback
4from marshmallow import ValidationError, INCLUDE
6from daiquiri.core import CoreBase
7from daiquiri.core.schema.hardware import HOConfigSchema, HardwareGroupSchema
8from daiquiri.core.utils import loader
9from daiquiri.resources.utils import ConfigDict
10from daiquiri.core.exceptions import InvalidYAML
12import logging
13from daiquiri.core.hardware.abstract import HardwareObject
15logger = logging.getLogger(__name__)
18class Hardware(CoreBase):
19 """Core Hardware Control
21 This initialises all hardware defined in hardware.yml, first loading the protocol
22 handlers, then initialising each object with the relevant handler. A list of groups
23 is built from the yaml, as well as a list of object types along with their schema
25 """
27 _blueprint = False
29 def __init__(self, *args, **kwargs):
30 self._handlers = {}
31 self._objects = []
32 self._types = {}
33 self._groups = []
34 CoreBase.__init__(self, *args, **kwargs)
36 def setup(self):
37 yml = "hardware.yml"
38 try:
39 self._config = ConfigDict(yml)
40 except (IOError, RuntimeError):
41 logger.warning(
42 "Configuration file missing, hardware component not configured"
43 )
44 else:
45 self.read_config(self._config, yml)
47 def read_config(self, config, filename=None):
48 self._handlers["daiquiri"] = loader(
49 "daiquiri.core.hardware",
50 "Handler",
51 "daiquiri",
52 app=self._app,
53 )
55 for h in config["protocols"]:
56 logger.debug("Loading handler {hand}".format(hand=h))
57 self._handlers[h["id"]] = loader(
58 "daiquiri.core.hardware", "Handler", h["type"], app=self._app, **h
59 )
61 library = self._handlers[h["id"]].library
62 if library and library not in self._base_config["versions"]:
63 self._base_config["versions"].append(library)
65 hoschema = HOConfigSchema()
66 ids = []
67 subconfigs = []
68 for obj in config["objects"]:
69 try:
70 obj = hoschema.load(obj, unknown=INCLUDE)
71 except ValidationError as err:
72 raise InvalidYAML(
73 {
74 "message": "Hardware object definition is invalid",
75 "file": filename,
76 "obj": obj,
77 "errors": err.messages,
78 }
79 )
81 if obj["protocol"] in self._handlers:
82 if obj["id"] in ids:
83 auto_message = (
84 obj.get("auto_id", "")
85 and " This id was generated automatically from its `url`, please specify an `id` property."
86 )
87 raise InvalidYAML(
88 {
89 "message": f"Hardware object id: {obj['id']} is not unique."
90 + auto_message,
91 "file": filename,
92 "obj": obj,
93 }
94 )
95 ids.append(obj["id"])
97 if self.get_object(obj["id"]):
98 continue
100 o = self.register_object_from_config(obj, register_sub_objects=False)
101 # Store the sub object configs to load them later
102 subconfigs.extend(o.get_subobject_configs())
103 else:
104 raise Exception(
105 f'No protocol handler {obj["protocol"]} for {obj["name"]}'
106 )
108 # Load the sub object configs
109 for config in subconfigs:
110 self.register_object_from_config(config)
112 hgschema = HardwareGroupSchema()
113 groups = config.get("groups", None)
114 self._groups = list(groups) if groups else []
116 for g in self._groups:
117 try:
118 hgschema.load(g, unknown=INCLUDE)
119 except ValidationError as err:
120 raise InvalidYAML(
121 {
122 "message": "Hardware group definition is invalid",
123 "file": filename,
124 "obj": g,
125 "errors": err.messages,
126 }
127 )
129 for oid in g["objects"]:
130 obj = self.get_object(oid)
131 if obj is None:
132 raise InvalidYAML(
133 {
134 "message": f'Could not find object: {oid} for group: {g["name"]}',
135 "file": filename,
136 "obj": g,
137 }
138 )
140 def register_object(self, obj: HardwareObject, register_sub_objects=True):
141 """Register a new object to this hardware repository
143 Attributes:
144 obj: Object to register
145 register_sub_objects: If true (default), sub components are loaded
146 and registered
147 """
148 logger.info(f"Register {obj._protocol}://{obj.id()}")
149 self._objects.append(obj)
150 if not (obj.type() in self._types):
151 self._types[obj.type()] = obj.schema().__class__.__name__
152 self._schema.register(obj.schema())
154 if register_sub_objects:
155 subconfigs = obj.get_subobject_configs()
156 for config in subconfigs:
157 self.register_object_from_config(config)
159 def register_object_from_config(self, config: dict, register_sub_objects=True):
160 """
161 Register an object from its config.
163 If the object id was already registered, the object is skipped.
165 It have to contains a `protocol` and an `id` key.
167 Attributes:
168 config: Object to register
169 register_sub_objects: If true (default), sub components are loaded
170 and registered
171 """
172 objectid = config["id"]
173 obj = self.get_object(objectid)
174 if obj:
175 logger.debug(
176 "Object id %s already registered. COnfiguration skipped", objectid
177 )
178 return
180 obj = self._handlers[config["protocol"]].get(**config)
181 self.register_object(obj, register_sub_objects=register_sub_objects)
182 return obj
184 def remove_object(self, obj: HardwareObject):
185 """Remove an object from this hardware repository"""
186 logger.info(f"Removing {obj.id()}")
187 self._objects.remove(obj)
189 def reload(self):
190 self.setup()
191 ids = [o["id"] for o in self._config["objects"]]
192 removed = [o for o in self.get_objects() if o.id() not in ids]
193 for obj in removed:
194 self.remove_object(obj)
196 def disconnect(self):
197 for h in self._handlers.values():
198 h.disconnect()
200 def get_objects(self, *args, **kwargs):
201 """Get a list of hardware objects
203 Kwargs:
204 type (str): Filter the list of objects by a type (i.e. motor)
205 group (int): Filter the objects by a group id
207 Returns:
208 A list of objects
209 """
210 objs = self._objects
212 ty = kwargs.get("type", None)
213 if ty:
214 objs = filter(lambda obj: obj.type() == ty, objs)
216 group = kwargs.get("group", None)
217 if group:
218 group = self.get_group(group)
219 objs = filter(lambda obj: obj.id() in group["objects"], objs)
221 return objs
223 def get_object(self, objectid):
224 """Get a specific object
226 Args:
227 objectid (str): The object id
229 Returns
230 The object
231 """
232 for o in self._objects:
233 if o.id() == objectid:
234 return o
235 return None
237 def get_types(self):
238 """Return the dict of object types loaded and their schema"""
239 return self._types
241 def get_groups(self, groupid=None, objectid=None):
242 """Get the list of groups loaded"""
243 groups = self._groups
244 ret = []
245 for g in groups:
246 if groupid:
247 if groupid == g["groupid"]:
248 return g
250 if objectid:
251 if objectid in g["objects"]:
252 ret.append(g)
253 else:
254 ret.append(g)
256 logger.debug(
257 "get_groups groupid=%s, objectid=%s to_update=%s", groupid, objectid, ret
258 )
259 # logger.warning("Call stack:\n%s", "".join(traceback.format_stack()))
260 for r in ret:
261 r["state"] = True
262 for o in r["objects"]:
263 obj = self.get_object(o)
264 r["state"] = r["state"] and obj.online() and obj.state_ok()
266 return ret
268 def get_group(self, groupid):
269 """Get a specific group
271 Args:
272 groupid (int): The groupid to return details for
274 Returns:
275 The group for the groupid
276 """
277 for g in self._groups:
278 if g["groupid"] == groupid:
279 return g