Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/tomo/scans/sequence_tomo_scan.py: 12%

158 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-15 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3 

4import logging 

5import typing 

6import time 

7import datetime 

8import os.path 

9from ..datatype import ScanInfo 

10from ..datatype import Sinogram 

11from ..datatype import TomoScan 

12from .tomo_scan import IncompatibleScanData 

13 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18class SequenceTomoScan(TomoScan): 

19 def _init(self, metadata): 

20 info = self._create_tomo_sequence_scan(self.scanid, metadata) 

21 if info is None: 

22 raise IncompatibleScanData("Not a tomo scan") 

23 self._info = info 

24 self._slice_reconstruction_already_requested = False 

25 

26 def _create_tomo_sequence_scan(self, scanid, metadata): 

27 is_sequence = metadata.get("is_scan_sequence", metadata.get("is-scan-sequence")) 

28 if not is_sequence: 

29 return None 

30 

31 technique = metadata.get("technique", {}) 

32 detectors = technique.get("detector", {}) 

33 detectors = list(detectors.keys()) 

34 if len(detectors) != 1: 

35 return None 

36 detector = detectors[0] 

37 

38 channels = metadata.get("channels", {}) 

39 sinogram_chan = channels.get("sinogram") 

40 rotation_chan = channels.get("rotation") 

41 translation_chan = channels.get("translation") 

42 if sinogram_chan and rotation_chan and translation_chan: 

43 try: 

44 expected_nb_points = translation_chan["points"] 

45 translation_range = translation_chan["start"], translation_chan["stop"] 

46 translation_axis_points = translation_chan.get( 

47 "axis_points", translation_chan.get("axis-points") 

48 ) 

49 rotation_range = rotation_chan["start"], rotation_chan["stop"] 

50 rotation_axis_points = rotation_chan.get( 

51 "axis_points", rotation_chan.get("axis-points") 

52 ) 

53 except Exception: 

54 logger.error("Error while reading the sinogram meta", exc_info=True) 

55 logger.error("Sinogram ignored") 

56 sinogram = None 

57 else: 

58 sinogram = Sinogram( 

59 expected_nb_points=expected_nb_points, 

60 rotation_range=rotation_range, 

61 translation_range=translation_range, 

62 translation_axis_points=translation_axis_points, 

63 rotation_axis_points=rotation_axis_points, 

64 ) 

65 else: 

66 sinogram = None 

67 

68 last_emit = None 

69 

70 def on_sinogram_updated(nbpoints): 

71 nonlocal last_emit 

72 current = time.time() 

73 if last_emit is None or (current - last_emit) > 1: 

74 # Reduce the emit frequency 

75 last_emit = current 

76 self.component.emit( 

77 "update_scan_info_sinogram", 

78 {"id": "lastgroup", "scanid": scanid, "actualnbpoints": nbpoints}, 

79 ) 

80 

81 if sinogram: 

82 sinogram.connect_sinogram_updated(on_sinogram_updated) 

83 

84 def on_active_subscan_updated(active_subscan): 

85 self.component.emit( 

86 "update_scan_info_subscan", 

87 {"id": "lastgroup", "scanid": scanid, "activesubscan": active_subscan}, 

88 ) 

89 

90 daiquiri_meta = metadata.get("daiquiri", {}) 

91 if len(daiquiri_meta) == 0: 

92 logger.warning("The scan sequence does not contain any daiquiri metadata") 

93 datacollectionid = daiquiri_meta.get("datacollectionid") 

94 datacollectiongroupid = daiquiri_meta.get("datacollectiongroupid") 

95 sampleid = daiquiri_meta.get("sampleid") 

96 sessionid = daiquiri_meta.get("sessionid") 

97 start_time = datetime.datetime.fromisoformat(metadata["start_time"]) 

98 

99 def read_subscans(technique): 

100 subscans = technique.get("subscans", None) 

101 if subscans is None: 

102 # Assume there is no information 

103 return None 

104 try: 

105 result = [(int(k[4:]), v["type"]) for k, v in subscans.items()] 

106 result = sorted(result) 

107 result = [i[1] for i in result] 

108 return result 

109 except Exception: 

110 logger.error("Error while parsing subscan info", exc_info=True) 

111 return [] 

112 

113 subscans = read_subscans(technique) 

114 

115 # This is not anymore available in BLISS 2.0 

116 # FIXME: The API have to be adapted 

117 state = metadata.get("state", "RUNNING") 

118 info = ScanInfo( 

119 scan_id=scanid, 

120 group_id=None, 

121 state=state, 

122 detector_id=detector, 

123 sinogram=sinogram, 

124 data_collection_id=datacollectionid, 

125 data_collection_group_id=datacollectiongroupid, 

126 sample_id=sampleid, 

127 session_id=sessionid, 

128 subscans=subscans, 

129 start_time=start_time, 

130 ) 

131 info.connect_active_subscan_updated(on_active_subscan_updated) 

132 return info 

133 

134 def to_rest(self) -> dict: 

