Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/tomo/datatype.py: 67%

210 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3 

4from __future__ import annotations 

5import numpy 

6import logging 

7import dataclasses 

8import time 

9import datetime 

10import pint 

11from .scans.tomo_scan import TomoScan 

12 

13 

14logger = logging.getLogger(__name__) 

15 

16 

17@dataclasses.dataclass 

18class SampleStageMetadata: 

19 """Store information relative to the sample stage""" 

20 

21 sz: pint.Quantity 

22 

23 sy: pint.Quantity 

24 

25 sampy: pint.Quantity 

26 

27 somega: pint.Quantity 

28 """We have to think about if we need user+dial""" 

29 

30 detcy: pint.Quantity 

31 """Detector center in the sample stage referencial""" 

32 

33 detcz: pint.Quantity 

34 """Detector center in the sample stage referencial""" 

35 

36 pixel_size: pint.Quantity 

37 

38 

39@dataclasses.dataclass 

40class Sinogram: 

41 expected_nb_points: int 

42 """Expected number of points in the sinogram""" 

43 

44 rotation_range: tuple[float, float] | None 

45 """Range on the rotation axis""" 

46 

47 rotation_axis_points: int 

48 """Number of points in the rotation axis""" 

49 

50 translation_range: tuple[float, float] | None 

51 """Range on the translation axis""" 

52 

53 translation_axis_points: int 

54 """Number of points in the translation axis""" 

55 

56 _actual_nb_points: int = 0 

57 """Actual number of points in the sinogram""" 

58 

59 _actual_channel_points = [0, 0, 0] 

60 

61 _sinogram_updated = None 

62 

63 @property 

64 def actual_nb_points(self): 

65 return self._actual_nb_points 

66 

67 def connect_sinogram_updated(self, callback): 

68 assert self._sinogram_updated is None # nosec 

69 self._sinogram_updated = callback 

70 

71 def channel_updated(self, channel_name, channel_size): 

72 channel_names = ["sinogram", "rotation", "translation"] 

73 try: 

74 index = channel_names.index(channel_name) 

75 except ValueError: 

76 return 

77 self._actual_channel_points[index] = channel_size 

78 nb = min(self._actual_channel_points) 

79 if nb <= self._actual_nb_points: 

80 return 

81 self._actual_nb_points = nb 

82 if self._sinogram_updated: 

83 self._sinogram_updated(nb) 

84 

85 def to_rest(self): 

86 result = { 

87 "actualnbpoints": self._actual_nb_points, 

88 "expectednbpoints": self.expected_nb_points, 

89 "rotationrange": self.rotation_range, 

90 "translationrange": self.translation_range, 

91 "rotationaxispoints": self.rotation_axis_points, 

92 "translationaxispoints": self.translation_axis_points, 

93 } 

94 return result 

95 

96 

97@dataclasses.dataclass 

98class ScanInfo: 

99 scan_id: int 

100 """Daiquiri identifier of the scan""" 

101 

102 group_id: int | None 

103 """Daiquiri identifier of the group scan, if one""" 

104 

105 start_time: datetime.datetime | None = None 

106 """Start time of this scan""" 

107 

108 frame_no: int | None = None 

109 """Identifier of the frame in this scan""" 

110 

111 state: str | None = None 

112 """State of the scan 

113 

114 Which is the name if state from `bliss.scanning.scan.ScanState` 

115 """ 

116 

117 detector_id: str | None = None 

118 """Name id of the detector""" 

119 

120 sinogram: Sinogram | None = None 

121 """Dedicated information related to the sinogram""" 

122 

123 handle_datacollection: bool = False 

124 """If true the tomo component have to handle the live cycle of the datacollection""" 

125 

126 data_collection_id: int | None = None 

127 """Data collection id of this scan if some from Daiquiri""" 

128 

129 data_collection_group_id: int | None = None 

130 """Data collection group id of this scan if some from Daiquiri""" 

131 

132 sample_id: int | None = None 

133 """Sample id from Daiquiri""" 

134 

135 session_id: int | None = None 

136 """session id from Daiquiri""" 

137 

138 subscans: list[str] | None = None 

139 """List containing the expected sequence of scans with dedicated roles""" 

140 

141 active_subscan = -1 

142 """Index of the active subscan""" 

143 

144 _active_subscan_updated = None 

145 

146 def connect_active_subscan_updated(self, callback): 

147 assert self._active_subscan_updated is None # nosec 

148 self._active_subscan_updated = callback 

149 

150 def increment_active_subscan(self): 

151 self.active_subscan += 1 

152 if self._active_subscan_updated is not None: 

153 self._active_subscan_updated(self.active_subscan) 

154 

155 def channel_updated(self, channel_name, channel_size): 

156 if self.sinogram is not None: 

157 self.sinogram.channel_updated(channel_name, channel_size) 

158 

159 def to_rest(self): 

160 state = self.state 

161 if not isinstance(state, str): 

162 state = "UNKNOWN" 

163 

164 result = { 

165 "scanid": self.scan_id, 

166 "frame_no": self.frame_no, 

167 "groupid": self.group_id, 

168 "detectorid": self.detector_id, 

169 "state": state, 

170 "datacollectionid": self.data_collection_id, 

171 "datacollectiongroupid": self.data_collection_group_id, 

172 "subscans": self.subscans, 

173 "activesubscan": self.active_subscan, 

174 } 

