Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/scans.py: 60%

194 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-15 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3import time 

4from marshmallow import fields 

5from PIL import Image 

6import numpy 

7 

8from daiquiri.core import marshal 

9from daiquiri.core.logging import log 

10from daiquiri.core.components import Component, ComponentResource 

11from daiquiri.core.schema import ErrorSchema 

12from daiquiri.core.schema.metadata import paginated 

13from daiquiri.core.schema.components.scan import ( 

14 ScanSchema, 

15 ScanStatusSchema, 

16 ScanDataSchema, 

17 ScanSpectraSchema, 

18) 

19from daiquiri.core.schema.components.h5grove import ValueField 

20from daiquiri.core.responses import image_response, gzipped 

21from daiquiri.core.utils import make_json_safe, worker 

22from daiquiri.core.utils import imageutils 

23from daiquiri.core.utils import arrayutils 

24 

25import logging 

26 

27logger = logging.getLogger(__name__) 

28 

29 

30class ScansResource(ComponentResource): 

31 @marshal(out=[[200, paginated(ScanSchema), "List of scans info"]], paged=True) 

32 def get(self, **kwargs): 

33 """Get a list of all scans and their info""" 

34 return self._parent.get_scans(**kwargs), 200 

35 

36 

37class ScanStatusResource(ComponentResource): 

38 @marshal( 

39 out=[[200, ScanStatusSchema(), "Scan status"]], 

40 ) 

41 def get(self, **kwargs): 

42 """Get current scan status 

43 

44 i.e. if there is a running scan 

45 """ 

46 return self._parent.get_scan_status() 

47 

48 

49class ScanResource(ComponentResource): 

50 @marshal( 

51 out=[ 

52 [200, ScanSchema(), "List of scans"], 

53 [404, ErrorSchema(), "Scan not found"], 

54 ] 

55 ) 

56 def get(self, scanid, **kwargs): 

57 """Get info for a specific scan""" 

58 scan = self._parent.get_scans(scanid=scanid) 

59 if scan: 

60 return scan, 200 

61 else: 

62 return {"error": "No such scan"}, 404 

63 

64 

65class ScanDataResource(ComponentResource): 

66 @marshal( 

67 inp={"scalars": fields.List(fields.Str())}, 

68 out=[ 

69 [200, ScanDataSchema(), "Scan data"], 

70 [404, ErrorSchema(), "Scan not found"], 

71 ], 

72 paged=True, 

73 ) 

74 def get(self, scanid, **kwargs): 

75 """Get the data for a specific scan""" 

76 scan = self._parent.get_scan_data(scanid=scanid, **kwargs) 

77 if scan: 

78 

79 def gzip(): 

80 return gzipped(scan) 

81 

82 return worker(gzip) 

83 else: 

84 return {"error": "No such scan"}, 404 

85 

86 

87class ScanDataBinaryResource(ComponentResource): 

88 @marshal( 

89 inp={ 

90 "channel": fields.Str(required=True), 

91 "dtype": fields.Str(), 

92 "selection": fields.Str(), 

93 }, 

94 out=[ 

95 [200, ValueField(), "Get scalar data from a scan in binary format"], 

96 [404, ErrorSchema(), "Scan not found"], 

97 ], 

98 ) 

99 def get(self, scanid, channel, dtype=None, selection=None, **kwargs): 

100 """Get all the data for a specific scan scalar in binary format 

101 

102 Arguments: 

103 scanid: Identifier of the scan 

104 channel: Name of the data channel 

105 dtype: One of "origin" or "safe" or numpy string like "float32" or "<f16" 

106 selection: A string representation to slice the data (`1,2::2`) 

107 """ 

108 if selection: 

109 selection = arrayutils.parse_slice(selection) 

110 else: 

111 selection = None 

112 

113 scan_info = self._parent.get_scan_data( 

114 scanid=scanid, json_safe=False, scalars=[channel], **kwargs 

115 ) 

116 if not scan_info: 

117 return {"error": "No such scan"}, 404 

118 

119 shape = scan_info["data"][channel]["shape"] 

120 if len(shape) == 2: 

121 

122 def supported_selection(selection) -> bool: 

123 if selection is None: 

124 return False 

125 if len(selection) == 0: 

126 return False 

127 if isinstance(selection[0], int): 

128 return True 

129 if selection[0].start != selection[0].stop - 1: 

130 return False 

131 return True 

132 

133 if not supported_selection(selection): 

134 raise RuntimeError( 

135 f"Such selection selection={selection} is not supported by the actual image API" 

136 ) 

137 image_no = selection[0] 

138 if not isinstance(image_no, int): 

139 image_no = image_no.start 

140 array = self._parent.get_scan_image( 

141 scanid=scanid, node_name=channel, image_no=image_no 

142 ) 

143 if len(selection) > 1: 

144 sub_selection = selection[1:] 

145 array = array[sub_selection] 

146 else: 

147 scan = self._parent.get_scan_data( 

148 scanid=scanid, 

149 json_safe=False, 

150 scalars=[channel], 

151 page=1, 

152 per_page=scan_info["data"][channel]["size"], 

153 **kwargs, 

154 ) 

