Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/tango/object.py: 68%

187 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-15 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3import time 

4import gevent 

5 

6import tango 

7from tango.gevent import DeviceProxy 

8 

9from daiquiri.core.hardware.abstract import MappedHardwareObject, HardwareProperty 

10 

11import logging 

12 

13logger = logging.getLogger(__name__) 

14 

15 

16def is_int_str(v) -> bool: 

17 try: 

18 int(v) 

19 except ValueError: 

20 return False 

21 return True 

22 

23 

24def is_float_str(v) -> bool: 

25 try: 

26 float(v) 

27 except ValueError: 

28 return False 

29 return True 

30 

31 

32class TangoStateProperty(HardwareProperty): 

33 def translate_from(self, value): 

34 return value.name 

35 

36 def translate_to(self, value): 

37 state = tango.DevState.__members__.get(value) 

38 if state is None: 

39 return tango.DevState.UNKNOWN 

40 return state 

41 

42 

43class TangoAttrProperty(HardwareProperty): 

44 def __init__(self, name: str, attrname: callable): 

45 """ 

46 Attribute 

47 name: Name of the attribute property 

48 attrname: Callable returning the attribute name 

49 """ 

50 HardwareProperty.__init__(self, name=name) 

51 self._attrname = attrname 

52 

53 def translate_from(self, value): 

54 if self.name == "quality": 

55 return value.name 

56 if self.name == "data_type": 

57 return value.name 

58 if self.name == "display_unit": 

59 if value is None or value == "No display unit": 

60 # This values are part of the Tango specification 

61 return "" 

62 return value 

63 

64 def translate_to(self, value): 

65 return HardwareProperty.translate_from(self, value) 

66 

67 def get_attrname(self, obj) -> str: 

68 """Returns the attrname used from a mapping object""" 

69 return self._attrname(obj) 

70 

71 

72class TangoObject(MappedHardwareObject): 

73 _protocol = "tango" 

74 

75 def __init__(self, app, tango_url, **kwargs): 

76 self._app = app 

77 self._subscriptions = {} 

78 self._tango_url = tango_url 

79 self._object = DeviceProxy(self._tango_url) 

80 super().__init__(**kwargs) 

81 

82 self._watchdog = gevent.spawn(self.try_device) 

83 self._watchdog.name = f"{__name__}.{tango_url}" 

84 

85 # TODO: Temporary fix for 

86 # https://github.com/tango-controls/TangoTickets/issues/34 

87 time.sleep(1) 

88 

89 def __repr__(self) -> str: 

90 return f"<Tango: {self._tango_url} ({self.__class__.__name__})>" 

91 

92 def _connect(self): 

93 """Try to connect this daiquiri hardware to the remote tango hardware 

94 

95 raises: 

96 tango.ConnectionFailed: Connection have failed 

97 tango.DevFailed: Another tango thing have failed 

98 Exception: Another thing have failed 

99 """ 

100 logger.info(f"Trying to connect to {self._tango_url}") 

101 self._object.ping() 

102 self._subscribe_device() 

103 self.set_online(True) 

104 try: 

105 self._broadcast() 

106 except Exception: 

107 self.set_online(False) 

108 raise 

109 

110 def _disconnect(self): 

111 """Disconnect this daiquiri hardware from the remote tango hardware""" 

112 self.set_online(False) 

113 self._unsubscribe_device() 

114 

115 def try_device(self): 

116 """Watch dog trying to reconnect to the remote hardware if it was 

117 disconnected""" 

118 while True: 

119 log_failure = logger.exception if self._app.debug else logger.info 

120 if not self._online: 

121 try: 

122 self._connect() 

123 except Exception: 

124 log_failure( 

125 f"Could not connect to {self._tango_url}. Retrying in 10s" 

126 ) 

127 gevent.sleep(10) 

128 

129 def _broadcast(self): 

130 """Read the connected hardware properties from the proxy and update this 

131 daiquiri hardware""" 

132 for name, prop in self._property_map.items(): 

133 self._update(name, prop, self._do_get(prop)) 

134 

135 def _force_subscribe_event(self, tango_attr_name, event_type, callback): 

136 """Force polling a tango attribute. 

137 

138 Trying first to subscribe. 

139 

140 If it fails, setup the server side events. 

141 

142 raises: 

143 tango.DevFailed: If the subscription failed 

144 RuntimeError: If the event configuration have failed 

145 """ 

146 obj = self._object 

147 try: 

148 return self._object.subscribe_event( 

149 tango_attr_name, event_type, callback, green_mode=tango.GreenMode.Gevent 

150 ) 

151 except tango.DevFailed as e: 

152 if hasattr(e, "reason"): 

153 if e.reason not in [ 

154 "API_EventPropertiesNotSet", 

155 "API_AttributePollingNotStarted", 

156 ]: 

157 raise 

158 else: 

159 raise 

160 

161 info: tango.AttributeInfoEx = obj.get_attribute_config(tango_attr_name) 

162 changes = [] 

163 valid_period = is_int_str(info.events.per_event.period) 

164 if not valid_period: 

