Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/bliss/lima.py: 85%
185 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from __future__ import annotations
5import typing
6import gevent
7from daiquiri.core.hardware.abstract.lima import (
8 Lima as AbstractLima,
9)
10from daiquiri.core.hardware.abstract import AbstractHardwareProperty2
11from daiquiri.core.hardware.bliss.object import BlissObject
12from bliss.controllers.lima.lima_base import Lima as BlissLima
13from bliss.common import event
14import tango
16import logging
18_logger = logging.getLogger(__name__)
21class _RawImageRoiProperty(AbstractHardwareProperty2):
22 def read_hardware(self, obj: BlissLima):
23 return obj.image._image_params._roi
25 def write_hardware(self, obj: BlissLima, value):
26 raise NotImplementedError("Raw roi can't be edited")
28 def connect_hardware(self, obj: BlissLima):
29 event.connect(obj.image._image_params, "_roi", self._update)
31 def _update(self, value, *args, **kwargs):
32 self.emit_update(value)
35class _RoiProperty(AbstractHardwareProperty2):
36 def read_hardware(self, obj: BlissLima):
37 roi = obj.image._image_params._cur_roi
38 return roi
40 def write_hardware(self, obj: BlissLima, value):
41 obj.image.roi = value
43 def connect_hardware(self, obj: BlissLima):
44 event.connect(obj.image._image_params, "_cur_roi", self._update)
46 def _update(self, value, *args, **kwargs):
47 self.emit_update(value)
50class _SizeProperty(AbstractHardwareProperty2):
51 def __init__(self, parent):
52 AbstractHardwareProperty2.__init__(self, parent)
53 self._online = False
55 def read_hardware(self, obj: BlissLima):
56 roi = obj.image._image_params._cur_roi
57 return roi[2:]
59 def write_hardware(self, obj: BlissLima, value):
60 raise NotImplementedError("Size can't be edited")
62 def connect_hardware(self, obj: BlissLima):
63 event.connect(obj.image._image_params, "_cur_roi", self._update)
64 # Event when the detector is connected
65 # BLISS is not able to update the size and ROI alone at the very first use
66 # this will force the update of the property in Redis
67 self._object.subscribe("state", self._on_hardware_state_change)
69 def _on_hardware_state_change(self, obj, prop, value):
70 online = value != "OFFLINE"
71 if self._online == online:
72 return
73 self._online = online
74 if self._online:
75 obj = self._object._object
76 roi = obj.image._image_params._cur_roi
77 if roi == [0, 0, 0, 0]:
78 # If it's the default value, reset
79 obj.image._update_cur_roi(update_dependencies=False)
81 def _update(self, value, *args, **kwargs):
82 value = value[2:]
83 self.emit_update(value)
86class _RotationProperty(AbstractHardwareProperty2):
87 FROM_HARDWARE = {
88 "NONE": 0,
89 "90": 90,
90 "180": 180,
91 "270": 270,
92 }
93 TO_HARDWARE = {
94 0: "NONE",
95 90: "90",
96 180: "180",
97 270: "270",
98 }
100 def read_hardware(self, obj: BlissLima):
101 r = obj.image._image_params.rotation
102 return self.FROM_HARDWARE[r]
104 def write_hardware(self, obj: BlissLima, value):
105 v = self.TO_HARDWARE[value]
106 obj.image.rotation = v
108 def connect_hardware(self, obj: BlissLima):
109 event.connect(obj.image._image_params, "rotation", self._update)
111 def _update(self, value, *args, **kwargs):
112 v = self.FROM_HARDWARE[value]
113 self.emit_update(v)
116class _BinningProperty(AbstractHardwareProperty2):
117 def read_hardware(self, obj: BlissLima):
118 return obj.image._image_params.binning
120 def write_hardware(self, obj: BlissLima, value):
121 obj.image.binning = value
123 def connect_hardware(self, obj: BlissLima):
124 event.connect(obj.image._image_params, "binning", self._update)
126 def _update(self, value, *args, **kwargs):
127 self.emit_update(value)
130class _FlipProperty(AbstractHardwareProperty2):
131 def read_hardware(self, obj: BlissLima):
132 return obj.image._image_params.flip
134 def write_hardware(self, obj: BlissLima, value):
135 obj.image.flip = value
137 def connect_hardware(self, obj: BlissLima):
138 event.connect(obj.image._image_params, "flip", self._update)
140 def _update(self, value, *args, **kwargs):
141 self.emit_update(value)
144class _AccMaxExpoTimeProperty(AbstractHardwareProperty2):
145 def __init__(self, parent):
146 AbstractHardwareProperty2.__init__(self, parent)
147 self._connected = False
149 def read_hardware(self, obj: BlissLima):
150 if not self._connected:
151 return None
152 return obj.accumulation.max_expo_time
154 def write_hardware(self, obj: BlissLima, value):
155 obj.accumulation.max_expo_time = value
157 def connect_hardware(self, obj: BlissLima):
158 try:
159 event.connect(obj.accumulation, "max_expo_time", self._update)
160 self._connected = True
161 except Exception:
162 # NOTE: At the very first start, the `obj.accumulation` will raise an
163 # exception the detector is not there. The work around is to delay
164 # until the detector is online
165 # Connect this property to the changes of the state property
166 self._object.subscribe("state", self._on_hardware_state_change)
168 def _on_hardware_state_change(self, obj: Lima, prop, value):
169 if self._connected:
170 return
172 online = value != "OFFLINE"
173 if not online:
174 return
176 event.connect(self._object._object.accumulation, "max_expo_time", self._update)
177 self._connected = True
178 result = self.read_hardware(self._object._object)
179 self.emit_update(result)
181 def _update(self, value, *args, **kwargs):
182 self.emit_update(value)
185class _TangoStaticProperty(AbstractHardwareProperty2):
186 """
187 Information which does not change during the life cycle of the detector.
189 It's constant values which can be fetched from the device.
191 For now, this is updated at the time the daiquiri hardware "state"
192 change it's value.
193 """
195 def __init__(self, parent):
196 AbstractHardwareProperty2.__init__(self, parent)
197 self._online = False
199 def _read_camera_pixel_size(
200 self, proxy: tango.DeviceProxy
201 ) -> typing.Tuple[float, float]:
202 """Returns the camera pixel size (unbinned) in micrometer"""
203 ps = proxy.camera_pixelsize
205 # Lima returns pixel size in meter
206 # Some cameras was returning returning it in micron (PCO, Andor, Andor3)
207 camera_type = proxy.camera_type.lower()
208 if camera_type in ["pco", "andor", "andor3"]:
209 # Handle patched and non patched Lima cameras
210 if ps[0] > 0.1:
211 # Sounds like it's already in micron
212 pass
213 else:
214 ps = ps[0] * 1e6, ps[1] * 1e6
215 else:
216 ps = ps[0] * 1e6, ps[1] * 1e6
218 return ps
220 def _read_image_max_size(self, proxy: tango.DeviceProxy):
221 w, h = proxy.image_max_dim
222 return int(w), int(h)
224 def read_hardware(self, obj: BlissLima):
225 proxy = obj._proxy
226 if proxy is None:
227 raise RuntimeError("The detector is not online")
229 try:
230 proxy.ping()
231 except Exception:
232 return None
234 return {
235 "lima_version": proxy.lima_version,
236 "lima_type": proxy.lima_type,
237 "camera_type": proxy.camera_type,
238 "camera_model": proxy.camera_model,
239 "camera_pixelsize": self._read_camera_pixel_size(proxy),
240 "image_max_dim": self._read_image_max_size(proxy),
241 }
243 def write_hardware(self, obj: BlissLima, value):
244 raise NotImplementedError("structural property is a read only value")
246 def connect_hardware(self, obj: BlissLima):
247 # Connect this property to the changes of the state property
248 self._object.subscribe("state", self._on_hardware_state_change)
250 def _on_hardware_state_change(self, obj, prop, value):
251 online = value != "OFFLINE"
252 if self._online == online:
253 return
254 self._online = online
255 if self._online:
256 result = self.read_hardware(self._object._object)
257 self.emit_update(result)
258 else:
259 self.emit_update(None)
262class _TangoStateProperty(AbstractHardwareProperty2):
263 """
264 State of Lima from the `acq_state` Tango attribute.
266 For now this is polled every 5 secondes.
268 FIXME: Replace the polling by even subscription
269 """
271 FROM_HARDWARE = {
272 "Ready": "READY",
273 "Fault": "FAULT",
274 "Running": "ACQUIRING",
275 "Configuration": "CONFIGURATION",
276 "?": "UNKNOWN", # Valid value which can be returned by the LimaCCDs
277 }
279 def __init__(self, parent):
280 AbstractHardwareProperty2.__init__(self, parent)
281 self._value: str = "OFFLINE"
282 self._gpolling = None
284 def _polling(self):
285 while True:
286 gevent.sleep(5)
287 try:
288 v = self.read_hardware(self._object._object)
289 if self._value != v:
290 self._value = v
291 self.emit_update(v)
292 except Exception:
293 _logger.error("Error while fetching state", exc_info=True)
295 def read_hardware(self, obj: BlissLima):
296 proxy = obj._proxy
297 try:
298 proxy.ping()
299 except Exception:
300 return "OFFLINE"
301 r = proxy.acq_status
302 return self.FROM_HARDWARE.get(r, "UNKNOWN")
304 def write_hardware(self, obj: BlissLima, value):
305 raise NotImplementedError("state is a read only value")
307 def connect_hardware(self, obj: BlissLima):
308 self._gpolling = gevent.spawn(self._polling)
311class Lima(BlissObject, AbstractLima):
312 def _create_properties(self):
313 return {
314 "state": _TangoStateProperty(self),
315 "static": _TangoStaticProperty(self),
316 "rotation": _RotationProperty(self),
317 "binning": _BinningProperty(self),
318 "size": _SizeProperty(self),
319 "raw_roi": _RawImageRoiProperty(self),
320 "roi": _RoiProperty(self),
321 "flip": _FlipProperty(self),
322 "acc_max_expo_time": _AccMaxExpoTimeProperty(self),
323 }
326Default = Lima