155 if not scan: 

156 return {"error": "No such scan"}, 404 

157 

158 data = scan["data"].get(channel) 

159 

160 if data is None: 

161 return {"error": f"No such scalar '{channel}'"}, 404 

162 array = data["data"] 

163 

164 if selection is not None: 

165 array = array[selection] 

166 

167 if dtype in ["origin", None]: 

168 pass 

169 elif dtype == "safe": 

170 safe_dtype = arrayutils.to_safe_js_dtype(array.dtype) 

171 array = array.astype(safe_dtype) 

172 else: 

173 array = array.astype(dtype) 

174 

175 def gzip(): 

176 return gzipped(array) 

177 

178 return worker(gzip) 

179 

180 

181class ScanSpectraResource(ComponentResource): 

182 @marshal( 

183 inp={ 

184 "point": fields.Int( 

185 required=True, 

186 metadata={"description": "The point to return a spectrum for"}, 

187 ) 

188 }, 

189 out=[ 

190 [200, ScanSpectraSchema(), "Scan spectra"], 

191 [404, ErrorSchema(), "Scan not found"], 

192 ], 

193 ) 

194 def get(self, scanid, **kwargs): 

195 """Get the spectra for a specific scan""" 

196 spectra = self._parent.get_scan_spectra(scanid=scanid, point=kwargs["point"]) 

197 if spectra is not None: 

198 return spectra, 200 

199 else: 

200 return {"error": "No such scan"}, 404 

201 

202 

203class ScanImageResource(ComponentResource): 

204 @marshal( 

205 inp={ 

206 "node_name": fields.Str( 

207 metadata={"description": "The scan node name to get images from"} 

208 ), 

209 "image_no": fields.Int( 

210 metadata={"description": "The image number to load"} 

211 ), 

212 "raw": fields.Bool( 

213 metadata={ 

214 "description": "Return the raw data rather than an image (gzipped)" 

215 } 

216 ), 

217 "norm": fields.String( 

218 metadata={ 

219 "description": "Normalization of the image, can be 'linear', 'log', 'arcsinh', 'sqrt'" 

220 } 

221 ), 

222 "autoscale": fields.String( 

223 metadata={ 

224 "description": "Autoscale for the domain of the image, can be 'none', 'minmax', 'stddev3'" 

225 } 

226 ), 

227 "lut": fields.String( 

228 metadata={ 

229 "description": "LUT for the colors, can be 'gray', 'gray_r', 'viridis', 'cividis'" 

230 } 

231 ), 

232 }, 

233 out=[ 

234 # [200, ScanDataSchema(), 'Scan data'], 

235 [ 

236 400, 

237 ErrorSchema(), 

238 "Error retrieving image", 

239 404, 

240 ErrorSchema(), 

241 "Scan not found", 

242 ] 

243 ], 

244 ) 

245 def get( 

246 self, 

247 scanid, 

248 node_name, 

249 image_no, 

250 raw=False, 

251 norm="linear", 

252 autoscale="none", 

253 lut="gray", 

254 **kwargs, 

255 ): 

256 """Get the image for a specific scan""" 

257 try: 

258 arr = self._parent.get_scan_image( 

259 scanid=scanid, node_name=node_name, image_no=image_no 

260 ) 

261 except Exception as e: 

262 return {"error": str(e)}, 400 

263 

264 if arr is None: 

265 return {"error": "No such image"}, 404 

266 

267 def generate(): 

268 nonlocal arr, raw, lut, autoscale, norm 

269 if raw: 

270 flat = arr.flatten() 

271 return gzipped( 

272 make_json_safe( 

273 { 

274 "domain": [numpy.amin(flat), numpy.amax(flat)], 

275 "shape": arr.shape, 

276 "data": flat, 

277 } 

278 ) 

279 ) 

280 else: 

281 arr = imageutils.array_to_image(arr, autoscale, norm, lut) 

282 im = Image.fromarray(arr) 

283 if im.mode != "RGB": 

284 im = im.convert("RGB") 

285 

286 return image_response(im) 

287 

288 return worker(generate) 

289 

290 

291class Scans(Component): 

292 """Scan Component 

293 

294 The scan component loads a scan source as defined in scans.yml, and then provides 

295 access to this data via a flask resource. 

296 

297 It also subscribes to the scan sources new scan, new data, and scan end watchers, 

298 and emits these changes via socketio 

299 """ 

300 

301 _config_export = ["mca"] 

302 

303 def setup(self, *args, **kwargs): 

304 self._last_new_data = 0 

305 self._scan_sources = [] 

306 self._scan_status = {"scanid": None, "progress": 0} 

307 

308 for source in self._config["sources"]: 

309 src = self._create_source_from_config(source, self._config) 

310 src.watch_new_scan(self._new_scan) 

311 src.watch_end_scan(self._end_scan) 

312 src.watch_new_data(self._new_data) 

313 self._scan_sources.append(src) 

314 

315 self.register_route(ScansResource, "") 

