Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/multivisor/_httpclient.py: 66%
121 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
1import json
3import requests
4import typing
5import gevent
6import logging
8logger = logging.getLogger(__name__)
11class _MultivisorApi:
12 """Remote access to Multivisor service based on HTTP.
14 It's a raw copy-paste from https://github.com/tiagocoutinho/multivisor/blob/develop/multivisor/client/http.py
15 With small changes.
17 Under GNU GENERAL PUBLIC LICENSE
18 """
20 def __init__(self, url):
21 self.url = url
22 self._status = None
23 self.notifications = []
24 self.on_process_changed = None
26 def stop_processes(self, *names):
27 return self.post("/api/process/stop", data=dict(uid=[",".join(names)]))
29 def restart_processes(self, *names):
30 return self.post("/api/process/restart", data=dict(uid=[",".join(names)]))
32 @property
33 def status(self):
34 if self._status is None:
35 self._status = self._get_status()
36 return self._status
38 @staticmethod
39 def _update_status_stats(status):
40 supervisors, processes = status["supervisors"], status["processes"]
41 s_stats = dict(
42 running=sum((s["running"] for s in status["supervisors"].values())),
43 total=len(supervisors),
44 )
45 s_stats["stopped"] = s_stats["total"] - s_stats["running"]
46 p_stats = dict(
47 running=sum((p["running"] for p in status["processes"].values())),
48 total=len(processes),
49 )
50 p_stats["stopped"] = p_stats["total"] - p_stats["running"]
51 stats = dict(supervisors=s_stats, processes=p_stats)
52 status["stats"] = stats
53 return stats
55 def _get_status(self):
56 status = self.get("/api/data").json()
57 # reorganize status per process
58 status["processes"] = processes = {}
59 for supervisor in status["supervisors"].values():
60 processes.update(supervisor["processes"])
61 self._update_status_stats(status)
62 return status
64 def refresh_status(self):
65 self._status = None
66 return self.status
68 def get(self, url, params=None, **kwargs):
69 result = requests.get(self.url + url, params=params, timeout=10, **kwargs)
70 result.raise_for_status()
71 return result
73 def post(self, url, data=None, json=None, **kwargs):
74 result = requests.post(
75 self.url + url, data=data, json=json, timeout=10, **kwargs
76 )
77 result.raise_for_status()
78 return result
80 def __getitem__(self, item):
81 return self.get(item).json()
83 def __setitem__(self, item, value):
84 self.post(item, data=value)
86 def events(self):
87 stream = self.get("/api/stream", stream=True)
88 for line in stream.iter_lines():
89 if line:
90 line = line.decode("utf-8")
91 if line.startswith("data:"):
92 line = line[5:]
93 try:
94 yield json.loads(line)
95 except ValueError:
96 print("error", line)
98 def run(self):
99 for event in self.events():
100 status = self.status
101 name, payload = event["event"], event["payload"]
102 if name == "process_changed":
103 status["processes"][payload["uid"]].update(payload)
104 self.on_process_changed(payload)
105 self._update_status_stats(status)
106 elif name == "notification":
107 # self.notifications.append(payload)
108 pass
111class Multivisor:
112 def __init__(self, url):
113 self.__url = url
114 self.__monitor = None
115 self.__callbacks: typing.Dict[str, typing.List[typing.Callable]] = {}
116 self._api = _MultivisorApi(url)
117 self._api.on_process_changed = self._on_process_changed
118 self._api.refresh_status()
119 self._spawn()
121 def _spawn(self):
122 self.__monitor = gevent.spawn(self._api.run)
124 def disconnect(self):
125 self.__monitor.kill()
126 self.__monitor = None
127 self.__callbacks = None
129 @property
130 def url(self):
131 return self.__url
133 def start(self, name):
134 # There is no start, use restart
135 self._api.restart_processes(name)
137 def restart(self, name):
138 self._api.restart_processes(name)
140 def stop(self, name):
141 self._api.stop_processes(name)
143 def _on_process_changed(self, payload):
144 uid = payload["uid"]
145 statename = payload.get("statename")
146 if statename is not None:
147 callbacks = self.__callbacks.get(uid)
148 if callbacks:
149 for c in callbacks:
150 c(name=uid, state=statename)
152 def register_state(self, name, callback):
153 callbacks = self.__callbacks.get(name)
154 if callbacks is None:
155 callbacks = []
156 self.__callbacks[name] = callbacks
157 callbacks.append(callback)
159 def unregister_state(self, name, callback):
160 callbacks = self.__callbacks.get(name)
161 if callbacks is None:
162 raise ValueError(f"Service uid {name} was not registered")
163 callbacks.remove(callback)
165 def get_state(self, name):
166 status = self._api.status
167 meta = status["processes"].get(name, {"statename": "UNKNOWN"})
168 return meta.get("statename", "UNKNOWN")