Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/multivisor/object.py: 72%
39 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
1# from blinker import signal
2import logging
3from daiquiri.core.hardware.abstract import MappedHardwareObject, HardwareProperty
4from ._httpclient import Multivisor
6logger = logging.getLogger(__name__)
9class MultivisorObject(MappedHardwareObject):
10 """
11 Base object to handle hardware object from Multivisor.
13 An hardware object can be defined such way in Daiquiri, from the
14 Multivisor UID of the service.
16 .. code-block: yaml
18 - url: multivisor://mainstation:id00:metadata
20 Arguments:
21 app: Link to flask
22 address: Address of the object inside Multivisor (it is the mulivisor uid)
23 multivisor: Access to the Multivisor API
24 """
26 _protocol = "multivisor"
28 def __init__(self, app, address: str, multivisor: Multivisor, **kwargs):
29 self._app = app
30 self._uid = address
31 self._multivisor = multivisor
32 super().__init__(**kwargs)
33 self._connect()
35 def __repr__(self) -> str:
36 return f"<Multivisor: {self._uid} ({self.__class__.__name__})>"
38 def _connect(self):
39 """Connect this daiquiri hardware to the remote service"""
40 self._multivisor.register_state(self._uid, self._state_updated)
41 state = self._multivisor.get_state(self._uid)
42 self._state_updated(self._uid, state)
44 def _state_updated(self, name, state):
45 prop = self._property_map["state"]
46 self.set_online(state != "UNKNOWN")
47 self._update("state", prop, state)
49 def _disconnect(self):
50 """Disconnect this daiquiri hardware from the remote service"""
51 self._multivisor.unregister_state(self._uid, self._state_updated)
52 self.set_online(False)
54 def _do_set(self, prop: HardwareProperty, value):
55 raise Exception("Unsupported")
57 def _do_get(self, prop: HardwareProperty):
58 if prop.name == "state":
59 state = self._multivisor.get_state(self._uid)
60 return state
61 raise Exception("Unsupported")
63 def _do_call(self, function, value, **kwargs):
64 if function == "start":
65 self._multivisor.start(self._uid)
66 elif function == "restart":
67 self._multivisor.restart(self._uid)
68 elif function == "stop":
69 self._multivisor.stop(self._uid)