316 self.register_route(ScanStatusResource, "/status") 

317 self.register_route(ScanResource, "/<int:scanid>") 

318 self.register_route(ScanDataResource, "/data/<int:scanid>") 

319 self.register_route(ScanDataBinaryResource, "/data/binary/<int:scanid>") 

320 self.register_route(ScanSpectraResource, "/spectra/<int:scanid>") 

321 self.register_route(ScanImageResource, "/image/<int:scanid>") 

322 

323 def close(self): 

324 """Clean up the service at the end. 

325 

326 After this call, the component should not be accessed anymore 

327 """ 

328 for s in self._scan_sources: 

329 s.close() 

330 self._scan_sources = [] 

331 

332 def _create_source_from_config(self, source_config: dict, config: dict): 

333 source_type = source_config["type"] 

334 if source_type == "bliss": 

335 import bliss.release 

336 from packaging.version import Version 

337 

338 if Version(bliss.release.version) < Version("2.0.dev"): 

339 # Use old bliss connector 

340 from daiquiri.core.hardware.bliss.scans import BlissScans 

341 

342 scan_connector_class = BlissScans 

343 else: 

344 # Use blissdata anyway 

345 from daiquiri.core.hardware.blissdata.scans import BlissdataScans 

346 

347 scan_connector_class = BlissdataScans 

348 elif source_type == "blissdata": 

349 from daiquiri.core.hardware.blissdata.scans import BlissdataScans 

350 

351 scan_connector_class = BlissdataScans 

352 else: 

353 raise ValueError(f"Unsupported {type} scan component type") 

354 

355 src = scan_connector_class(config, app=self._app, source_config=source_config) 

356 logger.debug("Registered %s scan source", source_type) 

357 return src 

358 

359 def get_scans(self, scanid=None, **kwargs): 

360 """Get scans from a scan source 

361 

362 Args: 

363 scanid (int): A specific scanif to return 

364 

365 Returns: 

366 scans (dict): A dict of total => number of scans, scans => paginated list of scan infos if scanid is None 

367 scan (dict): A scan dict if scanid is not None 

368 """ 

369 scans = {} 

370 if len(self._scan_sources): 

371 scans = self._scan_sources[0].get_scans(scanid=scanid, **kwargs) 

372 

373 return scans 

374 

375 def get_scan_data(self, scanid, **kwargs): 

376 """Get the data for a scan 

377 

378 Args: 

379 scanid (int): The scanid 

380 

381 Returns: 

382 data (dict): Returns 

383 """ 

384 scan = {} 

385 if len(self._scan_sources): 

386 scan = self._scan_sources[0].get_scan_data(scanid=scanid, **kwargs) 

387 

388 return scan 

389 

390 def get_scan_spectra(self, scanid, point=0, allpoints=False): 

391 """Get the data for a scan 

392 

393 Args: 

394 scanid (int): The scanid 

395 point (int): The point to return 

396 allpoints (bool): Return all available points 

397 

398 Returns: 

399 data (dict): Returns 

400 """ 

401 scan = {} 

402 if len(self._scan_sources): 

403 scan = self._scan_sources[0].get_scan_spectra( 

404 scanid=scanid, point=point, allpoints=allpoints 

405 ) 

406 

407 return scan 

408 

409 def get_scan_image(self, scanid, node_name, image_no): 

410 """Get the data for a scan 

411 

412 Args: 

413 scanid (int): The scanid 

414 node_name (str): The node to return data for 

415 image_no (int): The image to return 

416 

417 Returns: 

418 data (dict): Returns 

419 """ 

420 image = None 

421 if len(self._scan_sources): 

422 image = self._scan_sources[0].get_scan_image( 

423 scanid=scanid, node_name=node_name, image_no=image_no 

424 ) 

425 

426 return image 

427 

428 def get_scan_status(self): 

429 return self._scan_status 

430 

431 def _new_scan(self, scanid, type, title, metadata): 

432 self._scan_status["scanid"] = scanid 

433 

434 log.get("user").info( 

435 f"New scan started with id {scanid} and title {title}", type="scan" 

436 ) 

437 self.emit( 

438 "new_scan", 

439 {"scanid": scanid, "type": type, "title": title}, 

440 ) 

441 

442 def _end_scan(self, scanid, metadata): 

443 if self._scan_status["scanid"] == scanid: 

444 self._scan_status["scanid"] = None 

445 

446 log.get("user").info(f"Scan {scanid} finished", type="scan") 

447 self.emit("end_scan", {"scanid": scanid}) 

448 

449 def _new_data( 

450 self, scanid, master, progress, channel_name, channel_size, channel_progress 

451 ): 

452 if self._scan_status["scanid"] == scanid: 

453 self._scan_status["progress"] = progress 

454 

455 now = time.time() 

456 if (now - self._last_new_data) < 1: 

457 return 

458 

459 self._last_new_data = now 

460 

461 # logger.info(f"_new_data debounced {scanid} {channel}") 

462 self.emit( 

463 "new_data", 

464 {"scanid": scanid, "channel": master, "progress": progress}, 

465 )