Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/multivisor/object.py: 72%

39 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-15 02:12 +0000

1# from blinker import signal 

2import logging 

3from daiquiri.core.hardware.abstract import MappedHardwareObject, HardwareProperty 

4from ._httpclient import Multivisor 

5 

6logger = logging.getLogger(__name__) 

7 

8 

9class MultivisorObject(MappedHardwareObject): 

10 """ 

11 Base object to handle hardware object from Multivisor. 

12 

13 An hardware object can be defined such way in Daiquiri, from the 

14 Multivisor UID of the service. 

15 

16 .. code-block: yaml 

17 

18 - url: multivisor://mainstation:id00:metadata 

19 

20 Arguments: 

21 app: Link to flask 

22 address: Address of the object inside Multivisor (it is the mulivisor uid) 

23 multivisor: Access to the Multivisor API 

24 """ 

25 

26 _protocol = "multivisor" 

27 

28 def __init__(self, app, address: str, multivisor: Multivisor, **kwargs): 

29 self._app = app 

30 self._uid = address 

31 self._multivisor = multivisor 

32 super().__init__(**kwargs) 

33 self._connect() 

34 

35 def __repr__(self) -> str: 

36 return f"<Multivisor: {self._uid} ({self.__class__.__name__})>" 

37 

38 def _connect(self): 

39 """Connect this daiquiri hardware to the remote service""" 

40 self._multivisor.register_state(self._uid, self._state_updated) 

41 state = self._multivisor.get_state(self._uid) 

42 self._state_updated(self._uid, state) 

43 

44 def _state_updated(self, name, state): 

45 prop = self._property_map["state"] 

46 self.set_online(state != "UNKNOWN") 

47 self._update("state", prop, state) 

48 

49 def _disconnect(self): 

50 """Disconnect this daiquiri hardware from the remote service""" 

51 self._multivisor.unregister_state(self._uid, self._state_updated) 

52 self.set_online(False) 

53 

54 def _do_set(self, prop: HardwareProperty, value): 

55 raise Exception("Unsupported") 

56 

57 def _do_get(self, prop: HardwareProperty): 

58 if prop.name == "state": 

59 state = self._multivisor.get_state(self._uid) 

60 return state 

61 raise Exception("Unsupported") 

62 

63 def _do_call(self, function, value, **kwargs): 

64 if function == "start": 

65 self._multivisor.start(self._uid) 

66 elif function == "restart": 

67 self._multivisor.restart(self._uid) 

68 elif function == "stop": 

69 self._multivisor.stop(self._uid)