Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/hdf5.py: 50%

124 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-15 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3import os 

4import logging 

5import h5py 

6from marshmallow import fields 

7import numpy 

8 

9from daiquiri.core import marshal 

10from daiquiri.core.components import Component, ComponentResource 

11from daiquiri.core.schema import ErrorSchema 

12from daiquiri.core.schema.components.hdf5 import RootHdf5Schema, Hdf5Schema 

13from daiquiri.core.utils import make_json_safe, worker 

14from daiquiri.core.responses import gzipped 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19ids = { 

20 "autoprocprogramid": fields.Int( 

21 metadata={"description": "Auto processing program id"} 

22 ), 

23 "autoprocprogramattachmentid": fields.Int( 

24 metadata={"description": "Auto processing program attachment id"} 

25 ), 

26 "datacollectionid": fields.Int(metadata={"description": "Data collection id"}), 

27 "type": fields.Str(metadata={"enum": ["processing"]}), 

28} 

29 

30 

31class ContentsResource(ComponentResource): 

32 @marshal( 

33 inp=ids, 

34 out=[ 

35 [200, RootHdf5Schema(), "Get hdf5 file contents"], 

36 [400, ErrorSchema(), "Could not find hdf5 file"], 

37 ], 

38 ) 

39 def get(self, **kwargs): 

40 """Get the contents of an hdf5 file""" 

41 contents = self._parent.get_hdf5(**kwargs) 

42 if contents: 

43 

44 def gzip(): 

45 return gzipped(contents) 

46 

47 return worker(gzip) 

48 

49 return {"error": "Could not find hdf5 file"}, 400 

50 

51 

52class GroupResource(ComponentResource): 

53 @marshal( 

54 inp=ids, 

55 out=[ 

56 [200, Hdf5Schema(), "Get hdf5 group contents, including data"], 

57 [400, ErrorSchema(), "Could not find hdf5 group"], 

58 ], 

59 ) 

60 def get(self, path, **kwargs): 

61 """Get a group and its data from a hdf5 file""" 

62 group = self._parent.get_group(path, **kwargs) 

63 if group: 

64 

65 def gzip(): 

66 return gzipped(group) 

67 

68 return worker(gzip) 

69 

70 return {"error": "Could not find hdf5 group"}, 400 

71 

72 

73def hdf5_to_dict(filename: str, path: str = None, load_data: bool = False) -> dict: 

74 """ 

75 Dump a HDF5 into an arborecent python dict structure 

76 

77 Arguments: 

78 filename:Location of the HDF5 file 

79 path: Path of the HDF5 object to dump from the file 

80 load_data: If true datasets are dumped with the data, else the data is 

81 set to None. 

82 

83 Returns: 

84 None: If the file was not reachable, or the path not reachable. 

85 """ 

86 

87 def _get_data(dataset: h5py.Dataset, load_data: bool): 

88 dtype = dataset.dtype 

89 if dtype == "object": 

90 type_class = dataset.id.get_type().get_class() 

91 if type_class == h5py.h5t.STRING: 

92 dtype = "string" 

93 else: 

94 raise RuntimeError( 

95 "HDF5 dataset type %s unsupposed for now" % type_class 

96 ) 

97 

98 if not load_data: 

99 return None, dtype 

100 

101 # TODO: Optimise for big datasets 

102 if dtype == "string": 

103 data = dataset.asstr()[()] 

104 else: 

105 data = dataset[()] 

106 return data, dtype 

107 

108 def _get_dataset(h5file, path, load_data=False): 

109 """Retrieve a dataset 

110 

111 TODO: This will need sensible slicing options in the future 

112 

113 Args: 

114 h5file (h5py.File): The h5 file instance 

115 path (str): uri to the dataset 

116 load_data (bool): Whether to load the data 

117 

118 Returns: 

119 dataset (dict): Dict of the dataset 

120 """ 

121 dataset = h5file[path] 

122 

123 data, dtype = _get_data(dataset, load_data) 

124 return { 

125 "type": "dataset", 

126 "data": numpy.nan_to_num(data, posinf=1e200, neginf=-1e200), 

127 "attrs": {attr: dataset.attrs[attr] for attr in dataset.attrs}, 

128 "shape": dataset.shape, 

129 "size": dataset.size, 

130 "ndim": dataset.ndim, 

131 "dtype": dtype, 

132 "name": os.path.basename(dataset.name), 

133 } 

134 

135 def _get_groups_and_attrs( 

136 h5file: h5py.File, path: str = "/", load_data: bool = False 

137 ): 

138 """Retrieve a group 

139 

140 Args: 

141 h5file: The h5 file instance 

142 path: uri to the dataset 

143 load_data: Whether to load the data 

144 

145 Returns: 

146 group (dict): Dict of the group 

147 """ 

