Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/multivisor/_httpclient.py: 66%

121 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 02:13 +0000

1import json 

2 

3import requests 

4import typing 

5import gevent 

6import logging 

7 

8logger = logging.getLogger(__name__) 

9 

10 

11class _MultivisorApi: 

12 """Remote access to Multivisor service based on HTTP. 

13 

14 It's a raw copy-paste from https://github.com/tiagocoutinho/multivisor/blob/develop/multivisor/client/http.py 

15 With small changes. 

16 

17 Under GNU GENERAL PUBLIC LICENSE 

18 """ 

19 

20 def __init__(self, url): 

21 self.url = url 

22 self._status = None 

23 self.notifications = [] 

24 self.on_process_changed = None 

25 

26 def stop_processes(self, *names): 

27 return self.post("/api/process/stop", data=dict(uid=[",".join(names)])) 

28 

29 def restart_processes(self, *names): 

30 return self.post("/api/process/restart", data=dict(uid=[",".join(names)])) 

31 

32 @property 

33 def status(self): 

34 if self._status is None: 

35 self._status = self._get_status() 

36 return self._status 

37 

38 @staticmethod 

39 def _update_status_stats(status): 

40 supervisors, processes = status["supervisors"], status["processes"] 

41 s_stats = dict( 

42 running=sum((s["running"] for s in status["supervisors"].values())), 

43 total=len(supervisors), 

44 ) 

45 s_stats["stopped"] = s_stats["total"] - s_stats["running"] 

46 p_stats = dict( 

47 running=sum((p["running"] for p in status["processes"].values())), 

48 total=len(processes), 

49 ) 

50 p_stats["stopped"] = p_stats["total"] - p_stats["running"] 

51 stats = dict(supervisors=s_stats, processes=p_stats) 

52 status["stats"] = stats 

53 return stats 

54 

55 def _get_status(self): 

56 status = self.get("/api/data").json() 

57 # reorganize status per process 

58 status["processes"] = processes = {} 

59 for supervisor in status["supervisors"].values(): 

60 processes.update(supervisor["processes"]) 

61 self._update_status_stats(status) 

62 return status 

63 

64 def refresh_status(self): 

65 self._status = None 

66 return self.status 

67 

68 def get(self, url, params=None, **kwargs): 

69 result = requests.get(self.url + url, params=params, timeout=10, **kwargs) 

70 result.raise_for_status() 

71 return result 

72 

73 def post(self, url, data=None, json=None, **kwargs): 

74 result = requests.post( 

75 self.url + url, data=data, json=json, timeout=10, **kwargs 

76 ) 

77 result.raise_for_status() 

78 return result 

79 

80 def __getitem__(self, item): 

81 return self.get(item).json() 

82 

83 def __setitem__(self, item, value): 

84 self.post(item, data=value) 

85 

86 def events(self): 

87 stream = self.get("/api/stream", stream=True) 

88 for line in stream.iter_lines(): 

89 if line: 

90 line = line.decode("utf-8") 

91 if line.startswith("data:"): 

92 line = line[5:] 

93 try: 

94 yield json.loads(line) 

95 except ValueError: 

96 print("error", line) 

97 

98 def run(self): 

99 for event in self.events(): 

100 status = self.status 

101 name, payload = event["event"], event["payload"] 

102 if name == "process_changed": 

103 status["processes"][payload["uid"]].update(payload) 

104 self.on_process_changed(payload) 

105 self._update_status_stats(status) 

106 elif name == "notification": 

107 # self.notifications.append(payload) 

108 pass 

109 

110 

111class Multivisor: 

112 def __init__(self, url): 

113 self.__url = url 

114 self.__monitor = None 

115 self.__callbacks: typing.Dict[str, typing.List[typing.Callable]] = {} 

116 self._api = _MultivisorApi(url) 

117 self._api.on_process_changed = self._on_process_changed 

118 self._api.refresh_status() 

119 self._spawn() 

120 

121 def _spawn(self): 

122 self.__monitor = gevent.spawn(self._api.run) 

123 

124 def disconnect(self): 

125 self.__monitor.kill() 

126 self.__monitor = None 

127 self.__callbacks = None 

128 

129 @property 

130 def url(self): 

131 return self.__url 

132 

133 def start(self, name): 

134 # There is no start, use restart 

135 self._api.restart_processes(name) 

136 

137 def restart(self, name): 

138 self._api.restart_processes(name) 

139 

140 def stop(self, name): 

141 self._api.stop_processes(name) 

142 

143 def _on_process_changed(self, payload): 

144 uid = payload["uid"] 

145 statename = payload.get("statename") 

146 if statename is not None: 

147 callbacks = self.__callbacks.get(uid) 

148 if callbacks: 

149 for c in callbacks: 

150 c(name=uid, state=statename) 

151 

152 def register_state(self, name, callback): 

153 callbacks = self.__callbacks.get(name) 

154 if callbacks is None: 

155 callbacks = [] 

156 self.__callbacks[name] = callbacks 

157 callbacks.append(callback) 

158 

159 def unregister_state(self, name, callback): 

160 callbacks = self.__callbacks.get(name) 

161 if callbacks is None: 

162 raise ValueError(f"Service uid {name} was not registered") 

163 callbacks.remove(callback) 

164 

165 def get_state(self, name): 

166 status = self._api.status 

167 meta = status["processes"].get(name, {"statename": "UNKNOWN"}) 

168 return meta.get("statename", "UNKNOWN")