Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/bliss/helpers.py: 0%
55 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
1import logging
2import typing
3import numpy
4import math
6from bliss.flint.helper import scan_info_helper
8from daiquiri.core.utils import worker
10_logger = logging.getLogger(__name__)
12try:
13 from silx.io.h5py_utils import retry
14 from silx.io.h5py_utils import File
15except ImportError:
16 # For older silx version
17 _logger.warning("Fallback to HDF5 reader without retry")
19 def retry(retry_timeout):
20 def same(f):
21 return f
23 return same
25 from h5py import File
28@retry(retry_timeout=2)
29def get_data_from_file(
30 scan_node_name: str, scan_info, type="scalar"
31) -> typing.Dict[str, numpy.ndarray]:
32 """Read channel data from HDF5, and referenced by this scan_info"""
33 # Load it locally in case there is setup
34 from nexus_writer_service.subscribers.devices import device_info
35 from nexus_writer_service.subscribers.dataset_proxy import normalize_nexus_name
37 types = {
38 "scalar": 0,
39 "spectrum": 1,
40 }
42 if type not in types:
43 raise KeyError(
44 f"Unknown data type {type} requested, available types are {types.keys()}"
45 )
47 channels = list(scan_info_helper.iter_channels(scan_info))
48 channel_names = set(
49 [c.name for c in channels if c.info.get("dim", 0) == types[type]]
50 )
51 scan_nb = scan_info["scan_nb"]
53 if "nexus" not in scan_info["data_writer"]:
54 raise EnvironmentError("nexuswriter was not enabled for this scan")
56 def read_hdf5():
57 points = math.inf
58 result = {}
60 with File(scan_info["filename"]) as h5:
61 devices = scan_info["nexuswriter"]["devices"]
62 devices = device_info(devices, scan_info)
63 for subscan_id, (_subscan, devices) in enumerate(devices.items(), 1):
64 for channel_name, device in devices.items():
65 if channel_name not in channel_names:
66 continue
67 grpname = normalize_nexus_name(device["device_name"])
68 dsetname = normalize_nexus_name(device["data_name"])
69 path = f"/{scan_nb}.{subscan_id}/instrument/{grpname}/{dsetname}"
70 try:
71 # Create a memory copy of the data
72 data = h5[path][()]
73 except Exception:
74 _logger.debug("Backtrace", exc_info=True)
75 _logger.info(
76 "Data from channel %s is not reachable", channel_name
77 )
78 else:
79 if len(data) < points:
80 points = len(data)
82 result[channel_name] = data
84 if points == math.inf:
85 points = 0
86 return result, points
88 res = worker(read_hdf5)
89 return res[0], res[1]