Coverage for /opt/conda/envs/apienv/lib/python3.11/site-packages/daiquiri/core/hardware/__init__.py: 73%

141 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2025-03-29 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3# import traceback 

4from marshmallow import ValidationError, INCLUDE 

5 

6from daiquiri.core import CoreBase 

7from daiquiri.core.schema.hardware import HOConfigSchema, HardwareGroupSchema 

8from daiquiri.core.utils import loader 

9from daiquiri.resources.utils import ConfigDict 

10from daiquiri.core.exceptions import InvalidYAML 

11 

12import logging 

13from daiquiri.core.hardware.abstract import HardwareObject 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18class Hardware(CoreBase): 

19 """Core Hardware Control 

20 

21 This initialises all hardware defined in hardware.yml, first loading the protocol 

22 handlers, then initialising each object with the relevant handler. A list of groups 

23 is built from the yaml, as well as a list of object types along with their schema 

24 

25 """ 

26 

27 _blueprint = False 

28 

29 def __init__(self, *args, **kwargs): 

30 self._handlers = {} 

31 self._objects = [] 

32 self._types = {} 

33 self._groups = [] 

34 CoreBase.__init__(self, *args, **kwargs) 

35 

36 def setup(self): 

37 yml = "hardware.yml" 

38 try: 

39 self._config = ConfigDict(yml) 

40 except (IOError, RuntimeError): 

41 logger.warning( 

42 "Configuration file missing, hardware component not configured" 

43 ) 

44 else: 

45 self.read_config(self._config, yml) 

46 

47 def read_config(self, config, filename=None): 

48 self._handlers["daiquiri"] = loader( 

49 "daiquiri.core.hardware", 

50 "Handler", 

51 "daiquiri", 

52 app=self._app, 

53 ) 

54 

55 for h in config["protocols"]: 

56 logger.debug("Loading handler {hand}".format(hand=h)) 

57 self._handlers[h["id"]] = loader( 

58 "daiquiri.core.hardware", "Handler", h["type"], app=self._app, **h 

59 ) 

60 

61 library = self._handlers[h["id"]].library 

62 if library and library not in self._base_config["versions"]: 

63 self._base_config["versions"].append(library) 

64 

65 hoschema = HOConfigSchema() 

66 ids = [] 

67 subconfigs = [] 

68 for obj in config["objects"]: 

69 try: 

70 obj = hoschema.load(obj, unknown=INCLUDE) 

71 except ValidationError as err: 

72 raise InvalidYAML( 

73 { 

74 "message": "Hardware object definition is invalid", 

75 "file": filename, 

76 "obj": obj, 

77 "errors": err.messages, 

78 } 

79 ) 

80 

81 if obj["protocol"] in self._handlers: 

82 if obj["id"] in ids: 

83 auto_message = ( 

84 obj.get("auto_id", "") 

85 and " This id was generated automatically from its `url`, please specify an `id` property." 

86 ) 

87 raise InvalidYAML( 

88 { 

89 "message": f"Hardware object id: {obj['id']} is not unique." 

90 + auto_message, 

91 "file": filename, 

92 "obj": obj, 

93 } 

94 ) 

95 ids.append(obj["id"]) 

96 

97 if self.get_object(obj["id"]): 

98 continue 

99 

100 o = self.register_object_from_config(obj, register_sub_objects=False) 

101 # Store the sub object configs to load them later 

102 subconfigs.extend(o.get_subobject_configs()) 

103 else: 

104 raise Exception( 

105 f'No protocol handler {obj["protocol"]} for {obj["name"]}' 

106 ) 

107 

108 # Load the sub object configs 

109 for conf in subconfigs: 

110 self.register_object_from_config(conf) 

111 

112 logger.debug("Create monitors {hand}".format(hand=h)) 

113 self._monitors = [] 

114 for h in config["protocols"]: 

115 handler = self._handlers[h["id"]] 

116 monitor = handler.create_monitor() 

117 if monitor is not None: 

118 self._monitors.append(monitor) 

119 

120 hgschema = HardwareGroupSchema() 

121 groups = config.get("groups", None) 

122 self._groups = list(groups) if groups else [] 

123 

124 for g in self._groups: 

125 try: 

126 hgschema.load(g, unknown=INCLUDE) 

127 except ValidationError as err: 

128 raise InvalidYAML( 

129 { 

130 "message": "Hardware group definition is invalid", 

131 "file": filename, 

132 "obj": g, 

133 "errors": err.messages, 

134 } 

135 ) 

136 

137 for oid in g["objects"]: 

