Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/bliss/helpers.py: 0%

55 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 02:13 +0000

1import logging 

2import typing 

3import numpy 

4import math 

5 

6from bliss.flint.helper import scan_info_helper 

7 

8from daiquiri.core.utils import worker 

9 

10_logger = logging.getLogger(__name__) 

11 

12try: 

13 from silx.io.h5py_utils import retry 

14 from silx.io.h5py_utils import File 

15except ImportError: 

16 # For older silx version 

17 _logger.warning("Fallback to HDF5 reader without retry") 

18 

19 def retry(retry_timeout): 

20 def same(f): 

21 return f 

22 

23 return same 

24 

25 from h5py import File 

26 

27 

28@retry(retry_timeout=2) 

29def get_data_from_file( 

30 scan_node_name: str, scan_info, type="scalar" 

31) -> typing.Dict[str, numpy.ndarray]: 

32 """Read channel data from HDF5, and referenced by this scan_info""" 

33 # Load it locally in case there is setup 

34 from nexus_writer_service.subscribers.devices import device_info 

35 from nexus_writer_service.subscribers.dataset_proxy import normalize_nexus_name 

36 

37 types = { 

38 "scalar": 0, 

39 "spectrum": 1, 

40 } 

41 

42 if type not in types: 

43 raise KeyError( 

44 f"Unknown data type {type} requested, available types are {types.keys()}" 

45 ) 

46 

47 channels = list(scan_info_helper.iter_channels(scan_info)) 

48 channel_names = set( 

49 [c.name for c in channels if c.info.get("dim", 0) == types[type]] 

50 ) 

51 scan_nb = scan_info["scan_nb"] 

52 

53 if "nexus" not in scan_info["data_writer"]: 

54 raise EnvironmentError("nexuswriter was not enabled for this scan") 

55 

56 def read_hdf5(): 

57 points = math.inf 

58 result = {} 

59 

60 with File(scan_info["filename"]) as h5: 

61 devices = scan_info["nexuswriter"]["devices"] 

62 devices = device_info(devices, scan_info) 

63 for subscan_id, (_subscan, devices) in enumerate(devices.items(), 1): 

64 for channel_name, device in devices.items(): 

65 if channel_name not in channel_names: 

66 continue 

67 grpname = normalize_nexus_name(device["device_name"]) 

68 dsetname = normalize_nexus_name(device["data_name"]) 

69 path = f"/{scan_nb}.{subscan_id}/instrument/{grpname}/{dsetname}" 

70 try: 

71 # Create a memory copy of the data 

72 data = h5[path][()] 

73 except Exception: 

74 _logger.debug("Backtrace", exc_info=True) 

75 _logger.info( 

76 "Data from channel %s is not reachable", channel_name 

77 ) 

78 else: 

79 if len(data) < points: 

80 points = len(data) 

81 

82 result[channel_name] = data 

83 

84 if points == math.inf: 

85 points = 0 

86 return result, points 

87 

88 res = worker(read_hdf5) 

89 return res[0], res[1]