Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/abstract/__init__.py: 89%

270 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-02-06 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3from __future__ import annotations 

4from abc import ABC, abstractmethod 

5from marshmallow import fields 

6import traceback 

7import typing 

8import functools 

9 

10from daiquiri.core.schema.hardware import HardwareSchema, HardwareObjectBaseSchema 

11 

12 

13import logging 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18class ProtocolHandler: 

19 """A protocol handler instantiates a hardware object from a specific protocol 

20 

21 i.e. initialise a motor object from bliss, or a shutter from tango 

22 """ 

23 

24 library = None 

25 

26 def __init__(self, *args, **kwargs): 

27 self._app = kwargs.get("app") 

28 

29 def disconnect(self): 

30 """Called at the termination of the handler. 

31 

32 Implement it if the protocol hold resources. 

33 """ 

34 pass 

35 

36 @abstractmethod 

37 def get(self, *args, **kwargs): 

38 """Get the specific object from the protocol handler. 

39 

40 This function checks that kwargs conforms to Schema defined for the 

41 protocol handler (see core/hardware/bliss/__init__.py for a concrete example) 

42 

43 Returns: 

44 The hardware object instance for the specific protocol 

45 

46 """ 

47 pass 

48 

49 def create_monitor(self): 

50 """Create a global monitor at global state of the handler""" 

51 return None 

52 

53 

54class AbstractHardwareProperty2(ABC): 

55 """ 

56 Implement an automomous property. 

57 """ 

58 

59 def __init__(self, parent: MappedHardwareObject): 

60 self._object = parent 

61 self.__update = None 

62 

63 def _connect_update(self, callback): 

64 """Called by daiquiri at the initialization to be informed by changes""" 

65 assert self.__update is None 

66 self.__update = callback 

67 

68 def emit_update(self, value): 

69 """To be called by the property itself when the hardware property have changed""" 

70 update = self.__update 

71 if update is not None: 

72 update(value) 

73 

74 @abstractmethod 

75 def read_hardware(self, obj): 

76 """Read the value from the subsystem""" 

77 ... 

78 

79 @abstractmethod 

80 def write_hardware(self, obj, value): 

81 """Write the value to the subsystem""" 

82 ... 

83 

84 @abstractmethod 

85 def connect_hardware(self, obj): 

86 """Do the connection of this property to the subsystem""" 

87 ... 

88 

89 

90class HardwareProperty: 

91 """Describe a property from the device controls system. 

92 

93 Provides translation function between the controls system specific 

94 nomenclature and the abstract one. This can be overwritten. 

95 

96 Arguments: 

97 name: Name used by the controls system 

98 """ 

99 

100 def __init__(self, name: str, getter: callable = None): 

101 self._name = name 

102 self._getter = getter 

103 

104 @property 

105 def name(self) -> str: 

106 return self._name 

107 

108 @property 

109 def getter(self) -> typing.Optional[callable]: 

110 return self._getter 

111 

112 def translate_from(self, value): 

113 """Translate the value from the controls layer to the abstraction layer 

114 i.e for getters 

115 

116 Arguments: 

117 value: The property value to translate. 

118 

119 Returns: 

120 The translated value 

121 """ 

122 return value 

123 

124 def translate_to(self, value): 

125 """Translate the value to the controls layer from the abstraction layer 

126 i.e for setters 

127 

128 Arguments: 

129 value: The property value to translate. 

130 

131 Returns: 

132 The translated value 

133 """ 

134 return value 

135 

136 

137class HardwareObject(ABC): 

138 """Base HardwareObject from which all inherit 

139 

140 The base hardware object defines the objects procotol, type, its properties 

141 and callables schema, and mechanisms to subscribe to property changes 

142 

143 Attributes: 

144 _object (obj): The instance of the control system object. 

145 _protocol (str): The protocol for this object, i.e. bliss 

146 _type (str): The object type, i.e. motor, shutter 

147 

148 """ 

149 

150 _object = None 

151 _online = False 

152 _state_ok = [] 

153 

154 _protocol = None 

155 _type = None 

156 

157 _properties = HardwareSchema() 

158 _callables = HardwareSchema() 

159 

160 def schema_name(self): 

161 ty = self._type 

162 return ty[0].upper() + ty[1:] 

163 

164 def __init__(self, *args, **kwargs): 

165 self._callbacks = {} 

166 self._online_callbacks = [] 

167 self._locked_callbacks = [] 

168 self._locked: str | None = None 

169 

170 self._id = kwargs.get("id", None) 

171 self._name = kwargs.get("name", "") 

172 self._require_staff = kwargs.get("require_staff", False) 

173 self._alias = None 

174 self._user_tags = [] 

175 

176 def get(self, prop: str): 

