Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/tomo/component.py: 26%

163 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-02-06 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3 

4from __future__ import annotations 

5import logging 

6import typing 

7import pint 

8from ruamel.yaml import YAML 

9 

10from daiquiri.core.components import Component 

11from daiquiri.core.components.dcutilsmixin import DCUtilsMixin 

12 

13from .detectors_resource import TomoDetectorsResource 

14from .move_resource import TomoMoveResource 

15from .image_resource import TomoImageResource 

16from .scan_info_resource import TomoScanInfoResource 

17from .slice_reconstruction_resource import SliceReconstructionResource 

18from .datatype import DetectorLiveStorage 

19from .datatype import SampleStageMetadata 

20from .datatype import ScanInfoStorage 

21from .scan_listener import TomoScanListener 

22 

23 

24logger = logging.getLogger(__name__) 

25 

26 

27TOMOVIS_PROXY_TEMPLATE = """ 

28name: tomovis 

29target: TO_BE_MODIFIED 

30openapi: res://openapi/tomovis_v1.json 

31routes: 

32 - name: image_tiling/ 

33 - name: image_tiling/<string:resource_id> 

34 methods: [get, delete] 

35 - name: h5grove/attr/ 

36 - name: h5grove/data/ 

37 - name: h5grove/meta/ 

38""" 

39 

40 

41class TomoComponent(Component, DCUtilsMixin): 

42 """Tomo detector component. 

43 

44 The tomo detector component watch scans and capture data in order to process 

45 data correction on the fly. 

46 

47 For now it captures dark and flat from tomo scans. And provide an API to 

48 retrieve a flat field correction. 

49 """ 

50 

51 def __init__(self, *args, **kwargs): 

52 self._scan_sources = [] 

53 self._detectors: dict[str, DetectorLiveStorage] = {} 

54 self._scaninfos: ScanInfoStorage = ScanInfoStorage() 

55 self._tomo_config = None 

56 self._exported_config = {} 

57 self._scanlisteners = [] 

58 self._slice_reconstruction_triggers = None 

59 self._last_delta_beta = None 

60 super(TomoComponent, self).__init__(*args, **kwargs) 

61 

62 def after_all_setup(self, components): 

63 """Called after the setup of the whole components""" 

64 self._setup_tomovis_proxy(components) 

65 avatar = self._config.get("avatar") 

66 if avatar is not None: 

67 self._exported_config["avatar"] = avatar 

68 

69 def get_export_config(self): 

70 """Get exported config values from components""" 

71 return self._exported_config 

72 

73 def _setup_tomovis_proxy(self, components): 

74 config = self._config.get("tomovis", {}) 

75 url = config.get("url") 

76 if url is not None: 

77 self._exported_config["tomovis_url"] = url 

78 create_proxy = config.get("create_proxy", False) 

79 if create_proxy: 

80 proxy = components.get_component("proxy") 

81 if proxy is None: 

82 raise RuntimeError( 

83 "Tomovis setup as proxy but no proxy component configured" 

84 ) 

85 self._exported_config["tomovis_proxy_url"] = "/api/proxy/tomovis" 

86 yaml = YAML() 

87 description = yaml.load(TOMOVIS_PROXY_TEMPLATE) 

88 description["target"] = url 

89 proxy.setup_proxy(description) 

90 

91 @property 

92 def slice_reconstruction_triggers(self) -> list[str]: 

93 return self._slice_reconstruction_triggers 

94 

95 @property 

96 def scan_sources(self): 

97 return self._scan_sources 

98 

99 @property 

100 def min_refresh_period(self) -> float: 

101 """Minimal image refresh period done during scan""" 

102 return self._config.get("min_refresh_period", 1) 

103 

104 @property 

105 def last_delta_beta(self) -> typing.Optional[float]: 

106 """Last delta beta value used""" 

107 return self._last_delta_beta 

108 

109 def set_last_delta_beta(self, delta_beta: float): 

110 self._last_delta_beta = delta_beta 

111 

112 @property 

113 def tomovis_uri(self) -> typing.Optional[str]: 

114 """Tomovis URI service if defined, else None""" 

115 config = self._config.get("tomovis", {}) 

116 return config.get("url") 

117 

118 def get_tomo_config_name(self) -> str: 

119 """Returns the name of the tomo BLISS object used by the application""" 

120 return self._tomo_config.name 

121 

122 def setup(self, *args, **kwargs): 

123 self._slice_reconstruction_triggers = self._config.get( 

124 "slice_reconstruction_triggers", None 

125 ) 

126 if self._slice_reconstruction_triggers is None: 

127 self._slice_reconstruction_triggers = ["terminated"] 

128 

129 for source in self._config["sources"]: 

130 if source["type"] == "bliss": 

131 

132 def get_bliss_device_from_config(source, name): 

133 device_name = source.get(name) 

134 if device_name is None: 

135 return None 

136 

137 if device_name == "ACTIVE_TOMOCONFIG": 

138 from tomo.globals import ACTIVE_TOMOCONFIG 

139 

140 return ACTIVE_TOMOCONFIG.deref_active_object() 

141 

142 from bliss.config.static import get_config 

143 

144 config = get_config() 

145 return config.get(device_name) 

146 

147 self._tomo_config = get_bliss_device_from_config(source, "tomo_config") 

148 

149 tomo_detectors = self._tomo_config.detectors 

150 for tomo_detector in tomo_detectors.detectors: 

151 # Name of the lima device inside BLISS 

152 detector_id = tomo_detector.detector.name 

153 logger.info("Register BLISS detector: %s", detector_id) 

154 self._detectors[detector_id] = DetectorLiveStorage( 

155 detector_id=detector_id, 

156 tomo_detector_id=tomo_detector.name, 

157 detector_node_name=f"{detector_id}:image", 

158 ) 