175 sinogram = self.sinogram 

176 if sinogram is not None: 

177 result["sinogram"] = sinogram.to_rest() 

178 return result 

179 

180 

181@dataclasses.dataclass 

182class Projection: 

183 """Store information from a single projection""" 

184 

185 data: numpy.NdArray | None 

186 """Raw data from Lima detector""" 

187 

188 scan_id: int 

189 """Daiquiri identifier of the scan""" 

190 

191 frame_no: int 

192 """Identifier of the frame in this scan""" 

193 

194 exposure_time: float 

195 """Exposure time used by the actual proj.""" 

196 

197 sample_stage_meta: SampleStageMetadata | None = None 

198 """Metadata saved from the sample stage at the time of this projection""" 

199 

200 _normalized: numpy.NdArray | None = None 

201 """Normalized data by exposure time""" 

202 

203 @property 

204 def normalized(self): 

205 if self._normalized is None: 

206 self._normalized = self.data.astype(numpy.float32) / self.exposure_time 

207 return self._normalized 

208 

209 

210@dataclasses.dataclass 

211class DetectorLiveStorage: 

212 """Store data relative to a detector during the live of the server""" 

213 

214 detector_id: str 

215 """Identifier of the Lima detector""" 

216 

217 tomo_detector_id: str 

218 """Identifier of the tomo detector BLISS controller""" 

219 

220 detector_node_name: str 

221 """Identifier of the detector channel 

222 

223 In BLISS theoretically this name could be changed. Let assume it's fixed. 

224 """ 

225 

226 proj: Projection = None 

227 """ 

228 Hold data used a projection (not a dark, not a flat) 

229 """ 

230 

231 flat: Projection = None 

232 """ 

233 Hold data used as flat for flat field correction. 

234 """ 

235 

236 dark: Projection = None 

237 """ 

238 Image used as dark for flat field correction. 

239 

240 Float 2D data normalized by integration time.""" 

241 

242 last_emit_time = 0 

243 """Time of the last emit frame through client""" 

244 

245 def __hash__(self): 

246 return self.detector_id.__hash__() 

247 

248 def invalidate_proj(self, data: numpy.NdArray): 

249 """Invalidate every proj which does not match the new data size""" 

250 shape = data.shape 

251 if self.dark is not None: 

252 if self.dark.data.shape != shape: 

253 self.dark = None 

254 if self.flat is not None: 

255 if self.flat.data.shape != shape: 

256 self.flat = None 

257 if self.proj is not None: 

258 if self.proj.data.shape != shape: 

259 self.proj = None 

260 

261 def update_emit_time(self, min_delay): 

262 """Update the emit time if min_delay is respected. 

263 

264 Returns True if a new emit can be done. 

265 

266 Arguments: 

267 min_delay: Minimal delay between 2 signals. If None, there is 

268 no limitation. 

269 """ 

270 now = time.time() 

271 if min_delay is not None: 

272 if (now - self.last_emit_time) < min_delay: 

273 return False 

274 self.last_emit_time = now 

275 return True 

276 

277 def to_rest(self): 

278 result = { 

279 "detector_id": self.detector_id, 

280 "tomo_detector_id": self.tomo_detector_id, 

281 "node_name": self.detector_node_name, 

282 } 

283 if self.dark is not None: 

284 result["dark"] = 1 

285 if self.flat is not None: 

286 result["flat"] = 1 

287 if self.proj is not None: 

288 result["data"] = { 

289 "scanid": self.proj.scan_id, 

290 "frame_no": self.proj.frame_no, 

291 } 

292 return result 

293 

294 

295@dataclasses.dataclass 

296class ScanInfoStorage: 

297 last_group: TomoScan = None 

298 last_flat: TomoScan = None 

299 last_dark: TomoScan = None 

300 last_ref: TomoScan = None 

301 last_proj: TomoScan = None 

302 last_tiling: TomoScan = None 

303 

304 

305@dataclasses.dataclass 

306class MonitoredScan: 

307 """Store data relative to a specific scan""" 

308 

309 scan_id: int 

310 """Identifier of the scan""" 

311 

312 exposure_time: float 

313 """Exposure time for a single frame in second""" 

314 

315 nb_frames: int 

316 """Number of frames taken by this scan""" 

317 

318 tomo_scan: str 

319 """Content of the tomo_scan field""" 

320 

321 detectors: list[DetectorLiveStorage] 

322 """Node names containing tomo image""" 

323 

324 last_frame_no_received: dict[DetectorLiveStorage, int] | None = None 

325 """Frames received from the scan""" 

326 

327 frame_id_received: dict[DetectorLiveStorage, int] | None = None 

328 """Frames received from the scan""" 

329 

330 frame_id_sent: dict[DetectorLiveStorage, int] | None = None 

331 """Frames sent to the client as event""" 

332 

333 def __post_init__(self): 

334 self.last_frame_no_received = {} 

335 self.frame_id_received = {} 

336 self.frame_id_sent = {} 

337 

338 def newer_received_frame_id(self, detector): 

339 """Returns the newer frame than the one already emitted, if one 

340 

341 Else returns None 

342 """ 

343 received = self.frame_id_received.get(detector, -1) 

344 if received == -1: 

345 return None 

346 if received <= self.frame_id_sent.get(detector, -1): 

347 return None 

348 return received