Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/tomo/component.py: 26%

162 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-15 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3 

4import logging 

5import typing 

6import pint 

7from ruamel.yaml import YAML 

8 

9from daiquiri.core.components import Component 

10from daiquiri.core.components.dcutilsmixin import DCUtilsMixin 

11 

12from .detectors_resource import TomoDetectorsResource 

13from .move_resource import TomoMoveResource 

14from .image_resource import TomoImageResource 

15from .scan_info_resource import TomoScanInfoResource 

16from .slice_reconstruction_resource import SliceReconstructionResource 

17from .datatype import DetectorLiveStorage 

18from .datatype import SampleStageMetadata 

19from .datatype import ScanInfoStorage 

20from .scan_listener import TomoScanListener 

21 

22 

23logger = logging.getLogger(__name__) 

24 

25 

26TOMOVIS_PROXY_TEMPLATE = """ 

27name: tomovis 

28target: TO_BE_MODIFIED 

29openapi: res://openapi/tomovis_v1.json 

30routes: 

31 - name: image_tiling/ 

32 - name: image_tiling/<string:resource_id> 

33 methods: [get, delete] 

34 - name: h5grove/attr/ 

35 - name: h5grove/data/ 

36 - name: h5grove/meta/ 

37""" 

38 

39 

40class TomoComponent(Component, DCUtilsMixin): 

41 """Tomo detector component. 

42 

43 The tomo detector component watch scans and capture data in order to process 

44 data correction on the fly. 

45 

46 For now it captures dark and flat from tomo scans. And provide an API to 

47 retrieve a flat field correction. 

48 """ 

49 

50 def __init__(self, *args, **kwargs): 

51 self._scan_sources = [] 

52 self._detectors: typing.Dict[str, DetectorLiveStorage] = {} 

53 self._scaninfos: ScanInfoStorage = ScanInfoStorage() 

54 self._tomo_config = None 

55 self._exported_config = {} 

56 self._scanlisteners = [] 

57 self._slice_reconstruction_triggers = None 

58 self._last_delta_beta = None 

59 super(TomoComponent, self).__init__(*args, **kwargs) 

60 

61 def after_all_setup(self, components): 

62 """Called after the setup of the whole components""" 

63 self._setup_tomovis_proxy(components) 

64 avatar = self._config.get("avatar") 

65 if avatar is not None: 

66 self._exported_config["avatar"] = avatar 

67 

68 def get_export_config(self): 

69 """Get exported config values from components""" 

70 return self._exported_config 

71 

72 def _setup_tomovis_proxy(self, components): 

73 config = self._config.get("tomovis", {}) 

74 url = config.get("url") 

75 if url is not None: 

76 self._exported_config["tomovis_url"] = url 

77 create_proxy = config.get("create_proxy", False) 

78 if create_proxy: 

79 proxy = components.get_component("proxy") 

80 if proxy is None: 

81 raise RuntimeError( 

82 "Tomovis setup as proxy but no proxy component configured" 

83 ) 

84 self._exported_config["tomovis_proxy_url"] = "/api/proxy/tomovis" 

85 yaml = YAML() 

86 description = yaml.load(TOMOVIS_PROXY_TEMPLATE) 

87 description["target"] = url 

88 proxy.setup_proxy(description) 

89 

90 @property 

91 def slice_reconstruction_triggers(self) -> typing.List[str]: 

92 return self._slice_reconstruction_triggers 

93 

94 @property 

95 def scan_sources(self): 

96 return self._scan_sources 

97 

98 @property 

99 def min_refresh_period(self) -> float: 

100 """Minimal image refresh period done during scan""" 

101 return self._config.get("min_refresh_period", 1) 

102 

103 @property 

104 def last_delta_beta(self) -> typing.Optional[float]: 

105 """Last delta beta value used""" 

106 return self._last_delta_beta 

107 

108 def set_last_delta_beta(self, delta_beta: float): 

109 self._last_delta_beta = delta_beta 

110 

111 @property 

112 def tomovis_uri(self) -> typing.Optional[str]: 

113 """Tomovis URI service if defined, else None""" 

114 config = self._config.get("tomovis", {}) 

115 return config.get("url") 

116 

117 def get_tomo_config_name(self) -> str: 

118 """Returns the name of the tomo BLISS object used by the application""" 

119 return self._tomo_config.name 

120 

121 def setup(self, *args, **kwargs): 

122 self._slice_reconstruction_triggers = self._config.get( 

123 "slice_reconstruction_triggers", None 

124 ) 

125 if self._slice_reconstruction_triggers is None: 

126 self._slice_reconstruction_triggers = ["terminated"] 

127 

128 for source in self._config["sources"]: 

129 if source["type"] == "bliss": 

130 

131 def get_bliss_device_from_config(source, name): 

132 device_name = source.get(name) 

133 if device_name is None: 

134 return None 

135 

136 if device_name == "ACTIVE_TOMOCONFIG": 

137 from tomo.globals import ACTIVE_TOMOCONFIG 

138 

139 return ACTIVE_TOMOCONFIG.deref_active_object() 

140 

141 from bliss.config.static import get_config 

142 

143 config = get_config() 

144 return config.get(device_name) 

145 

146 self._tomo_config = get_bliss_device_from_config(source, "tomo_config") 

147 

148 tomo_detectors = self._tomo_config.detectors 

149 for tomo_detector in tomo_detectors.detectors: 

150 # Name of the lima device inside BLISS 

151 detector_id = tomo_detector.detector.name 

152 logger.info("Register BLISS detector: %s", detector_id) 

153 self._detectors[detector_id] = DetectorLiveStorage( 

154 detector_id=detector_id, 

155 tomo_detector_id=tomo_detector.name, 

156 detector_node_name=f"{detector_id}:image", 

157 ) 

