Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/tango/object.py: 68%
187 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import time
4import gevent
6import tango
7from tango.gevent import DeviceProxy
9from daiquiri.core.hardware.abstract import MappedHardwareObject, HardwareProperty
11import logging
13logger = logging.getLogger(__name__)
16def is_int_str(v) -> bool:
17 try:
18 int(v)
19 except ValueError:
20 return False
21 return True
24def is_float_str(v) -> bool:
25 try:
26 float(v)
27 except ValueError:
28 return False
29 return True
32class TangoStateProperty(HardwareProperty):
33 def translate_from(self, value):
34 return value.name
36 def translate_to(self, value):
37 state = tango.DevState.__members__.get(value)
38 if state is None:
39 return tango.DevState.UNKNOWN
40 return state
43class TangoAttrProperty(HardwareProperty):
44 def __init__(self, name: str, attrname: callable):
45 """
46 Attribute
47 name: Name of the attribute property
48 attrname: Callable returning the attribute name
49 """
50 HardwareProperty.__init__(self, name=name)
51 self._attrname = attrname
53 def translate_from(self, value):
54 if self.name == "quality":
55 return value.name
56 if self.name == "data_type":
57 return value.name
58 if self.name == "display_unit":
59 if value is None or value == "No display unit":
60 # This values are part of the Tango specification
61 return ""
62 return value
64 def translate_to(self, value):
65 return HardwareProperty.translate_from(self, value)
67 def get_attrname(self, obj) -> str:
68 """Returns the attrname used from a mapping object"""
69 return self._attrname(obj)
72class TangoObject(MappedHardwareObject):
73 _protocol = "tango"
75 def __init__(self, app, tango_url, **kwargs):
76 self._app = app
77 self._subscriptions = {}
78 self._tango_url = tango_url
79 self._object = DeviceProxy(self._tango_url)
80 super().__init__(**kwargs)
82 self._watchdog = gevent.spawn(self.try_device)
83 self._watchdog.name = f"{__name__}.{tango_url}"
85 # TODO: Temporary fix for
86 # https://github.com/tango-controls/TangoTickets/issues/34
87 time.sleep(1)
89 def __repr__(self) -> str:
90 return f"<Tango: {self._tango_url} ({self.__class__.__name__})>"
92 def _connect(self):
93 """Try to connect this daiquiri hardware to the remote tango hardware
95 raises:
96 tango.ConnectionFailed: Connection have failed
97 tango.DevFailed: Another tango thing have failed
98 Exception: Another thing have failed
99 """
100 logger.info(f"Trying to connect to {self._tango_url}")
101 self._object.ping()
102 self._subscribe_device()
103 self.set_online(True)
104 try:
105 self._broadcast()
106 except Exception:
107 self.set_online(False)
108 raise
110 def _disconnect(self):
111 """Disconnect this daiquiri hardware from the remote tango hardware"""
112 self.set_online(False)
113 self._unsubscribe_device()
115 def try_device(self):
116 """Watch dog trying to reconnect to the remote hardware if it was
117 disconnected"""
118 while True:
119 log_failure = logger.exception if self._app.debug else logger.info
120 if not self._online:
121 try:
122 self._connect()
123 except Exception:
124 log_failure(
125 f"Could not connect to {self._tango_url}. Retrying in 10s"
126 )
127 gevent.sleep(10)
129 def _broadcast(self):
130 """Read the connected hardware properties from the proxy and update this
131 daiquiri hardware"""
132 for name, prop in self._property_map.items():
133 self._update(name, prop, self._do_get(prop))
135 def _force_subscribe_event(self, tango_attr_name, event_type, callback):
136 """Force polling a tango attribute.
138 Trying first to subscribe.
140 If it fails, setup the server side events.
142 raises:
143 tango.DevFailed: If the subscription failed
144 RuntimeError: If the event configuration have failed
145 """
146 obj = self._object
147 try:
148 return self._object.subscribe_event(
149 tango_attr_name, event_type, callback, green_mode=tango.GreenMode.Gevent
150 )
151 except tango.DevFailed as e:
152 if hasattr(e, "reason"):
153 if e.reason not in [
154 "API_EventPropertiesNotSet",
155 "API_AttributePollingNotStarted",
156 ]:
157 raise
158 else:
159 raise
161 info: tango.AttributeInfoEx = obj.get_attribute_config(tango_attr_name)
162 changes = []
163 valid_period = is_int_str(info.events.per_event.period)
164 if not valid_period:
165 changes += ["polling period"]
166 info.events.per_event.period = "1000"
167 valid_rel_change = is_float_str(info.events.ch_event.rel_change)
168 valid_abs_change = is_float_str(info.events.ch_event.abs_change)
169 if not valid_abs_change and not valid_rel_change:
170 changes += ["rel_change"]
171 info.events.ch_event.rel_change = "0.001"
173 if changes != []:
174 msg = " + ".join(changes)
175 logger.info(f"Active {msg} for {self._tango_url} {tango_attr_name}")
176 try:
177 info.name = tango_attr_name
178 obj.set_attribute_config(info)
179 except tango.DevFailed:
180 raise RuntimeError(
181 f"Failed to configure events {self._tango_url} {tango_attr_name}"
182 )
184 logger.info(f"Try using event for {self._tango_url} {tango_attr_name}")
185 return self._object.subscribe_event(
186 tango_attr_name, event_type, callback, green_mode=tango.GreenMode.Gevent
187 )
189 def _subscribe_device(self):
190 """Subscribe events for this hardware"""
191 log_failure = logger.exception if self._app.debug else logger.info
192 for prop in self._property_map.values():
193 if isinstance(prop, TangoAttrProperty):
194 # FIXME: This have to be properly implemented
195 continue
196 try:
197 tango_attr_name = prop.name
198 event_id = self._force_subscribe_event(
199 prop.name, tango.EventType.CHANGE_EVENT, self._push_event
200 )
201 except Exception:
202 log_failure(
203 f"Could not subscribe to property {self._tango_url} {prop.name}"
204 )
205 else:
206 self._subscriptions[tango_attr_name] = event_id
208 def _unsubscribe_device(self):
209 """Unsubscribe registred events for this daiquiri hardware"""
210 log_failure = logger.exception if self._app.debug else logger.info
211 for tango_attr_name, eid in self._subscriptions.items():
212 try:
213 self._object.unsubscribe_event(eid)
214 except Exception:
215 log_failure(f"Couldnt unsubscribe from {tango_attr_name}")
217 self._subscriptions = {}
219 def _push_event(self, event: tango.EventData):
220 """Callback triggered when the remote tango hardware fire an event"""
221 try:
222 self._protected_push_event(event)
223 except Exception:
224 logger.error("Error while processing push_event", exc_info=True)
226 def _protected_push_event(self, event: tango.EventData):
227 """Callback triggered when the remote tango hardware fire an event
229 Any exceptions raised are catched by `_push_event`.
230 """
231 log_failure = logger.exception if self._app.debug else logger.info
233 if not self._online:
234 return
236 if event.errors:
237 error = event.errors[0]
238 log_failure(f"Error in push_event for {event.attr_name}: {error.desc}")
239 # if error.reason == 'API_EventTimeout':
240 self._disconnect()
241 return
243 if event.attr_value is not None:
244 for name, prop in self._property_map.items():
245 if event.attr_value.name == prop.name:
246 self._update(name, prop, event.attr_value.value)
247 break
249 def _do_set(self, prop: HardwareProperty, value):
250 obj = self._object
251 if isinstance(prop, TangoAttrProperty):
252 if prop.name != "value":
253 raise RuntimeError(
254 f"Setter for '{prop.name}' attribute config is not implemented"
255 )
256 attrname = prop.get_attrname(self)
257 return obj.write_attribute(attrname, value)
258 else:
259 return obj.write_attribute(prop.name, value)
261 def _do_get(self, prop: HardwareProperty):
262 obj = self._object
263 if isinstance(prop, TangoAttrProperty):
264 attrname = prop.get_attrname(self)
265 if prop.name == "name":
266 return attrname
267 if prop.name == "value":
268 return obj.read_attribute(attrname).value
269 elif prop.name == "data_type":
270 return obj.read_attribute(attrname).type
271 elif prop.name == "quality":
272 return obj.read_attribute(attrname).quality
273 elif prop.name == "unit":
274 return obj.get_attribute_config(attrname).unit
275 elif prop.name == "format":
276 return obj.get_attribute_config(attrname).format
277 elif prop.name == "display_unit":
278 return obj.get_attribute_config(attrname).display_unit
279 elif prop.name == "label":
280 return obj.get_attribute_config(attrname).label
281 elif prop.name == "description":
282 return obj.get_attribute_config(attrname).description
283 else:
284 raise RuntimeError(f"Unsupported {prop.name} attribute")
286 getter = prop.getter
287 if getter is not None:
288 return getter(self)
289 return obj.read_attribute(prop.name).value
291 def _do_call(self, function, value, **kwargs):
292 return self._object.command_inout(function, value, **kwargs)