Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/implementors/tomo/tilingscanwithroi.py: 0%

131 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-02-06 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3 

4from __future__ import annotations 

5 

6import logging 

7from marshmallow import fields, validate 

8from marshmallow import validates_schema, ValidationError 

9from daiquiri.core.schema.validators import OneOf 

10from daiquiri.core.components import ( 

11 ComponentActor, 

12 ComponentActorSchema, 

13) 

14 

15from tomo.sequence.tiling import tiling 

16from tomo.sequence.tiling import tiling_estimation_time 

17from . import tomo_helper 

18 

19try: 

20 from tomo.globals import ACTIVE_TOMOCONFIG 

21except ImportError: 

22 ACTIVE_TOMOCONFIG = None 

23 

24_logger = logging.getLogger(__name__) 

25 

26 

27class TilingscanwithroiSchema(ComponentActorSchema): 

28 sampleid = fields.Int(required=True) 

29 

30 scan_type = OneOf( 

31 ["Continuous", "Step"], 

32 dump_default="Continuous", 

33 metadata={"title": "Scan type"}, 

34 ) 

35 speed_ratio = fields.Float( 

36 validate=validate.Range(min=0.0, max=1.0), 

37 dump_default=1.0, 

38 metadata={ 

39 "title": "Stable/speed", 

40 "multipleOf": 0.01, 

41 }, 

42 ) 

43 # FIXME: Units have to be read from the motors 

44 # FIXME: Motor names have to be read from the motor 

45 start_x = fields.Float( 

46 validate=validate.Range(min=-2000, max=2000), 

47 dump_default=-250, 

48 metadata={"title": "Start", "unit": "㎜"}, 

49 ) 

50 end_x = fields.Float( 

51 validate=validate.Range(min=-2000, max=2000), 

52 dump_default=250, 

53 metadata={"title": "Stop", "unit": "㎜"}, 

54 ) 

55 start_y = fields.Float( 

56 validate=validate.Range(min=-2000, max=2000), 

57 dump_default=-250, 

58 metadata={"title": "Start", "unit": "㎜"}, 

59 ) 

60 end_y = fields.Float( 

61 validate=validate.Range(min=-2000, max=2000), 

62 dump_default=250, 

63 metadata={"title": "Stop", "unit": "㎜"}, 

64 ) 

65 start_z = fields.Float( 

66 validate=validate.Range(min=-2000, max=2000), 

67 dump_default=550, 

68 metadata={"title": "Start", "unit": "㎜"}, 

69 ) 

70 end_z = fields.Float( 

71 validate=validate.Range(min=-2000, max=2000), 

72 dump_default=-550, 

73 metadata={"title": "Stop", "unit": "㎜"}, 

74 ) 

75 expo_time = fields.Float( 

76 validate=validate.Range(min=0.001, max=10), 

77 dump_default=None, 

78 metadata={"title": "Exposure time", "unit": "s"}, 

79 ) 

80 sleep_time = fields.Float( 

81 validate=validate.Range(min=0.0, max=10), 

82 dump_default=0, 

83 metadata={"title": "Sleep time", "unit": "s"}, 

84 ) 

85 n_dark = fields.Int( 

86 dump_default=1, 

87 validate=validate.Range(min=0, max=999999), 

88 metadata={"title": "Nb darks"}, 

89 ) 

90 n_flat = fields.Int( 

91 dump_default=1, 

92 validate=validate.Range(min=0, max=999999), 

93 metadata={"title": "Nb flats"}, 

94 ) 

95 lateral_motor_mode = OneOf( 

96 ["over_rot", "under_rot"], 

97 dump_default="over_rot", 

98 metadata={ 

99 "title": "Lateral motor", 

100 }, 

101 ) 

102 pixel_blur = fields.Float( 

103 dump_default=0, 

104 metadata={ 

105 "title": "Pixel blur", 

106 "description": "Displacement during the expo time in pixel", 

107 "unit": "px", 

108 "readOnly": True, 

109 }, 

110 ) 

111 restore_motor_positions = fields.Bool( 

112 metadata={ 

113 "title": "Move back motors", 

114 "description": "Move back motors at their initial position when the scan is terminated", 

115 }, 

116 dump_default=True, 

117 ) 

