Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/implementors/tomo/tilingscanwithroi.py: 0%

104 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-15 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3import logging 

4from marshmallow import fields, validate 

5from marshmallow import validates_schema, ValidationError 

6from daiquiri.core.schema.validators import OneOf 

7from daiquiri.core.components import ( 

8 ComponentActor, 

9 ComponentActorSchema, 

10) 

11 

12from tomo.sequence.tiling import tiling 

13from tomo.sequence.tiling import tiling_estimation_time 

14 

15try: 

16 from tomo.globals import ACTIVE_TOMOCONFIG 

17except ImportError: 

18 ACTIVE_TOMOCONFIG = None 

19 

20_logger = logging.getLogger(__name__) 

21 

22 

23class TilingscanwithroiSchema(ComponentActorSchema): 

24 sampleid = fields.Int(required=True) 

25 

26 scan_type = OneOf( 

27 ["Continuous", "Step"], 

28 dump_default="Continuous", 

29 metadata={"title": "Scan type"}, 

30 ) 

31 # FIXME: Units have to be read from the motors 

32 # FIXME: Motor names have to be read from the motor 

33 start_x = fields.Float( 

34 validate=validate.Range(min=-2000, max=2000), 

35 dump_default=-250, 

36 metadata={"title": "Start", "unit": "㎜"}, 

37 ) 

38 end_x = fields.Float( 

39 validate=validate.Range(min=-2000, max=2000), 

40 dump_default=250, 

41 metadata={"title": "Stop", "unit": "㎜"}, 

42 ) 

43 start_y = fields.Float( 

44 validate=validate.Range(min=-2000, max=2000), 

45 dump_default=-250, 

46 metadata={"title": "Start", "unit": "㎜"}, 

47 ) 

48 end_y = fields.Float( 

49 validate=validate.Range(min=-2000, max=2000), 

50 dump_default=250, 

51 metadata={"title": "Stop", "unit": "㎜"}, 

52 ) 

53 start_z = fields.Float( 

54 validate=validate.Range(min=-2000, max=2000), 

55 dump_default=550, 

56 metadata={"title": "Start", "unit": "㎜"}, 

57 ) 

58 end_z = fields.Float( 

59 validate=validate.Range(min=-2000, max=2000), 

60 dump_default=-550, 

61 metadata={"title": "Stop", "unit": "㎜"}, 

62 ) 

63 expo_time = fields.Float( 

64 validate=validate.Range(min=0.001, max=10), 

65 dump_default=None, 

66 metadata={"title": "Exposure time", "unit": "s"}, 

67 ) 

68 sleep_time = fields.Float( 

69 validate=validate.Range(min=0.0, max=10), 

70 dump_default=0, 

71 metadata={"title": "Sleep time", "unit": "s"}, 

72 ) 

73 n_dark = fields.Int( 

74 dump_default=1, 

75 validate=validate.Range(min=0, max=999999), 

76 metadata={"title": "Nb darks"}, 

77 ) 

78 n_flat = fields.Int( 

79 dump_default=1, 

80 validate=validate.Range(min=0, max=999999), 

81 metadata={"title": "Nb flats"}, 

82 ) 

83 lateral_motor_mode = OneOf( 

84 ["over_rot", "under_rot"], 

85 dump_default="over_rot", 

86 metadata={ 

87 "title": "Lateral motor", 

88 }, 

89 ) 

90 restore_motor_positions = fields.Bool( 

91 metadata={ 

92 "title": "Move back motors", 

93 "description": "Move back motors at their initial position when the scan is terminated", 

94 }, 

95 dump_default=True, 

96 ) 

97 enqueue = fields.Bool( 

98 dump_default=True, 

99 metadata={ 

100 "title": "Queue Scan", 

101 }, 

102 ) 

103 

104 class Meta: 

105 uiorder = [ 

106 "sampleid", 

107 "scan_type", 

108 "expo_time", 

109 "sleep_time", 

110 "n_dark", 

111 "n_flat", 

112 "lateral_motor_mode", 

113 "restore_motor_positions", 

114 "start_x", 

115 "end_x", 

116 "start_y", 

117 "end_y", 

118 "start_z", 

119 "end_z", 

120 "enqueue", 

121 ] 

122 uigroups = [ 

123 "sampleid", 

124 "scan_type", 

125 { 

126 "Options": [ 

127 "expo_time", 

128 "sleep_time", 

129 "n_dark", 

130 "n_flat", 

131 "lateral_motor_mode", 

132 "restore_motor_positions", 

133 ], 

134 "ui:minwidth": 12, 

135 }, 

136 { 

137 "Y-motor (side)": ["start_x", "end_x"], 

138 "ui:minwidth": 6, 

139 }, 

140 { 

141 "Y-motor (front)": ["start_y", "end_y"], 

142 "ui:minwidth": 6, 

143 }, 

144 { 

145 "Z-motor": ["start_z", "end_z"], 

146 "ui:minwidth": 6, 

147 }, 

148 "enqueue", 

149 ] 

150 uischema = { 

151 "sampleid": {"classNames": "hidden-row", "ui:widget": "hidden"}, 

152 "enqueue": {"classNames": "hidden-row", "ui:widget": "hidden"}, 

153 } 

154 

155 @validates_schema 

156 def schema_validate(self, data, **kwargs): 

