Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/imageviewer/source.py: 77%
219 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import time
4import gevent
5import numpy
6from typing import Any, Dict
8from daiquiri.core.exceptions import InvalidYAML
9from daiquiri.core.transform.imageviewer import ImageViewerCanvas
10from daiquiri.core.transform.units import get_exponent
11from daiquiri.core.utils import timed, dict_nd_to_list
13import logging
15logger = logging.getLogger(__name__)
18class Source:
19 """ImageViewer Image Source
21 This represents an image source on the imagerviewer, in most cases this is a camera
22 which can be mounted on x and y axes (plus fine), and potentially with the ability to
23 zoom
25 Only one `Source` can be marked as the origin, this is the image from which for example
26 the beam is marked relative to
28 The offset in the imageviewer of this object is calculated from the axes and converted to
29 `self._base_unit` by default nanometers which should give enough precision long term
31 """
33 _base_unit = "nm"
34 _origin = False
36 def __init__(self, config, id, hardware, emit, config_file=None):
37 self._config = config
38 self._config_file = config_file
39 self._id = id
40 self._hardware = hardware
41 self._emit = emit
42 self._last_emit_source = 0
43 self._emit_source_timeout = None
44 self.canvas = None
45 self._reference_inverse_matrix = None
47 for config_label in ["url", "name", "zoominfo", "scale"]:
48 config_value = self._get_from_config(config_label)
49 setattr(self, f"_{config_label}", config_value)
51 self._additional_urls = self._get_from_config(
52 "additional_urls", default=[], allow_missing=True
53 )
55 self._device = self._get_hwobj_from_config("device")
56 self._device.subscribe("state", self._device_changed)
57 self._origin = self._get_from_config(
58 "origin", default=False, allow_missing=True
59 )
61 motors, unitdict = self._motors_from_config()
63 zoominfo = self._get_from_config("zoominfo")
65 # Use a zoom motor to change zoom level
66 if self._get_from_config("zoom", allow_missing=True):
67 self._zoom = self._get_hwobj_from_config("zoom")
68 self._zoom.subscribe("position", self._zoom_changed)
69 self._zoom.subscribe("state", self._zoom_changed)
70 zoomlevel = self._zoom.get("position")
72 # Otherwise use a fixed configuration
73 elif "FIXED" in zoominfo:
74 zoomlevel = "FIXED"
76 # Neither specified so raise
77 else:
78 self._raise_config_error(
79 "Either a `zoom` motor or a FIXED `zoominfo` key must be provided"
80 )
82 self._additional = {}
83 additional = self._get_from_config("additional", default={}, allow_missing=True)
84 err_info = {}
85 for name, obj_name in additional.items():
86 err_info[name] = obj_name
87 hwobj = hardware.get_object(obj_name)
88 if hwobj is None:
89 self._raise_config_error(
90 "Can't find hardware object", err_info=err_info
91 )
92 self._additional[name] = hwobj
93 hwobj.subscribe("position", self._additional_changed)
94 hwobj.subscribe("state", self._additional_changed)
96 # Canvas object used for coordinate transformations
97 self.canvas = ImageViewerCanvas(
98 motors,
99 units=self._base_unit,
100 unitdict=unitdict,
101 sampleoffset=None, # TODO:
102 beamoffset=None, # TODO:
103 beamsize=config.get("beamsize", [0.7e3, 0.3e3, 100e3]),
104 beamangle=config.get("beamangle", 0),
105 focaloffset=None, # TODO:
106 vlmimageshape=(100, 100), # can be anything
107 zoomlevel=zoomlevel,
108 zoominfo=zoominfo,
109 downstream=config.get("downstream", False),
110 )
112 self.fine_fixed = self._get_from_config(
113 "fine_fixed", default=False, allow_missing=True
114 )
116 def _motors_from_config(self):
117 """Retrieve motor names from config and instantiate the hardware objects"""
118 motors = {}
119 unitdict = {}
120 # Mapping from motor role to configuration field
121 # Roles are used buy the canvas
122 config_motors = {
123 "x": "motor_x",
124 "x_fine": "motor_x_fine",
125 "y": "motor_y",
126 "y_fine": "motor_y_fine",
127 "z": "motor_z",
128 "z_fine": "motor_z_fine",
129 }
130 for label, config_label in config_motors.items():
131 allow_missing = config_label.endswith("fine")
132 motobj = self._get_hwobj_from_config(
133 config_label, allow_missing=allow_missing
134 )
135 if motobj is None:
136 continue
137 config_label += "_unit"
138 unit = self._get_from_config(config_label, allow_missing=allow_missing)
139 if not unit:
140 unit = motobj.get("unit")
141 if not unit:
142 self._raise_config_error(f"Missing '{config_label}' key")
143 motobj.subscribe("position", self._translate_changed)
144 motobj.subscribe("state", self._translate_changed)
145 motors[label] = motobj
146 unitdict[motobj.id()] = unit
147 return motors, unitdict
149 def _get_hwobj_from_config(self, config_label, allow_missing=False):
150 """Instantiate a hardware object from the config"""
151 obj_id = self._get_from_config(
152 config_label, default="", allow_missing=allow_missing
153 )
154 if allow_missing and not obj_id:
155 return None
156 err_info = {}
157 err_info[config_label] = obj_id
158 hwobj = self._hardware.get_object(obj_id)
159 if not allow_missing and hwobj is None:
160 self._raise_config_error("Can't find hardware object", err_info=err_info)
161 return hwobj
163 def _get_from_config(self, config_label, default=None, allow_missing=False):
164 """Get a value for the config"""
165 if not allow_missing and config_label not in self._config:
166 self._raise_config_error(f"Missing '{config_label}' key")
167 return self._config.get(config_label, default)
169 def update_config(self, config):
170 self._config = config
171 self.canvas.zoominfo = self._get_from_config("zoominfo")
172 self.canvas.beamsize = self._get_from_config("beamsize")
173 self.canvas.downstream = self._get_from_config("downstream")
175 def _raise_config_error(self, message, err_info=None):
176 err_obj = {"key": "sources", "name": self._config.get("name")}
177 if err_info:
178 err_obj.update(err_info)
179 raise InvalidYAML(
180 {"message": message, "file": self._config_file, "obj": err_obj}
181 )
183 @property
184 def config(self) -> Dict[str, Any]:
185 return {
186 "fine_fixed": self._get_from_config(
187 "fine_fixed", default=False, allow_missing=True
188 ),
189 "allow_fixed_axes": self._get_from_config(
190 "allow_fixed_axes", default=False, allow_missing=True
191 ),
192 }
194 @property
195 def has_fine(self) -> bool:
196 for motor_label in ["motor_x_fine", "motor_y_fine"]:
197 if motor_label in self._config:
198 return True
199 return False
201 @property
202 def fine_fixed(self) -> bool:
203 """Whether the fine axis is fixed and centred for movement"""
204 return self.canvas.motors_to_world.trnx.smallest_fixed
206 @fine_fixed.setter
207 def fine_fixed(self, value: bool):
208 self.canvas.motors_to_world.trnx.smallest_fixed = value
209 self.canvas.motors_to_world.trny.smallest_fixed = value
210 self.canvas.motors_to_world.trnz.smallest_fixed = value
212 if value and self.coarse_fixed:
213 self.coarse_fixed = False
215 @property
216 def coarse_fixed(self) -> bool:
217 """Whether the coarse axis is fixed for movement"""
218 return self.canvas.motors_to_world.trnx.largest_fixed
220 @coarse_fixed.setter
221 def coarse_fixed(self, value: bool):
222 self.canvas.motors_to_world.trnx.largest_fixed = value
223 self.canvas.motors_to_world.trny.largest_fixed = value
224 self.canvas.motors_to_world.trnz.largest_fixed = value
226 if value and self.fine_fixed:
227 self.fine_fixed = False
229 @property
230 def name(self):
231 return self._name
233 @property
234 def device(self):
235 return self._device
237 @property
238 def unit(self):
239 return get_exponent(self._base_unit)
241 @property
242 def id(self):
243 return self._id
245 @property
246 def origin(self):
247 return self._origin
249 @property
250 def beamsize(self) -> Dict[str, float]:
251 """Return the beamsize (in nm)"""
252 bs = self._get_from_config("beamsize")
253 return {"x": bs[0], "y": bs[1], "z": bs[2]}
255 @timed
256 def info(self):
257 """Get info from this image source
259 This includes name, offsets, scale, limits, url, etc
260 """
261 info = {
262 "sourceid": self._id,
263 "name": self._name,
264 "type": "video",
265 "origin": self._origin,
266 "url": self._url,
267 "additional_urls": self._additional_urls,
268 "scale": self._scale,
269 "additional": self.get_additional(),
270 }
271 info.update(self.canvas.vlm_image_info)
272 info["polylines"] = polylines = {}
273 info["markings"] = markings = {}
274 polylines["limits"] = self.canvas.reach_polyline.tolist()
275 polylines["fine motors"] = self.canvas.fine_polyline.tolist()
276 beam_info = self.canvas.beam_info
277 beam_info["origin"] = True
278 markings["beam"] = beam_info
279 info["reference"] = {"beam": self.calculate_reference_beam(beam_info["center"])}
281 return dict_nd_to_list(info)
283 def _device_changed(self, obj, prop, value):
284 self._emit(
285 "message",
286 {"type": "device", "prop": prop, "value": value},
287 )
289 def _zoom_changed(self, obj, prop, value):
290 """Callback when the zoom changes"""
291 if prop == "position" and obj == self._zoom:
292 try:
293 label = self.get_scale_label()
294 except RuntimeError:
295 logger.info("Zoom scale label currently unknown")
296 return
298 if label:
299 self.canvas.zoomlevel = label
300 self._emit(
301 "message",
302 {
303 "type": "source",
304 "info": dict_nd_to_list(
305 {
306 **self.canvas.vlm_image_info,
307 "sourceid": self._id,
308 "markings": {"beam": self.canvas.beam_info},
309 }
310 ),
311 },
312 )
314 def _additional_changed(self, obj, prop, value):
315 """Callback when an additional position changes"""
316 for name, ob in self._additional.items():
317 if prop == "position" and obj == ob:
318 self._emit(
319 "message",
320 {
321 "type": "origin",
322 "id": self._id,
323 "prop": "additional",
324 "object": name,
325 "value": value,
326 },
327 )
329 def get_additional(self):
330 """Get a dict of the additional positions
332 Returns:
333 positions(dict): A dict of positions and their values
334 """
335 additional = {}
336 for name, obj in self._additional.items():
337 additional[name] = obj.get("position")
339 return additional
341 def move_to_additional(self, additional, wait=True):
342 """Move to additional positions"""
343 if additional:
344 for a, v in additional.items():
345 print("moving to additional", a, v)
346 if a in self._additional:
347 self._additional[a].move(v)
349 if wait:
350 for a, v in additional.items():
351 if a in self._additional:
352 self._additional[a].wait()
354 def _translate_changed(self, obj, prop, value):
355 """Callback when a translation changes"""
356 if prop == "position":
357 self._queue_emit_source()
359 def _queue_emit_source(self):
360 now = time.time()
361 if now - self._last_emit_source > 0.2:
362 self._emit_source()
363 self._last_emit_source = now
364 else:
365 if self._emit_source_timeout:
366 self._emit_source_timeout.kill()
368 self._emit_source_timeout = gevent.spawn_later(0.5, self._emit_source)
370 def _emit_source(self):
371 if self.canvas is None:
372 return
374 beam_info = self.canvas.beam_info
375 self._emit(
376 "message",
377 {
378 "type": "source",
379 "info": dict_nd_to_list(
380 {
381 **self.canvas.vlm_image_info,
382 "sourceid": self._id,
383 "markings": {"beam": beam_info},
384 "polylines": {"fine motors": self.canvas.fine_polyline},
385 "reference": {
386 "beam": self.calculate_reference_beam(beam_info["center"])
387 },
388 }
389 ),
390 },
391 )
393 def set_zoom(self, zoom):
394 self._zoom.move(zoom)
396 def get_scale_label(self):
397 """Get the label for the current scale"""
398 label = self._zoom.get("position")
399 if label not in self._zoominfo:
400 raise RuntimeError(
401 "Zoom scale '{}' not in configuration ({})".format(
402 label, list(self._zoominfo.keys())
403 )
404 )
405 return label
407 def set_reference_inverse_matrix(self, matrix):
408 self._reference_inverse_matrix = matrix
410 def calculate_reference_beam(self, center):
411 if not isinstance(self._reference_inverse_matrix, numpy.ndarray):
412 return
414 ref_pos = numpy.dot(
415 # Beam center is in inverted y sense compared with reference image
416 self._reference_inverse_matrix,
417 numpy.array([center[0], -center[1], 1]),
418 )
419 return (ref_pos[0], ref_pos[1])