177 """Get a property from a hardware object 

178 

179 First checks the property is defined in the objects 

180 property schema, then delegates to the local getter 

181 implementation _get 

182 

183 Arguments: 

184 prop: The property to retreive. 

185 

186 Returns: 

187 The property value if the property exists, otherwise 

188 rasises an exception 

189 

190 """ 

191 if not self._online: 

192 return 

193 

194 if prop in self._properties: 

195 value = self._get(prop) 

196 return value 

197 else: 

198 raise KeyError(f"Unknown property '{prop}' from object {self._name}") 

199 

200 def set(self, prop: str, value): 

201 """Set a property on a hardware object 

202 

203 First checks the property is defined in the objects 

204 property schema, then delegates to the local setter 

205 implementation _set 

206 

207 Arguments: 

208 prop: The property to set. 

209 value: The property to set. 

210 

211 Returns: 

212 The the result from the object setter if the property exists 

213 otherwise raises an exception 

214 

215 """ 

216 if not self._online: 

217 return 

218 

219 if prop in self._properties: 

220 if self._properties.read_only(prop): 

221 raise AttributeError("Property {p} is read only".format(p=prop)) 

222 

223 value = self._properties.validate(prop, value) 

224 return self._set(prop, value) 

225 else: 

226 raise KeyError("Unknown property {p}".format(p=prop)) 

227 

228 def get_subobject_configs(self): 

229 """Returns a list of referenced objects own by this object.""" 

230 return [] 

231 

232 def call(self, function, value, **kwargs): 

233 """Calls a function on a hardware object 

234 

235 First checks the function is defined in the objects 

236 callables schema, then delegates to the local call 

237 implementation _call 

238 

239 Args: 

240 function (str): The function to call. 

241 value: The value to call the function with. 

242 

243 Returns: 

244 The the result from the object function if the function exists 

245 otherwise rasises an exception 

246 

247 """ 

248 if not self._online: 

249 return 

250 

251 if function in self._callables: 

252 value = self._callables.validate(function, value) 

253 # try: 

254 ret = self._call(function, value, **kwargs) 

255 if ret: 

256 return ret 

257 else: 

258 return True 

259 # except Exception as e: 

260 # return e 

261 else: 

262 raise KeyError("Unknown function {fn}".format(fn=function)) 

263 

264 @abstractmethod 

265 def _get(self, prop: str): 

266 """Local implementation of getter""" 

267 pass 

268 

269 @abstractmethod 

270 def _set(self, prop: str, value): 

271 """Local implementation of setter""" 

272 pass 

273 

274 @abstractmethod 

275 def _call(self, function, value): 

276 """Local implementation of call""" 

277 pass 

278 

279 def state(self, **kwargs): 

280 """Gets the current state of a hardware object 

281 

282 Builds a dictionary of the basic info of the object, plus its properties, and 

283 callables. 

284 

285 Returns: 

286 A dict of the hardware object status 

287 

288 """ 

289 info = { 

290 k: getattr(self, "_" + k) 

291 for k in ["name", "type", "id", "protocol", "online", "locked"] 

292 } 

293 info["callables"] = [k for k in self._callables] 

294 info["errors"] = [] 

295 properties = {} 

296 for p in self._properties: 

297 try: 

298 properties[p] = self.get(p) 

299 except Exception as e: 

300 properties[p] = None 

301 info["online"] = False 

302 info["errors"].append( 

303 { 

304 "property": p, 

305 "exception": str(e), 

306 "traceback": "".join(traceback.format_tb(e.__traceback__)), 

307 } 

308 ) 

309 logger.exception(f"Couldn't get property `{p}` for `{self.name()}`") 

310 

311 try: 

312 info["properties"] = self._properties.dump(properties) 

313 except Exception: 

314 logger.error( 

315 "Error while serializing %s (%s)", 

316 self._name, 

317 properties, 

318 exc_info=True, 

319 ) 

320 info["properties"] = {} 

321 info["online"] = False 

322 

323 try: 

324 info["properties"]["state_ok"] = self.state_ok() 

325 except Exception: 

326 logger.debug( 

327 f"Could not determine `state_ok` for {self._name}", exec_info=True 

328 ) 

329 

330 info["require_staff"] = self.require_staff() 

331 info["alias"] = self.alias() 

332 info["user_tags"] = self.user_tags() 

333 

334 return info 

335 

336 def schema(self): 

337 """Returns the schema for the current hardware object 

338 

339 The hardware schema is built from the `HardwareObjectBaseSchema` 

340 and the object specific _property and _callable schema 

341 (both instances of `HardwareSchema`) 

342 

343 Returns: 

344 An instance of a schema 

345 

346 """ 

347 schema = type( 

348 f"HW{self.schema_name()}Schema", 

349 (HardwareObjectBaseSchema,), 

350 { 

351 "properties": fields.Nested(self._properties.__class__), 

352 "callables": fields.Nested(self._callables.__class__), 

353 }, 

354 ) 

