Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/tomo/scans/standard_tomo_scan.py: 16%

128 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3 

4import logging 

5import typing 

6from .tomo_scan import TomoScan 

7from .tomo_scan import IncompatibleScanData 

8from ..datatype import DetectorLiveStorage 

9from ..datatype import MonitoredScan 

10from ..datatype import Projection 

11 

12 

13logger = logging.getLogger(__name__) 

14 

15 

16class StandardTomoScan(TomoScan): 

17 def _init(self, metadata): 

18 info = self._create_tomo_scan(self.scanid, metadata) 

19 if info is None: 

20 raise IncompatibleScanData("Not a tomo scan") 

21 self._info = info 

22 

23 @property 

24 def info(self): 

25 return self._info 

26 

27 def _get_involved_detectors(self, metadata) -> typing.List[DetectorLiveStorage]: 

28 """Returns the list of involved detectors. 

29 

30 Empty if none. 

31 """ 

32 channels = metadata.get("channels", None) 

33 if channels is None: 

34 logger.error( 

35 "Unexpected scan content. Channels field is mandatory. BLISS scan specification have changed." 

36 ) 

37 return [] 

38 result = [] 

39 for detector in self.component.get_detectors(): 

40 node_name = detector.detector_node_name 

41 if node_name in channels: 

42 result.append(detector) 

43 return result 

44 

45 def _create_tomo_scan(self, scanid, metadata) -> typing.Optional[MonitoredScan]: 

46 """Create a scan structure from metadata. 

47 

48 Else None, if the scan is not supported (not a tomo scan) 

49 """ 

50 technique = metadata.get("technique", {}) 

51 

52 detectors = self._get_involved_detectors(metadata) 

53 if len(detectors) == 0: 

54 # No need to monitor that scan 

55 return None 

56 

57 def read_exposure_time_in_second(meta): 

58 value = meta["exposure_time"] 

59 unit = scan.get("exposure_time@units", "s") 

60 coefs = {"s": 1, "ms": 0.001} 

61 if unit not in coefs: 

62 raise RuntimeError(f"Unsupported exposure_time unit '{unit}' in scan") 

63 return value * coefs[unit] 

64 

65 if "dark" in technique: 

66 scan = technique["dark"] 

67 tomo_scan = "dark" 

68 nb_frames = scan["dark_n"] 

69 exposure_time = read_exposure_time_in_second(scan) 

70 elif "flat" in technique: 

71 scan = technique["flat"] 

72 tomo_scan = "flat" 

73 nb_frames = scan["flat_n"] 

74 exposure_time = read_exposure_time_in_second(scan) 

75 elif "proj" in technique: 

76 scan = technique["proj"] 

77 tomo_scan = "projection" 

78 nb_frames = scan["proj_n"] 

79 exposure_time = read_exposure_time_in_second(scan) 

80 elif "scan" in technique and "tomo_n" in technique["scan"]: 

81 scan = technique["scan"] 

82 tomo_scan = "projection" 

83 nb_frames = scan["tomo_n"] 

84 exposure_time = scan["exposure_time"] / 1000 

85 else: 

86 image_key = technique.get("image_key", None) 

87 if image_key == -1: 

88 tomo_scan = "return" 

89 else: 

90 tomo_scan = None 

91 # NOTE: BLISS 1.9 expose mesh scan npoints as numpy int64 

92 # cast it to avoid further json serialization fail 

93 nb_frames = int(metadata.get("npoints")) 

94 exposure_time = metadata.get("count_time") 

95 

96 scan = MonitoredScan( 

97 scan_id=scanid, 

98 detectors=detectors, 

99 nb_frames=nb_frames, 

100 tomo_scan=tomo_scan, 

101 exposure_time=exposure_time, 

102 ) 

103 return scan 

104 

105 def on_scan_started(self, metadata): 

106 monitored_scan = self._info 

107 

108 detector_ids = [d.detector_id for d in monitored_scan.detectors] 

109 node_names = [d.detector_node_name for d in monitored_scan.detectors] 

110 self.component.emit( 

111 "new_scan", 

112 { 

113 "scanid": self.scanid, 

114 "detector_ids": detector_ids, 

115 "node_names": node_names, 

116 "frame_no": monitored_scan.nb_frames - 1, 

117 }, 

118 ) 

119 

120 def on_data_received( 

121 self, master, progress, channel_name, channel_size, channel_progress 

122 ): 

123 

124 monitored_scan = self._info 

125 

126 # FIXME: This could be mapped to speed up the check 

127 detectors = [ 

128 d for d in monitored_scan.detectors if d.detector_node_name == channel_name 

129 ] 

130 if len(detectors) != 0: 

131 assert len(detectors) == 1, "Problem on the internal logic" # nosec 

132 self._new_data_frame( 

133 monitored_scan, detectors[0], channel_size - 1, channel_progress 

134 ) 

135 

136 def _new_data_frame( 

137 self, monitored_scan, detector, channel_index, channel_progress 

138 ): 

139 """Triggered when a new frame was received""" 

140 

141 monitored_scan.frame_id_received[detector] = channel_index 

142 

143 if monitored_scan.tomo_scan not in ["dark", "flat"]: 

144 if detector.update_emit_time(self.component.min_refresh_period): 

145 monitored_scan.frame_id_sent[detector] = channel_index 

