Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/implementors/tomo/tilingscanwithroi.py: 0%
104 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import logging
4from marshmallow import fields, validate
5from marshmallow import validates_schema, ValidationError
6from daiquiri.core.schema.validators import OneOf
7from daiquiri.core.components import (
8 ComponentActor,
9 ComponentActorSchema,
10)
12from tomo.sequence.tiling import tiling
13from tomo.sequence.tiling import tiling_estimation_time
15try:
16 from tomo.globals import ACTIVE_TOMOCONFIG
17except ImportError:
18 ACTIVE_TOMOCONFIG = None
20_logger = logging.getLogger(__name__)
23class TilingscanwithroiSchema(ComponentActorSchema):
24 sampleid = fields.Int(required=True)
26 scan_type = OneOf(
27 ["Continuous", "Step"],
28 dump_default="Continuous",
29 metadata={"title": "Scan type"},
30 )
31 # FIXME: Units have to be read from the motors
32 # FIXME: Motor names have to be read from the motor
33 start_x = fields.Float(
34 validate=validate.Range(min=-2000, max=2000),
35 dump_default=-250,
36 metadata={"title": "Start", "unit": "㎜"},
37 )
38 end_x = fields.Float(
39 validate=validate.Range(min=-2000, max=2000),
40 dump_default=250,
41 metadata={"title": "Stop", "unit": "㎜"},
42 )
43 start_y = fields.Float(
44 validate=validate.Range(min=-2000, max=2000),
45 dump_default=-250,
46 metadata={"title": "Start", "unit": "㎜"},
47 )
48 end_y = fields.Float(
49 validate=validate.Range(min=-2000, max=2000),
50 dump_default=250,
51 metadata={"title": "Stop", "unit": "㎜"},
52 )
53 start_z = fields.Float(
54 validate=validate.Range(min=-2000, max=2000),
55 dump_default=550,
56 metadata={"title": "Start", "unit": "㎜"},
57 )
58 end_z = fields.Float(
59 validate=validate.Range(min=-2000, max=2000),
60 dump_default=-550,
61 metadata={"title": "Stop", "unit": "㎜"},
62 )
63 expo_time = fields.Float(
64 validate=validate.Range(min=0.001, max=10),
65 dump_default=None,
66 metadata={"title": "Exposure time", "unit": "s"},
67 )
68 sleep_time = fields.Float(
69 validate=validate.Range(min=0.0, max=10),
70 dump_default=0,
71 metadata={"title": "Sleep time", "unit": "s"},
72 )
73 n_dark = fields.Int(
74 dump_default=1,
75 validate=validate.Range(min=0, max=999999),
76 metadata={"title": "Nb darks"},
77 )
78 n_flat = fields.Int(
79 dump_default=1,
80 validate=validate.Range(min=0, max=999999),
81 metadata={"title": "Nb flats"},
82 )
83 lateral_motor_mode = OneOf(
84 ["over_rot", "under_rot"],
85 dump_default="over_rot",
86 metadata={
87 "title": "Lateral motor",
88 },
89 )
90 restore_motor_positions = fields.Bool(
91 metadata={
92 "title": "Move back motors",
93 "description": "Move back motors at their initial position when the scan is terminated",
94 },
95 dump_default=True,
96 )
97 enqueue = fields.Bool(
98 dump_default=True,
99 metadata={
100 "title": "Queue Scan",
101 },
102 )
104 class Meta:
105 uiorder = [
106 "sampleid",
107 "scan_type",
108 "expo_time",
109 "sleep_time",
110 "n_dark",
111 "n_flat",
112 "lateral_motor_mode",
113 "restore_motor_positions",
114 "start_x",
115 "end_x",
116 "start_y",
117 "end_y",
118 "start_z",
119 "end_z",
120 "enqueue",
121 ]
122 uigroups = [
123 "sampleid",
124 "scan_type",
125 {
126 "Options": [
127 "expo_time",
128 "sleep_time",
129 "n_dark",
130 "n_flat",
131 "lateral_motor_mode",
132 "restore_motor_positions",
133 ],
134 "ui:minwidth": 12,
135 },
136 {
137 "Y-motor (side)": ["start_x", "end_x"],
138 "ui:minwidth": 6,
139 },
140 {
141 "Y-motor (front)": ["start_y", "end_y"],
142 "ui:minwidth": 6,
143 },
144 {
145 "Z-motor": ["start_z", "end_z"],
146 "ui:minwidth": 6,
147 },
148 "enqueue",
149 ]
150 uischema = {
151 "sampleid": {"classNames": "hidden-row", "ui:widget": "hidden"},
152 "enqueue": {"classNames": "hidden-row", "ui:widget": "hidden"},
153 }
155 @validates_schema
156 def schema_validate(self, data, **kwargs):
157 if data["scan_type"] == "Continuous":
158 if data["lateral_motor_mode"] != "under_rot":
159 raise ValidationError(
160 f"In continuous mode, the lateral motor have ot be under the rot"
161 )
163 def calculated(self, data, **kwargs):
164 """Returns the calculated values
166 Arguments:
167 data: Dictionary containing the actual parameters of the form
168 """
169 result = {}
171 if data.get("expo_time") is None:
172 # Trick to feed the initial expo time based on the tomo imaging device
173 tomo_config = (
174 ACTIVE_TOMOCONFIG is not None
175 and ACTIVE_TOMOCONFIG.deref_active_object()
176 )
177 if tomo_config is None:
178 raise RuntimeError("No ACTIVE_TOMOCONFIG selected")
179 imaging = tomo_config.tomo_imaging
180 if imaging is not None:
181 result["expo_time"] = imaging.exposure_time
182 else:
183 result["expo_time"] = 1.0
185 return result
187 def time_estimate(self, data):
188 result = self.calculated(data)
189 data.update(result)
190 tomo_config = (
191 ACTIVE_TOMOCONFIG is not None and ACTIVE_TOMOCONFIG.deref_active_object()
192 )
193 if tomo_config is None:
194 raise RuntimeError("No ACTIVE_TOMOCONFIG selected")
195 tomo_det = tomo_config.detectors.active_detector
196 if tomo_det is not None:
197 tomo_cam = tomo_det.detector
198 else:
199 tomo_cam = None
201 if tomo_cam is None:
202 raise RuntimeError(f"No active detector selected in `{tomo_config.name}`")
203 try:
204 start_x = data["start_x"]
205 end_x = data["end_x"]
206 start_y = data["start_y"]
207 end_y = data["end_y"]
208 start_z = data["start_z"]
209 end_z = data["end_z"]
210 expo_time = data["expo_time"]
211 sleep_time = data.get("sleep_time", 0) # FIXME: It can be uninitialized...
212 n_dark = data["n_dark"]
213 n_flat = data["n_flat"]
214 lateral_motor_mode = data["lateral_motor_mode"]
215 restore_motor_positions = data["restore_motor_positions"]
216 scan_type = data["scan_type"]
217 except Exception:
218 _logger.debug("Error while reading data", exc_info=True)
219 return 0
220 try:
221 continuous = scan_type == "Continuous"
222 return tiling_estimation_time(
223 start_x,
224 end_x,
225 start_y,
226 end_y,
227 start_z,
228 end_z,
229 tomo_cam,
230 expo_time=expo_time,
231 sleep_time=sleep_time,
232 n_dark=n_dark,
233 n_flat=n_flat,
234 lateral_motor_mode=lateral_motor_mode,
235 restore_motor_positions=restore_motor_positions,
236 continuous=continuous,
237 tomoconfig=tomo_config,
238 )
239 except Exception:
240 _logger.error("Error while computing estimation", exc_info=True)
241 return 0
244class TilingscanwithroiActor(ComponentActor):
245 schema = TilingscanwithroiSchema
246 name = "[tomo] tiling scan"
248 metatype = "tomo"
250 def method(
251 self,
252 start_x,
253 end_x,
254 start_y,
255 end_y,
256 start_z,
257 end_z,
258 expo_time,
259 sleep_time,
260 n_dark,
261 n_flat,
262 lateral_motor_mode,
263 scan_type: str,
264 restore_motor_positions,
265 before_scan_starts,
266 update_datacollection,
267 **kwargs,
268 ):
269 tomo_config = (
270 ACTIVE_TOMOCONFIG is not None and ACTIVE_TOMOCONFIG.deref_active_object()
271 )
272 if tomo_config is None:
273 raise RuntimeError("No ACTIVE_TOMOCONFIG selected")
274 tomo_det = tomo_config.detectors.active_detector
275 if tomo_det is not None:
276 tomo_cam = tomo_det.detector
277 else:
278 tomo_cam = None
280 if tomo_cam is None:
281 raise RuntimeError(f"No active detector selected in `{tomo_config.name}`")
283 tomo_det.sync_hard()
284 if not tomo_det.is_online:
285 raise RuntimeError(f"{tomo_cam.name} is not online")
286 if not tomo_det.check_ready_for_acquisition():
287 raise RuntimeError(f"{tomo_cam.name} is not ready")
289 continuous = scan_type == "Continuous"
290 tiling(
291 start_x,
292 end_x,
293 start_y,
294 end_y,
295 start_z,
296 end_z,
297 tomo_cam,
298 expo_time=expo_time,
299 sleep_time=sleep_time,
300 n_dark=n_dark,
301 n_flat=n_flat,
302 lateral_motor_mode=lateral_motor_mode,
303 continuous=continuous,
304 restore_motor_positions=restore_motor_positions,
305 tomoconfig=tomo_config,
306 )