Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/implementors/tomo/tilingscanwithroi.py: 0%
131 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-06 02:13 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-06 02:13 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
4from __future__ import annotations
6import logging
7from marshmallow import fields, validate
8from marshmallow import validates_schema, ValidationError
9from daiquiri.core.schema.validators import OneOf
10from daiquiri.core.components import (
11 ComponentActor,
12 ComponentActorSchema,
13)
15from tomo.sequence.tiling import tiling
16from tomo.sequence.tiling import tiling_estimation_time
17from . import tomo_helper
19try:
20 from tomo.globals import ACTIVE_TOMOCONFIG
21except ImportError:
22 ACTIVE_TOMOCONFIG = None
24_logger = logging.getLogger(__name__)
27class TilingscanwithroiSchema(ComponentActorSchema):
28 sampleid = fields.Int(required=True)
30 scan_type = OneOf(
31 ["Continuous", "Step"],
32 dump_default="Continuous",
33 metadata={"title": "Scan type"},
34 )
35 speed_ratio = fields.Float(
36 validate=validate.Range(min=0.0, max=1.0),
37 dump_default=1.0,
38 metadata={
39 "title": "Stable/speed",
40 "multipleOf": 0.01,
41 },
42 )
43 # FIXME: Units have to be read from the motors
44 # FIXME: Motor names have to be read from the motor
45 start_x = fields.Float(
46 validate=validate.Range(min=-2000, max=2000),
47 dump_default=-250,
48 metadata={"title": "Start", "unit": "㎜"},
49 )
50 end_x = fields.Float(
51 validate=validate.Range(min=-2000, max=2000),
52 dump_default=250,
53 metadata={"title": "Stop", "unit": "㎜"},
54 )
55 start_y = fields.Float(
56 validate=validate.Range(min=-2000, max=2000),
57 dump_default=-250,
58 metadata={"title": "Start", "unit": "㎜"},
59 )
60 end_y = fields.Float(
61 validate=validate.Range(min=-2000, max=2000),
62 dump_default=250,
63 metadata={"title": "Stop", "unit": "㎜"},
64 )
65 start_z = fields.Float(
66 validate=validate.Range(min=-2000, max=2000),
67 dump_default=550,
68 metadata={"title": "Start", "unit": "㎜"},
69 )
70 end_z = fields.Float(
71 validate=validate.Range(min=-2000, max=2000),
72 dump_default=-550,
73 metadata={"title": "Stop", "unit": "㎜"},
74 )
75 expo_time = fields.Float(
76 validate=validate.Range(min=0.001, max=10),
77 dump_default=None,
78 metadata={"title": "Exposure time", "unit": "s"},
79 )
80 sleep_time = fields.Float(
81 validate=validate.Range(min=0.0, max=10),
82 dump_default=0,
83 metadata={"title": "Sleep time", "unit": "s"},
84 )
85 n_dark = fields.Int(
86 dump_default=1,
87 validate=validate.Range(min=0, max=999999),
88 metadata={"title": "Nb darks"},
89 )
90 n_flat = fields.Int(
91 dump_default=1,
92 validate=validate.Range(min=0, max=999999),
93 metadata={"title": "Nb flats"},
94 )
95 lateral_motor_mode = OneOf(
96 ["over_rot", "under_rot"],
97 dump_default="over_rot",
98 metadata={
99 "title": "Lateral motor",
100 },
101 )
102 pixel_blur = fields.Float(
103 dump_default=0,
104 metadata={
105 "title": "Pixel blur",
106 "description": "Displacement during the expo time in pixel",
107 "unit": "px",
108 "readOnly": True,
109 },
110 )
111 restore_motor_positions = fields.Bool(
112 metadata={
113 "title": "Move back motors",
114 "description": "Move back motors at their initial position when the scan is terminated",
115 },
116 dump_default=True,
117 )
118 enqueue = fields.Bool(
119 dump_default=True,
120 metadata={
121 "title": "Queue Scan",
122 },
123 )
125 class Meta:
126 uiorder = [
127 "sampleid",
128 "scan_type",
129 "speed_ratio",
130 "pixel_blur",
131 "expo_time",
132 "sleep_time",
133 "n_dark",
134 "n_flat",
135 "lateral_motor_mode",
136 "restore_motor_positions",
137 "start_x",
138 "end_x",
139 "start_y",
140 "end_y",
141 "start_z",
142 "end_z",
143 "enqueue",
144 ]
145 uigroups = [
146 "sampleid",
147 "scan_type",
148 "speed_ratio",
149 "pixel_blur",
150 {
151 "Options": [
152 "expo_time",
153 "sleep_time",
154 "n_dark",
155 "n_flat",
156 "lateral_motor_mode",
157 "restore_motor_positions",
158 ],
159 "ui:minwidth": 12,
160 },
161 {
162 "Y-motor (side)": ["start_x", "end_x"],
163 "ui:minwidth": 6,
164 },
165 {
166 "Y-motor (front)": ["start_y", "end_y"],
167 "ui:minwidth": 6,
168 },
169 {
170 "Z-motor": ["start_z", "end_z"],
171 "ui:minwidth": 6,
172 },
173 "enqueue",
174 ]
175 uischema = {
176 "sampleid": {"classNames": "hidden-row", "ui:widget": "hidden"},
177 "enqueue": {"classNames": "hidden-row", "ui:widget": "hidden"},
178 "speed_ratio": {"ui:widget": "range"},
179 }
181 @validates_schema
182 def schema_validate(self, data, **kwargs):
183 if data["scan_type"] == "Continuous":
184 if data["lateral_motor_mode"] != "under_rot":
185 raise ValidationError(
186 "In continuous mode, the lateral motor have ot be under the rot"
187 )
189 def calculated(self, data):
190 try:
191 return self._calculated(**data)
192 except Exception:
193 _logger.error("Error while calculated", exc_info=True)
195 def _calculated(
196 self,
197 expo_time: float | None = None,
198 scan_type: str = None,
199 speed_ratio: float = None,
200 **kwargs,
201 ):
202 """Returns the calculated values
204 Arguments:
205 data: Dictionary containing the actual parameters of the form
206 """
207 result = {}
209 tomo_config = tomo_helper.get_active_tomo_config()
210 if tomo_config is None:
211 raise RuntimeError("No ACTIVE_TOMOCONFIG selected")
213 if expo_time is None:
214 # Trick to feed the initial expo time based on the tomo imaging device
215 imaging = tomo_config.tomo_imaging
216 if imaging is not None:
217 expo_time = imaging.exposure_time
218 else:
219 expo_time = 1.0
220 result["expo_time"] = expo_time
222 if scan_type == "Step":
223 result["pixel_blur"] = 0.0
224 elif scan_type == "Continuous" and speed_ratio is not None:
225 if expo_time is not None:
226 tomo_detector = tomo_config.detectors.active_detector
227 px = tomo_detector.sample_pixel_size
229 def get_continuous_velocity(z_axis):
230 vmax = z_axis.velocity
231 v1px = 1000 * px / expo_time
232 if v1px < vmax:
233 vmin = v1px
234 else:
235 vmin = vmax * 0.1
236 return vmin + (vmax - vmin) * speed_ratio
238 v = get_continuous_velocity(tomo_config.z_axis)
239 pixel_blur = (expo_time * v) / (1000 * px)
240 result["pixel_blur"] = pixel_blur
242 return result
244 def time_estimate(self, data):
245 result = self._calculated(**data)
246 data.update(result)
247 tomo_config = (
248 ACTIVE_TOMOCONFIG is not None and ACTIVE_TOMOCONFIG.deref_active_object()
249 )
250 if tomo_config is None:
251 raise RuntimeError("No ACTIVE_TOMOCONFIG selected")
252 tomo_det = tomo_config.detectors.active_detector
253 if tomo_det is not None:
254 tomo_cam = tomo_det.detector
255 else:
256 tomo_cam = None
258 if tomo_cam is None:
259 raise RuntimeError(f"No active detector selected in `{tomo_config.name}`")
260 try:
261 start_x = data["start_x"]
262 end_x = data["end_x"]
263 start_y = data["start_y"]
264 end_y = data["end_y"]
265 start_z = data["start_z"]
266 end_z = data["end_z"]
267 expo_time = data["expo_time"]
268 sleep_time = data.get("sleep_time", 0) # FIXME: It can be uninitialized...
269 n_dark = data["n_dark"]
270 n_flat = data["n_flat"]
271 lateral_motor_mode = data["lateral_motor_mode"]
272 restore_motor_positions = data["restore_motor_positions"]
273 scan_type = data["scan_type"]
274 speed_ratio = data["speed_ratio"]
275 except Exception:
276 _logger.debug("Error while reading data", exc_info=True)
277 return 0
278 try:
279 continuous = scan_type == "Continuous"
280 return tiling_estimation_time(
281 start_x,
282 end_x,
283 start_y,
284 end_y,
285 start_z,
286 end_z,
287 tomo_cam,
288 expo_time=expo_time,
289 sleep_time=sleep_time,
290 n_dark=n_dark,
291 n_flat=n_flat,
292 lateral_motor_mode=lateral_motor_mode,
293 restore_motor_positions=restore_motor_positions,
294 continuous=continuous,
295 tomoconfig=tomo_config,
296 speed_ratio=speed_ratio * 1.0,
297 )
298 except Exception:
299 _logger.error("Error while computing estimation", exc_info=True)
300 return 0
303class TilingscanwithroiActor(ComponentActor):
304 schema = TilingscanwithroiSchema
305 name = "[tomo] tiling scan"
307 metatype = "tomo"
309 def method(
310 self,
311 start_x: float,
312 end_x: float,
313 start_y: float,
314 end_y: float,
315 start_z: float,
316 end_z: float,
317 expo_time: float,
318 sleep_time: float,
319 n_dark: int,
320 n_flat: int,
321 lateral_motor_mode: str,
322 scan_type: str,
323 restore_motor_positions: bool,
324 speed_ratio: float,
325 before_scan_starts,
326 update_datacollection,
327 **kwargs,
328 ):
329 tomo_config = (
330 ACTIVE_TOMOCONFIG is not None and ACTIVE_TOMOCONFIG.deref_active_object()
331 )
332 if tomo_config is None:
333 raise RuntimeError("No ACTIVE_TOMOCONFIG selected")
334 tomo_det = tomo_config.detectors.active_detector
335 if tomo_det is not None:
336 tomo_cam = tomo_det.detector
337 else:
338 tomo_cam = None
340 if tomo_cam is None:
341 raise RuntimeError(f"No active detector selected in `{tomo_config.name}`")
343 tomo_det.sync_hard()
344 if not tomo_det.is_online:
345 raise RuntimeError(f"{tomo_cam.name} is not online")
346 if not tomo_det.check_ready_for_acquisition():
347 raise RuntimeError(f"{tomo_cam.name} is not ready")
349 continuous = scan_type == "Continuous"
350 tiling(
351 start_x,
352 end_x,
353 start_y,
354 end_y,
355 start_z,
356 end_z,
357 tomo_cam,
358 expo_time=expo_time,
359 sleep_time=sleep_time,
360 n_dark=n_dark,
361 n_flat=n_flat,
362 lateral_motor_mode=lateral_motor_mode,
363 continuous=continuous,
364 speed_ratio=speed_ratio,
365 restore_motor_positions=restore_motor_positions,
366 tomoconfig=tomo_config,
367 )