Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/h5grove.py: 67%
106 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import os
4import logging
6os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"
7import h5py
8from marshmallow import fields, validate
10from daiquiri.core import marshal
11from daiquiri.core.components import Component, ComponentResource
12from daiquiri.core.schema import ErrorSchema
13from daiquiri.core.schema.components.h5grove import MetaSchema, ValueField
14from daiquiri.core.utils import worker
15from daiquiri.core.responses import gzipped
17from h5grove import encode, create_content
18from h5grove.content import ResolvedEntityContent, DatasetContent
19from h5grove.utils import PathError
20from flask import Response
22logger = logging.getLogger(__name__)
25query_params = {
26 "autoprocprogramid": fields.Int(
27 metadata={"description": "Auto processing program id"}
28 ),
29 "autoprocprogramattachmentid": fields.Int(
30 metadata={"description": "Auto processing program attachment id"}
31 ),
32 "datacollectionid": fields.Int(metadata={"description": "Data collection id"}),
33 "sampleactionid": fields.Int(metadata={"description": "Sample action id"}),
34 "type": fields.String(metadata={"enum": ["processing"]}),
35 "path": fields.String(load_default="/"),
36}
38error_responses = [
39 [400, ErrorSchema(), "Could not find HDF5 file"],
40 [400, ErrorSchema(), "Could not resolve HDF5 entity"],
41]
44def make_encoded_response(content, format_arg="json") -> Response:
45 """Prepare flask Response according to format"""
46 h5grove_response = encode(content, format_arg)
48 # Let h5grove deal with non-standard format
49 if format_arg in ["tiff", "npy", "csv"]:
50 response = Response(h5grove_response.content)
51 response.headers.update(h5grove_response.headers)
52 return response
54 return worker(lambda: gzipped(h5grove_response.content))
57class H5GroveResource(ComponentResource):
58 def get(self, **kwargs):
60 filename = self._parent._get_file(**kwargs)
62 if not filename or not os.path.isfile(filename):
63 return {"error": "Could not find hdf5 file"}, 400
65 with h5py.File(filename, mode="r") as h5file:
66 try:
67 content = self._get_content(h5file, **kwargs)
68 except PathError as e:
69 return {"error": f"Could not resolve hdf5 entity: {e}"}, 404
71 return self._encode_response(content, **kwargs)
73 def _get_content(self, h5file, **kwargs):
74 raise NotImplementedError()
76 def _encode_response(self, content, **kwargs):
77 return make_encoded_response(content)
80class AttrResource(H5GroveResource):
81 @marshal(
82 inp={
83 **query_params,
84 "attr_keys": fields.List(fields.String(), load_default=None),
85 },
86 out=[
87 [200, fields.Dict(), "Get dict of attributes of HDF5 entity"],
88 *error_responses,
89 ],
90 )
91 def get(self, **kwargs):
92 return super().get(**kwargs)
94 def _get_content(self, h5file, **kwargs):
95 content = create_content(h5file, kwargs["path"])
96 assert isinstance(content, ResolvedEntityContent) # nosec
97 return content.attributes(kwargs["attr_keys"])
100class DataResource(H5GroveResource):
101 @marshal(
102 inp={
103 **query_params,
104 "selection": fields.String(load_default=None),
105 "format": fields.String(
106 validate=validate.OneOf(("json", "bin", "csv", "npy", "tiff")),
107 load_default="json",
108 ),
109 "flatten": fields.Boolean(load_default=False),
110 "dtype": fields.String(
111 validate=validate.OneOf(("origin", "safe")), load_default="origin"
112 ),
113 },
114 out=[[200, ValueField(), "Get data of a HDF5 dataset"], *error_responses],
115 )
116 def get(self, **kwargs):
117 return super().get(**kwargs)
119 def _encode_response(self, content, **kwargs):
120 return make_encoded_response(content, kwargs["format"])
122 def _get_content(self, h5file, **kwargs):
123 content = create_content(h5file, kwargs["path"])
124 assert isinstance(content, DatasetContent) # nosec
125 return content.data(kwargs["selection"], kwargs["flatten"], kwargs["dtype"])
128class MetaResource(H5GroveResource):
129 @marshal(
130 inp=query_params,
131 out=[
132 [200, MetaSchema(), "Get metadata of HDF5 entity"],
133 *error_responses,
134 ],
135 )
136 def get(self, **kwargs):
137 return super().get(**kwargs)
139 def _get_content(self, h5file, **kwargs):
140 return create_content(h5file, kwargs["path"]).metadata()
143class H5Grove(Component):
144 """Generic HDF5 Component
146 A component that can read hdf5 files and return json slices
147 of data.
149 Currently can get files from an
150 autoprocprogramid (first rank file)
151 autoprocprogramattachmentid
152 datacollectionid
154 May have other sources in future
155 """
157 def setup(self, *args, **kwargs):
158 self.register_route(AttrResource, "/attr/")
159 self.register_route(DataResource, "/data/")
160 self.register_route(MetaResource, "/meta/")
162 def _file_from_app(self, autoprocprogramid):
163 appas = self._metadata.get_autoprocprogram_attachments(
164 autoprocprogramid=autoprocprogramid
165 )
167 rank = 9999
168 minr = None
169 for app in appas["rows"]:
170 app_rank = app["rank"]
171 if app_rank is None or app_rank < rank:
172 ext = os.path.splitext(app["filename"])[1][1:].strip().lower()
173 if app["filetype"] == "Result" and ext in ["h5", "hdf5", "nxs"]:
174 if app_rank is not None:
175 rank = app_rank
176 minr = app
178 if minr:
179 return os.path.join(minr["filepath"], minr["filename"])
181 def _get_file(
182 self,
183 datacollectionid=None,
184 autoprocprogramattachmentid=None,
185 autoprocprogramid=None,
186 sampleactionid=None,
187 type=None,
188 **kwargs,
189 ):
190 """Find the file relevant for the request"""
192 # From autoprocprogramid => lowest rank
193 if autoprocprogramid is not None:
194 return self._file_from_app(autoprocprogramid)
196 # Directly from autoprocprogramattachmentid
197 elif autoprocprogramattachmentid is not None:
198 appa = self._metadata.get_autoprocprogram_attachments(
199 autoprocprogramattachmentid=autoprocprogramattachmentid
200 )
201 if appa:
202 ext = os.path.splitext(appa["filename"])[1][1:].strip().lower()
203 if appa["filetype"] == "Result" and ext in ["h5", "hdf5"]:
204 return appa["filefullpath"]
206 # From datacollectionid, taking latest related autoprocprogram and lowest
207 # rank attachment
208 elif datacollectionid is not None and type == "processing":
209 apps = self._metadata.get_autoprocprograms(
210 datacollectionid=datacollectionid
211 )
212 logger.debug("Result: %s", apps)
213 if apps["total"]:
214 autoprocprogramid = apps["rows"][-1]["autoprocprogramid"]
215 return self._file_from_app(autoprocprogramid)
217 # Direct datacollection hdf5
218 elif datacollectionid is not None:
219 dc = self._metadata.get_datacollections(datacollectionid=datacollectionid)
220 if dc:
221 return os.path.join(dc["imagedirectory"], dc["filetemplate"])
223 # From a sample action
224 elif sampleactionid is not None:
225 sampleaction = self._metadata.get_sampleactions(
226 sampleactionid=sampleactionid
227 )
228 if sampleaction:
229 return sampleaction["resultfilepath"]