138 obj = self.get_object(oid) 

139 if obj is None: 

140 raise InvalidYAML( 

141 { 

142 "message": f'Could not find object: {oid} for group: {g["name"]}', 

143 "file": filename, 

144 "obj": g, 

145 } 

146 ) 

147 

148 def register_object(self, obj: HardwareObject, register_sub_objects=True): 

149 """Register a new object to this hardware repository 

150 

151 Attributes: 

152 obj: Object to register 

153 register_sub_objects: If true (default), sub components are loaded 

154 and registered 

155 """ 

156 logger.info(f"Register {obj._protocol}://{obj.id()}") 

157 self._objects.append(obj) 

158 if not (obj.type() in self._types): 

159 self._types[obj.type()] = obj.schema().__class__.__name__ 

160 self._schema.register(obj.schema()) 

161 

162 if register_sub_objects: 

163 subconfigs = obj.get_subobject_configs() 

164 for config in subconfigs: 

165 self.register_object_from_config(config) 

166 

167 def register_object_from_config(self, config: dict, register_sub_objects=True): 

168 """ 

169 Register an object from its config. 

170 

171 If the object id was already registered, the object is skipped. 

172 

173 It have to contains a `protocol` and an `id` key. 

174 

175 Attributes: 

176 config: Object to register 

177 register_sub_objects: If true (default), sub components are loaded 

178 and registered 

179 """ 

180 objectid = config["id"] 

181 obj = self.get_object(objectid) 

182 if obj: 

183 logger.debug( 

184 "Object id %s already registered. COnfiguration skipped", objectid 

185 ) 

186 return 

187 

188 obj = self._handlers[config["protocol"]].get(**config) 

189 self.register_object(obj, register_sub_objects=register_sub_objects) 

190 return obj 

191 

192 def remove_object(self, obj: HardwareObject): 

193 """Remove an object from this hardware repository""" 

194 logger.info(f"Removing {obj.id()}") 

195 self._objects.remove(obj) 

196 

197 def reload(self): 

198 self.setup() 

199 ids = [o["id"] for o in self._config["objects"]] 

200 removed = [o for o in self.get_objects() if o.id() not in ids] 

201 for obj in removed: 

202 self.remove_object(obj) 

203 

204 def disconnect(self): 

205 for h in self._handlers.values(): 

206 h.disconnect() 

207 

208 def get_objects(self, *args, **kwargs): 

209 """Get a list of hardware objects 

210 

211 Kwargs: 

212 type (str): Filter the list of objects by a type (i.e. motor) 

213 group (int): Filter the objects by a group id 

214 

215 Returns: 

216 A list of objects 

217 """ 

218 objs = self._objects 

219 

220 ty = kwargs.get("type", None) 

221 if ty: 

222 objs = filter(lambda obj: obj.type() == ty, objs) 

223 

224 group = kwargs.get("group", None) 

225 if group: 

226 group = self.get_group(group) 

227 objs = filter(lambda obj: obj.id() in group["objects"], objs) 

228 

229 return objs 

230 

231 def get_object(self, objectid): 

232 """Get a specific object 

233 

234 Args: 

235 objectid (str): The object id 

236 

237 Returns 

238 The object 

239 """ 

240 for o in self._objects: 

241 if o.id() == objectid: 

242 return o 

243 return None 

244 

245 def get_types(self): 

246 """Return the dict of object types loaded and their schema""" 

247 return self._types 

248 

249 def get_groups(self, groupid=None, objectid=None): 

250 """Get the list of groups loaded""" 

251 groups = self._groups 

252 ret = [] 

253 for g in groups: 

254 if groupid: 

255 if groupid == g["groupid"]: 

256 return g 

257 

258 if objectid: 

259 if objectid in g["objects"]: 

260 ret.append(g) 

261 else: 

262 ret.append(g) 

263 

264 logger.debug( 

265 "get_groups groupid=%s, objectid=%s to_update=%s", groupid, objectid, ret 

266 ) 

267 # logger.warning("Call stack:\n%s", "".join(traceback.format_stack())) 

268 for r in ret: 

269 r["state"] = True 

270 for o in r["objects"]: 

271 obj = self.get_object(o) 

272 r["state"] = r["state"] and obj.online() and obj.state_ok() 

273 

274 return ret 

275 

276 def get_group(self, groupid): 

277 """Get a specific group 

278 

279 Args: 

280 groupid (int): The groupid to return details for 

281 

282 Returns: 

283 The group for the groupid 

284 """ 

285 for g in self._groups: 

286 if g["groupid"] == groupid: 

287 return g