Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/tomo/component.py: 26%
163 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-06 02:13 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-06 02:13 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
4from __future__ import annotations
5import logging
6import typing
7import pint
8from ruamel.yaml import YAML
10from daiquiri.core.components import Component
11from daiquiri.core.components.dcutilsmixin import DCUtilsMixin
13from .detectors_resource import TomoDetectorsResource
14from .move_resource import TomoMoveResource
15from .image_resource import TomoImageResource
16from .scan_info_resource import TomoScanInfoResource
17from .slice_reconstruction_resource import SliceReconstructionResource
18from .datatype import DetectorLiveStorage
19from .datatype import SampleStageMetadata
20from .datatype import ScanInfoStorage
21from .scan_listener import TomoScanListener
24logger = logging.getLogger(__name__)
27TOMOVIS_PROXY_TEMPLATE = """
28name: tomovis
29target: TO_BE_MODIFIED
30openapi: res://openapi/tomovis_v1.json
31routes:
32 - name: image_tiling/
33 - name: image_tiling/<string:resource_id>
34 methods: [get, delete]
35 - name: h5grove/attr/
36 - name: h5grove/data/
37 - name: h5grove/meta/
38"""
41class TomoComponent(Component, DCUtilsMixin):
42 """Tomo detector component.
44 The tomo detector component watch scans and capture data in order to process
45 data correction on the fly.
47 For now it captures dark and flat from tomo scans. And provide an API to
48 retrieve a flat field correction.
49 """
51 def __init__(self, *args, **kwargs):
52 self._scan_sources = []
53 self._detectors: dict[str, DetectorLiveStorage] = {}
54 self._scaninfos: ScanInfoStorage = ScanInfoStorage()
55 self._tomo_config = None
56 self._exported_config = {}
57 self._scanlisteners = []
58 self._slice_reconstruction_triggers = None
59 self._last_delta_beta = None
60 super(TomoComponent, self).__init__(*args, **kwargs)
62 def after_all_setup(self, components):
63 """Called after the setup of the whole components"""
64 self._setup_tomovis_proxy(components)
65 avatar = self._config.get("avatar")
66 if avatar is not None:
67 self._exported_config["avatar"] = avatar
69 def get_export_config(self):
70 """Get exported config values from components"""
71 return self._exported_config
73 def _setup_tomovis_proxy(self, components):
74 config = self._config.get("tomovis", {})
75 url = config.get("url")
76 if url is not None:
77 self._exported_config["tomovis_url"] = url
78 create_proxy = config.get("create_proxy", False)
79 if create_proxy:
80 proxy = components.get_component("proxy")
81 if proxy is None:
82 raise RuntimeError(
83 "Tomovis setup as proxy but no proxy component configured"
84 )
85 self._exported_config["tomovis_proxy_url"] = "/api/proxy/tomovis"
86 yaml = YAML()
87 description = yaml.load(TOMOVIS_PROXY_TEMPLATE)
88 description["target"] = url
89 proxy.setup_proxy(description)
91 @property
92 def slice_reconstruction_triggers(self) -> list[str]:
93 return self._slice_reconstruction_triggers
95 @property
96 def scan_sources(self):
97 return self._scan_sources
99 @property
100 def min_refresh_period(self) -> float:
101 """Minimal image refresh period done during scan"""
102 return self._config.get("min_refresh_period", 1)
104 @property
105 def last_delta_beta(self) -> typing.Optional[float]:
106 """Last delta beta value used"""
107 return self._last_delta_beta
109 def set_last_delta_beta(self, delta_beta: float):
110 self._last_delta_beta = delta_beta
112 @property
113 def tomovis_uri(self) -> typing.Optional[str]:
114 """Tomovis URI service if defined, else None"""
115 config = self._config.get("tomovis", {})
116 return config.get("url")
118 def get_tomo_config_name(self) -> str:
119 """Returns the name of the tomo BLISS object used by the application"""
120 return self._tomo_config.name
122 def setup(self, *args, **kwargs):
123 self._slice_reconstruction_triggers = self._config.get(
124 "slice_reconstruction_triggers", None
125 )
126 if self._slice_reconstruction_triggers is None:
127 self._slice_reconstruction_triggers = ["terminated"]
129 for source in self._config["sources"]:
130 if source["type"] == "bliss":
132 def get_bliss_device_from_config(source, name):
133 device_name = source.get(name)
134 if device_name is None:
135 return None
137 if device_name == "ACTIVE_TOMOCONFIG":
138 from tomo.globals import ACTIVE_TOMOCONFIG
140 return ACTIVE_TOMOCONFIG.deref_active_object()
142 from bliss.config.static import get_config
144 config = get_config()
145 return config.get(device_name)
147 self._tomo_config = get_bliss_device_from_config(source, "tomo_config")
149 tomo_detectors = self._tomo_config.detectors
150 for tomo_detector in tomo_detectors.detectors:
151 # Name of the lima device inside BLISS
152 detector_id = tomo_detector.detector.name
153 logger.info("Register BLISS detector: %s", detector_id)
154 self._detectors[detector_id] = DetectorLiveStorage(
155 detector_id=detector_id,
156 tomo_detector_id=tomo_detector.name,
157 detector_node_name=f"{detector_id}:image",
158 )
160 scan_source_class = self._get_scan_source_class()
161 src = scan_source_class(
162 self._config, app=self._app, source_config=source
163 )
165 scanlistener = TomoScanListener(self, src)
166 src.watch_new_scan(scanlistener.new_scan)
167 src.watch_end_scan(scanlistener.end_scan)
168 src.watch_new_data(scanlistener.new_data)
169 src.watch_new_0d_data(scanlistener.new_0d_data)
171 self._scan_sources.append(src)
172 self._scanlisteners.append(scanlistener)
174 logger.debug("Registered Bliss scan source")
175 else:
176 raise TypeError("Only bliss source is available for now")
178 self.register_route(TomoDetectorsResource, "/detectors")
179 self.register_route(TomoScanInfoResource, "/scaninfo")
180 self.register_route(TomoImageResource, "/data")
181 self.register_route(TomoMoveResource, "/move")
182 self.register_route(SliceReconstructionResource, "/slice_reconstruction")
184 def _get_scan_source_class(self):
185 """
186 FIXME: This should be dropped
187 """
188 import bliss.release
189 from packaging.version import Version
191 if Version(bliss.release.version) < Version("2.0.dev"):
192 # Use old bliss connector
193 from daiquiri.core.hardware.bliss.scans import BlissScans
195 return BlissScans
196 else:
197 # Use blissdata anyway
198 from daiquiri.core.hardware.blissdata.scans import (
199 BlissdataScans,
200 )
202 return BlissdataScans
204 def get_tomo_config(self):
205 """Returns the tomo config object"""
206 return self._tomo_config
208 def get_sample_stage_meta(self, detector_name) -> SampleStageMetadata:
209 """Returns the sample stage state
211 This is a workaround for now, it would be much better to retrieve it
212 from the scan if possible.
214 Which is probably part of positioners for ct
215 """
217 def quantity_from_motor(motor):
218 if motor is None:
219 return None
220 return pint.Quantity(motor.position, motor.unit)
222 def quantity_or_none(value, unit):
223 if value is None:
224 return None
225 return pint.Quantity(value, unit)
227 pixel_size = None
229 def get_active_tomo_detector():
230 if self._tomo_config is None:
231 return None
232 if self._tomo_config.detectors is None:
233 return None
234 tomo_detector = self._tomo_config.detectors.active_detector
235 return tomo_detector
237 detector = get_active_tomo_detector()
238 if detector and detector.detector.name == detector_name:
239 # For now there is no way to reach the pixel size from another
240 # detector than the active one
241 ps = detector.sample_pixel_size
242 if ps:
243 pixel_size = pint.Quantity(ps, "um")
245 tomo_config = self._tomo_config
246 sample_stage = tomo_config.sample_stage
247 detector_center = sample_stage.detector_center
249 return SampleStageMetadata(
250 sy=quantity_from_motor(sample_stage.y_axis),
251 sz=quantity_from_motor(sample_stage.z_axis),
252 sampy=quantity_from_motor(sample_stage.sample_y_axis),
253 somega=quantity_from_motor(sample_stage.rotation_axis),
254 detcy=quantity_or_none(detector_center[0], "mm"),
255 detcz=quantity_or_none(detector_center[1], "mm"),
256 pixel_size=pixel_size,
257 )
259 def move(
260 self,
261 sy: pint.Quantity = None,
262 sz: pint.Quantity = None,
263 sampx: pint.Quantity = None,
264 sampy: pint.Quantity = None,
265 sampu: pint.Quantity = None,
266 sampv: pint.Quantity = None,
267 relative: bool = False,
268 ):
269 """Move the sample stage motors.
271 If multiple moves are requested, all the command are send in parallel
273 Arguments:
274 relative: If true, the move is relative
275 """
277 def normalize_value_to_motor_unit(motor, value):
278 if value.units == "css_pixel":
279 # By default pint provides a conversion from px to metric system
280 # TODO this have to be removed
281 raise RuntimeError("Can't convert from pixel to length")
282 return value.to(motor.unit).magnitude
284 trajectory = {
285 self._tomo_config.y_axis: sy,
286 self._tomo_config.z_axis: sz,
287 self._tomo_config.sample_x_axis: sampx,
288 self._tomo_config.sample_y_axis: sampy,
289 self._tomo_config.sample_u_axis: sampu,
290 self._tomo_config.sample_v_axis: sampv,
291 }
293 trajectory = {
294 k: normalize_value_to_motor_unit(k, v)
295 for k, v in trajectory.items()
296 if v is not None
297 }
299 if len(trajectory) == 0:
300 return
302 from bliss.common.motor_group import Group
304 group = Group(*trajectory.keys())
305 group.move(trajectory, wait=True, relative=relative)
307 def get_detector(self, detector_id: str) -> typing.Optional[DetectorLiveStorage]:
308 """Get available detectors"""
309 return self._detectors.get(detector_id)
311 def get_scaninfos(self):
312 """Returns the state of the actual scans"""
313 return self._scaninfos
315 def get_detectors(self) -> list[DetectorLiveStorage]:
316 """Get available detectors"""
317 return self._detectors.values()