Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/__init__.py: 72%

134 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-15 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3# import traceback 

4from marshmallow import ValidationError, INCLUDE 

5 

6from daiquiri.core import CoreBase 

7from daiquiri.core.schema.hardware import HOConfigSchema, HardwareGroupSchema 

8from daiquiri.core.utils import loader 

9from daiquiri.resources.utils import ConfigDict 

10from daiquiri.core.exceptions import InvalidYAML 

11 

12import logging 

13from daiquiri.core.hardware.abstract import HardwareObject 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18class Hardware(CoreBase): 

19 """Core Hardware Control 

20 

21 This initialises all hardware defined in hardware.yml, first loading the protocol 

22 handlers, then initialising each object with the relevant handler. A list of groups 

23 is built from the yaml, as well as a list of object types along with their schema 

24 

25 """ 

26 

27 _blueprint = False 

28 

29 def __init__(self, *args, **kwargs): 

30 self._handlers = {} 

31 self._objects = [] 

32 self._types = {} 

33 self._groups = [] 

34 CoreBase.__init__(self, *args, **kwargs) 

35 

36 def setup(self): 

37 yml = "hardware.yml" 

38 try: 

39 self._config = ConfigDict(yml) 

40 except (IOError, RuntimeError): 

41 logger.warning( 

42 "Configuration file missing, hardware component not configured" 

43 ) 

44 else: 

45 self.read_config(self._config, yml) 

46 

47 def read_config(self, config, filename=None): 

48 self._handlers["daiquiri"] = loader( 

49 "daiquiri.core.hardware", 

50 "Handler", 

51 "daiquiri", 

52 app=self._app, 

53 ) 

54 

55 for h in config["protocols"]: 

56 logger.debug("Loading handler {hand}".format(hand=h)) 

57 self._handlers[h["id"]] = loader( 

58 "daiquiri.core.hardware", "Handler", h["type"], app=self._app, **h 

59 ) 

60 

61 library = self._handlers[h["id"]].library 

62 if library and library not in self._base_config["versions"]: 

63 self._base_config["versions"].append(library) 

64 

65 hoschema = HOConfigSchema() 

66 ids = [] 

67 subconfigs = [] 

68 for obj in config["objects"]: 

69 try: 

70 obj = hoschema.load(obj, unknown=INCLUDE) 

71 except ValidationError as err: 

72 raise InvalidYAML( 

73 { 

74 "message": "Hardware object definition is invalid", 

75 "file": filename, 

76 "obj": obj, 

77 "errors": err.messages, 

78 } 

79 ) 

80 

81 if obj["protocol"] in self._handlers: 

82 if obj["id"] in ids: 

83 auto_message = ( 

84 obj.get("auto_id", "") 

85 and " This id was generated automatically from its `url`, please specify an `id` property." 

86 ) 

87 raise InvalidYAML( 

88 { 

89 "message": f"Hardware object id: {obj['id']} is not unique." 

90 + auto_message, 

91 "file": filename, 

92 "obj": obj, 

93 } 

94 ) 

95 ids.append(obj["id"]) 

96 

97 if self.get_object(obj["id"]): 

98 continue 

99 

100 o = self.register_object_from_config(obj, register_sub_objects=False) 

101 # Store the sub object configs to load them later 

102 subconfigs.extend(o.get_subobject_configs()) 

103 else: 

104 raise Exception( 

105 f'No protocol handler {obj["protocol"]} for {obj["name"]}' 

106 ) 

107 

108 # Load the sub object configs 

109 for config in subconfigs: 

110 self.register_object_from_config(config) 

111 

112 hgschema = HardwareGroupSchema() 

113 groups = config.get("groups", None) 

114 self._groups = list(groups) if groups else [] 

115 

116 for g in self._groups: 

117 try: 

118 hgschema.load(g, unknown=INCLUDE) 

119 except ValidationError as err: 

120 raise InvalidYAML( 

121 { 

122 "message": "Hardware group definition is invalid", 

123 "file": filename, 

124 "obj": g, 

125 "errors": err.messages, 

126 } 

127 ) 

128 

129 for oid in g["objects"]: 

130 obj = self.get_object(oid) 

131 if obj is None: 

132 raise InvalidYAML( 

133 { 

134 "message": f'Could not find object: {oid} for group: {g["name"]}', 

135 "file": filename, 

136 "obj": g, 

137 } 

138 ) 

