Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/hdf5.py: 50%
124 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import os
4import logging
5import h5py
6from marshmallow import fields
7import numpy
9from daiquiri.core import marshal
10from daiquiri.core.components import Component, ComponentResource
11from daiquiri.core.schema import ErrorSchema
12from daiquiri.core.schema.components.hdf5 import RootHdf5Schema, Hdf5Schema
13from daiquiri.core.utils import make_json_safe, worker
14from daiquiri.core.responses import gzipped
16logger = logging.getLogger(__name__)
19ids = {
20 "autoprocprogramid": fields.Int(
21 metadata={"description": "Auto processing program id"}
22 ),
23 "autoprocprogramattachmentid": fields.Int(
24 metadata={"description": "Auto processing program attachment id"}
25 ),
26 "datacollectionid": fields.Int(metadata={"description": "Data collection id"}),
27 "type": fields.Str(metadata={"enum": ["processing"]}),
28}
31class ContentsResource(ComponentResource):
32 @marshal(
33 inp=ids,
34 out=[
35 [200, RootHdf5Schema(), "Get hdf5 file contents"],
36 [400, ErrorSchema(), "Could not find hdf5 file"],
37 ],
38 )
39 def get(self, **kwargs):
40 """Get the contents of an hdf5 file"""
41 contents = self._parent.get_hdf5(**kwargs)
42 if contents:
44 def gzip():
45 return gzipped(contents)
47 return worker(gzip)
49 return {"error": "Could not find hdf5 file"}, 400
52class GroupResource(ComponentResource):
53 @marshal(
54 inp=ids,
55 out=[
56 [200, Hdf5Schema(), "Get hdf5 group contents, including data"],
57 [400, ErrorSchema(), "Could not find hdf5 group"],
58 ],
59 )
60 def get(self, path, **kwargs):
61 """Get a group and its data from a hdf5 file"""
62 group = self._parent.get_group(path, **kwargs)
63 if group:
65 def gzip():
66 return gzipped(group)
68 return worker(gzip)
70 return {"error": "Could not find hdf5 group"}, 400
73def hdf5_to_dict(filename: str, path: str = None, load_data: bool = False) -> dict:
74 """
75 Dump a HDF5 into an arborecent python dict structure
77 Arguments:
78 filename:Location of the HDF5 file
79 path: Path of the HDF5 object to dump from the file
80 load_data: If true datasets are dumped with the data, else the data is
81 set to None.
83 Returns:
84 None: If the file was not reachable, or the path not reachable.
85 """
87 def _get_data(dataset: h5py.Dataset, load_data: bool):
88 dtype = dataset.dtype
89 if dtype == "object":
90 type_class = dataset.id.get_type().get_class()
91 if type_class == h5py.h5t.STRING:
92 dtype = "string"
93 else:
94 raise RuntimeError(
95 "HDF5 dataset type %s unsupposed for now" % type_class
96 )
98 if not load_data:
99 return None, dtype
101 # TODO: Optimise for big datasets
102 if dtype == "string":
103 data = dataset.asstr()[()]
104 else:
105 data = dataset[()]
106 return data, dtype
108 def _get_dataset(h5file, path, load_data=False):
109 """Retrieve a dataset
111 TODO: This will need sensible slicing options in the future
113 Args:
114 h5file (h5py.File): The h5 file instance
115 path (str): uri to the dataset
116 load_data (bool): Whether to load the data
118 Returns:
119 dataset (dict): Dict of the dataset
120 """
121 dataset = h5file[path]
123 data, dtype = _get_data(dataset, load_data)
124 return {
125 "type": "dataset",
126 "data": numpy.nan_to_num(data, posinf=1e200, neginf=-1e200),
127 "attrs": {attr: dataset.attrs[attr] for attr in dataset.attrs},
128 "shape": dataset.shape,
129 "size": dataset.size,
130 "ndim": dataset.ndim,
131 "dtype": dtype,
132 "name": os.path.basename(dataset.name),
133 }
135 def _get_groups_and_attrs(
136 h5file: h5py.File, path: str = "/", load_data: bool = False
137 ):
138 """Retrieve a group
140 Args:
141 h5file: The h5 file instance
142 path: uri to the dataset
143 load_data: Whether to load the data
145 Returns:
146 group (dict): Dict of the group
147 """
148 if path not in h5file:
149 logger.error("Hdf5 path '%s' not found", path)
150 return None
151 group = h5file[path]
152 if not isinstance(group, h5py.Group):
153 logger.error("Hdf5 path '%s' is not a group", path)
154 return None
156 children = {}
157 for node in group:
158 node_path = f"{path}/{node}"
159 if isinstance(h5file[node_path], h5py.Dataset):
160 child = _get_dataset(h5file, node_path, load_data=load_data)
161 elif isinstance(h5file[node_path], h5py.Group):
162 child = _get_groups_and_attrs(h5file, node_path, load_data=load_data)
163 else:
164 raise TypeError(f"Unsupported entity at {node_path}")
166 children[child["name"]] = child
168 return {
169 "type": "group",
170 "children": children,
171 "name": os.path.basename(group.name),
172 "uri": group.name,
173 "attrs": {**group.attrs},
174 }
176 try:
177 os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"
178 with h5py.File(filename, mode="r") as h5file:
179 content = _get_groups_and_attrs(h5file, path=path, load_data=load_data)
180 if path is None and content:
181 content["file"] = os.path.basename(filename)
182 return make_json_safe(content)
183 except OSError:
184 logger.error(f"No such file {filename}")
185 return None
188class Hdf5(Component):
189 """Generic HDF5 Component
191 A component that can read hdf5 files and return json slices
192 of data.
194 Currently can get files from an
195 autoprocprogramid (first rank file)
196 autoprocprogramattachmentid
197 datacollectionid
199 May have other sources in future
200 """
202 def setup(self, *args, **kwargs):
203 self.register_route(ContentsResource, "")
204 self.register_route(GroupResource, "/groups/<path:path>")
206 def _file_from_app(self, autoprocprogramid):
207 appas = self._metadata.get_autoprocprogram_attachments(
208 autoprocprogramid=autoprocprogramid
209 )
211 rank = 9999
212 minr = None
213 for app in appas["rows"]:
214 app_rank = app["rank"]
215 if app_rank is None or app_rank < rank:
216 ext = os.path.splitext(app["filename"])[1][1:].strip().lower()
217 if app["filetype"] == "Result" and ext in ["h5", "hdf5", "nxs"]:
218 if app_rank is not None:
219 rank = app_rank
220 minr = app
222 if minr:
223 return os.path.join(minr["filepath"], minr["filename"])
225 def _get_file(
226 self,
227 datacollectionid=None,
228 autoprocprogramattachmentid=None,
229 autoprocprogramid=None,
230 type=None,
231 **kwargs,
232 ):
233 """Find the file relevant for the request"""
235 # From autoprocprogramid => lowest rank
236 if autoprocprogramid is not None:
237 return self._file_from_app(autoprocprogramid)
239 # Directly from autoprocprogramattachmentid
240 elif autoprocprogramattachmentid is not None:
241 appa = self._metadata.get_autoprocprogram_attachments(
242 autoprocprogramattachmentid=autoprocprogramattachmentid
243 )
244 if appa:
245 ext = os.path.splitext(appa["filename"])[1][1:].strip().lower()
246 if appa["filetype"] == "Result" and ext in ["h5", "hdf5"]:
247 return appa["filefullpath"]
249 # From datacollectionid, taking latest related autoprocprogram and lowest
250 # rank attachment
251 elif datacollectionid is not None and type == "processing":
252 apps = self._metadata.get_autoprocprograms(
253 datacollectionid=datacollectionid
254 )
255 logger.debug("Result: %s", apps)
256 if apps["total"]:
257 autoprocprogramid = apps["rows"][-1]["autoprocprogramid"]
258 return self._file_from_app(autoprocprogramid)
260 # Direct datacollection hdf5
261 elif datacollectionid is not None:
262 dc = self._metadata.get_datacollections(datacollectionid=datacollectionid)
263 if dc:
264 return os.path.join(dc["imagedirectory"], dc["filetemplate"])
266 def get_hdf5(self, **kwargs):
267 file = self._get_file(**kwargs)
268 if not file:
269 return None
270 return hdf5_to_dict(file, load_data=False)
272 def get_group(self, path, **kwargs):
273 file = self._get_file(**kwargs)
274 if not file:
275 return None
276 return hdf5_to_dict(file, path, load_data=True)