Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/implementors/tomo/zseriesscanwithroi.py: 0%

184 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3 

4from __future__ import annotations 

5 

6import gevent 

7import logging 

8import numpy 

9from marshmallow import fields, validate 

10 

11from bliss.common.standard import mv 

12from tomo.zseries import ZSeries 

13from . import tomo_helper 

14 

15from daiquiri.core.schema.validators import OneOf 

16from daiquiri.core.components import ( 

17 ComponentActor, 

18 ComponentActorSchema, 

19 ComponentActorKilled, 

20) 

21 

22_logger = logging.getLogger(__name__) 

23 

24 

25class ZseriesscanwithroiSchema(ComponentActorSchema): 

26 sampleid = fields.Int( 

27 required=True, 

28 metadata={"title": "Collection"}, 

29 ) 

30 dataset_name = fields.Str( 

31 required=True, 

32 metadata={"title": "Dataset"}, 

33 ) 

34 scan_type = OneOf( 

35 ["Continuous", "Step"], 

36 dump_default="Continuous", 

37 metadata={"title": "Scan type"}, 

38 ) 

39 nb_scans = fields.Int( 

40 dump_default=None, 

41 allow_none=True, 

42 # validate=validate.Range(min=1, max=999999), 

43 metadata={"title": "Nb scans", "readOnly": True}, 

44 ) 

45 z_start = fields.Float( 

46 dump_default=None, 

47 allow_none=True, 

48 metadata={"title": "Z-start", "unit": "mm", "readOnly": True}, 

49 ) 

50 z_stop = fields.Float( 

51 dump_default=None, 

52 allow_none=True, 

53 metadata={"title": "Z-stop", "unit": "mm", "readOnly": True}, 

54 ) 

55 z_height = fields.Float( 

56 dump_default=None, 

57 allow_none=True, 

58 validate=validate.Range(min=0, max=999999), 

59 metadata={"title": "Height", "unit": "mm", "readOnly": True}, 

60 ) 

61 z_fixel_step = fields.Float( 

62 dump_default=None, 

63 allow_none=True, 

64 validate=validate.Range(min=0, max=99999), 

65 metadata={"title": "Fixel step", "unit": "mm"}, 

66 ) 

67 z_step = fields.Float( 

68 dump_default=None, 

69 allow_none=True, 

70 metadata={"title": "Z step", "unit": "mm", "readOnly": True}, 

71 ) 

72 z_direction = OneOf( 

73 ["Up", "Down"], 

74 dump_default=None, 

75 allow_none=True, 

76 metadata={"title": "Direction", "readOnly": True}, 

77 ) 

78 z_min_overlap = fields.Float( 

79 dump_default=10, 

80 validate=validate.Range(min=0, max=99.99999), 

81 metadata={"title": "Min overlap", "unit": "%"}, 

82 ) 

83 z_overlap = fields.Float( 

84 dump_default=None, 

85 allow_none=True, 

86 validate=validate.Range(min=0, max=99.99999), 

87 required=False, 

88 metadata={"title": "Overlap", "unit": "%", "readOnly": True}, 

89 ) 

90 start_pos = fields.Float( 

91 dump_default=0, 

92 metadata={"title": "Start angle", "unit": "deg"}, 

93 ) 

94 range = fields.Float( 

95 dump_default=360, 

96 metadata={"title": "Range angle", "unit": "deg"}, 

97 ) 

98 tomo_n = fields.Int( 

99 dump_default=360, 

100 validate=validate.Range(min=1, max=999999), 

101 metadata={"title": "Nb steps"}, 

102 ) 

103 n_dark = fields.Int( 

104 dump_default=1, 

105 validate=validate.Range(min=0, max=999999), 

106 metadata={"title": "Nb darks"}, 

107 ) 

108 n_flat = fields.Int( 

109 dump_default=1, 

110 validate=validate.Range(min=0, max=999999), 

111 metadata={"title": "Nb flats"}, 

112 ) 

113 expo_time = fields.Float( 

114 validate=validate.Range(min=0.001, max=10), 

115 dump_default=None, 

116 metadata={"title": "Exposure time", "unit": "s"}, 

117 ) 

118 use_sinogram = fields.Bool( 

119 dump_default=True, 

120 metadata={ 

121 "title": "Sinogram", 

122 "description": "Generate or not a sinogram during the scan", 

123 }, 

124 ) 

125 axis_displacement = fields.Int( 

126 dump_default=0, 

127 validate=validate.Range(min=-300, max=300), 

128 metadata={ 

129 "title": "Axis displacement", 

130 "unit": "%", 

131 "description": "0% = centered, 100% = half detector width", 

132 }, 

133 ) 