157 if data["scan_type"] == "Continuous": 

158 if data["lateral_motor_mode"] != "under_rot": 

159 raise ValidationError( 

160 f"In continuous mode, the lateral motor have ot be under the rot" 

161 ) 

162 

163 def calculated(self, data, **kwargs): 

164 """Returns the calculated values 

165 

166 Arguments: 

167 data: Dictionary containing the actual parameters of the form 

168 """ 

169 result = {} 

170 

171 if data.get("expo_time") is None: 

172 # Trick to feed the initial expo time based on the tomo imaging device 

173 tomo_config = ( 

174 ACTIVE_TOMOCONFIG is not None 

175 and ACTIVE_TOMOCONFIG.deref_active_object() 

176 ) 

177 if tomo_config is None: 

178 raise RuntimeError("No ACTIVE_TOMOCONFIG selected") 

179 imaging = tomo_config.tomo_imaging 

180 if imaging is not None: 

181 result["expo_time"] = imaging.exposure_time 

182 else: 

183 result["expo_time"] = 1.0 

184 

185 return result 

186 

187 def time_estimate(self, data): 

188 result = self.calculated(data) 

189 data.update(result) 

190 tomo_config = ( 

191 ACTIVE_TOMOCONFIG is not None and ACTIVE_TOMOCONFIG.deref_active_object() 

192 ) 

193 if tomo_config is None: 

194 raise RuntimeError("No ACTIVE_TOMOCONFIG selected") 

195 tomo_det = tomo_config.detectors.active_detector 

196 if tomo_det is not None: 

197 tomo_cam = tomo_det.detector 

198 else: 

199 tomo_cam = None 

200 

201 if tomo_cam is None: 

202 raise RuntimeError(f"No active detector selected in `{tomo_config.name}`") 

203 try: 

204 start_x = data["start_x"] 

205 end_x = data["end_x"] 

206 start_y = data["start_y"] 

207 end_y = data["end_y"] 

208 start_z = data["start_z"] 

209 end_z = data["end_z"] 

210 expo_time = data["expo_time"] 

211 sleep_time = data.get("sleep_time", 0) # FIXME: It can be uninitialized... 

212 n_dark = data["n_dark"] 

213 n_flat = data["n_flat"] 

214 lateral_motor_mode = data["lateral_motor_mode"] 

215 restore_motor_positions = data["restore_motor_positions"] 

216 scan_type = data["scan_type"] 

217 except Exception: 

218 _logger.debug("Error while reading data", exc_info=True) 

219 return 0 

220 try: 

221 continuous = scan_type == "Continuous" 

222 return tiling_estimation_time( 

223 start_x, 

224 end_x, 

225 start_y, 

226 end_y, 

227 start_z, 

228 end_z, 

229 tomo_cam, 

230 expo_time=expo_time, 

231 sleep_time=sleep_time, 

232 n_dark=n_dark, 

233 n_flat=n_flat, 

234 lateral_motor_mode=lateral_motor_mode, 

235 restore_motor_positions=restore_motor_positions, 

236 continuous=continuous, 

237 tomoconfig=tomo_config, 

238 ) 

239 except Exception: 

240 _logger.error("Error while computing estimation", exc_info=True) 

241 return 0 

242 

243 

244class TilingscanwithroiActor(ComponentActor): 

245 schema = TilingscanwithroiSchema 

246 name = "[tomo] tiling scan" 

247 

248 metatype = "tomo" 

249 

250 def method( 

251 self, 

252 start_x, 

253 end_x, 

254 start_y, 

255 end_y, 

256 start_z, 

257 end_z, 

258 expo_time, 

259 sleep_time, 

260 n_dark, 

261 n_flat, 

262 lateral_motor_mode, 

263 scan_type: str, 

264 restore_motor_positions, 

265 before_scan_starts, 

266 update_datacollection, 

267 **kwargs, 

268 ): 

269 tomo_config = ( 

270 ACTIVE_TOMOCONFIG is not None and ACTIVE_TOMOCONFIG.deref_active_object() 

271 ) 

272 if tomo_config is None: 

273 raise RuntimeError("No ACTIVE_TOMOCONFIG selected") 

274 tomo_det = tomo_config.detectors.active_detector 

275 if tomo_det is not None: 

276 tomo_cam = tomo_det.detector 

277 else: 

278 tomo_cam = None 

279 

280 if tomo_cam is None: 

281 raise RuntimeError(f"No active detector selected in `{tomo_config.name}`") 

282 

283 tomo_det.sync_hard() 

284 if not tomo_det.is_online: 

285 raise RuntimeError(f"{tomo_cam.name} is not online") 

286 if not tomo_det.check_ready_for_acquisition(): 

287 raise RuntimeError(f"{tomo_cam.name} is not ready") 

288 

289 continuous = scan_type == "Continuous" 

290 tiling( 

291 start_x, 

292 end_x, 

293 start_y, 

294 end_y, 

295 start_z, 

296 end_z, 

297 tomo_cam, 

298 expo_time=expo_time, 

299 sleep_time=sleep_time, 

300 n_dark=n_dark, 

301 n_flat=n_flat, 

302 lateral_motor_mode=lateral_motor_mode, 

303 continuous=continuous, 

304 restore_motor_positions=restore_motor_positions, 

305 tomoconfig=tomo_config, 

306 )