Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/imageviewer/source.py: 77%

219 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3import time 

4import gevent 

5import numpy 

6from typing import Any, Dict 

7 

8from daiquiri.core.exceptions import InvalidYAML 

9from daiquiri.core.transform.imageviewer import ImageViewerCanvas 

10from daiquiri.core.transform.units import get_exponent 

11from daiquiri.core.utils import timed, dict_nd_to_list 

12 

13import logging 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18class Source: 

19 """ImageViewer Image Source 

20 

21 This represents an image source on the imagerviewer, in most cases this is a camera 

22 which can be mounted on x and y axes (plus fine), and potentially with the ability to 

23 zoom 

24 

25 Only one `Source` can be marked as the origin, this is the image from which for example 

26 the beam is marked relative to 

27 

28 The offset in the imageviewer of this object is calculated from the axes and converted to 

29 `self._base_unit` by default nanometers which should give enough precision long term 

30 

31 """ 

32 

33 _base_unit = "nm" 

34 _origin = False 

35 

36 def __init__(self, config, id, hardware, emit, config_file=None): 

37 self._config = config 

38 self._config_file = config_file 

39 self._id = id 

40 self._hardware = hardware 

41 self._emit = emit 

42 self._last_emit_source = 0 

43 self._emit_source_timeout = None 

44 self.canvas = None 

45 self._reference_inverse_matrix = None 

46 

47 for config_label in ["url", "name", "zoominfo", "scale"]: 

48 config_value = self._get_from_config(config_label) 

49 setattr(self, f"_{config_label}", config_value) 

50 

51 self._additional_urls = self._get_from_config( 

52 "additional_urls", default=[], allow_missing=True 

53 ) 

54 

55 self._device = self._get_hwobj_from_config("device") 

56 self._device.subscribe("state", self._device_changed) 

57 self._origin = self._get_from_config( 

58 "origin", default=False, allow_missing=True 

59 ) 

60 

61 motors, unitdict = self._motors_from_config() 

62 

63 zoominfo = self._get_from_config("zoominfo") 

64 

65 # Use a zoom motor to change zoom level 

66 if self._get_from_config("zoom", allow_missing=True): 

67 self._zoom = self._get_hwobj_from_config("zoom") 

68 self._zoom.subscribe("position", self._zoom_changed) 

69 self._zoom.subscribe("state", self._zoom_changed) 

70 zoomlevel = self._zoom.get("position") 

71 

72 # Otherwise use a fixed configuration 

73 elif "FIXED" in zoominfo: 

74 zoomlevel = "FIXED" 

75 

76 # Neither specified so raise 

77 else: 

78 self._raise_config_error( 

79 "Either a `zoom` motor or a FIXED `zoominfo` key must be provided" 

80 ) 

81 

82 self._additional = {} 

83 additional = self._get_from_config("additional", default={}, allow_missing=True) 

84 err_info = {} 

85 for name, obj_name in additional.items(): 

86 err_info[name] = obj_name 

87 hwobj = hardware.get_object(obj_name) 

88 if hwobj is None: 

89 self._raise_config_error( 

90 "Can't find hardware object", err_info=err_info 

91 ) 

92 self._additional[name] = hwobj 

93 hwobj.subscribe("position", self._additional_changed) 

94 hwobj.subscribe("state", self._additional_changed) 

95 

96 # Canvas object used for coordinate transformations 

97 self.canvas = ImageViewerCanvas( 

98 motors, 

99 units=self._base_unit, 

100 unitdict=unitdict, 

101 sampleoffset=None, # TODO: 

102 beamoffset=None, # TODO: 

103 beamsize=config.get("beamsize", [0.7e3, 0.3e3, 100e3]), 

104 beamangle=config.get("beamangle", 0), 

105 focaloffset=None, # TODO: 

106 vlmimageshape=(100, 100), # can be anything 

107 zoomlevel=zoomlevel, 

108 zoominfo=zoominfo, 

109 downstream=config.get("downstream", False), 

110 ) 

111 

112 self.fine_fixed = self._get_from_config( 

113 "fine_fixed", default=False, allow_missing=True 

114 ) 

115 

116 def _motors_from_config(self): 

117 """Retrieve motor names from config and instantiate the hardware objects""" 

118 motors = {} 

119 unitdict = {} 

120 # Mapping from motor role to configuration field 

121 # Roles are used buy the canvas 

122 config_motors = { 

123 "x": "motor_x", 

124 "x_fine": "motor_x_fine", 

125 "y": "motor_y", 

126 "y_fine": "motor_y_fine", 

127 "z": "motor_z", 

128 "z_fine": "motor_z_fine", 

129 } 

130 for label, config_label in config_motors.items(): 

131 allow_missing = config_label.endswith("fine") 

132 motobj = self._get_hwobj_from_config( 

133 config_label, allow_missing=allow_missing 

134 ) 

