Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/bliss/lima.py: 85%

185 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-15 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3from __future__ import annotations 

4 

5import typing 

6import gevent 

7from daiquiri.core.hardware.abstract.lima import ( 

8 Lima as AbstractLima, 

9) 

10from daiquiri.core.hardware.abstract import AbstractHardwareProperty2 

11from daiquiri.core.hardware.bliss.object import BlissObject 

12from bliss.controllers.lima.lima_base import Lima as BlissLima 

13from bliss.common import event 

14import tango 

15 

16import logging 

17 

18_logger = logging.getLogger(__name__) 

19 

20 

21class _RawImageRoiProperty(AbstractHardwareProperty2): 

22 def read_hardware(self, obj: BlissLima): 

23 return obj.image._image_params._roi 

24 

25 def write_hardware(self, obj: BlissLima, value): 

26 raise NotImplementedError("Raw roi can't be edited") 

27 

28 def connect_hardware(self, obj: BlissLima): 

29 event.connect(obj.image._image_params, "_roi", self._update) 

30 

31 def _update(self, value, *args, **kwargs): 

32 self.emit_update(value) 

33 

34 

35class _RoiProperty(AbstractHardwareProperty2): 

36 def read_hardware(self, obj: BlissLima): 

37 roi = obj.image._image_params._cur_roi 

38 return roi 

39 

40 def write_hardware(self, obj: BlissLima, value): 

41 obj.image.roi = value 

42 

43 def connect_hardware(self, obj: BlissLima): 

44 event.connect(obj.image._image_params, "_cur_roi", self._update) 

45 

46 def _update(self, value, *args, **kwargs): 

47 self.emit_update(value) 

48 

49 

50class _SizeProperty(AbstractHardwareProperty2): 

51 def __init__(self, parent): 

52 AbstractHardwareProperty2.__init__(self, parent) 

53 self._online = False 

54 

55 def read_hardware(self, obj: BlissLima): 

56 roi = obj.image._image_params._cur_roi 

57 return roi[2:] 

58 

59 def write_hardware(self, obj: BlissLima, value): 

60 raise NotImplementedError("Size can't be edited") 

61 

62 def connect_hardware(self, obj: BlissLima): 

63 event.connect(obj.image._image_params, "_cur_roi", self._update) 

64 # Event when the detector is connected 

65 # BLISS is not able to update the size and ROI alone at the very first use 

66 # this will force the update of the property in Redis 

67 self._object.subscribe("state", self._on_hardware_state_change) 

68 

69 def _on_hardware_state_change(self, obj, prop, value): 

70 online = value != "OFFLINE" 

71 if self._online == online: 

72 return 

73 self._online = online 

74 if self._online: 

75 obj = self._object._object 

76 roi = obj.image._image_params._cur_roi 

77 if roi == [0, 0, 0, 0]: 

78 # If it's the default value, reset 

79 obj.image._update_cur_roi(update_dependencies=False) 

80 

81 def _update(self, value, *args, **kwargs): 

82 value = value[2:] 

83 self.emit_update(value) 

84 

85 

86class _RotationProperty(AbstractHardwareProperty2): 

87 FROM_HARDWARE = { 

88 "NONE": 0, 

89 "90": 90, 

90 "180": 180, 

91 "270": 270, 

92 } 

93 TO_HARDWARE = { 

94 0: "NONE", 

95 90: "90", 

96 180: "180", 

97 270: "270", 

98 } 

99 

100 def read_hardware(self, obj: BlissLima): 

101 r = obj.image._image_params.rotation 

102 return self.FROM_HARDWARE[r] 

103 

104 def write_hardware(self, obj: BlissLima, value): 

105 v = self.TO_HARDWARE[value] 

106 obj.image.rotation = v 

107 

108 def connect_hardware(self, obj: BlissLima): 

109 event.connect(obj.image._image_params, "rotation", self._update) 

110 

111 def _update(self, value, *args, **kwargs): 

112 v = self.FROM_HARDWARE[value] 

113 self.emit_update(v) 

114 

115 

116class _BinningProperty(AbstractHardwareProperty2): 

117 def read_hardware(self, obj: BlissLima): 

118 return obj.image._image_params.binning 

119 

120 def write_hardware(self, obj: BlissLima, value): 

121 obj.image.binning = value 

122 

123 def connect_hardware(self, obj: BlissLima): 

124 event.connect(obj.image._image_params, "binning", self._update) 

125 

126 def _update(self, value, *args, **kwargs): 

127 self.emit_update(value) 

128 

129 

130class _FlipProperty(AbstractHardwareProperty2): 

131 def read_hardware(self, obj: BlissLima): 

132 return obj.image._image_params.flip 

133 

134 def write_hardware(self, obj: BlissLima, value): 

135 obj.image.flip = value 

136 

137 def connect_hardware(self, obj: BlissLima): 

138 event.connect(obj.image._image_params, "flip", self._update) 

139 

140 def _update(self, value, *args, **kwargs): 

141 self.emit_update(value) 

142 

143 

144class _AccMaxExpoTimeProperty(AbstractHardwareProperty2): 

145 def __init__(self, parent): 

146 AbstractHardwareProperty2.__init__(self, parent) 

147 self._connected = False 

148 

149 def read_hardware(self, obj: BlissLima): 

150 if not self._connected: 

151 return None 

152 return obj.accumulation.max_expo_time 

153 

154 def write_hardware(self, obj: BlissLima, value): 

155 obj.accumulation.max_expo_time = value 

156 

157 def connect_hardware(self, obj: BlissLima): 

158 try: 

159 event.connect(obj.accumulation, "max_expo_time", self._update) 

160 self._connected = True 

161 except Exception: 

162 # NOTE: At the very first start, the `obj.accumulation` will raise an 

