Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/abstract/__init__.py: 90%

251 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-15 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3from __future__ import annotations 

4from abc import ABC, abstractmethod 

5from marshmallow import fields 

6import traceback 

7import typing 

8import functools 

9 

10from daiquiri.core.schema.hardware import HardwareSchema, HardwareObjectBaseSchema 

11 

12 

13import logging 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18class ProtocolHandler: 

19 """A protocol handler instantiates a hardware object from a specific protocol 

20 

21 i.e. initialise a motor object from bliss, or a shutter from tango 

22 """ 

23 

24 library = None 

25 

26 def __init__(self, *args, **kwargs): 

27 self._app = kwargs.get("app") 

28 

29 def disconnect(self): 

30 """Called at the termination of the handler. 

31 

32 Implement it if the protocol hold resources. 

33 """ 

34 pass 

35 

36 @abstractmethod 

37 def get(self, *args, **kwargs): 

38 """Get the specific object from the protocol handler. 

39 

40 This function checks that kwargs conforms to Schema defined for the 

41 protocol handler (see core/hardware/bliss/__init__.py for a concrete example) 

42 

43 Returns: 

44 The hardware object instance for the specific protocol 

45 

46 """ 

47 pass 

48 

49 

50class AbstractHardwareProperty2(ABC): 

51 """ 

52 Implement an automomous property. 

53 """ 

54 

55 def __init__(self, parent: MappedHardwareObject): 

56 self._object = parent 

57 self.__update = None 

58 

59 def _connect_update(self, callback): 

60 """Called by daiquiri at the initialization to be informed by changes""" 

61 assert self.__update is None 

62 self.__update = callback 

63 

64 def emit_update(self, value): 

65 """To be called by the property itself when the hardware property have changed""" 

66 update = self.__update 

67 if update is not None: 

68 update(value) 

69 

70 @abstractmethod 

71 def read_hardware(self, obj): 

72 """Read the value from the subsystem""" 

73 ... 

74 

75 @abstractmethod 

76 def write_hardware(self, obj, value): 

77 """Write the value to the subsystem""" 

78 ... 

79 

80 @abstractmethod 

81 def connect_hardware(self, obj): 

82 """Do the connection of this property to the subsystem""" 

83 ... 

84 

85 

86class HardwareProperty: 

87 """Describe a property from the device controls system. 

88 

89 Provides translation function between the controls system specific 

90 nomenclature and the abstract one. This can be overwritten. 

91 

92 Arguments: 

93 name: Name used by the controls system 

94 """ 

95 

96 def __init__(self, name: str, getter: callable = None): 

97 self._name = name 

98 self._getter = getter 

99 

100 @property 

101 def name(self) -> str: 

102 return self._name 

103 

104 @property 

105 def getter(self) -> typing.Optional[callable]: 

106 return self._getter 

107 

108 def translate_from(self, value): 

109 """Translate the value from the controls layer to the abstraction layer 

110 i.e for getters 

111 

112 Arguments: 

113 value: The property value to translate. 

114 

115 Returns: 

116 The translated value 

117 """ 

118 return value 

119 

120 def translate_to(self, value): 

121 """Translate the value to the controls layer from the abstraction layer 

122 i.e for setters 

123 

124 Arguments: 

125 value: The property value to translate. 

126 

127 Returns: 

128 The translated value 

129 """ 

130 return value 

131 

132 

133class HardwareObject(ABC): 

134 """Base HardwareObject from which all inherit 

135 

136 The base hardware object defines the objects procotol, type, its properties 

137 and callables schema, and mechanisms to subscribe to property changes 

138 

139 Attributes: 

140 _object (obj): The instance of the control system object. 

141 _protocol (str): The protocol for this object, i.e. bliss 

142 _type (str): The object type, i.e. motor, shutter 

143 

144 """ 

145 

146 _object = None 

147 _online = False 

148 _state_ok = [] 

149 

150 _protocol = None 

151 _type = None 

152 

153 _properties = HardwareSchema() 

154 _callables = HardwareSchema() 

155 

156 def schema_name(self): 

157 ty = self._type 

158 return ty[0].upper() + ty[1:] 

159 

160 def __init__(self, *args, **kwargs): 

161 self._callbacks = {} 

162 self._online_callbacks = [] 

163 

164 self._id = kwargs.get("id", None) 

165 self._name = kwargs.get("name", "") 

166 self._require_staff = kwargs.get("require_staff", False) 

167 self._alias = None 

168 self._user_tags = [] 

169 

170 def get(self, prop: str): 