355 return schema() 

356 

357 def set_online(self, state): 

358 """Set the online state of the device 

359 

360 Sets the state and execute any registered callbacks 

361 

362 Args: 

363 state (boolean): Set the online state 

364 """ 

365 self._online = state 

366 

367 for cb in self._online_callbacks: 

368 cb(self, self._online) 

369 

370 def set_locked(self, reason: str | None): 

371 """Set the device locked for a reason. 

372 

373 Argument: 

374 reason: Locking reason. If none the device is not locked. 

375 """ 

376 if self._locked == reason: 

377 return 

378 self._locked = reason 

379 for cb in self._locked_callbacks: 

380 cb(self, self._locked) 

381 

382 def subscribe_online(self, fn): 

383 """Subscribe to the online state of the hardware object 

384 

385 Add a function to a list of callbacks for when the online state of the object change 

386 

387 Args: 

388 fn: (:callable) The function to call when this property changes. 

389 

390 """ 

391 if not callable(fn): 

392 raise AttributeError("Callback function must be callable") 

393 

394 if fn not in self._online_callbacks: 

395 self._online_callbacks.append(fn) 

396 

397 def subscribe_locked(self, fn): 

398 """Subscribe to the locked state of the hardware object 

399 

400 Add a function to a list of callbacks for when the locked state of the object change 

401 

402 Args: 

403 fn: (:callable) The function to call when this property changes. 

404 

405 """ 

406 if not callable(fn): 

407 raise AttributeError("Callback function must be callable") 

408 

409 if fn not in self._locked_callbacks: 

410 self._locked_callbacks.append(fn) 

411 

412 def id(self): 

413 return self._id 

414 

415 def name(self): 

416 return self._name 

417 

418 def type(self): 

419 return self._type 

420 

421 def object(self): 

422 return self._object 

423 

424 def online(self): 

425 return self._online 

426 

427 def locked(self) -> str | None: 

428 return self._locked 

429 

430 def alias(self) -> typing.Optional[str]: 

431 return self._alias 

432 

433 def user_tags(self) -> typing.List[str]: 

434 return self._user_tags 

435 

436 def require_staff(self): 

437 return self._require_staff 

438 

439 def state_ok(self): 

440 """Returns if the current object state is `ok`""" 

441 state = self.get("state") 

442 return self._is_state_ok(state) 

443 

444 def _is_state_ok(self, state: str): 

445 if isinstance(state, list): 

446 st = False 

447 for ok in self._state_ok: 

448 if ok in state: 

449 st = True 

450 return st 

451 else: 

452 return state in self._state_ok 

453 

454 def subscribe(self, prop: str, fn): 

455 """Subscribe to property changes on the hardware object 

456 

457 Add a function to a list of callbacks for when properties on the object change 

458 

459 Arguments: 

460 prop: The property to subscribe to. Can pass 'all' to subscribe to all changes 

461 fn: (:callable) The function to call when this property changes. 

462 

463 """ 

464 if not callable(fn): 

465 raise AttributeError("Callback function must be callable") 

466 

467 if prop in self._properties or prop == "all": 

468 if not (prop in self._callbacks): 

469 self._callbacks[prop] = [] 

470 

471 if not (fn in self._callbacks[prop]): 

472 self._callbacks[prop].append(fn) 

473 

474 else: 

475 raise KeyError(f"No such property: {prop}") 

476 

477 def unsubscribe(self, prop: str, fn): 

478 """Unsubscribe from a property change on the hardware object 

479 

480 Arguments: 

481 prop: The property to unsubscribe from. 

482 fn: (:callable) The function to unsubscribe 

483 

484 """ 

485 if prop in self._callbacks: 

486 if fn in self._callbacks[prop]: 

487 # logger.debug("Unsubscribe from property %s, %s", prop, fn) 

488 self._callbacks[prop].remove(fn) 

489 return True 

490 

491 return False 

492 

493 def _execute_callbacks(self, name, value): 

494 if name in self._callbacks: 

495 for cb in self._callbacks[name]: 

496 cb(self, name, value) 

497 

498 if "all" in self._callbacks: 

499 for cb in self._callbacks["all"]: 

500 cb(self, name, value) 

501 

502 def _update(self, name: str, prop: HardwareProperty, value): 

503 """Internal function to call when a property has changed 

504 

505 This delegates to all subscribes that a property has changed 

506 

507 Arguments: 

508 name: Property name of the abstract device 

509 prop: Property of the control system hardware 

510 value: The new value (from the control system). 

511 

512 """ 

513 # logger.debug('{c}._update {n} - {p}: {v}'.format(c=self.__class__.__name__, n=self._address, p=prop, v=value)) 

514 if not isinstance(prop, AbstractHardwareProperty2): 

515 value = prop.translate_from(value) 

516 if name in self._properties: 