134 comment = fields.Str( 

135 dump_default="", 

136 metadata={ 

137 "title": "Comment", 

138 }, 

139 ) 

140 x = fields.Float( 

141 dump_default=0, 

142 allow_none=True, 

143 metadata={"title": "X", "unit": "mm", "readOnly": True}, 

144 ) 

145 y = fields.Float( 

146 dump_default=0, 

147 allow_none=True, 

148 metadata={"title": "Y", "unit": "mm", "readOnly": True}, 

149 ) 

150 enqueue = fields.Bool( 

151 dump_default=True, 

152 metadata={ 

153 "title": "Queue Scan", 

154 }, 

155 ) 

156 

157 class Meta: 

158 uiorder = [ 

159 "sampleid", 

160 "dataset_name", 

161 "scan_type", 

162 "expo_time", 

163 "nb_scans", 

164 "z_start", 

165 "z_stop", 

166 "z_step", 

167 "z_min_overlap", 

168 "z_overlap", 

169 "z_height", 

170 "z_direction", 

171 "z_fixel_step", 

172 "start_pos", 

173 "range", 

174 "tomo_n", 

175 "use_sinogram", 

176 "n_dark", 

177 "n_flat", 

178 "axis_displacement", 

179 "comment", 

180 "x", 

181 "y", 

182 "enqueue", 

183 ] 

184 uigroups = [ 

185 {"Data policy": ["sampleid", "dataset_name"], "ui:minwidth": 12}, 

186 "scan_type", 

187 { 

188 "Main": ["expo_time", "start_pos", "range", "tomo_n"], 

189 "ui:minwidth": 12, 

190 }, 

191 { 

192 "Z-series": [ 

193 "z_height", 

194 "nb_scans", 

195 "z_fixel_step", 

196 "z_step", 

197 "z_min_overlap", 

198 "z_overlap", 

199 ], 

200 "ui:minwidth": 6, 

201 }, 

202 {"Options": ["use_sinogram", "n_dark", "n_flat"], "ui:minwidth": 12}, 

203 { 

204 "Half acquisition": ["axis_displacement"], 

205 "ui:minwidth": 12, 

206 }, 

207 "comment", 

208 { 

209 "Positioning": ["x", "y", "z_start", "z_stop"], 

210 "ui:minwidth": 6, 

211 }, 

212 "enqueue", 

213 "z_direction", 

214 ] 

215 uischema = { 

216 "sampleid": {"ui:widget": "SampleId"}, 

217 "dataset_name": {"ui:widget": "DatasetName"}, 

218 "comment": {"ui:widget": "textarea", "ui:options": {"rows": 3}}, 

219 "enqueue": {"classNames": "hidden-row", "ui:widget": "hidden"}, 

220 "z_direction": {"classNames": "hidden-row", "ui:widget": "hidden"}, 

221 } 

222 

223 def calculated(self, data): 

224 return self._calculated(**data) 

225 

226 def _calculated( 

227 self, 

228 z_direction: str, 

229 expo_time: float | None = None, 

230 z_height: None | float = None, 

231 z_fixel_step: None | float = None, 

232 z_min_overlap: float | None = None, 

233 **kwargs, 

234 ): 

235 """Returns the calculated values 

236 

237 Arguments: 

238 data: Dictionary containing the actual parameters of the form 

239 """ 

240 result: dict[str, object] = {} 

241 

242 if expo_time is None: 

243 result["expo_time"] = self.calculated_expo_time() 

244 vfov = self.calculated_vfov() 

245 nb_scans = self.calculated_nb_scans( 

246 z_fixel_step=z_fixel_step, 

247 z_height=z_height, 

248 z_min_overlap=z_min_overlap, 

249 vfov=vfov, 

250 ) 

251 if nb_scans is not None: 

252 result["nb_scans"] = nb_scans 

253 result["z_step"] = self.calculated_z_step( 

254 z_fixel_step=z_fixel_step, 

255 z_height=z_height, 

256 nb_scans=nb_scans, 

257 vfov=vfov, 

258 z_direction=z_direction, 

259 ) 

260 result["z_overlap"] = self.calculated_z_overlap( 

261 z_height=z_height, nb_scans=nb_scans, vfov=vfov 

262 ) 

263 return result 

264 

265 def calculated_expo_time(self) -> float: 

266 # Trick to feed the initial expo time based on the tomo imaging device 

267 tomo_config = tomo_helper.get_active_tomo_config() 

268 if tomo_config is None: 