118 enqueue = fields.Bool( 

119 dump_default=True, 

120 metadata={ 

121 "title": "Queue Scan", 

122 }, 

123 ) 

124 

125 class Meta: 

126 uiorder = [ 

127 "sampleid", 

128 "scan_type", 

129 "speed_ratio", 

130 "pixel_blur", 

131 "expo_time", 

132 "sleep_time", 

133 "n_dark", 

134 "n_flat", 

135 "lateral_motor_mode", 

136 "restore_motor_positions", 

137 "start_x", 

138 "end_x", 

139 "start_y", 

140 "end_y", 

141 "start_z", 

142 "end_z", 

143 "enqueue", 

144 ] 

145 uigroups = [ 

146 "sampleid", 

147 "scan_type", 

148 "speed_ratio", 

149 "pixel_blur", 

150 { 

151 "Options": [ 

152 "expo_time", 

153 "sleep_time", 

154 "n_dark", 

155 "n_flat", 

156 "lateral_motor_mode", 

157 "restore_motor_positions", 

158 ], 

159 "ui:minwidth": 12, 

160 }, 

161 { 

162 "Y-motor (side)": ["start_x", "end_x"], 

163 "ui:minwidth": 6, 

164 }, 

165 { 

166 "Y-motor (front)": ["start_y", "end_y"], 

167 "ui:minwidth": 6, 

168 }, 

169 { 

170 "Z-motor": ["start_z", "end_z"], 

171 "ui:minwidth": 6, 

172 }, 

173 "enqueue", 

174 ] 

175 uischema = { 

176 "sampleid": {"classNames": "hidden-row", "ui:widget": "hidden"}, 

177 "enqueue": {"classNames": "hidden-row", "ui:widget": "hidden"}, 

178 "speed_ratio": {"ui:widget": "range"}, 

179 } 

180 

181 @validates_schema 

182 def schema_validate(self, data, **kwargs): 

183 if data["scan_type"] == "Continuous": 

184 if data["lateral_motor_mode"] != "under_rot": 

185 raise ValidationError( 

186 "In continuous mode, the lateral motor have ot be under the rot" 

187 ) 

188 

189 def calculated(self, data): 

190 try: 

191 return self._calculated(**data) 

192 except Exception: 

193 _logger.error("Error while calculated", exc_info=True) 

194 

195 def _calculated( 

196 self, 

197 expo_time: float | None = None, 

198 scan_type: str = None, 

199 speed_ratio: float = None, 

200 **kwargs, 

201 ): 

202 """Returns the calculated values 

203 

204 Arguments: 

205 data: Dictionary containing the actual parameters of the form 

206 """ 

207 result = {} 

208 

209 tomo_config = tomo_helper.get_active_tomo_config() 

210 if tomo_config is None: 

211 raise RuntimeError("No ACTIVE_TOMOCONFIG selected") 

212 

213 if expo_time is None: 

214 # Trick to feed the initial expo time based on the tomo imaging device 

215 imaging = tomo_config.tomo_imaging 

216 if imaging is not None: 

217 expo_time = imaging.exposure_time 

218 else: 

219 expo_time = 1.0 

220 result["expo_time"] = expo_time 

221 

222 if scan_type == "Step": 

223 result["pixel_blur"] = 0.0 

224 elif scan_type == "Continuous" and speed_ratio is not None: 

225 if expo_time is not None: 

226 tomo_detector = tomo_config.detectors.active_detector 

227 px = tomo_detector.sample_pixel_size 

228 

229 def get_continuous_velocity(z_axis): 

230 vmax = z_axis.velocity 

231 v1px = 1000 * px / expo_time 

232 if v1px < vmax: 

233 vmin = v1px 

234 else: 

235 vmin = vmax * 0.1 

236 return vmin + (vmax - vmin) * speed_ratio 

237 

238 v = get_continuous_velocity(tomo_config.z_axis) 

239 pixel_blur = (expo_time * v) / (1000 * px) 

240 result["pixel_blur"] = pixel_blur 

241 

242 return result 

243 

244 def time_estimate(self, data): 

245 result = self._calculated(**data) 

246 data.update(result) 

247 tomo_config = ( 

248 ACTIVE_TOMOCONFIG is not None and ACTIVE_TOMOCONFIG.deref_active_object() 

249 ) 