139 

140 def register_object(self, obj: HardwareObject, register_sub_objects=True): 

141 """Register a new object to this hardware repository 

142 

143 Attributes: 

144 obj: Object to register 

145 register_sub_objects: If true (default), sub components are loaded 

146 and registered 

147 """ 

148 logger.info(f"Register {obj._protocol}://{obj.id()}") 

149 self._objects.append(obj) 

150 if not (obj.type() in self._types): 

151 self._types[obj.type()] = obj.schema().__class__.__name__ 

152 self._schema.register(obj.schema()) 

153 

154 if register_sub_objects: 

155 subconfigs = obj.get_subobject_configs() 

156 for config in subconfigs: 

157 self.register_object_from_config(config) 

158 

159 def register_object_from_config(self, config: dict, register_sub_objects=True): 

160 """ 

161 Register an object from its config. 

162 

163 If the object id was already registered, the object is skipped. 

164 

165 It have to contains a `protocol` and an `id` key. 

166 

167 Attributes: 

168 config: Object to register 

169 register_sub_objects: If true (default), sub components are loaded 

170 and registered 

171 """ 

172 objectid = config["id"] 

173 obj = self.get_object(objectid) 

174 if obj: 

175 logger.debug( 

176 "Object id %s already registered. COnfiguration skipped", objectid 

177 ) 

178 return 

179 

180 obj = self._handlers[config["protocol"]].get(**config) 

181 self.register_object(obj, register_sub_objects=register_sub_objects) 

182 return obj 

183 

184 def remove_object(self, obj: HardwareObject): 

185 """Remove an object from this hardware repository""" 

186 logger.info(f"Removing {obj.id()}") 

187 self._objects.remove(obj) 

188 

189 def reload(self): 

190 self.setup() 

191 ids = [o["id"] for o in self._config["objects"]] 

192 removed = [o for o in self.get_objects() if o.id() not in ids] 

193 for obj in removed: 

194 self.remove_object(obj) 

195 

196 def disconnect(self): 

197 for h in self._handlers.values(): 

198 h.disconnect() 

199 

200 def get_objects(self, *args, **kwargs): 

201 """Get a list of hardware objects 

202 

203 Kwargs: 

204 type (str): Filter the list of objects by a type (i.e. motor) 

205 group (int): Filter the objects by a group id 

206 

207 Returns: 

208 A list of objects 

209 """ 

210 objs = self._objects 

211 

212 ty = kwargs.get("type", None) 

213 if ty: 

214 objs = filter(lambda obj: obj.type() == ty, objs) 

215 

216 group = kwargs.get("group", None) 

217 if group: 

218 group = self.get_group(group) 

219 objs = filter(lambda obj: obj.id() in group["objects"], objs) 

220 

221 return objs 

222 

223 def get_object(self, objectid): 

224 """Get a specific object 

225 

226 Args: 

227 objectid (str): The object id 

228 

229 Returns 

230 The object 

231 """ 

232 for o in self._objects: 

233 if o.id() == objectid: 

234 return o 

235 return None 

236 

237 def get_types(self): 

238 """Return the dict of object types loaded and their schema""" 

239 return self._types 

240 

241 def get_groups(self, groupid=None, objectid=None): 

242 """Get the list of groups loaded""" 

243 groups = self._groups 

244 ret = [] 

245 for g in groups: 

246 if groupid: 

247 if groupid == g["groupid"]: 

248 return g 

249 

250 if objectid: 

251 if objectid in g["objects"]: 

252 ret.append(g) 

253 else: 

254 ret.append(g) 

255 

256 logger.debug( 

257 "get_groups groupid=%s, objectid=%s to_update=%s", groupid, objectid, ret 

258 ) 

259 # logger.warning("Call stack:\n%s", "".join(traceback.format_stack())) 

260 for r in ret: 

261 r["state"] = True 

262 for o in r["objects"]: 

263 obj = self.get_object(o) 

264 r["state"] = r["state"] and obj.online() and obj.state_ok() 

265 

266 return ret 

267 

268 def get_group(self, groupid): 

269 """Get a specific group 

270 

271 Args: 

272 groupid (int): The groupid to return details for 

273 

274 Returns: 

275 The group for the groupid 

276 """ 

277 for g in self._groups: 

278 if g["groupid"] == groupid: 

279 return g