135 if motobj is None: 

136 continue 

137 config_label += "_unit" 

138 unit = self._get_from_config(config_label, allow_missing=allow_missing) 

139 if not unit: 

140 unit = motobj.get("unit") 

141 if not unit: 

142 self._raise_config_error(f"Missing '{config_label}' key") 

143 motobj.subscribe("position", self._translate_changed) 

144 motobj.subscribe("state", self._translate_changed) 

145 motors[label] = motobj 

146 unitdict[motobj.id()] = unit 

147 return motors, unitdict 

148 

149 def _get_hwobj_from_config(self, config_label, allow_missing=False): 

150 """Instantiate a hardware object from the config""" 

151 obj_id = self._get_from_config( 

152 config_label, default="", allow_missing=allow_missing 

153 ) 

154 if allow_missing and not obj_id: 

155 return None 

156 err_info = {} 

157 err_info[config_label] = obj_id 

158 hwobj = self._hardware.get_object(obj_id) 

159 if not allow_missing and hwobj is None: 

160 self._raise_config_error("Can't find hardware object", err_info=err_info) 

161 return hwobj 

162 

163 def _get_from_config(self, config_label, default=None, allow_missing=False): 

164 """Get a value for the config""" 

165 if not allow_missing and config_label not in self._config: 

166 self._raise_config_error(f"Missing '{config_label}' key") 

167 return self._config.get(config_label, default) 

168 

169 def update_config(self, config): 

170 self._config = config 

171 self.canvas.zoominfo = self._get_from_config("zoominfo") 

172 self.canvas.beamsize = self._get_from_config("beamsize") 

173 self.canvas.downstream = self._get_from_config("downstream") 

174 

175 def _raise_config_error(self, message, err_info=None): 

176 err_obj = {"key": "sources", "name": self._config.get("name")} 

177 if err_info: 

178 err_obj.update(err_info) 

179 raise InvalidYAML( 

180 {"message": message, "file": self._config_file, "obj": err_obj} 

181 ) 

182 

183 @property 

184 def config(self) -> Dict[str, Any]: 

185 return { 

186 "fine_fixed": self._get_from_config( 

187 "fine_fixed", default=False, allow_missing=True 

188 ), 

189 "allow_fixed_axes": self._get_from_config( 

190 "allow_fixed_axes", default=False, allow_missing=True 

191 ), 

192 } 

193 

194 @property 

195 def has_fine(self) -> bool: 

196 for motor_label in ["motor_x_fine", "motor_y_fine"]: 

197 if motor_label in self._config: 

198 return True 

199 return False 

200 

201 @property 

202 def fine_fixed(self) -> bool: 

203 """Whether the fine axis is fixed and centred for movement""" 

204 return self.canvas.motors_to_world.trnx.smallest_fixed 

205 

206 @fine_fixed.setter 

207 def fine_fixed(self, value: bool): 

208 self.canvas.motors_to_world.trnx.smallest_fixed = value 

209 self.canvas.motors_to_world.trny.smallest_fixed = value 

210 self.canvas.motors_to_world.trnz.smallest_fixed = value 

211 

212 if value and self.coarse_fixed: 

213 self.coarse_fixed = False 

214 

215 @property 

216 def coarse_fixed(self) -> bool: 

217 """Whether the coarse axis is fixed for movement""" 

218 return self.canvas.motors_to_world.trnx.largest_fixed 

219 

220 @coarse_fixed.setter 

221 def coarse_fixed(self, value: bool): 

222 self.canvas.motors_to_world.trnx.largest_fixed = value 

223 self.canvas.motors_to_world.trny.largest_fixed = value 

224 self.canvas.motors_to_world.trnz.largest_fixed = value 

225 

226 if value and self.fine_fixed: 

227 self.fine_fixed = False 

228 

229 @property 

230 def name(self): 

231 return self._name 

232 

233 @property 

234 def device(self): 

235 return self._device 

236 

237 @property 

238 def unit(self): 

239 return get_exponent(self._base_unit) 

240 

241 @property 

242 def id(self): 

243 return self._id 

244 

245 @property 

246 def origin(self): 

247 return self._origin 

248 

249 @property 

250 def beamsize(self) -> Dict[str, float]: 

251 """Return the beamsize (in nm)""" 

252 bs = self._get_from_config("beamsize") 

253 return {"x": bs[0], "y": bs[1], "z": bs[2]} 

254 

255 @timed 

256 def info(self): 

257 """Get info from this image source 

258 

259 This includes name, offsets, scale, limits, url, etc 

260 """ 

261 info = { 

262 "sourceid": self._id, 

263 "name": self._name, 

264 "type": "video", 

265 "origin": self._origin, 

266 "url": self._url, 

267 "additional_urls": self._additional_urls, 

268 "scale": self._scale, 

269 "additional": self.get_additional(), 

270 } 