171 """Get a property from a hardware object 

172 

173 First checks the property is defined in the objects 

174 property schema, then delegates to the local getter 

175 implementation _get 

176 

177 Arguments: 

178 prop: The property to retreive. 

179 

180 Returns: 

181 The property value if the property exists, otherwise 

182 rasises an exception 

183 

184 """ 

185 if not self._online: 

186 return 

187 

188 if prop in self._properties: 

189 value = self._get(prop) 

190 return value 

191 else: 

192 raise KeyError("Unknown property {p}".format(p=prop)) 

193 

194 def set(self, prop: str, value): 

195 """Set a property on a hardware object 

196 

197 First checks the property is defined in the objects 

198 property schema, then delegates to the local setter 

199 implementation _set 

200 

201 Arguments: 

202 prop: The property to set. 

203 value: The property to set. 

204 

205 Returns: 

206 The the result from the object setter if the property exists 

207 otherwise raises an exception 

208 

209 """ 

210 if not self._online: 

211 return 

212 

213 if prop in self._properties: 

214 if self._properties.read_only(prop): 

215 raise AttributeError("Property {p} is read only".format(p=prop)) 

216 

217 value = self._properties.validate(prop, value) 

218 return self._set(prop, value) 

219 else: 

220 raise KeyError("Unknown property {p}".format(p=prop)) 

221 

222 def get_subobject_configs(self): 

223 """Returns a list of referenced objects own by this object.""" 

224 return [] 

225 

226 def call(self, function, value, **kwargs): 

227 """Calls a function on a hardware object 

228 

229 First checks the function is defined in the objects 

230 callables schema, then delegates to the local call 

231 implementation _call 

232 

233 Args: 

234 function (str): The function to call. 

235 value: The value to call the function with. 

236 

237 Returns: 

238 The the result from the object function if the function exists 

239 otherwise rasises an exception 

240 

241 """ 

242 if not self._online: 

243 return 

244 

245 if function in self._callables: 

246 value = self._callables.validate(function, value) 

247 # try: 

248 ret = self._call(function, value, **kwargs) 

249 if ret: 

250 return ret 

251 else: 

252 return True 

253 # except Exception as e: 

254 # return e 

255 else: 

256 raise KeyError("Unknown function {fn}".format(fn=function)) 

257 

258 @abstractmethod 

259 def _get(self, prop: str): 

260 """Local implementation of getter""" 

261 pass 

262 

263 @abstractmethod 

264 def _set(self, prop: str, value): 

265 """Local implementation of setter""" 

266 pass 

267 

268 @abstractmethod 

269 def _call(self, function, value): 

270 """Local implementation of call""" 

271 pass 

272 

273 def state(self, **kwargs): 

274 """Gets the current state of a hardware object 

275 

276 Builds a dictionary of the basic info of the object, plus its properties, and 

277 callables. 

278 

279 Returns: 

280 A dict of the hardware object status 

281 

282 """ 

283 info = { 

284 k: getattr(self, "_" + k) 

285 for k in ["name", "type", "id", "protocol", "online"] 

286 } 

287 info["callables"] = [k for k in self._callables] 

288 info["errors"] = [] 

289 properties = {} 

290 for p in self._properties: 

291 try: 

292 properties[p] = self.get(p) 

293 except Exception as e: 

294 properties[p] = None 

295 info["online"] = False 

296 info["errors"].append( 

297 { 

298 "property": p, 

299 "exception": str(e), 

300 "traceback": "".join(traceback.format_tb(e.__traceback__)), 

301 } 

302 ) 

303 logger.exception(f"Couldn't get property `{p}` for `{self.name()}`") 

304 

305 try: 

306 info["properties"] = self._properties.dump(properties) 

307 except Exception: 

308 logger.error( 

309 "Error while serializing %s (%s)", 

310 self._name, 

311 properties, 

312 exc_info=True, 

313 ) 

314 info["properties"] = {} 

315 info["online"] = False 

316 

317 try: 

318 info["properties"]["state_ok"] = self.state_ok() 

319 except Exception: 

320 logger.debug( 

321 f"Could not determine `state_ok` for {self._name}", exec_info=True 

322 ) 

323 

324 info["require_staff"] = self.require_staff() 

325 info["alias"] = self.alias() 

326 info["user_tags"] = self.user_tags() 

327 

328 return info 

329 

330 def schema(self): 

331 """Returns the schema for the current hardware object 

332 

333 The hardware schema is built from the `HardwareObjectBaseSchema` 

334 and the object specific _property and _callable schema 

335 (both instances of `HardwareSchema`) 

336 

337 Returns: 

338 An instance of a schema 

339 

340 """ 