250 if tomo_config is None: 

251 raise RuntimeError("No ACTIVE_TOMOCONFIG selected") 

252 tomo_det = tomo_config.detectors.active_detector 

253 if tomo_det is not None: 

254 tomo_cam = tomo_det.detector 

255 else: 

256 tomo_cam = None 

257 

258 if tomo_cam is None: 

259 raise RuntimeError(f"No active detector selected in `{tomo_config.name}`") 

260 try: 

261 start_x = data["start_x"] 

262 end_x = data["end_x"] 

263 start_y = data["start_y"] 

264 end_y = data["end_y"] 

265 start_z = data["start_z"] 

266 end_z = data["end_z"] 

267 expo_time = data["expo_time"] 

268 sleep_time = data.get("sleep_time", 0) # FIXME: It can be uninitialized... 

269 n_dark = data["n_dark"] 

270 n_flat = data["n_flat"] 

271 lateral_motor_mode = data["lateral_motor_mode"] 

272 restore_motor_positions = data["restore_motor_positions"] 

273 scan_type = data["scan_type"] 

274 speed_ratio = data["speed_ratio"] 

275 except Exception: 

276 _logger.debug("Error while reading data", exc_info=True) 

277 return 0 

278 try: 

279 continuous = scan_type == "Continuous" 

280 return tiling_estimation_time( 

281 start_x, 

282 end_x, 

283 start_y, 

284 end_y, 

285 start_z, 

286 end_z, 

287 tomo_cam, 

288 expo_time=expo_time, 

289 sleep_time=sleep_time, 

290 n_dark=n_dark, 

291 n_flat=n_flat, 

292 lateral_motor_mode=lateral_motor_mode, 

293 restore_motor_positions=restore_motor_positions, 

294 continuous=continuous, 

295 tomoconfig=tomo_config, 

296 speed_ratio=speed_ratio * 1.0, 

297 ) 

298 except Exception: 

299 _logger.error("Error while computing estimation", exc_info=True) 

300 return 0 

301 

302 

303class TilingscanwithroiActor(ComponentActor): 

304 schema = TilingscanwithroiSchema 

305 name = "[tomo] tiling scan" 

306 

307 metatype = "tomo" 

308 

309 def method( 

310 self, 

311 start_x: float, 

312 end_x: float, 

313 start_y: float, 

314 end_y: float, 

315 start_z: float, 

316 end_z: float, 

317 expo_time: float, 

318 sleep_time: float, 

319 n_dark: int, 

320 n_flat: int, 

321 lateral_motor_mode: str, 

322 scan_type: str, 

323 restore_motor_positions: bool, 

324 speed_ratio: float, 

325 before_scan_starts, 

326 update_datacollection, 

327 **kwargs, 

328 ): 

329 tomo_config = ( 

330 ACTIVE_TOMOCONFIG is not None and ACTIVE_TOMOCONFIG.deref_active_object() 

331 ) 

332 if tomo_config is None: 

333 raise RuntimeError("No ACTIVE_TOMOCONFIG selected") 

334 tomo_det = tomo_config.detectors.active_detector 

335 if tomo_det is not None: 

336 tomo_cam = tomo_det.detector 

337 else: 

338 tomo_cam = None 

339 

340 if tomo_cam is None: 

341 raise RuntimeError(f"No active detector selected in `{tomo_config.name}`") 

342 

343 tomo_det.sync_hard() 

344 if not tomo_det.is_online: 

345 raise RuntimeError(f"{tomo_cam.name} is not online") 

346 if not tomo_det.check_ready_for_acquisition(): 

347 raise RuntimeError(f"{tomo_cam.name} is not ready") 

348 

349 continuous = scan_type == "Continuous" 

350 tiling( 

351 start_x, 

352 end_x, 

353 start_y, 

354 end_y, 

355 start_z, 

356 end_z, 

357 tomo_cam, 

358 expo_time=expo_time, 

359 sleep_time=sleep_time, 

360 n_dark=n_dark, 

361 n_flat=n_flat, 

362 lateral_motor_mode=lateral_motor_mode, 

363 continuous=continuous, 

364 speed_ratio=speed_ratio, 

365 restore_motor_positions=restore_motor_positions, 

366 tomoconfig=tomo_config, 

367 )