Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/tomo/scans/standard_tomo_scan.py: 16%
128 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
4import logging
5import typing
6from .tomo_scan import TomoScan
7from .tomo_scan import IncompatibleScanData
8from ..datatype import DetectorLiveStorage
9from ..datatype import MonitoredScan
10from ..datatype import Projection
13logger = logging.getLogger(__name__)
16class StandardTomoScan(TomoScan):
17 def _init(self, metadata):
18 info = self._create_tomo_scan(self.scanid, metadata)
19 if info is None:
20 raise IncompatibleScanData("Not a tomo scan")
21 self._info = info
23 @property
24 def info(self):
25 return self._info
27 def _get_involved_detectors(self, metadata) -> typing.List[DetectorLiveStorage]:
28 """Returns the list of involved detectors.
30 Empty if none.
31 """
32 channels = metadata.get("channels", None)
33 if channels is None:
34 logger.error(
35 "Unexpected scan content. Channels field is mandatory. BLISS scan specification have changed."
36 )
37 return []
38 result = []
39 for detector in self.component.get_detectors():
40 node_name = detector.detector_node_name
41 if node_name in channels:
42 result.append(detector)
43 return result
45 def _create_tomo_scan(self, scanid, metadata) -> typing.Optional[MonitoredScan]:
46 """Create a scan structure from metadata.
48 Else None, if the scan is not supported (not a tomo scan)
49 """
50 technique = metadata.get("technique", {})
52 detectors = self._get_involved_detectors(metadata)
53 if len(detectors) == 0:
54 # No need to monitor that scan
55 return None
57 def read_exposure_time_in_second(meta):
58 value = meta["exposure_time"]
59 unit = scan.get("exposure_time@units", "s")
60 coefs = {"s": 1, "ms": 0.001}
61 if unit not in coefs:
62 raise RuntimeError(f"Unsupported exposure_time unit '{unit}' in scan")
63 return value * coefs[unit]
65 if "dark" in technique:
66 scan = technique["dark"]
67 tomo_scan = "dark"
68 nb_frames = scan["dark_n"]
69 exposure_time = read_exposure_time_in_second(scan)
70 elif "flat" in technique:
71 scan = technique["flat"]
72 tomo_scan = "flat"
73 nb_frames = scan["flat_n"]
74 exposure_time = read_exposure_time_in_second(scan)
75 elif "proj" in technique:
76 scan = technique["proj"]
77 tomo_scan = "projection"
78 nb_frames = scan["proj_n"]
79 exposure_time = read_exposure_time_in_second(scan)
80 elif "scan" in technique and "tomo_n" in technique["scan"]:
81 scan = technique["scan"]
82 tomo_scan = "projection"
83 nb_frames = scan["tomo_n"]
84 exposure_time = scan["exposure_time"] / 1000
85 else:
86 image_key = technique.get("image_key", None)
87 if image_key == -1:
88 tomo_scan = "return"
89 else:
90 tomo_scan = None
91 # NOTE: BLISS 1.9 expose mesh scan npoints as numpy int64
92 # cast it to avoid further json serialization fail
93 nb_frames = int(metadata.get("npoints"))
94 exposure_time = metadata.get("count_time")
96 scan = MonitoredScan(
97 scan_id=scanid,
98 detectors=detectors,
99 nb_frames=nb_frames,
100 tomo_scan=tomo_scan,
101 exposure_time=exposure_time,
102 )
103 return scan
105 def on_scan_started(self, metadata):
106 monitored_scan = self._info
108 detector_ids = [d.detector_id for d in monitored_scan.detectors]
109 node_names = [d.detector_node_name for d in monitored_scan.detectors]
110 self.component.emit(
111 "new_scan",
112 {
113 "scanid": self.scanid,
114 "detector_ids": detector_ids,
115 "node_names": node_names,
116 "frame_no": monitored_scan.nb_frames - 1,
117 },
118 )
120 def on_data_received(
121 self, master, progress, channel_name, channel_size, channel_progress
122 ):
124 monitored_scan = self._info
126 # FIXME: This could be mapped to speed up the check
127 detectors = [
128 d for d in monitored_scan.detectors if d.detector_node_name == channel_name
129 ]
130 if len(detectors) != 0:
131 assert len(detectors) == 1, "Problem on the internal logic" # nosec
132 self._new_data_frame(
133 monitored_scan, detectors[0], channel_size - 1, channel_progress
134 )
136 def _new_data_frame(
137 self, monitored_scan, detector, channel_index, channel_progress
138 ):
139 """Triggered when a new frame was received"""
141 monitored_scan.frame_id_received[detector] = channel_index
143 if monitored_scan.tomo_scan not in ["dark", "flat"]:
144 if detector.update_emit_time(self.component.min_refresh_period):
145 monitored_scan.frame_id_sent[detector] = channel_index
146 self._proj_was_taken(
147 monitored_scan, detector, channel_index, channel_progress
148 )
150 def on_scan_terminated(self, metadata):
151 monitored_scan = self._info
153 if monitored_scan.tomo_scan == "dark":
154 for detector in monitored_scan.detectors:
155 self._dark_was_taken(monitored_scan, detector)
156 elif monitored_scan.tomo_scan == "flat":
157 for detector in monitored_scan.detectors:
158 self._flat_was_taken(monitored_scan, detector)
159 else:
160 for detector in monitored_scan.detectors:
161 newer_frame_id = monitored_scan.newer_received_frame_id(detector)
162 if newer_frame_id is not None:
163 detector.update_emit_time(None)
164 self._proj_was_taken(
165 monitored_scan,
166 detector,
167 frame_id=newer_frame_id,
168 progress=100,
169 )
171 detector_ids = [d.detector_id for d in monitored_scan.detectors]
172 node_names = [d.detector_node_name for d in monitored_scan.detectors]
173 self.component.emit(
174 "end_scan",
175 {
176 "scanid": self.scanid,
177 "detector_ids": detector_ids,
178 "node_names": node_names,
179 "frame_no": monitored_scan.nb_frames - 1,
180 },
181 )
183 def _dark_was_taken(
184 self, monitored_scan: MonitoredScan, detector: DetectorLiveStorage
185 ):
186 """Triggered at the end of a scan, when a dark was taken"""
187 frame_id = monitored_scan.nb_frames - 1
188 data = self.source.get_scan_image(
189 monitored_scan.scan_id, detector.detector_node_name, frame_id
190 )
191 if data is None:
192 # FIXME: Could it be retrieved later?
193 # Not for ct, could be for sct
194 logger.error(
195 "Data from scan %s frame %s was None", monitored_scan.scan_id, frame_id
196 )
197 return
198 detector.invalidate_proj(data)
199 detector.dark = Projection(
200 data=data,
201 exposure_time=monitored_scan.exposure_time,
202 scan_id=monitored_scan.scan_id,
203 frame_no=frame_id,
204 )
205 self.component.emit(
206 "new_dark",
207 {
208 "scanid": monitored_scan.scan_id,
209 "detector_id": detector.detector_id,
210 "node_name": detector.detector_node_name,
211 "has_flat": detector.flat is not None,
212 "has_proj": detector.proj is not None,
213 },
214 )
216 def _flat_was_taken(
217 self, monitored_scan: MonitoredScan, detector: DetectorLiveStorage
218 ):
219 """Triggered at the end of a scan, when a flat was taken"""
220 frame_id = monitored_scan.nb_frames - 1
221 data = self.source.get_scan_image(
222 monitored_scan.scan_id, detector.detector_node_name, frame_id
223 )
224 if data is None:
225 # FIXME: Could it be retrieved later?
226 # Not for ct, could be for sct
227 logger.error(
228 "Data from scan %s frame %s was None", monitored_scan.scan_id, frame_id
229 )
230 return
232 detector.invalidate_proj(data)
233 detector.flat = Projection(
234 data=data,
235 exposure_time=monitored_scan.exposure_time,
236 scan_id=monitored_scan.scan_id,
237 frame_no=frame_id,
238 )
239 self.component.emit(
240 "new_flat",
241 {
242 "scanid": monitored_scan.scan_id,
243 "detector_id": detector.detector_id,
244 "node_name": detector.detector_node_name,
245 "has_dark": detector.dark is not None,
246 "has_proj": detector.proj is not None,
247 },
248 )
250 def _proj_was_taken(
251 self,
252 monitored_scan: MonitoredScan,
253 detector: DetectorLiveStorage,
254 frame_id,
255 progress,
256 ):
257 data = self.source.get_scan_image(
258 monitored_scan.scan_id, detector.detector_node_name, frame_id
259 )
260 if data is None:
261 # FIXME: Could it be retrieved later?
262 # Not for ct, could be for sct
263 logger.error(
264 "Data from scan %s frame %s was None", monitored_scan.scan_id, frame_id
265 )
266 return
268 sample_stage_meta = self.component.get_sample_stage_meta(detector.detector_id)
269 detector.invalidate_proj(data)
270 detector.proj = Projection(
271 data=data,
272 exposure_time=monitored_scan.exposure_time,
273 scan_id=monitored_scan.scan_id,
274 frame_no=frame_id,
275 sample_stage_meta=sample_stage_meta,
276 )
277 self.component.emit(
278 "new_data",
279 {
280 "scanid": monitored_scan.scan_id,
281 "detector_id": detector.detector_id,
282 "node_name": detector.detector_node_name,
283 "progress": progress,
284 "frame_no": frame_id,
285 "has_dark": detector.dark is not None,
286 "has_flat": detector.flat is not None,
287 },
288 )