146 self._proj_was_taken( 

147 monitored_scan, detector, channel_index, channel_progress 

148 ) 

149 

150 def on_scan_terminated(self, metadata): 

151 monitored_scan = self._info 

152 

153 if monitored_scan.tomo_scan == "dark": 

154 for detector in monitored_scan.detectors: 

155 self._dark_was_taken(monitored_scan, detector) 

156 elif monitored_scan.tomo_scan == "flat": 

157 for detector in monitored_scan.detectors: 

158 self._flat_was_taken(monitored_scan, detector) 

159 else: 

160 for detector in monitored_scan.detectors: 

161 newer_frame_id = monitored_scan.newer_received_frame_id(detector) 

162 if newer_frame_id is not None: 

163 detector.update_emit_time(None) 

164 self._proj_was_taken( 

165 monitored_scan, 

166 detector, 

167 frame_id=newer_frame_id, 

168 progress=100, 

169 ) 

170 

171 detector_ids = [d.detector_id for d in monitored_scan.detectors] 

172 node_names = [d.detector_node_name for d in monitored_scan.detectors] 

173 self.component.emit( 

174 "end_scan", 

175 { 

176 "scanid": self.scanid, 

177 "detector_ids": detector_ids, 

178 "node_names": node_names, 

179 "frame_no": monitored_scan.nb_frames - 1, 

180 }, 

181 ) 

182 

183 def _dark_was_taken( 

184 self, monitored_scan: MonitoredScan, detector: DetectorLiveStorage 

185 ): 

186 """Triggered at the end of a scan, when a dark was taken""" 

187 frame_id = monitored_scan.nb_frames - 1 

188 data = self.source.get_scan_image( 

189 monitored_scan.scan_id, detector.detector_node_name, frame_id 

190 ) 

191 if data is None: 

192 # FIXME: Could it be retrieved later? 

193 # Not for ct, could be for sct 

194 logger.error( 

195 "Data from scan %s frame %s was None", monitored_scan.scan_id, frame_id 

196 ) 

197 return 

198 detector.invalidate_proj(data) 

199 detector.dark = Projection( 

200 data=data, 

201 exposure_time=monitored_scan.exposure_time, 

202 scan_id=monitored_scan.scan_id, 

203 frame_no=frame_id, 

204 ) 

205 self.component.emit( 

206 "new_dark", 

207 { 

208 "scanid": monitored_scan.scan_id, 

209 "detector_id": detector.detector_id, 

210 "node_name": detector.detector_node_name, 

211 "has_flat": detector.flat is not None, 

212 "has_proj": detector.proj is not None, 

213 }, 

214 ) 

215 

216 def _flat_was_taken( 

217 self, monitored_scan: MonitoredScan, detector: DetectorLiveStorage 

218 ): 

219 """Triggered at the end of a scan, when a flat was taken""" 

220 frame_id = monitored_scan.nb_frames - 1 

221 data = self.source.get_scan_image( 

222 monitored_scan.scan_id, detector.detector_node_name, frame_id 

223 ) 

224 if data is None: 

225 # FIXME: Could it be retrieved later? 

226 # Not for ct, could be for sct 

227 logger.error( 

228 "Data from scan %s frame %s was None", monitored_scan.scan_id, frame_id 

229 ) 

230 return 

231 

232 detector.invalidate_proj(data) 

233 detector.flat = Projection( 

234 data=data, 

235 exposure_time=monitored_scan.exposure_time, 

236 scan_id=monitored_scan.scan_id, 

237 frame_no=frame_id, 

238 ) 

239 self.component.emit( 

240 "new_flat", 

241 { 

242 "scanid": monitored_scan.scan_id, 

243 "detector_id": detector.detector_id, 

244 "node_name": detector.detector_node_name, 

245 "has_dark": detector.dark is not None, 

246 "has_proj": detector.proj is not None, 

247 }, 

248 ) 

249 

250 def _proj_was_taken( 

251 self, 

252 monitored_scan: MonitoredScan, 

253 detector: DetectorLiveStorage, 

254 frame_id, 

255 progress, 

256 ): 

257 data = self.source.get_scan_image( 

258 monitored_scan.scan_id, detector.detector_node_name, frame_id 

259 ) 

260 if data is None: 

261 # FIXME: Could it be retrieved later? 

262 # Not for ct, could be for sct 

263 logger.error( 

264 "Data from scan %s frame %s was None", monitored_scan.scan_id, frame_id 

265 ) 

266 return 

267 

268 sample_stage_meta = self.component.get_sample_stage_meta(detector.detector_id) 

269 detector.invalidate_proj(data) 

270 detector.proj = Projection( 

271 data=data, 

272 exposure_time=monitored_scan.exposure_time, 

273 scan_id=monitored_scan.scan_id, 

274 frame_no=frame_id, 

275 sample_stage_meta=sample_stage_meta, 

276 ) 

277 self.component.emit( 

278 "new_data", 

279 { 

280 "scanid": monitored_scan.scan_id, 

281 "detector_id": detector.detector_id, 

282 "node_name": detector.detector_node_name, 

283 "progress": progress, 

284 "frame_no": frame_id, 

285 "has_dark": detector.dark is not None, 

286 "has_flat": detector.flat is not None, 

287 }, 

288 )