341 schema = type( 

342 f"HW{self.schema_name()}Schema", 

343 (HardwareObjectBaseSchema,), 

344 { 

345 "properties": fields.Nested(self._properties.__class__), 

346 "callables": fields.Nested(self._callables.__class__), 

347 }, 

348 ) 

349 return schema() 

350 

351 def set_online(self, state): 

352 """Set the online state of the device 

353 

354 Sets the state and execute any registered callbacks 

355 

356 Args: 

357 state (boolean): Set the online state 

358 """ 

359 self._online = state 

360 

361 for cb in self._online_callbacks: 

362 cb(self, self._online) 

363 

364 def subscribe_online(self, fn): 

365 """Subscribe to the online state of the hardware object 

366 

367 Add a function to a list of callbacks for when the online state of the object change 

368 

369 Args: 

370 fn: (:callable) The function to call when this property changes. 

371 

372 """ 

373 if not callable(fn): 

374 raise AttributeError("Callback function must be callable") 

375 

376 if not (fn in self._online_callbacks): 

377 self._online_callbacks.append(fn) 

378 

379 def id(self): 

380 return self._id 

381 

382 def name(self): 

383 return self._name 

384 

385 def type(self): 

386 return self._type 

387 

388 def object(self): 

389 return self._object 

390 

391 def online(self): 

392 return self._online 

393 

394 def alias(self) -> typing.Optional[str]: 

395 return self._alias 

396 

397 def user_tags(self) -> typing.List[str]: 

398 return self._user_tags 

399 

400 def require_staff(self): 

401 return self._require_staff 

402 

403 def state_ok(self): 

404 """Returns if the current object state is `ok`""" 

405 state = self.get("state") 

406 if isinstance(state, list): 

407 st = False 

408 for ok in self._state_ok: 

409 if ok in state: 

410 st = True 

411 return st 

412 

413 else: 

414 return state in self._state_ok 

415 

416 def subscribe(self, prop: str, fn): 

417 """Subscribe to property changes on the hardware object 

418 

419 Add a function to a list of callbacks for when properties on the object change 

420 

421 Arguments: 

422 prop: The property to subscribe to. Can pass 'all' to subscribe to all changes 

423 fn: (:callable) The function to call when this property changes. 

424 

425 """ 

426 if not callable(fn): 

427 raise AttributeError("Callback function must be callable") 

428 

429 if prop in self._properties or prop == "all": 

430 if not (prop in self._callbacks): 

431 self._callbacks[prop] = [] 

432 

433 if not (fn in self._callbacks[prop]): 

434 self._callbacks[prop].append(fn) 

435 

436 else: 

437 raise KeyError(f"No such property: {prop}") 

438 

439 def unsubscribe(self, prop: str, fn): 

440 """Unsubscribe from a property change on the hardware object 

441 

442 Arguments: 

443 prop: The property to unsubscribe from. 

444 fn: (:callable) The function to unsubscribe 

445 

446 """ 

447 if prop in self._callbacks: 

448 if fn in self._callbacks[prop]: 

449 # logger.debug("Unsubscribe from property %s, %s", prop, fn) 

450 self._callbacks[prop].remove(fn) 

451 return True 

452 

453 return False 

454 

455 def _execute_callbacks(self, name, value): 

456 if name in self._callbacks: 

457 for cb in self._callbacks[name]: 

458 cb(self, name, value) 

459 

460 if "all" in self._callbacks: 

461 for cb in self._callbacks["all"]: 

462 cb(self, name, value) 

463 

464 def _update(self, name: str, prop: HardwareProperty, value): 

465 """Internal function to call when a property has changed 

466 

467 This delegates to all subscribes that a property has changed 

468 

469 Arguments: 

470 name: Property name of the abstract device 

471 prop: Property of the control system hardware 

472 value: The new value (from the control system). 

473 

474 """ 

475 # logger.debug('{c}._update {n} - {p}: {v}'.format(c=self.__class__.__name__, n=self._address, p=prop, v=value)) 

476 if not isinstance(prop, AbstractHardwareProperty2): 

477 value = prop.translate_from(value) 

478 if name in self._properties: 

479 self._execute_callbacks(name, value) 

480 

481 # If `state` changes emit an updated `state_ok` 

482 if name == "state": 

483 self._execute_callbacks("state_ok", self.state_ok()) 

484 

485 

486class MappedHardwareObject(HardwareObject): 