269 raise RuntimeError("No ACTIVE_TOMOCONFIG selected") 

270 imaging = tomo_config.tomo_imaging 

271 if imaging is None: 

272 return 1.0 

273 return imaging.exposure_time 

274 

275 def calculated_vfov(self) -> None | float: 

276 """Vertical field of view in milimeter""" 

277 tomo_config = tomo_helper.get_active_tomo_config() 

278 tomocam = tomo_config.detectors.active_detector 

279 if tomocam is None: 

280 return None 

281 px = tomocam.sample_pixel_size 

282 if px is None or px == 0: 

283 return None 

284 size = tomocam.actual_size 

285 if size is None: 

286 return None 

287 return px * size[1] * 0.001 

288 

289 def calculated_nb_scans( 

290 self, 

291 z_fixel_step: float | None, 

292 z_height: None | float, 

293 vfov: None | float, 

294 z_min_overlap: float | None, 

295 ) -> None | int: 

296 """ 

297 n > height / (vfov * (1 - min_overlap)) - min_overlap / (1 - min_overlap) 

298 """ 

299 if z_height is None: 

300 return None 

301 if z_min_overlap is None: 

302 return None 

303 if vfov is None: 

304 return None 

305 if z_fixel_step is not None: 

306 n = (z_height + vfov) / z_fixel_step 

307 return max(1, int(numpy.ceil(n))) 

308 min_overlap = z_min_overlap * 0.01 

309 n = z_height / (vfov * (1 - min_overlap)) - min_overlap / (1 - min_overlap) 

310 return max(1, int(numpy.ceil(n))) 

311 

312 def calculated_z_overlap( 

313 self, 

314 z_height: None | float, 

315 vfov: None | float, 

316 nb_scans: int | None, 

317 ) -> None | float: 

318 """ 

319 overlap = (vfov * n - height) / (vfov * (n - 1)) 

320 """ 

321 if z_height is None: 

322 return None 

323 if vfov is None: 

324 return None 

325 if nb_scans is None: 

326 return None 

327 if nb_scans == 1: 

328 return 0 

329 return (vfov * nb_scans - z_height) / (vfov * (nb_scans - 1)) * 100 

330 

331 def calculated_z_step( 

332 self, 

333 z_fixel_step: float | None, 

334 z_height: float | None, 

335 vfov: float | None, 

336 nb_scans: int | None, 

337 z_direction: str, 

338 ) -> float | None: 

339 if z_direction == "Up": 

340 direction = -1 

341 elif z_direction == "Down": 

342 direction = 1 

343 else: 

344 raise ValueError(f"Unexpected value for z_direction. Found: {z_direction}") 

345 

346 if z_fixel_step is not None: 

347 return direction * z_fixel_step 

348 if z_height is None: 

349 return None 

350 if vfov is None: 

351 return None 

352 if nb_scans is None: 

353 return None 

354 if nb_scans == 1: 

355 return 0 

356 step = (z_height - vfov) / (nb_scans - 1) 

357 return direction * step 

358 

359 def time_estimate(self, data): 

360 try: 

361 actor = self.get_actor() 

362 return actor.time_estimate(**data) 

363 except Exception: 

364 _logger.error("Error while reading time estimate", exc_info=True) 

365 return None 

366 

367 

368class ZseriesscanwithroiActor(ComponentActor): 

369 schema = ZseriesscanwithroiSchema 

370 name = "[tomo] full tomo scan" 

371 

372 metatype = "tomo" 

373 saving_args = {"dataset": "{dataset_name}"} 

374 

375 def time_estimate( 

376 self, 

377 scan_type: str | None = None, 

378 start_pos: float | None = None, 

379 range: float | None = None, 

380 tomo_n: int | None = None, 

381 expo_time: float | None = None, 

382 use_sinogram: bool | None = None, 

383 n_dark: int | None = None, 

384 n_flat: int | None = None, 

385 axis_displacement: float | None = None, 

386 nb_scans: int | None = None, 

387 comment: str | None = None, 

388 z_start: float | None = None, 

389 z_step: float | None = None, 

390 **kwargs, 

391 ) -> float | None: 

392 

393 if ( 

394 scan_type is None 

395 or start_pos is None 

396 or range is None 

397 or tomo_n is None 

398 or expo_time is None 

399 or use_sinogram is None 

400 or n_dark is None 

401 or n_flat is None 

402 or nb_scans is None 

403 or z_start is None 

404 or z_step is None 

405 ): 

406 print(scan_type, start_pos, range) 

407 return None 

408 print("LETS TRY") 

409 

