Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/h5grove.py: 67%

106 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-15 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3import os 

4import logging 

5 

6os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE" 

7import h5py 

8from marshmallow import fields, validate 

9 

10from daiquiri.core import marshal 

11from daiquiri.core.components import Component, ComponentResource 

12from daiquiri.core.schema import ErrorSchema 

13from daiquiri.core.schema.components.h5grove import MetaSchema, ValueField 

14from daiquiri.core.utils import worker 

15from daiquiri.core.responses import gzipped 

16 

17from h5grove import encode, create_content 

18from h5grove.content import ResolvedEntityContent, DatasetContent 

19from h5grove.utils import PathError 

20from flask import Response 

21 

22logger = logging.getLogger(__name__) 

23 

24 

25query_params = { 

26 "autoprocprogramid": fields.Int( 

27 metadata={"description": "Auto processing program id"} 

28 ), 

29 "autoprocprogramattachmentid": fields.Int( 

30 metadata={"description": "Auto processing program attachment id"} 

31 ), 

32 "datacollectionid": fields.Int(metadata={"description": "Data collection id"}), 

33 "sampleactionid": fields.Int(metadata={"description": "Sample action id"}), 

34 "type": fields.String(metadata={"enum": ["processing"]}), 

35 "path": fields.String(load_default="/"), 

36} 

37 

38error_responses = [ 

39 [400, ErrorSchema(), "Could not find HDF5 file"], 

40 [400, ErrorSchema(), "Could not resolve HDF5 entity"], 

41] 

42 

43 

44def make_encoded_response(content, format_arg="json") -> Response: 

45 """Prepare flask Response according to format""" 

46 h5grove_response = encode(content, format_arg) 

47 

48 # Let h5grove deal with non-standard format 

49 if format_arg in ["tiff", "npy", "csv"]: 

50 response = Response(h5grove_response.content) 

51 response.headers.update(h5grove_response.headers) 

52 return response 

53 

54 return worker(lambda: gzipped(h5grove_response.content)) 

55 

56 

57class H5GroveResource(ComponentResource): 

58 def get(self, **kwargs): 

59 

60 filename = self._parent._get_file(**kwargs) 

61 

62 if not filename or not os.path.isfile(filename): 

63 return {"error": "Could not find hdf5 file"}, 400 

64 

65 with h5py.File(filename, mode="r") as h5file: 

66 try: 

67 content = self._get_content(h5file, **kwargs) 

68 except PathError as e: 

69 return {"error": f"Could not resolve hdf5 entity: {e}"}, 404 

70 

71 return self._encode_response(content, **kwargs) 

72 

73 def _get_content(self, h5file, **kwargs): 

74 raise NotImplementedError() 

75 

76 def _encode_response(self, content, **kwargs): 

77 return make_encoded_response(content) 

78 

79 

80class AttrResource(H5GroveResource): 

81 @marshal( 

82 inp={ 

83 **query_params, 

84 "attr_keys": fields.List(fields.String(), load_default=None), 

85 }, 

86 out=[ 

87 [200, fields.Dict(), "Get dict of attributes of HDF5 entity"], 

88 *error_responses, 

89 ], 

90 ) 

91 def get(self, **kwargs): 

92 return super().get(**kwargs) 

93 

94 def _get_content(self, h5file, **kwargs): 

95 content = create_content(h5file, kwargs["path"]) 

96 assert isinstance(content, ResolvedEntityContent) # nosec 

97 return content.attributes(kwargs["attr_keys"]) 

98 

99 

100class DataResource(H5GroveResource): 

101 @marshal( 

102 inp={ 

103 **query_params, 

104 "selection": fields.String(load_default=None), 

105 "format": fields.String( 

106 validate=validate.OneOf(("json", "bin", "csv", "npy", "tiff")), 

107 load_default="json", 

108 ), 

109 "flatten": fields.Boolean(load_default=False), 

110 "dtype": fields.String( 

111 validate=validate.OneOf(("origin", "safe")), load_default="origin" 

112 ), 

113 }, 

114 out=[[200, ValueField(), "Get data of a HDF5 dataset"], *error_responses], 

115 ) 

116 def get(self, **kwargs): 

117 return super().get(**kwargs) 