517 self._execute_callbacks(name, value) 

518 

519 # If `state` changes emit an updated `state_ok` 

520 if name == "state": 

521 self._execute_callbacks("state_ok", self._is_state_ok(value)) 

522 

523 

524class MappedHardwareObject(HardwareObject): 

525 """Hardware object that maps properties via a simple dict 

526 

527 HardwareObject that has a simple map between abstract properties and their 

528 actual properties on the object with fallback to a function on the parent 

529 """ 

530 

531 PROPERTY_MAP: typing.Dict[str, HardwareProperty] = {} 

532 CALLABLE_MAP: typing.Dict[str, str] = {} 

533 

534 def __init__(self, *args, **kwargs): 

535 HardwareObject.__init__(self, *args, **kwargs) 

536 self._property_map: typing.Dict[ 

537 str, HardwareProperty | AbstractHardwareProperty2 

538 ] = self._create_properties() 

539 self._callable_map: typing.Dict[str, str] = dict(self.CALLABLE_MAP) 

540 

541 for name, prop in self._property_map.items(): 

542 if isinstance(prop, AbstractHardwareProperty2): 

543 prop._connect_update(functools.partial(self._update, name, prop)) 

544 

545 def _create_properties( 

546 self, 

547 ) -> typing.Dict[str, HardwareProperty | AbstractHardwareProperty2]: 

548 """Return the properties to be used for this hardware object 

549 

550 The default implementation reads the descriptions from the 

551 class attribute `PROPERTY_MAP`. 

552 """ 

553 return dict(self.PROPERTY_MAP) 

554 

555 def _set(self, prop: str, value): 

556 """Set a property on the child object 

557 

558 First try from the simple property map which maps properties to attributes 

559 on the child object. Delegates to _do_set which locally implements the setter 

560 

561 Second, if not in the map, try calling the function _get_<prop> on the parent 

562 

563 Args: 

564 prop: The property to set. 

565 value: Its value. 

566 """ 

567 hprop = self._property_map.get(prop) 

568 if hprop is not None: 

569 if not isinstance(hprop, AbstractHardwareProperty2): 

570 value = hprop.translate_to(value) 

571 self._do_set(hprop, value) 

572 else: 

573 raise KeyError( 

574 f"Couldnt find a setter for property `{prop}` on `{self.name()}`" 

575 ) 

576 

577 @abstractmethod 

578 def _do_set(self, prop, value): 

579 """Local implementation of how to set the property on the object""" 

580 pass 

581 

582 def _get(self, prop: str): 

583 """Get a property from the child object 

584 

585 First try from the simple property map which maps properties to attributes 

586 on the child object. Delegates to _do_get which locally implements the getter 

587 

588 Second, if not in the map, try calling the function _get_<prop> on the parent 

589 

590 Arguments: 

591 prop: The property to set. 

592 

593 Returns: 

594 The property value 

595 """ 

596 hprop = self._property_map.get(prop) 

597 if hprop is not None: 

598 try: 

599 hvalue = self._do_get(hprop) 

600 if not isinstance(hprop, AbstractHardwareProperty2): 

601 hvalue = hprop.translate_from(hvalue) 

602 return hvalue 

603 except NotImplementedError: 

604 logger.info("Object %s does not implement %s", self._id, prop) 

605 

606 # `state_ok` is dynamically generated from `state` 

607 elif prop == "state_ok": 

608 pass 

609 else: 

610 raise KeyError( 

611 f"Couldnt find a getter for property `{prop}` on `{self.name()}`" 

612 ) 

613 

614 @abstractmethod 

615 def _do_get(self, prop): 

616 """Local implementation of how to get the property from the object""" 

617 pass 

618 

619 def _call(self, function, value, **kwargs): 

620 """Call a function on the child object 

621 

622 First try from the simple function map which maps to function names 

623 on the child object. Delegates to _do_call which locally implements the getter 

624 

625 Second, if not in the map, try calling the function _call_<fn> on the parent 

626 

627 Args: 

628 function (str): The function to call. 

629 value: The value to call the function with 

630 

631 Returns: 

632 True if function successfully called 

633 """ 

634 if function in self._callable_map: 

635 ret = self._do_call(function, value, **kwargs) 

636 elif hasattr(self, "_call_{fn}".format(fn=function)): 

637 ret = getattr(self, "_call_{fn}".format(fn=function))(value, **kwargs) 

638 

639 else: 

640 raise KeyError( 

641 f"Couldnt find a handler for function `{function}` on `{self.name()}`" 

642 ) 

643 

644 if ret is None: 

645 # When a function returns nothing 

646 ret = True 

647 

648 return ret 

649 

650 @abstractmethod 

651 def _do_call(self, function, value, **kwargs): 

652 """ 

653 Local implementation of how to get the function from the object 

654 """ 

655 pass