Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/tomo/component.py: 26%
162 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
4import logging
5import typing
6import pint
7from ruamel.yaml import YAML
9from daiquiri.core.components import Component
10from daiquiri.core.components.dcutilsmixin import DCUtilsMixin
12from .detectors_resource import TomoDetectorsResource
13from .move_resource import TomoMoveResource
14from .image_resource import TomoImageResource
15from .scan_info_resource import TomoScanInfoResource
16from .slice_reconstruction_resource import SliceReconstructionResource
17from .datatype import DetectorLiveStorage
18from .datatype import SampleStageMetadata
19from .datatype import ScanInfoStorage
20from .scan_listener import TomoScanListener
23logger = logging.getLogger(__name__)
26TOMOVIS_PROXY_TEMPLATE = """
27name: tomovis
28target: TO_BE_MODIFIED
29openapi: res://openapi/tomovis_v1.json
30routes:
31 - name: image_tiling/
32 - name: image_tiling/<string:resource_id>
33 methods: [get, delete]
34 - name: h5grove/attr/
35 - name: h5grove/data/
36 - name: h5grove/meta/
37"""
40class TomoComponent(Component, DCUtilsMixin):
41 """Tomo detector component.
43 The tomo detector component watch scans and capture data in order to process
44 data correction on the fly.
46 For now it captures dark and flat from tomo scans. And provide an API to
47 retrieve a flat field correction.
48 """
50 def __init__(self, *args, **kwargs):
51 self._scan_sources = []
52 self._detectors: typing.Dict[str, DetectorLiveStorage] = {}
53 self._scaninfos: ScanInfoStorage = ScanInfoStorage()
54 self._tomo_config = None
55 self._exported_config = {}
56 self._scanlisteners = []
57 self._slice_reconstruction_triggers = None
58 self._last_delta_beta = None
59 super(TomoComponent, self).__init__(*args, **kwargs)
61 def after_all_setup(self, components):
62 """Called after the setup of the whole components"""
63 self._setup_tomovis_proxy(components)
64 avatar = self._config.get("avatar")
65 if avatar is not None:
66 self._exported_config["avatar"] = avatar
68 def get_export_config(self):
69 """Get exported config values from components"""
70 return self._exported_config
72 def _setup_tomovis_proxy(self, components):
73 config = self._config.get("tomovis", {})
74 url = config.get("url")
75 if url is not None:
76 self._exported_config["tomovis_url"] = url
77 create_proxy = config.get("create_proxy", False)
78 if create_proxy:
79 proxy = components.get_component("proxy")
80 if proxy is None:
81 raise RuntimeError(
82 "Tomovis setup as proxy but no proxy component configured"
83 )
84 self._exported_config["tomovis_proxy_url"] = "/api/proxy/tomovis"
85 yaml = YAML()
86 description = yaml.load(TOMOVIS_PROXY_TEMPLATE)
87 description["target"] = url
88 proxy.setup_proxy(description)
90 @property
91 def slice_reconstruction_triggers(self) -> typing.List[str]:
92 return self._slice_reconstruction_triggers
94 @property
95 def scan_sources(self):
96 return self._scan_sources
98 @property
99 def min_refresh_period(self) -> float:
100 """Minimal image refresh period done during scan"""
101 return self._config.get("min_refresh_period", 1)
103 @property
104 def last_delta_beta(self) -> typing.Optional[float]:
105 """Last delta beta value used"""
106 return self._last_delta_beta
108 def set_last_delta_beta(self, delta_beta: float):
109 self._last_delta_beta = delta_beta
111 @property
112 def tomovis_uri(self) -> typing.Optional[str]:
113 """Tomovis URI service if defined, else None"""
114 config = self._config.get("tomovis", {})
115 return config.get("url")
117 def get_tomo_config_name(self) -> str:
118 """Returns the name of the tomo BLISS object used by the application"""
119 return self._tomo_config.name
121 def setup(self, *args, **kwargs):
122 self._slice_reconstruction_triggers = self._config.get(
123 "slice_reconstruction_triggers", None
124 )
125 if self._slice_reconstruction_triggers is None:
126 self._slice_reconstruction_triggers = ["terminated"]
128 for source in self._config["sources"]:
129 if source["type"] == "bliss":
131 def get_bliss_device_from_config(source, name):
132 device_name = source.get(name)
133 if device_name is None:
134 return None
136 if device_name == "ACTIVE_TOMOCONFIG":
137 from tomo.globals import ACTIVE_TOMOCONFIG
139 return ACTIVE_TOMOCONFIG.deref_active_object()
141 from bliss.config.static import get_config
143 config = get_config()
144 return config.get(device_name)
146 self._tomo_config = get_bliss_device_from_config(source, "tomo_config")
148 tomo_detectors = self._tomo_config.detectors
149 for tomo_detector in tomo_detectors.detectors:
150 # Name of the lima device inside BLISS
151 detector_id = tomo_detector.detector.name
152 logger.info("Register BLISS detector: %s", detector_id)
153 self._detectors[detector_id] = DetectorLiveStorage(
154 detector_id=detector_id,
155 tomo_detector_id=tomo_detector.name,
156 detector_node_name=f"{detector_id}:image",
157 )
159 scan_source_class = self._get_scan_source_class()
160 src = scan_source_class(
161 self._config, app=self._app, source_config=source
162 )
164 scanlistener = TomoScanListener(self, src)
165 src.watch_new_scan(scanlistener.new_scan)
166 src.watch_end_scan(scanlistener.end_scan)
167 src.watch_new_data(scanlistener.new_data)
168 src.watch_new_0d_data(scanlistener.new_0d_data)
170 self._scan_sources.append(src)
171 self._scanlisteners.append(scanlistener)
173 logger.debug("Registered Bliss scan source")
174 else:
175 raise TypeError("Only bliss source is available for now")
177 self.register_route(TomoDetectorsResource, "/detectors")
178 self.register_route(TomoScanInfoResource, "/scaninfo")
179 self.register_route(TomoImageResource, "/data")
180 self.register_route(TomoMoveResource, "/move")
181 self.register_route(SliceReconstructionResource, "/slice_reconstruction")
183 def _get_scan_source_class(self):
184 """
185 FIXME: This should be dropped
186 """
187 import bliss.release
188 from packaging.version import Version
190 if Version(bliss.release.version) < Version("2.0.dev"):
191 # Use old bliss connector
192 from daiquiri.core.hardware.bliss.scans import BlissScans
194 return BlissScans
195 else:
196 # Use blissdata anyway
197 from daiquiri.core.hardware.blissdata.scans import (
198 BlissdataScans,
199 )
201 return BlissdataScans
203 def get_tomo_config(self):
204 """Returns the tomo config object"""
205 return self._tomo_config
207 def get_sample_stage_meta(self, detector_name) -> SampleStageMetadata:
208 """Returns the sample stage state
210 This is a workaround for now, it would be much better to retrieve it
211 from the scan if possible.
213 Which is probably part of positioners for ct
214 """
216 def quantity_from_motor(motor):
217 if motor is None:
218 return None
219 return pint.Quantity(motor.position, motor.unit)
221 def quantity_or_none(value, unit):
222 if value is None:
223 return None
224 return pint.Quantity(value, unit)
226 pixel_size = None
228 def get_active_tomo_detector():
229 if self._tomo_config is None:
230 return None
231 if self._tomo_config.detectors is None:
232 return None
233 tomo_detector = self._tomo_config.detectors.active_detector
234 return tomo_detector
236 detector = get_active_tomo_detector()
237 if detector and detector.detector.name == detector_name:
238 # For now there is no way to reach the pixel size from another
239 # detector than the active one
240 ps = detector.sample_pixel_size
241 if ps:
242 pixel_size = pint.Quantity(ps, "um")
244 tomo_config = self._tomo_config
245 sample_stage = tomo_config.sample_stage
246 detector_center = sample_stage.detector_center
248 return SampleStageMetadata(
249 sy=quantity_from_motor(sample_stage.y_axis),
250 sz=quantity_from_motor(sample_stage.z_axis),
251 sampy=quantity_from_motor(sample_stage.sample_y_axis),
252 somega=quantity_from_motor(sample_stage.rotation_axis),
253 detcy=quantity_or_none(detector_center[0], "mm"),
254 detcz=quantity_or_none(detector_center[1], "mm"),
255 pixel_size=pixel_size,
256 )
258 def move(
259 self,
260 sy: pint.Quantity = None,
261 sz: pint.Quantity = None,
262 sampx: pint.Quantity = None,
263 sampy: pint.Quantity = None,
264 sampu: pint.Quantity = None,
265 sampv: pint.Quantity = None,
266 relative: bool = False,
267 ):
268 """Move the sample stage motors.
270 If multiple moves are requested, all the command are send in parallel
272 Arguments:
273 relative: If true, the move is relative
274 """
276 def normalize_value_to_motor_unit(motor, value):
277 if value.units == "css_pixel":
278 # By default pint provides a conversion from px to metric system
279 # TODO this have to be removed
280 raise RuntimeError("Can't convert from pixel to length")
281 return value.to(motor.unit).magnitude
283 trajectory = {
284 self._tomo_config.y_axis: sy,
285 self._tomo_config.z_axis: sz,
286 self._tomo_config.sample_x_axis: sampx,
287 self._tomo_config.sample_y_axis: sampy,
288 self._tomo_config.sample_u_axis: sampu,
289 self._tomo_config.sample_v_axis: sampv,
290 }
292 trajectory = {
293 k: normalize_value_to_motor_unit(k, v)
294 for k, v in trajectory.items()
295 if v is not None
296 }
298 if len(trajectory) == 0:
299 return
301 from bliss.common.motor_group import Group
303 group = Group(*trajectory.keys())
304 group.move(trajectory, wait=True, relative=relative)
306 def get_detector(self, detector_id: str) -> typing.Optional[DetectorLiveStorage]:
307 """Get available detectors"""
308 return self._detectors.get(detector_id)
310 def get_scaninfos(self):
311 """Returns the state of the actual scans"""
312 return self._scaninfos
314 def get_detectors(self) -> typing.List[DetectorLiveStorage]:
315 """Get available detectors"""
316 return self._detectors.values()