118 

119 def _encode_response(self, content, **kwargs): 

120 return make_encoded_response(content, kwargs["format"]) 

121 

122 def _get_content(self, h5file, **kwargs): 

123 content = create_content(h5file, kwargs["path"]) 

124 assert isinstance(content, DatasetContent) # nosec 

125 return content.data(kwargs["selection"], kwargs["flatten"], kwargs["dtype"]) 

126 

127 

128class MetaResource(H5GroveResource): 

129 @marshal( 

130 inp=query_params, 

131 out=[ 

132 [200, MetaSchema(), "Get metadata of HDF5 entity"], 

133 *error_responses, 

134 ], 

135 ) 

136 def get(self, **kwargs): 

137 return super().get(**kwargs) 

138 

139 def _get_content(self, h5file, **kwargs): 

140 return create_content(h5file, kwargs["path"]).metadata() 

141 

142 

143class H5Grove(Component): 

144 """Generic HDF5 Component 

145 

146 A component that can read hdf5 files and return json slices 

147 of data. 

148 

149 Currently can get files from an 

150 autoprocprogramid (first rank file) 

151 autoprocprogramattachmentid 

152 datacollectionid 

153 

154 May have other sources in future 

155 """ 

156 

157 def setup(self, *args, **kwargs): 

158 self.register_route(AttrResource, "/attr/") 

159 self.register_route(DataResource, "/data/") 

160 self.register_route(MetaResource, "/meta/") 

161 

162 def _file_from_app(self, autoprocprogramid): 

163 appas = self._metadata.get_autoprocprogram_attachments( 

164 autoprocprogramid=autoprocprogramid 

165 ) 

166 

167 rank = 9999 

168 minr = None 

169 for app in appas["rows"]: 

170 app_rank = app["rank"] 

171 if app_rank is None or app_rank < rank: 

172 ext = os.path.splitext(app["filename"])[1][1:].strip().lower() 

173 if app["filetype"] == "Result" and ext in ["h5", "hdf5", "nxs"]: 

174 if app_rank is not None: 

175 rank = app_rank 

176 minr = app 

177 

178 if minr: 

179 return os.path.join(minr["filepath"], minr["filename"]) 

180 

181 def _get_file( 

182 self, 

183 datacollectionid=None, 

184 autoprocprogramattachmentid=None, 

185 autoprocprogramid=None, 

186 sampleactionid=None, 

187 type=None, 

188 **kwargs, 

189 ): 

190 """Find the file relevant for the request""" 

191 

192 # From autoprocprogramid => lowest rank 

193 if autoprocprogramid is not None: 

194 return self._file_from_app(autoprocprogramid) 

195 

196 #  Directly from autoprocprogramattachmentid 

197 elif autoprocprogramattachmentid is not None: 

198 appa = self._metadata.get_autoprocprogram_attachments( 

199 autoprocprogramattachmentid=autoprocprogramattachmentid 

200 ) 

201 if appa: 

202 ext = os.path.splitext(appa["filename"])[1][1:].strip().lower() 

203 if appa["filetype"] == "Result" and ext in ["h5", "hdf5"]: 

204 return appa["filefullpath"] 

205 

206 # From datacollectionid, taking latest related autoprocprogram and lowest 

207 # rank attachment 

208 elif datacollectionid is not None and type == "processing": 

209 apps = self._metadata.get_autoprocprograms( 

210 datacollectionid=datacollectionid 

211 ) 

212 logger.debug("Result: %s", apps) 

213 if apps["total"]: 

214 autoprocprogramid = apps["rows"][-1]["autoprocprogramid"] 

215 return self._file_from_app(autoprocprogramid) 

216 

217 # Direct datacollection hdf5 

218 elif datacollectionid is not None: 

219 dc = self._metadata.get_datacollections(datacollectionid=datacollectionid) 

220 if dc: 

221 return os.path.join(dc["imagedirectory"], dc["filetemplate"]) 

222 

223 # From a sample action 

224 elif sampleactionid is not None: 

225 sampleaction = self._metadata.get_sampleactions( 

226 sampleactionid=sampleactionid 

227 ) 

228 if sampleaction: 

229 return sampleaction["resultfilepath"]