165 changes += ["polling period"] 

166 info.events.per_event.period = "1000" 

167 valid_rel_change = is_float_str(info.events.ch_event.rel_change) 

168 valid_abs_change = is_float_str(info.events.ch_event.abs_change) 

169 if not valid_abs_change and not valid_rel_change: 

170 changes += ["rel_change"] 

171 info.events.ch_event.rel_change = "0.001" 

172 

173 if changes != []: 

174 msg = " + ".join(changes) 

175 logger.info(f"Active {msg} for {self._tango_url} {tango_attr_name}") 

176 try: 

177 info.name = tango_attr_name 

178 obj.set_attribute_config(info) 

179 except tango.DevFailed: 

180 raise RuntimeError( 

181 f"Failed to configure events {self._tango_url} {tango_attr_name}" 

182 ) 

183 

184 logger.info(f"Try using event for {self._tango_url} {tango_attr_name}") 

185 return self._object.subscribe_event( 

186 tango_attr_name, event_type, callback, green_mode=tango.GreenMode.Gevent 

187 ) 

188 

189 def _subscribe_device(self): 

190 """Subscribe events for this hardware""" 

191 log_failure = logger.exception if self._app.debug else logger.info 

192 for prop in self._property_map.values(): 

193 if isinstance(prop, TangoAttrProperty): 

194 # FIXME: This have to be properly implemented 

195 continue 

196 try: 

197 tango_attr_name = prop.name 

198 event_id = self._force_subscribe_event( 

199 prop.name, tango.EventType.CHANGE_EVENT, self._push_event 

200 ) 

201 except Exception: 

202 log_failure( 

203 f"Could not subscribe to property {self._tango_url} {prop.name}" 

204 ) 

205 else: 

206 self._subscriptions[tango_attr_name] = event_id 

207 

208 def _unsubscribe_device(self): 

209 """Unsubscribe registred events for this daiquiri hardware""" 

210 log_failure = logger.exception if self._app.debug else logger.info 

211 for tango_attr_name, eid in self._subscriptions.items(): 

212 try: 

213 self._object.unsubscribe_event(eid) 

214 except Exception: 

215 log_failure(f"Couldnt unsubscribe from {tango_attr_name}") 

216 

217 self._subscriptions = {} 

218 

219 def _push_event(self, event: tango.EventData): 

220 """Callback triggered when the remote tango hardware fire an event""" 

221 try: 

222 self._protected_push_event(event) 

223 except Exception: 

224 logger.error("Error while processing push_event", exc_info=True) 

225 

226 def _protected_push_event(self, event: tango.EventData): 

227 """Callback triggered when the remote tango hardware fire an event 

228 

229 Any exceptions raised are catched by `_push_event`. 

230 """ 

231 log_failure = logger.exception if self._app.debug else logger.info 

232 

233 if not self._online: 

234 return 

235 

236 if event.errors: 

237 error = event.errors[0] 

238 log_failure(f"Error in push_event for {event.attr_name}: {error.desc}") 

239 # if error.reason == 'API_EventTimeout': 

240 self._disconnect() 

241 return 

242 

243 if event.attr_value is not None: 

244 for name, prop in self._property_map.items(): 

245 if event.attr_value.name == prop.name: 

246 self._update(name, prop, event.attr_value.value) 

247 break 

248 

249 def _do_set(self, prop: HardwareProperty, value): 

250 obj = self._object 

251 if isinstance(prop, TangoAttrProperty): 

252 if prop.name != "value": 

253 raise RuntimeError( 

254 f"Setter for '{prop.name}' attribute config is not implemented" 

255 ) 

256 attrname = prop.get_attrname(self) 

257 return obj.write_attribute(attrname, value) 

258 else: 

259 return obj.write_attribute(prop.name, value) 

260 

261 def _do_get(self, prop: HardwareProperty): 

262 obj = self._object 

263 if isinstance(prop, TangoAttrProperty): 

264 attrname = prop.get_attrname(self) 

265 if prop.name == "name": 

266 return attrname 

267 if prop.name == "value": 

268 return obj.read_attribute(attrname).value 

269 elif prop.name == "data_type": 

270 return obj.read_attribute(attrname).type 

271 elif prop.name == "quality": 

272 return obj.read_attribute(attrname).quality 

273 elif prop.name == "unit": 

274 return obj.get_attribute_config(attrname).unit 

275 elif prop.name == "format": 

276 return obj.get_attribute_config(attrname).format 

277 elif prop.name == "display_unit": 

278 return obj.get_attribute_config(attrname).display_unit 

279 elif prop.name == "label": 

280 return obj.get_attribute_config(attrname).label 

281 elif prop.name == "description": 

282 return obj.get_attribute_config(attrname).description 

283 else: 

284 raise RuntimeError(f"Unsupported {prop.name} attribute") 

285 

286 getter = prop.getter 

287 if getter is not None: 

288 return getter(self) 

289 return obj.read_attribute(prop.name).value 

290 

291 def _do_call(self, function, value, **kwargs): 

292 return self._object.command_inout(function, value, **kwargs)