159 

160 scan_source_class = self._get_scan_source_class() 

161 src = scan_source_class( 

162 self._config, app=self._app, source_config=source 

163 ) 

164 

165 scanlistener = TomoScanListener(self, src) 

166 src.watch_new_scan(scanlistener.new_scan) 

167 src.watch_end_scan(scanlistener.end_scan) 

168 src.watch_new_data(scanlistener.new_data) 

169 src.watch_new_0d_data(scanlistener.new_0d_data) 

170 

171 self._scan_sources.append(src) 

172 self._scanlisteners.append(scanlistener) 

173 

174 logger.debug("Registered Bliss scan source") 

175 else: 

176 raise TypeError("Only bliss source is available for now") 

177 

178 self.register_route(TomoDetectorsResource, "/detectors") 

179 self.register_route(TomoScanInfoResource, "/scaninfo") 

180 self.register_route(TomoImageResource, "/data") 

181 self.register_route(TomoMoveResource, "/move") 

182 self.register_route(SliceReconstructionResource, "/slice_reconstruction") 

183 

184 def _get_scan_source_class(self): 

185 """ 

186 FIXME: This should be dropped 

187 """ 

188 import bliss.release 

189 from packaging.version import Version 

190 

191 if Version(bliss.release.version) < Version("2.0.dev"): 

192 # Use old bliss connector 

193 from daiquiri.core.hardware.bliss.scans import BlissScans 

194 

195 return BlissScans 

196 else: 

197 # Use blissdata anyway 

198 from daiquiri.core.hardware.blissdata.scans import ( 

199 BlissdataScans, 

200 ) 

201 

202 return BlissdataScans 

203 

204 def get_tomo_config(self): 

205 """Returns the tomo config object""" 

206 return self._tomo_config 

207 

208 def get_sample_stage_meta(self, detector_name) -> SampleStageMetadata: 

209 """Returns the sample stage state 

210 

211 This is a workaround for now, it would be much better to retrieve it 

212 from the scan if possible. 

213 

214 Which is probably part of positioners for ct 

215 """ 

216 

217 def quantity_from_motor(motor): 

218 if motor is None: 

219 return None 

220 return pint.Quantity(motor.position, motor.unit) 

221 

222 def quantity_or_none(value, unit): 

223 if value is None: 

224 return None 

225 return pint.Quantity(value, unit) 

226 

227 pixel_size = None 

228 

229 def get_active_tomo_detector(): 

230 if self._tomo_config is None: 

231 return None 

232 if self._tomo_config.detectors is None: 

233 return None 

234 tomo_detector = self._tomo_config.detectors.active_detector 

235 return tomo_detector 

236 

237 detector = get_active_tomo_detector() 

238 if detector and detector.detector.name == detector_name: 

239 # For now there is no way to reach the pixel size from another 

240 # detector than the active one 

241 ps = detector.sample_pixel_size 

242 if ps: 

243 pixel_size = pint.Quantity(ps, "um") 

244 

245 tomo_config = self._tomo_config 

246 sample_stage = tomo_config.sample_stage 

247 detector_center = sample_stage.detector_center 

248 

249 return SampleStageMetadata( 

250 sy=quantity_from_motor(sample_stage.y_axis), 

251 sz=quantity_from_motor(sample_stage.z_axis), 

252 sampy=quantity_from_motor(sample_stage.sample_y_axis), 

253 somega=quantity_from_motor(sample_stage.rotation_axis), 

254 detcy=quantity_or_none(detector_center[0], "mm"), 

255 detcz=quantity_or_none(detector_center[1], "mm"), 

256 pixel_size=pixel_size, 

257 ) 

258 

259 def move( 

260 self, 

261 sy: pint.Quantity = None, 

262 sz: pint.Quantity = None, 

263 sampx: pint.Quantity = None, 

264 sampy: pint.Quantity = None, 

265 sampu: pint.Quantity = None, 

266 sampv: pint.Quantity = None, 

267 relative: bool = False, 

268 ): 

269 """Move the sample stage motors. 

270 

271 If multiple moves are requested, all the command are send in parallel 

272 

273 Arguments: 

274 relative: If true, the move is relative 

275 """ 

276 

277 def normalize_value_to_motor_unit(motor, value): 

278 if value.units == "css_pixel": 

279 # By default pint provides a conversion from px to metric system 

280 # TODO this have to be removed 

281 raise RuntimeError("Can't convert from pixel to length") 

282 return value.to(motor.unit).magnitude 

283 

284 trajectory = { 

285 self._tomo_config.y_axis: sy, 

286 self._tomo_config.z_axis: sz, 

287 self._tomo_config.sample_x_axis: sampx, 

288 self._tomo_config.sample_y_axis: sampy, 

289 self._tomo_config.sample_u_axis: sampu, 

290 self._tomo_config.sample_v_axis: sampv, 

291 } 

292 

293 trajectory = { 

294 k: normalize_value_to_motor_unit(k, v) 

295 for k, v in trajectory.items() 

296 if v is not None 

297 } 

298 

299 if len(trajectory) == 0: 

300 return 

301 

302 from bliss.common.motor_group import Group 

303 

304 group = Group(*trajectory.keys()) 

305 group.move(trajectory, wait=True, relative=relative) 

306 

307 def get_detector(self, detector_id: str) -> typing.Optional[DetectorLiveStorage]: 

308 """Get available detectors""" 

309 return self._detectors.get(detector_id) 

310 

311 def get_scaninfos(self): 

312 """Returns the state of the actual scans""" 

313 return self._scaninfos 

314 

315 def get_detectors(self) -> list[DetectorLiveStorage]: 

316 """Get available detectors""" 

317 return self._detectors.values()