Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/tomo/datatype.py: 67%
210 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
4from __future__ import annotations
5import numpy
6import logging
7import dataclasses
8import time
9import datetime
10import pint
11from .scans.tomo_scan import TomoScan
14logger = logging.getLogger(__name__)
17@dataclasses.dataclass
18class SampleStageMetadata:
19 """Store information relative to the sample stage"""
21 sz: pint.Quantity
23 sy: pint.Quantity
25 sampy: pint.Quantity
27 somega: pint.Quantity
28 """We have to think about if we need user+dial"""
30 detcy: pint.Quantity
31 """Detector center in the sample stage referencial"""
33 detcz: pint.Quantity
34 """Detector center in the sample stage referencial"""
36 pixel_size: pint.Quantity
39@dataclasses.dataclass
40class Sinogram:
41 expected_nb_points: int
42 """Expected number of points in the sinogram"""
44 rotation_range: tuple[float, float] | None
45 """Range on the rotation axis"""
47 rotation_axis_points: int
48 """Number of points in the rotation axis"""
50 translation_range: tuple[float, float] | None
51 """Range on the translation axis"""
53 translation_axis_points: int
54 """Number of points in the translation axis"""
56 _actual_nb_points: int = 0
57 """Actual number of points in the sinogram"""
59 _actual_channel_points = [0, 0, 0]
61 _sinogram_updated = None
63 @property
64 def actual_nb_points(self):
65 return self._actual_nb_points
67 def connect_sinogram_updated(self, callback):
68 assert self._sinogram_updated is None # nosec
69 self._sinogram_updated = callback
71 def channel_updated(self, channel_name, channel_size):
72 channel_names = ["sinogram", "rotation", "translation"]
73 try:
74 index = channel_names.index(channel_name)
75 except ValueError:
76 return
77 self._actual_channel_points[index] = channel_size
78 nb = min(self._actual_channel_points)
79 if nb <= self._actual_nb_points:
80 return
81 self._actual_nb_points = nb
82 if self._sinogram_updated:
83 self._sinogram_updated(nb)
85 def to_rest(self):
86 result = {
87 "actualnbpoints": self._actual_nb_points,
88 "expectednbpoints": self.expected_nb_points,
89 "rotationrange": self.rotation_range,
90 "translationrange": self.translation_range,
91 "rotationaxispoints": self.rotation_axis_points,
92 "translationaxispoints": self.translation_axis_points,
93 }
94 return result
97@dataclasses.dataclass
98class ScanInfo:
99 scan_id: int
100 """Daiquiri identifier of the scan"""
102 group_id: int | None
103 """Daiquiri identifier of the group scan, if one"""
105 start_time: datetime.datetime | None = None
106 """Start time of this scan"""
108 frame_no: int | None = None
109 """Identifier of the frame in this scan"""
111 state: str | None = None
112 """State of the scan
114 Which is the name if state from `bliss.scanning.scan.ScanState`
115 """
117 detector_id: str | None = None
118 """Name id of the detector"""
120 sinogram: Sinogram | None = None
121 """Dedicated information related to the sinogram"""
123 handle_datacollection: bool = False
124 """If true the tomo component have to handle the live cycle of the datacollection"""
126 data_collection_id: int | None = None
127 """Data collection id of this scan if some from Daiquiri"""
129 data_collection_group_id: int | None = None
130 """Data collection group id of this scan if some from Daiquiri"""
132 sample_id: int | None = None
133 """Sample id from Daiquiri"""
135 session_id: int | None = None
136 """session id from Daiquiri"""
138 subscans: list[str] | None = None
139 """List containing the expected sequence of scans with dedicated roles"""
141 active_subscan = -1
142 """Index of the active subscan"""
144 _active_subscan_updated = None
146 def connect_active_subscan_updated(self, callback):
147 assert self._active_subscan_updated is None # nosec
148 self._active_subscan_updated = callback
150 def increment_active_subscan(self):
151 self.active_subscan += 1
152 if self._active_subscan_updated is not None:
153 self._active_subscan_updated(self.active_subscan)
155 def channel_updated(self, channel_name, channel_size):
156 if self.sinogram is not None:
157 self.sinogram.channel_updated(channel_name, channel_size)
159 def to_rest(self):
160 state = self.state
161 if not isinstance(state, str):
162 state = "UNKNOWN"
164 result = {
165 "scanid": self.scan_id,
166 "frame_no": self.frame_no,
167 "groupid": self.group_id,
168 "detectorid": self.detector_id,
169 "state": state,
170 "datacollectionid": self.data_collection_id,
171 "datacollectiongroupid": self.data_collection_group_id,
172 "subscans": self.subscans,
173 "activesubscan": self.active_subscan,
174 }
175 sinogram = self.sinogram
176 if sinogram is not None:
177 result["sinogram"] = sinogram.to_rest()
178 return result
181@dataclasses.dataclass
182class Projection:
183 """Store information from a single projection"""
185 data: numpy.NdArray | None
186 """Raw data from Lima detector"""
188 scan_id: int
189 """Daiquiri identifier of the scan"""
191 frame_no: int
192 """Identifier of the frame in this scan"""
194 exposure_time: float
195 """Exposure time used by the actual proj."""
197 sample_stage_meta: SampleStageMetadata | None = None
198 """Metadata saved from the sample stage at the time of this projection"""
200 _normalized: numpy.NdArray | None = None
201 """Normalized data by exposure time"""
203 @property
204 def normalized(self):
205 if self._normalized is None:
206 self._normalized = self.data.astype(numpy.float32) / self.exposure_time
207 return self._normalized
210@dataclasses.dataclass
211class DetectorLiveStorage:
212 """Store data relative to a detector during the live of the server"""
214 detector_id: str
215 """Identifier of the Lima detector"""
217 tomo_detector_id: str
218 """Identifier of the tomo detector BLISS controller"""
220 detector_node_name: str
221 """Identifier of the detector channel
223 In BLISS theoretically this name could be changed. Let assume it's fixed.
224 """
226 proj: Projection = None
227 """
228 Hold data used a projection (not a dark, not a flat)
229 """
231 flat: Projection = None
232 """
233 Hold data used as flat for flat field correction.
234 """
236 dark: Projection = None
237 """
238 Image used as dark for flat field correction.
240 Float 2D data normalized by integration time."""
242 last_emit_time = 0
243 """Time of the last emit frame through client"""
245 def __hash__(self):
246 return self.detector_id.__hash__()
248 def invalidate_proj(self, data: numpy.NdArray):
249 """Invalidate every proj which does not match the new data size"""
250 shape = data.shape
251 if self.dark is not None:
252 if self.dark.data.shape != shape:
253 self.dark = None
254 if self.flat is not None:
255 if self.flat.data.shape != shape:
256 self.flat = None
257 if self.proj is not None:
258 if self.proj.data.shape != shape:
259 self.proj = None
261 def update_emit_time(self, min_delay):
262 """Update the emit time if min_delay is respected.
264 Returns True if a new emit can be done.
266 Arguments:
267 min_delay: Minimal delay between 2 signals. If None, there is
268 no limitation.
269 """
270 now = time.time()
271 if min_delay is not None:
272 if (now - self.last_emit_time) < min_delay:
273 return False
274 self.last_emit_time = now
275 return True
277 def to_rest(self):
278 result = {
279 "detector_id": self.detector_id,
280 "tomo_detector_id": self.tomo_detector_id,
281 "node_name": self.detector_node_name,
282 }
283 if self.dark is not None:
284 result["dark"] = 1
285 if self.flat is not None:
286 result["flat"] = 1
287 if self.proj is not None:
288 result["data"] = {
289 "scanid": self.proj.scan_id,
290 "frame_no": self.proj.frame_no,
291 }
292 return result
295@dataclasses.dataclass
296class ScanInfoStorage:
297 last_group: TomoScan = None
298 last_flat: TomoScan = None
299 last_dark: TomoScan = None
300 last_ref: TomoScan = None
301 last_proj: TomoScan = None
302 last_tiling: TomoScan = None
305@dataclasses.dataclass
306class MonitoredScan:
307 """Store data relative to a specific scan"""
309 scan_id: int
310 """Identifier of the scan"""
312 exposure_time: float
313 """Exposure time for a single frame in second"""
315 nb_frames: int
316 """Number of frames taken by this scan"""
318 tomo_scan: str
319 """Content of the tomo_scan field"""
321 detectors: list[DetectorLiveStorage]
322 """Node names containing tomo image"""
324 last_frame_no_received: dict[DetectorLiveStorage, int] | None = None
325 """Frames received from the scan"""
327 frame_id_received: dict[DetectorLiveStorage, int] | None = None
328 """Frames received from the scan"""
330 frame_id_sent: dict[DetectorLiveStorage, int] | None = None
331 """Frames sent to the client as event"""
333 def __post_init__(self):
334 self.last_frame_no_received = {}
335 self.frame_id_received = {}
336 self.frame_id_sent = {}
338 def newer_received_frame_id(self, detector):
339 """Returns the newer frame than the one already emitted, if one
341 Else returns None
342 """
343 received = self.frame_id_received.get(detector, -1)
344 if received == -1:
345 return None
346 if received <= self.frame_id_sent.get(detector, -1):
347 return None
348 return received