163 # exception the detector is not there. The work around is to delay 

164 # until the detector is online 

165 # Connect this property to the changes of the state property 

166 self._object.subscribe("state", self._on_hardware_state_change) 

167 

168 def _on_hardware_state_change(self, obj: Lima, prop, value): 

169 if self._connected: 

170 return 

171 

172 online = value != "OFFLINE" 

173 if not online: 

174 return 

175 

176 event.connect(self._object._object.accumulation, "max_expo_time", self._update) 

177 self._connected = True 

178 result = self.read_hardware(self._object._object) 

179 self.emit_update(result) 

180 

181 def _update(self, value, *args, **kwargs): 

182 self.emit_update(value) 

183 

184 

185class _TangoStaticProperty(AbstractHardwareProperty2): 

186 """ 

187 Information which does not change during the life cycle of the detector. 

188 

189 It's constant values which can be fetched from the device. 

190 

191 For now, this is updated at the time the daiquiri hardware "state" 

192 change it's value. 

193 """ 

194 

195 def __init__(self, parent): 

196 AbstractHardwareProperty2.__init__(self, parent) 

197 self._online = False 

198 

199 def _read_camera_pixel_size( 

200 self, proxy: tango.DeviceProxy 

201 ) -> typing.Tuple[float, float]: 

202 """Returns the camera pixel size (unbinned) in micrometer""" 

203 ps = proxy.camera_pixelsize 

204 

205 # Lima returns pixel size in meter 

206 # Some cameras was returning returning it in micron (PCO, Andor, Andor3) 

207 camera_type = proxy.camera_type.lower() 

208 if camera_type in ["pco", "andor", "andor3"]: 

209 # Handle patched and non patched Lima cameras 

210 if ps[0] > 0.1: 

211 # Sounds like it's already in micron 

212 pass 

213 else: 

214 ps = ps[0] * 1e6, ps[1] * 1e6 

215 else: 

216 ps = ps[0] * 1e6, ps[1] * 1e6 

217 

218 return ps 

219 

220 def _read_image_max_size(self, proxy: tango.DeviceProxy): 

221 w, h = proxy.image_max_dim 

222 return int(w), int(h) 

223 

224 def read_hardware(self, obj: BlissLima): 

225 proxy = obj._proxy 

226 if proxy is None: 

227 raise RuntimeError("The detector is not online") 

228 

229 try: 

230 proxy.ping() 

231 except Exception: 

232 return None 

233 

234 return { 

235 "lima_version": proxy.lima_version, 

236 "lima_type": proxy.lima_type, 

237 "camera_type": proxy.camera_type, 

238 "camera_model": proxy.camera_model, 

239 "camera_pixelsize": self._read_camera_pixel_size(proxy), 

240 "image_max_dim": self._read_image_max_size(proxy), 

241 } 

242 

243 def write_hardware(self, obj: BlissLima, value): 

244 raise NotImplementedError("structural property is a read only value") 

245 

246 def connect_hardware(self, obj: BlissLima): 

247 # Connect this property to the changes of the state property 

248 self._object.subscribe("state", self._on_hardware_state_change) 

249 

250 def _on_hardware_state_change(self, obj, prop, value): 

251 online = value != "OFFLINE" 

252 if self._online == online: 

253 return 

254 self._online = online 

255 if self._online: 

256 result = self.read_hardware(self._object._object) 

257 self.emit_update(result) 

258 else: 

259 self.emit_update(None) 

260 

261 

262class _TangoStateProperty(AbstractHardwareProperty2): 

263 """ 

264 State of Lima from the `acq_state` Tango attribute. 

265 

266 For now this is polled every 5 secondes. 

267 

268 FIXME: Replace the polling by even subscription 

269 """ 

270 

271 FROM_HARDWARE = { 

272 "Ready": "READY", 

273 "Fault": "FAULT", 

274 "Running": "ACQUIRING", 

275 "Configuration": "CONFIGURATION", 

276 "?": "UNKNOWN", # Valid value which can be returned by the LimaCCDs 

277 } 

278 

279 def __init__(self, parent): 

280 AbstractHardwareProperty2.__init__(self, parent) 

281 self._value: str = "OFFLINE" 

282 self._gpolling = None 

283 

284 def _polling(self): 

285 while True: 

286 gevent.sleep(5) 

287 try: 

288 v = self.read_hardware(self._object._object) 

289 if self._value != v: 

290 self._value = v 

291 self.emit_update(v) 

292 except Exception: 

293 _logger.error("Error while fetching state", exc_info=True) 

294 

295 def read_hardware(self, obj: BlissLima): 

296 proxy = obj._proxy 

297 try: 

298 proxy.ping() 

299 except Exception: 

300 return "OFFLINE" 

301 r = proxy.acq_status 

302 return self.FROM_HARDWARE.get(r, "UNKNOWN") 

303 

304 def write_hardware(self, obj: BlissLima, value): 

305 raise NotImplementedError("state is a read only value") 

306 

307 def connect_hardware(self, obj: BlissLima): 

308 self._gpolling = gevent.spawn(self._polling) 

309 

310 

311class Lima(BlissObject, AbstractLima): 

312 def _create_properties(self): 

313 return { 

314 "state": _TangoStateProperty(self), 

315 "static": _TangoStaticProperty(self), 

316 "rotation": _RotationProperty(self), 

317 "binning": _BinningProperty(self), 

318 "size": _SizeProperty(self), 

319 "raw_roi": _RawImageRoiProperty(self), 

320 "roi": _RoiProperty(self), 

321 "flip": _FlipProperty(self), 

322 "acc_max_expo_time": _AccMaxExpoTimeProperty(self), 

323 } 

324 

325 

326Default = Lima