410 tomo_config = tomo_helper.get_active_tomo_config() 

411 if tomo_config is None: 

412 raise RuntimeError("No ACTIVE_TOMOCONFIG selected") 

413 

414 actor_config = self.get_config() 

415 zseries_object_names = actor_config.get("zseries_object") 

416 if zseries_object_names is None: 

417 raise RuntimeError( 

418 "'fullfield_name' config field was not setup in this samplescan actor description" 

419 ) 

420 

421 zseries = tomo_helper.get_sequence_from_name( 

422 tomo_config, zseries_object_names, ZSeries 

423 ) 

424 

425 tomo_helper.setup_main_pars( 

426 zseries, 

427 scan_type=scan_type, 

428 use_sinogram=use_sinogram, 

429 n_dark=n_dark, 

430 n_flat=n_flat, 

431 axis_displacement=axis_displacement, 

432 comment=comment, 

433 ) 

434 if use_sinogram: 

435 zseries.pars.dark_flat_for_each_scan = True 

436 

437 zseries.pars.step_start_pos = z_start 

438 zseries.pars.start_nb = 1 

439 zseries.pars.delta_pos = z_step 

440 zseries.pars.nb_scans = nb_scans 

441 zseries.pars.start_pos = start_pos 

442 zseries.pars.range = range 

443 zseries.pars.tomo_n = tomo_n 

444 zseries.pars.exposure_time = expo_time 

445 zseries.prepare() 

446 # Else it will be considered as already prepared next time 

447 zseries._prepare_done = False 

448 return zseries._inpars.scan_time 

449 

450 def method( 

451 self, 

452 scan_type: str, 

453 dataset_name: str, 

454 nb_scans: int, 

455 x: float, 

456 y: float, 

457 z_start: float, 

458 z_step: float, 

459 z_fixel_step: float | None, 

460 start_pos: float, 

461 range: float, 

462 tomo_n: int, 

463 expo_time: float, 

464 use_sinogram: bool, 

465 n_dark: int, 

466 n_flat: int, 

467 axis_displacement: float, 

468 comment: str, 

469 before_scan_starts, 

470 update_datacollection, 

471 **kwargs, 

472 ): 

473 if len(kwargs): 

474 _logger.error("Unused params: %s", kwargs) 

475 

476 tomo_config = tomo_helper.get_active_tomo_config() 

477 if tomo_config is None: 

478 raise RuntimeError("No ACTIVE_TOMOCONFIG selected") 

479 

480 actor_config = self.get_config() 

481 zseries_object_names = actor_config.get("zseries_object") 

482 if zseries_object_names is None: 

483 raise RuntimeError( 

484 "'fullfield_name' config field was not setup in this samplescan actor description" 

485 ) 

486 

487 zseries = tomo_helper.get_sequence_from_name( 

488 tomo_config, zseries_object_names, ZSeries 

489 ) 

490 

491 mg = tomo_helper.create_mg(tomo_config) 

492 mg.set_active() 

493 

494 tomo_helper.setup_main_pars( 

495 zseries, 

496 scan_type=scan_type, 

497 use_sinogram=use_sinogram, 

498 n_dark=n_dark, 

499 n_flat=n_flat, 

500 axis_displacement=axis_displacement, 

501 comment=comment, 

502 ) 

503 if use_sinogram: 

504 zseries.pars.dark_flat_for_each_scan = True 

505 

506 scan_info = tomo_helper.create_daiquiri_scan_info( 

507 self, share_data_collection_group=True 

508 ) 

509 

510 before_scan_starts(self) 

511 

512 def run(): 

513 with tomo_config.auto_projection.inhibit(): 

514 sample_stage = tomo_config.sample_stage 

515 yrot = sample_stage.y_axis.position 

516 mv( 

517 sample_stage.sample_x_axis, 

518 x - yrot, 

519 sample_stage.sample_y_axis, 

520 y - yrot, 

521 ) 

522 zseries.basic_scan( 

523 collection_name=None, 

524 dataset_name=dataset_name, 

525 step_start_pos=z_start, 

526 delta_pos=z_step, 

527 nb_scans=nb_scans, 

528 tomo_start_pos=start_pos, 

529 tomo_end_pos=start_pos + range, 

530 tomo_n=tomo_n, 

531 expo_time=expo_time, 

532 scan_info=scan_info, 

533 trust_data_policy_location=True, 

534 ) 

535 

536 greenlet = gevent.spawn(run) 

537 try: 

538 greenlet.join() 

539 except ComponentActorKilled: 

540 greenlet.kill() 

541 raise 

542 

543 greenlet.get()