148 if path not in h5file: 

149 logger.error("Hdf5 path '%s' not found", path) 

150 return None 

151 group = h5file[path] 

152 if not isinstance(group, h5py.Group): 

153 logger.error("Hdf5 path '%s' is not a group", path) 

154 return None 

155 

156 children = {} 

157 for node in group: 

158 node_path = f"{path}/{node}" 

159 if isinstance(h5file[node_path], h5py.Dataset): 

160 child = _get_dataset(h5file, node_path, load_data=load_data) 

161 elif isinstance(h5file[node_path], h5py.Group): 

162 child = _get_groups_and_attrs(h5file, node_path, load_data=load_data) 

163 else: 

164 raise TypeError(f"Unsupported entity at {node_path}") 

165 

166 children[child["name"]] = child 

167 

168 return { 

169 "type": "group", 

170 "children": children, 

171 "name": os.path.basename(group.name), 

172 "uri": group.name, 

173 "attrs": {**group.attrs}, 

174 } 

175 

176 try: 

177 os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE" 

178 with h5py.File(filename, mode="r") as h5file: 

179 content = _get_groups_and_attrs(h5file, path=path, load_data=load_data) 

180 if path is None and content: 

181 content["file"] = os.path.basename(filename) 

182 return make_json_safe(content) 

183 except OSError: 

184 logger.error(f"No such file {filename}") 

185 return None 

186 

187 

188class Hdf5(Component): 

189 """Generic HDF5 Component 

190 

191 A component that can read hdf5 files and return json slices 

192 of data. 

193 

194 Currently can get files from an 

195 autoprocprogramid (first rank file) 

196 autoprocprogramattachmentid 

197 datacollectionid 

198 

199 May have other sources in future 

200 """ 

201 

202 def setup(self, *args, **kwargs): 

203 self.register_route(ContentsResource, "") 

204 self.register_route(GroupResource, "/groups/<path:path>") 

205 

206 def _file_from_app(self, autoprocprogramid): 

207 appas = self._metadata.get_autoprocprogram_attachments( 

208 autoprocprogramid=autoprocprogramid 

209 ) 

210 

211 rank = 9999 

212 minr = None 

213 for app in appas["rows"]: 

214 app_rank = app["rank"] 

215 if app_rank is None or app_rank < rank: 

216 ext = os.path.splitext(app["filename"])[1][1:].strip().lower() 

217 if app["filetype"] == "Result" and ext in ["h5", "hdf5", "nxs"]: 

218 if app_rank is not None: 

219 rank = app_rank 

220 minr = app 

221 

222 if minr: 

223 return os.path.join(minr["filepath"], minr["filename"]) 

224 

225 def _get_file( 

226 self, 

227 datacollectionid=None, 

228 autoprocprogramattachmentid=None, 

229 autoprocprogramid=None, 

230 type=None, 

231 **kwargs, 

232 ): 

233 """Find the file relevant for the request""" 

234 

235 # From autoprocprogramid => lowest rank 

236 if autoprocprogramid is not None: 

237 return self._file_from_app(autoprocprogramid) 

238 

239 #  Directly from autoprocprogramattachmentid 

240 elif autoprocprogramattachmentid is not None: 

241 appa = self._metadata.get_autoprocprogram_attachments( 

242 autoprocprogramattachmentid=autoprocprogramattachmentid 

243 ) 

244 if appa: 

245 ext = os.path.splitext(appa["filename"])[1][1:].strip().lower() 

246 if appa["filetype"] == "Result" and ext in ["h5", "hdf5"]: 

247 return appa["filefullpath"] 

248 

249 # From datacollectionid, taking latest related autoprocprogram and lowest 

250 # rank attachment 

251 elif datacollectionid is not None and type == "processing": 

252 apps = self._metadata.get_autoprocprograms( 

253 datacollectionid=datacollectionid 

254 ) 

255 logger.debug("Result: %s", apps) 

256 if apps["total"]: 

257 autoprocprogramid = apps["rows"][-1]["autoprocprogramid"] 

258 return self._file_from_app(autoprocprogramid) 

259 

260 # Direct datacollection hdf5 

261 elif datacollectionid is not None: 

262 dc = self._metadata.get_datacollections(datacollectionid=datacollectionid) 

263 if dc: 

264 return os.path.join(dc["imagedirectory"], dc["filetemplate"]) 

265 

266 def get_hdf5(self, **kwargs): 

267 file = self._get_file(**kwargs) 

268 if not file: 

269 return None 

270 return hdf5_to_dict(file, load_data=False) 

271 

272 def get_group(self, path, **kwargs): 

273 file = self._get_file(**kwargs) 

274 if not file: 

275 return None 

276 return hdf5_to_dict(file, path, load_data=True)