135 return self._info.to_rest() 

136 

137 def on_scan_started(self, metadata): 

138 monitored_sequence = self._info 

139 

140 if monitored_sequence.data_collection_id is None: 

141 has_daiquiri_info = ( 

142 monitored_sequence.session_id is not None 

143 and monitored_sequence.sample_id is not None 

144 ) 

145 if not has_daiquiri_info: 

146 # FIXME: this have to be handled at some point, to me it's critical 

147 logger.warning("The scan can't be registered inside Daiquiri") 

148 monitored_sequence.handle_datacollection = False 

149 else: 

150 kwargs = {} 

151 kwargs["sessionid"] = monitored_sequence.session_id 

152 kwargs["datacollectionnumber"] = self.scanid 

153 if monitored_sequence.data_collection_group_id is not None: 

154 kwargs[ 

155 "datacollectiongroupid" 

156 ] = monitored_sequence.data_collection_group_id 

157 # FIXME: This is definitaly wrong if launched from BLISS 

158 kwargs["imagecontainersubpath"] = "1.1/measurement" 

159 kwargs["experimenttype"] = "Tomo" 

160 kwargs["starttime"] = monitored_sequence.start_time 

161 if monitored_sequence.sample_id: 

162 kwargs["sampleid"] = monitored_sequence.sample_id 

163 

164 filename = metadata["filename"] 

165 kwargs["imagedirectory"] = os.path.dirname(filename) 

166 kwargs["filetemplate"] = os.path.basename(filename) 

167 dc = self.component._metadata.add_datacollection(**kwargs) 

168 datacollectionid = dc["datacollectionid"] 

169 stomp = self.component._stomp 

170 if stomp is not None: 

171 stomp.send_event(datacollectionid, "start") 

172 celery = self.component.get_component("celery") 

173 if celery: 

174 celery.send_event(datacollectionid, "start") 

175 monitored_sequence.data_collection_id = datacollectionid 

176 monitored_sequence.handle_datacollection = True 

177 

178 scaninfos = self.component.get_scaninfos() 

179 scaninfos.last_group = self 

180 

181 self.component.emit( 

182 "update_scan_info", 

183 {"id": "lastgroup", "data": self.to_rest()}, 

184 ) 

185 

186 def on_subscan_started(self, scanid, scan: typing.Optional[TomoScan]): 

187 monitored_sequence = self._info 

188 monitored_sequence.increment_active_subscan() 

189 if scan is not None: 

190 try: 

191 tomo_scan = scan.info.tomo_scan 

192 except Exception: 

193 logger.error("Unsupported scan", exc_info=True) 

194 tomo_scan = None 

195 if tomo_scan == "return": 

196 self._request_slice_reconstruction_once("return") 

197 

198 def on_0d_data_received(self, channel_name, channel_size): 

199 self._info.channel_updated(channel_name, channel_size) 

200 

201 def on_scan_terminated(self, metadata): 

202 monitored_sequence = self._info 

203 state = self._get_daiquiri_scan_state(metadata) 

204 monitored_sequence.state = state 

205 

206 if monitored_sequence.handle_datacollection is not None: 

207 if monitored_sequence.data_collection_id is not None: 

208 self.component._metadata.update_datacollection( 

209 datacollectionid=monitored_sequence.data_collection_id, 

210 no_context=True, 

211 endtime=datetime.datetime.now(), 

212 runstatus="Successful", 

213 ) 

214 

215 celery = self.component.get_component("celery") 

216 if celery: 

217 celery.send_event(monitored_sequence.data_collection_id, "end") 

218 

219 sinogram = monitored_sequence.sinogram 

220 if sinogram: 

221 self.component.emit( 

222 "update_scan_info_sinogram", 

223 { 

224 "id": "lastgroup", 

225 "scanid": self.scanid, 

226 "actualnbpoints": sinogram.actual_nb_points, 

227 }, 

228 ) 

229 self.component.emit( 

230 "update_scan_info", 

231 {"id": "lastgroup", "data": self.to_rest()}, 

232 ) 

233 self._request_slice_reconstruction_once("terminated") 

234 

235 def _request_slice_reconstruction_once(self, event): 

236 """Request slice reconstruction if it was not already done.""" 

237 if self._slice_reconstruction_already_requested: 

238 return 

239 if event not in self.component.slice_reconstruction_triggers: 

240 return 

241 self._slice_reconstruction_already_requested = True 

242 

243 monitored_sequence = self._info 

244 if monitored_sequence.data_collection_id is not None: 

245 if monitored_sequence.sinogram is not None: 

246 parameters = {} 

247 deltabeta = self.component.last_delta_beta 

248 if deltabeta is not None: 

249 parameters["deltabeta"] = deltabeta 

250 celery = self.component.get_component("celery") 

251 if celery: 

252 celery.execute_graph( 

253 "tomo-sinogram-reconstruction", 

254 monitored_sequence.data_collection_id, 

255 parameters=parameters, 

256 )