487 """Hardware object that maps properties via a simple dict 

488 

489 HardwareObject that has a simple map between abstract properties and their 

490 actual properties on the object with fallback to a function on the parent 

491 """ 

492 

493 PROPERTY_MAP: typing.Dict[str, HardwareProperty] = {} 

494 CALLABLE_MAP: typing.Dict[str, str] = {} 

495 

496 def __init__(self, *args, **kwargs): 

497 HardwareObject.__init__(self, *args, **kwargs) 

498 self._property_map: typing.Dict[ 

499 str, HardwareProperty | AbstractHardwareProperty2 

500 ] = self._create_properties() 

501 self._callable_map: typing.Dict[str, str] = dict(self.CALLABLE_MAP) 

502 

503 for name, prop in self._property_map.items(): 

504 if isinstance(prop, AbstractHardwareProperty2): 

505 prop._connect_update(functools.partial(self._update, name, prop)) 

506 

507 def _create_properties( 

508 self, 

509 ) -> typing.Dict[str, HardwareProperty | AbstractHardwareProperty2]: 

510 """Return the properties to be used for this hardware object 

511 

512 The default implementation reads the descriptions from the 

513 class attribute `PROPERTY_MAP`. 

514 """ 

515 return dict(self.PROPERTY_MAP) 

516 

517 def _set(self, prop: str, value): 

518 """Set a property on the child object 

519 

520 First try from the simple property map which maps properties to attributes 

521 on the child object. Delegates to _do_set which locally implements the setter 

522 

523 Second, if not in the map, try calling the function _get_<prop> on the parent 

524 

525 Args: 

526 prop: The property to set. 

527 value: Its value. 

528 """ 

529 hprop = self._property_map.get(prop) 

530 if hprop is not None: 

531 if not isinstance(hprop, AbstractHardwareProperty2): 

532 value = hprop.translate_to(value) 

533 self._do_set(hprop, value) 

534 else: 

535 raise KeyError( 

536 f"Couldnt find a setter for property `{prop}` on `{self.name()}`" 

537 ) 

538 

539 @abstractmethod 

540 def _do_set(self, prop, value): 

541 """Local implementation of how to set the property on the object""" 

542 pass 

543 

544 def _get(self, prop: str): 

545 """Get a property from the child object 

546 

547 First try from the simple property map which maps properties to attributes 

548 on the child object. Delegates to _do_get which locally implements the getter 

549 

550 Second, if not in the map, try calling the function _get_<prop> on the parent 

551 

552 Arguments: 

553 prop: The property to set. 

554 

555 Returns: 

556 The property value 

557 """ 

558 hprop = self._property_map.get(prop) 

559 if hprop is not None: 

560 try: 

561 hvalue = self._do_get(hprop) 

562 if not isinstance(hprop, AbstractHardwareProperty2): 

563 hvalue = hprop.translate_from(hvalue) 

564 return hvalue 

565 except NotImplementedError: 

566 logger.info("Object %s does not implement %s", self._id, prop) 

567 

568 # `state_ok` is dynamically generated from `state` 

569 elif prop == "state_ok": 

570 pass 

571 else: 

572 raise KeyError( 

573 f"Couldnt find a getter for property `{prop}` on `{self.name()}`" 

574 ) 

575 

576 @abstractmethod 

577 def _do_get(self, prop): 

578 """Local implementation of how to get the property from the object""" 

579 pass 

580 

581 def _call(self, function, value, **kwargs): 

582 """Call a function on the child object 

583 

584 First try from the simple function map which maps to function names 

585 on the child object. Delegates to _do_call which locally implements the getter 

586 

587 Second, if not in the map, try calling the function _call_<fn> on the parent 

588 

589 Args: 

590 function (str): The function to call. 

591 value: The value to call the function with 

592 

593 Returns: 

594 True if function successfully called 

595 """ 

596 if function in self._callable_map: 

597 ret = self._do_call(function, value, **kwargs) 

598 elif hasattr(self, "_call_{fn}".format(fn=function)): 

599 ret = getattr(self, "_call_{fn}".format(fn=function))(value, **kwargs) 

600 

601 else: 

602 raise KeyError( 

603 f"Couldnt find a handler for function `{function}` on `{self.name()}`" 

604 ) 

605 

606 if ret is None: 

607 # When a function returns nothing 

608 ret = True 

609 

610 return ret 

611 

612 @abstractmethod 

613 def _do_call(self, function, value, **kwargs): 

614 """ 

615 Local implementation of how to get the function from the object 

616 """ 

617 pass