271 info.update(self.canvas.vlm_image_info) 

272 info["polylines"] = polylines = {} 

273 info["markings"] = markings = {} 

274 polylines["limits"] = self.canvas.reach_polyline.tolist() 

275 polylines["fine motors"] = self.canvas.fine_polyline.tolist() 

276 beam_info = self.canvas.beam_info 

277 beam_info["origin"] = True 

278 markings["beam"] = beam_info 

279 info["reference"] = {"beam": self.calculate_reference_beam(beam_info["center"])} 

280 

281 return dict_nd_to_list(info) 

282 

283 def _device_changed(self, obj, prop, value): 

284 self._emit( 

285 "message", 

286 {"type": "device", "prop": prop, "value": value}, 

287 ) 

288 

289 def _zoom_changed(self, obj, prop, value): 

290 """Callback when the zoom changes""" 

291 if prop == "position" and obj == self._zoom: 

292 try: 

293 label = self.get_scale_label() 

294 except RuntimeError: 

295 logger.info("Zoom scale label currently unknown") 

296 return 

297 

298 if label: 

299 self.canvas.zoomlevel = label 

300 self._emit( 

301 "message", 

302 { 

303 "type": "source", 

304 "info": dict_nd_to_list( 

305 { 

306 **self.canvas.vlm_image_info, 

307 "sourceid": self._id, 

308 "markings": {"beam": self.canvas.beam_info}, 

309 } 

310 ), 

311 }, 

312 ) 

313 

314 def _additional_changed(self, obj, prop, value): 

315 """Callback when an additional position changes""" 

316 for name, ob in self._additional.items(): 

317 if prop == "position" and obj == ob: 

318 self._emit( 

319 "message", 

320 { 

321 "type": "origin", 

322 "id": self._id, 

323 "prop": "additional", 

324 "object": name, 

325 "value": value, 

326 }, 

327 ) 

328 

329 def get_additional(self): 

330 """Get a dict of the additional positions 

331 

332 Returns: 

333 positions(dict): A dict of positions and their values 

334 """ 

335 additional = {} 

336 for name, obj in self._additional.items(): 

337 additional[name] = obj.get("position") 

338 

339 return additional 

340 

341 def move_to_additional(self, additional, wait=True): 

342 """Move to additional positions""" 

343 if additional: 

344 for a, v in additional.items(): 

345 print("moving to additional", a, v) 

346 if a in self._additional: 

347 self._additional[a].move(v) 

348 

349 if wait: 

350 for a, v in additional.items(): 

351 if a in self._additional: 

352 self._additional[a].wait() 

353 

354 def _translate_changed(self, obj, prop, value): 

355 """Callback when a translation changes""" 

356 if prop == "position": 

357 self._queue_emit_source() 

358 

359 def _queue_emit_source(self): 

360 now = time.time() 

361 if now - self._last_emit_source > 0.2: 

362 self._emit_source() 

363 self._last_emit_source = now 

364 else: 

365 if self._emit_source_timeout: 

366 self._emit_source_timeout.kill() 

367 

368 self._emit_source_timeout = gevent.spawn_later(0.5, self._emit_source) 

369 

370 def _emit_source(self): 

371 if self.canvas is None: 

372 return 

373 

374 beam_info = self.canvas.beam_info 

375 self._emit( 

376 "message", 

377 { 

378 "type": "source", 

379 "info": dict_nd_to_list( 

380 { 

381 **self.canvas.vlm_image_info, 

382 "sourceid": self._id, 

383 "markings": {"beam": beam_info}, 

384 "polylines": {"fine motors": self.canvas.fine_polyline}, 

385 "reference": { 

386 "beam": self.calculate_reference_beam(beam_info["center"]) 

387 }, 

388 } 

389 ), 

390 }, 

391 ) 

392 

393 def set_zoom(self, zoom): 

394 self._zoom.move(zoom) 

395 

396 def get_scale_label(self): 

397 """Get the label for the current scale""" 

398 label = self._zoom.get("position") 

399 if label not in self._zoominfo: 

400 raise RuntimeError( 

401 "Zoom scale '{}' not in configuration ({})".format( 

402 label, list(self._zoominfo.keys()) 

403 ) 

404 ) 

405 return label 

406 

407 def set_reference_inverse_matrix(self, matrix): 

408 self._reference_inverse_matrix = matrix 

409 

410 def calculate_reference_beam(self, center): 

411 if not isinstance(self._reference_inverse_matrix, numpy.ndarray): 

412 return 

413 

414 ref_pos = numpy.dot( 

415 # Beam center is in inverted y sense compared with reference image 

416 self._reference_inverse_matrix, 

417 numpy.array([center[0], -center[1], 1]), 

418 ) 

419 return (ref_pos[0], ref_pos[1])