158 

159 scan_source_class = self._get_scan_source_class() 

160 src = scan_source_class( 

161 self._config, app=self._app, source_config=source 

162 ) 

163 

164 scanlistener = TomoScanListener(self, src) 

165 src.watch_new_scan(scanlistener.new_scan) 

166 src.watch_end_scan(scanlistener.end_scan) 

167 src.watch_new_data(scanlistener.new_data) 

168 src.watch_new_0d_data(scanlistener.new_0d_data) 

169 

170 self._scan_sources.append(src) 

171 self._scanlisteners.append(scanlistener) 

172 

173 logger.debug("Registered Bliss scan source") 

174 else: 

175 raise TypeError("Only bliss source is available for now") 

176 

177 self.register_route(TomoDetectorsResource, "/detectors") 

178 self.register_route(TomoScanInfoResource, "/scaninfo") 

179 self.register_route(TomoImageResource, "/data") 

180 self.register_route(TomoMoveResource, "/move") 

181 self.register_route(SliceReconstructionResource, "/slice_reconstruction") 

182 

183 def _get_scan_source_class(self): 

184 """ 

185 FIXME: This should be dropped 

186 """ 

187 import bliss.release 

188 from packaging.version import Version 

189 

190 if Version(bliss.release.version) < Version("2.0.dev"): 

191 # Use old bliss connector 

192 from daiquiri.core.hardware.bliss.scans import BlissScans 

193 

194 return BlissScans 

195 else: 

196 # Use blissdata anyway 

197 from daiquiri.core.hardware.blissdata.scans import ( 

198 BlissdataScans, 

199 ) 

200 

201 return BlissdataScans 

202 

203 def get_tomo_config(self): 

204 """Returns the tomo config object""" 

205 return self._tomo_config 

206 

207 def get_sample_stage_meta(self, detector_name) -> SampleStageMetadata: 

208 """Returns the sample stage state 

209 

210 This is a workaround for now, it would be much better to retrieve it 

211 from the scan if possible. 

212 

213 Which is probably part of positioners for ct 

214 """ 

215 

216 def quantity_from_motor(motor): 

217 if motor is None: 

218 return None 

219 return pint.Quantity(motor.position, motor.unit) 

220 

221 def quantity_or_none(value, unit): 

222 if value is None: 

223 return None 

224 return pint.Quantity(value, unit) 

225 

226 pixel_size = None 

227 

228 def get_active_tomo_detector(): 

229 if self._tomo_config is None: 

230 return None 

231 if self._tomo_config.detectors is None: 

232 return None 

233 tomo_detector = self._tomo_config.detectors.active_detector 

234 return tomo_detector 

235 

236 detector = get_active_tomo_detector() 

237 if detector and detector.detector.name == detector_name: 

238 # For now there is no way to reach the pixel size from another 

239 # detector than the active one 

240 ps = detector.sample_pixel_size 

241 if ps: 

242 pixel_size = pint.Quantity(ps, "um") 

243 

244 tomo_config = self._tomo_config 

245 sample_stage = tomo_config.sample_stage 

246 detector_center = sample_stage.detector_center 

247 

248 return SampleStageMetadata( 

249 sy=quantity_from_motor(sample_stage.y_axis), 

250 sz=quantity_from_motor(sample_stage.z_axis), 

251 sampy=quantity_from_motor(sample_stage.sample_y_axis), 

252 somega=quantity_from_motor(sample_stage.rotation_axis), 

253 detcy=quantity_or_none(detector_center[0], "mm"), 

254 detcz=quantity_or_none(detector_center[1], "mm"), 

255 pixel_size=pixel_size, 

256 ) 

257 

258 def move( 

259 self, 

260 sy: pint.Quantity = None, 

261 sz: pint.Quantity = None, 

262 sampx: pint.Quantity = None, 

263 sampy: pint.Quantity = None, 

264 sampu: pint.Quantity = None, 

265 sampv: pint.Quantity = None, 

266 relative: bool = False, 

267 ): 

268 """Move the sample stage motors. 

269 

270 If multiple moves are requested, all the command are send in parallel 

271 

272 Arguments: 

273 relative: If true, the move is relative 

274 """ 

275 

276 def normalize_value_to_motor_unit(motor, value): 

277 if value.units == "css_pixel": 

278 # By default pint provides a conversion from px to metric system 

279 # TODO this have to be removed 

280 raise RuntimeError("Can't convert from pixel to length") 

281 return value.to(motor.unit).magnitude 

282 

283 trajectory = { 

284 self._tomo_config.y_axis: sy, 

285 self._tomo_config.z_axis: sz, 

286 self._tomo_config.sample_x_axis: sampx, 

287 self._tomo_config.sample_y_axis: sampy, 

288 self._tomo_config.sample_u_axis: sampu, 

289 self._tomo_config.sample_v_axis: sampv, 

290 } 

291 

292 trajectory = { 

293 k: normalize_value_to_motor_unit(k, v) 

294 for k, v in trajectory.items() 

295 if v is not None 

296 } 

297 

298 if len(trajectory) == 0: 

299 return 

300 

301 from bliss.common.motor_group import Group 

302 

303 group = Group(*trajectory.keys()) 

304 group.move(trajectory, wait=True, relative=relative) 

305 

306 def get_detector(self, detector_id: str) -> typing.Optional[DetectorLiveStorage]: 

307 """Get available detectors""" 

308 return self._detectors.get(detector_id) 

309 

310 def get_scaninfos(self): 

311 """Returns the state of the actual scans""" 

312 return self._scaninfos 

313 

314 def get_detectors(self) -> typing.List[DetectorLiveStorage]: 

315 """Get available detectors""" 

316 return self._detectors.values()