Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/scans.py: 60%
194 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import time
4from marshmallow import fields
5from PIL import Image
6import numpy
8from daiquiri.core import marshal
9from daiquiri.core.logging import log
10from daiquiri.core.components import Component, ComponentResource
11from daiquiri.core.schema import ErrorSchema
12from daiquiri.core.schema.metadata import paginated
13from daiquiri.core.schema.components.scan import (
14 ScanSchema,
15 ScanStatusSchema,
16 ScanDataSchema,
17 ScanSpectraSchema,
18)
19from daiquiri.core.schema.components.h5grove import ValueField
20from daiquiri.core.responses import image_response, gzipped
21from daiquiri.core.utils import make_json_safe, worker
22from daiquiri.core.utils import imageutils
23from daiquiri.core.utils import arrayutils
25import logging
27logger = logging.getLogger(__name__)
30class ScansResource(ComponentResource):
31 @marshal(out=[[200, paginated(ScanSchema), "List of scans info"]], paged=True)
32 def get(self, **kwargs):
33 """Get a list of all scans and their info"""
34 return self._parent.get_scans(**kwargs), 200
37class ScanStatusResource(ComponentResource):
38 @marshal(
39 out=[[200, ScanStatusSchema(), "Scan status"]],
40 )
41 def get(self, **kwargs):
42 """Get current scan status
44 i.e. if there is a running scan
45 """
46 return self._parent.get_scan_status()
49class ScanResource(ComponentResource):
50 @marshal(
51 out=[
52 [200, ScanSchema(), "List of scans"],
53 [404, ErrorSchema(), "Scan not found"],
54 ]
55 )
56 def get(self, scanid, **kwargs):
57 """Get info for a specific scan"""
58 scan = self._parent.get_scans(scanid=scanid)
59 if scan:
60 return scan, 200
61 else:
62 return {"error": "No such scan"}, 404
65class ScanDataResource(ComponentResource):
66 @marshal(
67 inp={"scalars": fields.List(fields.Str())},
68 out=[
69 [200, ScanDataSchema(), "Scan data"],
70 [404, ErrorSchema(), "Scan not found"],
71 ],
72 paged=True,
73 )
74 def get(self, scanid, **kwargs):
75 """Get the data for a specific scan"""
76 scan = self._parent.get_scan_data(scanid=scanid, **kwargs)
77 if scan:
79 def gzip():
80 return gzipped(scan)
82 return worker(gzip)
83 else:
84 return {"error": "No such scan"}, 404
87class ScanDataBinaryResource(ComponentResource):
88 @marshal(
89 inp={
90 "channel": fields.Str(required=True),
91 "dtype": fields.Str(),
92 "selection": fields.Str(),
93 },
94 out=[
95 [200, ValueField(), "Get scalar data from a scan in binary format"],
96 [404, ErrorSchema(), "Scan not found"],
97 ],
98 )
99 def get(self, scanid, channel, dtype=None, selection=None, **kwargs):
100 """Get all the data for a specific scan scalar in binary format
102 Arguments:
103 scanid: Identifier of the scan
104 channel: Name of the data channel
105 dtype: One of "origin" or "safe" or numpy string like "float32" or "<f16"
106 selection: A string representation to slice the data (`1,2::2`)
107 """
108 if selection:
109 selection = arrayutils.parse_slice(selection)
110 else:
111 selection = None
113 scan_info = self._parent.get_scan_data(
114 scanid=scanid, json_safe=False, scalars=[channel], **kwargs
115 )
116 if not scan_info:
117 return {"error": "No such scan"}, 404
119 shape = scan_info["data"][channel]["shape"]
120 if len(shape) == 2:
122 def supported_selection(selection) -> bool:
123 if selection is None:
124 return False
125 if len(selection) == 0:
126 return False
127 if isinstance(selection[0], int):
128 return True
129 if selection[0].start != selection[0].stop - 1:
130 return False
131 return True
133 if not supported_selection(selection):
134 raise RuntimeError(
135 f"Such selection selection={selection} is not supported by the actual image API"
136 )
137 image_no = selection[0]
138 if not isinstance(image_no, int):
139 image_no = image_no.start
140 array = self._parent.get_scan_image(
141 scanid=scanid, node_name=channel, image_no=image_no
142 )
143 if len(selection) > 1:
144 sub_selection = selection[1:]
145 array = array[sub_selection]
146 else:
147 scan = self._parent.get_scan_data(
148 scanid=scanid,
149 json_safe=False,
150 scalars=[channel],
151 page=1,
152 per_page=scan_info["data"][channel]["size"],
153 **kwargs,
154 )
155 if not scan:
156 return {"error": "No such scan"}, 404
158 data = scan["data"].get(channel)
160 if data is None:
161 return {"error": f"No such scalar '{channel}'"}, 404
162 array = data["data"]
164 if selection is not None:
165 array = array[selection]
167 if dtype in ["origin", None]:
168 pass
169 elif dtype == "safe":
170 safe_dtype = arrayutils.to_safe_js_dtype(array.dtype)
171 array = array.astype(safe_dtype)
172 else:
173 array = array.astype(dtype)
175 def gzip():
176 return gzipped(array)
178 return worker(gzip)
181class ScanSpectraResource(ComponentResource):
182 @marshal(
183 inp={
184 "point": fields.Int(
185 required=True,
186 metadata={"description": "The point to return a spectrum for"},
187 )
188 },
189 out=[
190 [200, ScanSpectraSchema(), "Scan spectra"],
191 [404, ErrorSchema(), "Scan not found"],
192 ],
193 )
194 def get(self, scanid, **kwargs):
195 """Get the spectra for a specific scan"""
196 spectra = self._parent.get_scan_spectra(scanid=scanid, point=kwargs["point"])
197 if spectra is not None:
198 return spectra, 200
199 else:
200 return {"error": "No such scan"}, 404
203class ScanImageResource(ComponentResource):
204 @marshal(
205 inp={
206 "node_name": fields.Str(
207 metadata={"description": "The scan node name to get images from"}
208 ),
209 "image_no": fields.Int(
210 metadata={"description": "The image number to load"}
211 ),
212 "raw": fields.Bool(
213 metadata={
214 "description": "Return the raw data rather than an image (gzipped)"
215 }
216 ),
217 "norm": fields.String(
218 metadata={
219 "description": "Normalization of the image, can be 'linear', 'log', 'arcsinh', 'sqrt'"
220 }
221 ),
222 "autoscale": fields.String(
223 metadata={
224 "description": "Autoscale for the domain of the image, can be 'none', 'minmax', 'stddev3'"
225 }
226 ),
227 "lut": fields.String(
228 metadata={
229 "description": "LUT for the colors, can be 'gray', 'gray_r', 'viridis', 'cividis'"
230 }
231 ),
232 },
233 out=[
234 # [200, ScanDataSchema(), 'Scan data'],
235 [
236 400,
237 ErrorSchema(),
238 "Error retrieving image",
239 404,
240 ErrorSchema(),
241 "Scan not found",
242 ]
243 ],
244 )
245 def get(
246 self,
247 scanid,
248 node_name,
249 image_no,
250 raw=False,
251 norm="linear",
252 autoscale="none",
253 lut="gray",
254 **kwargs,
255 ):
256 """Get the image for a specific scan"""
257 try:
258 arr = self._parent.get_scan_image(
259 scanid=scanid, node_name=node_name, image_no=image_no
260 )
261 except Exception as e:
262 return {"error": str(e)}, 400
264 if arr is None:
265 return {"error": "No such image"}, 404
267 def generate():
268 nonlocal arr, raw, lut, autoscale, norm
269 if raw:
270 flat = arr.flatten()
271 return gzipped(
272 make_json_safe(
273 {
274 "domain": [numpy.amin(flat), numpy.amax(flat)],
275 "shape": arr.shape,
276 "data": flat,
277 }
278 )
279 )
280 else:
281 arr = imageutils.array_to_image(arr, autoscale, norm, lut)
282 im = Image.fromarray(arr)
283 if im.mode != "RGB":
284 im = im.convert("RGB")
286 return image_response(im)
288 return worker(generate)
291class Scans(Component):
292 """Scan Component
294 The scan component loads a scan source as defined in scans.yml, and then provides
295 access to this data via a flask resource.
297 It also subscribes to the scan sources new scan, new data, and scan end watchers,
298 and emits these changes via socketio
299 """
301 _config_export = ["mca"]
303 def setup(self, *args, **kwargs):
304 self._last_new_data = 0
305 self._scan_sources = []
306 self._scan_status = {"scanid": None, "progress": 0}
308 for source in self._config["sources"]:
309 src = self._create_source_from_config(source, self._config)
310 src.watch_new_scan(self._new_scan)
311 src.watch_end_scan(self._end_scan)
312 src.watch_new_data(self._new_data)
313 self._scan_sources.append(src)
315 self.register_route(ScansResource, "")
316 self.register_route(ScanStatusResource, "/status")
317 self.register_route(ScanResource, "/<int:scanid>")
318 self.register_route(ScanDataResource, "/data/<int:scanid>")
319 self.register_route(ScanDataBinaryResource, "/data/binary/<int:scanid>")
320 self.register_route(ScanSpectraResource, "/spectra/<int:scanid>")
321 self.register_route(ScanImageResource, "/image/<int:scanid>")
323 def close(self):
324 """Clean up the service at the end.
326 After this call, the component should not be accessed anymore
327 """
328 for s in self._scan_sources:
329 s.close()
330 self._scan_sources = []
332 def _create_source_from_config(self, source_config: dict, config: dict):
333 source_type = source_config["type"]
334 if source_type == "bliss":
335 import bliss.release
336 from packaging.version import Version
338 if Version(bliss.release.version) < Version("2.0.dev"):
339 # Use old bliss connector
340 from daiquiri.core.hardware.bliss.scans import BlissScans
342 scan_connector_class = BlissScans
343 else:
344 # Use blissdata anyway
345 from daiquiri.core.hardware.blissdata.scans import BlissdataScans
347 scan_connector_class = BlissdataScans
348 elif source_type == "blissdata":
349 from daiquiri.core.hardware.blissdata.scans import BlissdataScans
351 scan_connector_class = BlissdataScans
352 else:
353 raise ValueError(f"Unsupported {type} scan component type")
355 src = scan_connector_class(config, app=self._app, source_config=source_config)
356 logger.debug("Registered %s scan source", source_type)
357 return src
359 def get_scans(self, scanid=None, **kwargs):
360 """Get scans from a scan source
362 Args:
363 scanid (int): A specific scanif to return
365 Returns:
366 scans (dict): A dict of total => number of scans, scans => paginated list of scan infos if scanid is None
367 scan (dict): A scan dict if scanid is not None
368 """
369 scans = {}
370 if len(self._scan_sources):
371 scans = self._scan_sources[0].get_scans(scanid=scanid, **kwargs)
373 return scans
375 def get_scan_data(self, scanid, **kwargs):
376 """Get the data for a scan
378 Args:
379 scanid (int): The scanid
381 Returns:
382 data (dict): Returns
383 """
384 scan = {}
385 if len(self._scan_sources):
386 scan = self._scan_sources[0].get_scan_data(scanid=scanid, **kwargs)
388 return scan
390 def get_scan_spectra(self, scanid, point=0, allpoints=False):
391 """Get the data for a scan
393 Args:
394 scanid (int): The scanid
395 point (int): The point to return
396 allpoints (bool): Return all available points
398 Returns:
399 data (dict): Returns
400 """
401 scan = {}
402 if len(self._scan_sources):
403 scan = self._scan_sources[0].get_scan_spectra(
404 scanid=scanid, point=point, allpoints=allpoints
405 )
407 return scan
409 def get_scan_image(self, scanid, node_name, image_no):
410 """Get the data for a scan
412 Args:
413 scanid (int): The scanid
414 node_name (str): The node to return data for
415 image_no (int): The image to return
417 Returns:
418 data (dict): Returns
419 """
420 image = None
421 if len(self._scan_sources):
422 image = self._scan_sources[0].get_scan_image(
423 scanid=scanid, node_name=node_name, image_no=image_no
424 )
426 return image
428 def get_scan_status(self):
429 return self._scan_status
431 def _new_scan(self, scanid, type, title, metadata):
432 self._scan_status["scanid"] = scanid
434 log.get("user").info(
435 f"New scan started with id {scanid} and title {title}", type="scan"
436 )
437 self.emit(
438 "new_scan",
439 {"scanid": scanid, "type": type, "title": title},
440 )
442 def _end_scan(self, scanid, metadata):
443 if self._scan_status["scanid"] == scanid:
444 self._scan_status["scanid"] = None
446 log.get("user").info(f"Scan {scanid} finished", type="scan")
447 self.emit("end_scan", {"scanid": scanid})
449 def _new_data(
450 self, scanid, master, progress, channel_name, channel_size, channel_progress
451 ):
452 if self._scan_status["scanid"] == scanid:
453 self._scan_status["progress"] = progress
455 now = time.time()
456 if (now - self._last_new_data) < 1:
457 return
459 self._last_new_data = now
461 # logger.info(f"_new_data debounced {scanid} {channel}")
462 self.emit(
463 "new_data",
464 {"scanid": scanid, "channel": master, "progress": progress},
465 )