Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/tomo/scans/sequence_tomo_scan.py: 12%
158 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
4import logging
5import typing
6import time
7import datetime
8import os.path
9from ..datatype import ScanInfo
10from ..datatype import Sinogram
11from ..datatype import TomoScan
12from .tomo_scan import IncompatibleScanData
15logger = logging.getLogger(__name__)
18class SequenceTomoScan(TomoScan):
19 def _init(self, metadata):
20 info = self._create_tomo_sequence_scan(self.scanid, metadata)
21 if info is None:
22 raise IncompatibleScanData("Not a tomo scan")
23 self._info = info
24 self._slice_reconstruction_already_requested = False
26 def _create_tomo_sequence_scan(self, scanid, metadata):
27 is_sequence = metadata.get("is_scan_sequence", metadata.get("is-scan-sequence"))
28 if not is_sequence:
29 return None
31 technique = metadata.get("technique", {})
32 detectors = technique.get("detector", {})
33 detectors = list(detectors.keys())
34 if len(detectors) != 1:
35 return None
36 detector = detectors[0]
38 channels = metadata.get("channels", {})
39 sinogram_chan = channels.get("sinogram")
40 rotation_chan = channels.get("rotation")
41 translation_chan = channels.get("translation")
42 if sinogram_chan and rotation_chan and translation_chan:
43 try:
44 expected_nb_points = translation_chan["points"]
45 translation_range = translation_chan["start"], translation_chan["stop"]
46 translation_axis_points = translation_chan.get(
47 "axis_points", translation_chan.get("axis-points")
48 )
49 rotation_range = rotation_chan["start"], rotation_chan["stop"]
50 rotation_axis_points = rotation_chan.get(
51 "axis_points", rotation_chan.get("axis-points")
52 )
53 except Exception:
54 logger.error("Error while reading the sinogram meta", exc_info=True)
55 logger.error("Sinogram ignored")
56 sinogram = None
57 else:
58 sinogram = Sinogram(
59 expected_nb_points=expected_nb_points,
60 rotation_range=rotation_range,
61 translation_range=translation_range,
62 translation_axis_points=translation_axis_points,
63 rotation_axis_points=rotation_axis_points,
64 )
65 else:
66 sinogram = None
68 last_emit = None
70 def on_sinogram_updated(nbpoints):
71 nonlocal last_emit
72 current = time.time()
73 if last_emit is None or (current - last_emit) > 1:
74 # Reduce the emit frequency
75 last_emit = current
76 self.component.emit(
77 "update_scan_info_sinogram",
78 {"id": "lastgroup", "scanid": scanid, "actualnbpoints": nbpoints},
79 )
81 if sinogram:
82 sinogram.connect_sinogram_updated(on_sinogram_updated)
84 def on_active_subscan_updated(active_subscan):
85 self.component.emit(
86 "update_scan_info_subscan",
87 {"id": "lastgroup", "scanid": scanid, "activesubscan": active_subscan},
88 )
90 daiquiri_meta = metadata.get("daiquiri", {})
91 if len(daiquiri_meta) == 0:
92 logger.warning("The scan sequence does not contain any daiquiri metadata")
93 datacollectionid = daiquiri_meta.get("datacollectionid")
94 datacollectiongroupid = daiquiri_meta.get("datacollectiongroupid")
95 sampleid = daiquiri_meta.get("sampleid")
96 sessionid = daiquiri_meta.get("sessionid")
97 start_time = datetime.datetime.fromisoformat(metadata["start_time"])
99 def read_subscans(technique):
100 subscans = technique.get("subscans", None)
101 if subscans is None:
102 # Assume there is no information
103 return None
104 try:
105 result = [(int(k[4:]), v["type"]) for k, v in subscans.items()]
106 result = sorted(result)
107 result = [i[1] for i in result]
108 return result
109 except Exception:
110 logger.error("Error while parsing subscan info", exc_info=True)
111 return []
113 subscans = read_subscans(technique)
115 # This is not anymore available in BLISS 2.0
116 # FIXME: The API have to be adapted
117 state = metadata.get("state", "RUNNING")
118 info = ScanInfo(
119 scan_id=scanid,
120 group_id=None,
121 state=state,
122 detector_id=detector,
123 sinogram=sinogram,
124 data_collection_id=datacollectionid,
125 data_collection_group_id=datacollectiongroupid,
126 sample_id=sampleid,
127 session_id=sessionid,
128 subscans=subscans,
129 start_time=start_time,
130 )
131 info.connect_active_subscan_updated(on_active_subscan_updated)
132 return info
134 def to_rest(self) -> dict:
135 return self._info.to_rest()
137 def on_scan_started(self, metadata):
138 monitored_sequence = self._info
140 if monitored_sequence.data_collection_id is None:
141 has_daiquiri_info = (
142 monitored_sequence.session_id is not None
143 and monitored_sequence.sample_id is not None
144 )
145 if not has_daiquiri_info:
146 # FIXME: this have to be handled at some point, to me it's critical
147 logger.warning("The scan can't be registered inside Daiquiri")
148 monitored_sequence.handle_datacollection = False
149 else:
150 kwargs = {}
151 kwargs["sessionid"] = monitored_sequence.session_id
152 kwargs["datacollectionnumber"] = self.scanid
153 if monitored_sequence.data_collection_group_id is not None:
154 kwargs[
155 "datacollectiongroupid"
156 ] = monitored_sequence.data_collection_group_id
157 # FIXME: This is definitaly wrong if launched from BLISS
158 kwargs["imagecontainersubpath"] = "1.1/measurement"
159 kwargs["experimenttype"] = "Tomo"
160 kwargs["starttime"] = monitored_sequence.start_time
161 if monitored_sequence.sample_id:
162 kwargs["sampleid"] = monitored_sequence.sample_id
164 filename = metadata["filename"]
165 kwargs["imagedirectory"] = os.path.dirname(filename)
166 kwargs["filetemplate"] = os.path.basename(filename)
167 dc = self.component._metadata.add_datacollection(**kwargs)
168 datacollectionid = dc["datacollectionid"]
169 stomp = self.component._stomp
170 if stomp is not None:
171 stomp.send_event(datacollectionid, "start")
172 celery = self.component.get_component("celery")
173 if celery:
174 celery.send_event(datacollectionid, "start")
175 monitored_sequence.data_collection_id = datacollectionid
176 monitored_sequence.handle_datacollection = True
178 scaninfos = self.component.get_scaninfos()
179 scaninfos.last_group = self
181 self.component.emit(
182 "update_scan_info",
183 {"id": "lastgroup", "data": self.to_rest()},
184 )
186 def on_subscan_started(self, scanid, scan: typing.Optional[TomoScan]):
187 monitored_sequence = self._info
188 monitored_sequence.increment_active_subscan()
189 if scan is not None:
190 try:
191 tomo_scan = scan.info.tomo_scan
192 except Exception:
193 logger.error("Unsupported scan", exc_info=True)
194 tomo_scan = None
195 if tomo_scan == "return":
196 self._request_slice_reconstruction_once("return")
198 def on_0d_data_received(self, channel_name, channel_size):
199 self._info.channel_updated(channel_name, channel_size)
201 def on_scan_terminated(self, metadata):
202 monitored_sequence = self._info
203 state = self._get_daiquiri_scan_state(metadata)
204 monitored_sequence.state = state
206 if monitored_sequence.handle_datacollection is not None:
207 if monitored_sequence.data_collection_id is not None:
208 self.component._metadata.update_datacollection(
209 datacollectionid=monitored_sequence.data_collection_id,
210 no_context=True,
211 endtime=datetime.datetime.now(),
212 runstatus="Successful",
213 )
215 celery = self.component.get_component("celery")
216 if celery:
217 celery.send_event(monitored_sequence.data_collection_id, "end")
219 sinogram = monitored_sequence.sinogram
220 if sinogram:
221 self.component.emit(
222 "update_scan_info_sinogram",
223 {
224 "id": "lastgroup",
225 "scanid": self.scanid,
226 "actualnbpoints": sinogram.actual_nb_points,
227 },
228 )
229 self.component.emit(
230 "update_scan_info",
231 {"id": "lastgroup", "data": self.to_rest()},
232 )
233 self._request_slice_reconstruction_once("terminated")
235 def _request_slice_reconstruction_once(self, event):
236 """Request slice reconstruction if it was not already done."""
237 if self._slice_reconstruction_already_requested:
238 return
239 if event not in self.component.slice_reconstruction_triggers:
240 return
241 self._slice_reconstruction_already_requested = True
243 monitored_sequence = self._info
244 if monitored_sequence.data_collection_id is not None:
245 if monitored_sequence.sinogram is not None:
246 parameters = {}
247 deltabeta = self.component.last_delta_beta
248 if deltabeta is not None:
249 parameters["deltabeta"] = deltabeta
250 celery = self.component.get_component("celery")
251 if celery:
252 celery.execute_graph(
253 "tomo-sinogram-reconstruction",
254 monitored_sequence.data_collection_id,
255 parameters=parameters,
256 )