Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/filebrowser.py: 0%

327 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3import os 

4import collections 

5import math 

6 

7from abc import abstractmethod 

8from typing import Dict, List, Optional, OrderedDict, Tuple, Type, Union 

9import gevent 

10import time 

11 

12import numpy as np 

13import h5py 

14 

15import h5grove 

16from h5grove.content import DatasetContent 

17from h5grove.utils import get_array_stats 

18 

19from flask import Response 

20from marshmallow import Schema, fields 

21 

22from fabio.cbfimage import CbfImage 

23 

24from daiquiri.core import marshal 

25from daiquiri.core.components import Component, ComponentResource 

26from daiquiri.core.responses import gzipped 

27from daiquiri.core.schema import ErrorSchema 

28from daiquiri.core.utils import worker 

29from daiquiri.resources.utils import YamlDict 

30 

31 

32import logging 

33 

34CacheEntry = Tuple[np.ndarray, dict] 

35 

36 

37logger = logging.getLogger(__name__) 

38 

39 

40class NodeSchema(Schema): 

41 """Node Schema""" 

42 

43 path = fields.Str() 

44 name = fields.Str() 

45 type = fields.Str() 

46 ext = fields.Str() 

47 hdr = fields.Dict() 

48 last_modified = fields.Float() 

49 

50 

51class FileBrowserDirectorySchema(Schema): 

52 """File Schema""" 

53 

54 abs_path = fields.Str() 

55 rows = fields.Nested(NodeSchema, many=True) 

56 total = fields.Int() 

57 

58 

59class FileBrowserDirectoryResource(ComponentResource): 

60 @marshal( 

61 inp={"path": fields.Str()}, 

62 out=[ 

63 [ 

64 200, 

65 FileBrowserDirectorySchema, 

66 "Directory listing for current resources", 

67 ] 

68 ], 

69 ) 

70 def get(self, path: str, **kwargs): 

71 """Get the directory listing""" 

72 return self._parent.get_directory(path, **kwargs) 

73 

74 

75class LoadFileResource(ComponentResource): 

76 @marshal( 

77 inp={"path": fields.Str()}, 

78 out=[ 

79 [200, NodeSchema(), "Read file contents"], 

80 [400, ErrorSchema(), "Could not read file contents"], 

81 ], 

82 ) 

83 def get(self, path: str, **kwargs): 

84 """Get a file""" 

85 file = self._parent.fb_load_file(path, **kwargs) 

86 

87 if file is None: 

88 return {"message": "Could not read file"}, 400 

89 return file 

90 

91 

92class FileResource(ComponentResource): 

93 @marshal( 

94 inp={"path": fields.Str()}, 

95 out=[[400, ErrorSchema(), "Could not read file contents"]], 

96 ) 

97 def get(self, path: str, **kwargs) -> Union[Response, Tuple[dict, int]]: 

98 """Get a file""" 

99 file = self._parent.get_file_content(path, **kwargs) 

100 if file is None: 

101 return {"message": "Could not read file"}, 400 

102 

103 def gzip(): 

104 return gzipped(file) 

105 

106 return worker(gzip) 

107 

108 

109class HistResource(ComponentResource): 

110 @marshal( 

111 inp={"path": fields.Str()}, 

112 out=[[400, ErrorSchema(), "Could not read hist data"]], 

113 ) 

114 def get(self, path: str, **kwargs) -> Union[Response, Tuple[dict, int]]: 

115 data = self._parent.get_histogram_data(path, **kwargs) 

116 if data is None: 

117 return {"message": "Could not read hist data"}, 400 

118 

119 def gzip(): 

120 return gzipped(data) 

121 

122 return worker(gzip) 

123 

124 

125class HistMetaResource(ComponentResource): 

126 @marshal( 

127 inp={"path": fields.Str()}, 

128 out=[[400, ErrorSchema(), "Could not read file contents"]], 

129 ) 

130 def get(self, path: str, **kwargs) -> Union[Response, Tuple[dict, int]]: 

131 metadata = self._parent.get_histogram_meta(path, **kwargs) 

132 if metadata is None: 

133 return {"message": "Could not read hist metadata"}, 400 

134 

135 return metadata 

136 

137 

138class Filebrowser(Component): 

139 _base_url = "filebrowser" 

140 _require_blsession = False 

141 _CACHE_LIMIT = 100 

142 

143 def setup(self): 

144 assert isinstance(self._config, YamlDict) # nosec 

145 self.register_route(FileBrowserDirectoryResource, "/directory") 

146 self.register_route(LoadFileResource, "/loadfile") 

147 self.register_route(FileResource, "/file") 

148 self.register_route(HistMetaResource, "/loadhist") 

149 self.register_route(HistResource, "/hist") 

150 self._root_path: str = os.path.normpath(self._config.get("root", "/")) 

151 self._file_types: List[str] = self._config.get("file_types", []) 

152 self._show_hidden: bool = self._config.get("show_hidden", False) 

153 self._container_file_types: List[str] = self._config.get( 

154 "container_file_types", [] 

155 ) 

156 self._file_cache: OrderedDict[str, CacheEntry] = collections.OrderedDict() 

157 self._container_handlers: Dict[str, ContainerHandler] = { 

158 **{ 

159 ext: FSContainerHandler( 

160 self._root_path, self._file_types, self._show_hidden 

161 ) 

162 for ext in FSContainerHandler.EXT 

163 }, 

164 **{ 

165 ext: HDF5ContainerHandler( 

166 self._root_path, self._file_types, self._show_hidden 

167 ) 

168 for ext in HDF5ContainerHandler.EXT 

169 }, 

170 } 

171 self._format_handlers: Dict[str, Type[FormatHandler]] = { 

172 **{ext: HDF5FormatHandler for ext in HDF5FormatHandler.EXT}, 

173 **{ext: CBFFormatHandler for ext in CBFFormatHandler.EXT}, 

174 } 

175 

176 self.current_folder_watcher = gevent.spawn( 

177 self.watch_file_creation, self._root_path 

178 ) 

179 

180 def watch_file_creation(self, folder): 

181 ext = get_file_ext(folder) 

182 

183 # Disable watching for HDF5 "folders" 

184 if ext in HDF5ContainerHandler.EXT: 

185 return 

186 

187 old_listing = {f: None for f in os.listdir(folder)} 

188 while True: 

189 time.sleep(2) 

190 new_listing = {f: None for f in os.listdir(folder)} 

191 new_files = [f for f in new_listing if f not in old_listing] 

192 for f in new_files: 

193 self.emit( 

194 "new_file_in_dir", 

195 {"dir_path": os.path.relpath(folder, self._root_path), "file": f}, 

196 ) 

197 old_listing = new_listing 

198 

199 def _add_to_cache(self, path: str, data: CacheEntry): 

200 if len(self._file_cache) > self._CACHE_LIMIT: 

201 self._file_cache.popitem(last=False) 

202 

203 self._file_cache[path] = data 

204 

205 def _get_root_path(self, path: str) -> str: 

206 abs_path = os.path.normpath(os.path.join(self._root_path, path)) 

207 

208 # Restrict listing to root folder 

209 if ( 

210 os.path.commonpath([self._root_path, abs_path]) != self._root_path 

211 or self._root_path == abs_path 

212 ): 

213 abs_path = self._root_path 

214 

215 return abs_path 

216 

217 def get_directory( 

218 self, path: str 

219 ) -> Optional[Dict[str, Union[int, list, str, float]]]: 

220 abs_path = self._get_root_path(path) 

221 

222 if not os.path.exists(abs_path): 

223 logger.warning(f"Path {abs_path} does not exist") 

224 return None 

225 

226 raw_ext = get_file_ext(abs_path) 

227 

228 # Default to file system listing if no container handler is found 

229 ext = raw_ext if raw_ext != "" else "FS" 

230 

231 nodes = ( 

232 self._container_handlers[ext].list_entry(abs_path) 

233 if ext in self._container_handlers 

234 else [] 

235 ) 

236 

237 self.current_folder_watcher.kill() 

238 self.current_folder_watcher = gevent.spawn(self.watch_file_creation, abs_path) 

239 

240 return {"total": len(nodes), "rows": nodes, "abs_path": abs_path} 

241 

242 def _make_path_absolute(self, path: str) -> str: 

243 path = os.path.normpath(path) 

244 abs_path = os.path.normpath(os.path.join(self._root_path, path)) 

245 

246 if ( 

247 os.path.commonpath([self._root_path, abs_path]) != self._root_path 

248 or self._root_path == abs_path 

249 ): 

250 raise IOError(f"Can't access {abs_path}") 

251 

252 return abs_path 

253 

254 def fb_load_file(self, path: str) -> Optional[dict]: 

255 try: 

256 abs_path = self._make_path_absolute(path) 

257 except IOError as e: 

258 logging.warning(e) 

259 return None 

260 

261 ext = get_file_ext(abs_path) 

262 

263 if abs_path not in self._file_cache: 

264 try: 

265 format_handler = self._format_handlers[ext] 

266 except KeyError: 

267 logger.warning(f"File empty or format not handled {abs_path}") 

268 return 

269 

270 try: 

271 hdr, data, _ = format_handler.preload(abs_path) 

272 except RuntimeError: 

273 logger.warning(f"File {abs_path} is not a file") 

274 return 

275 

276 self._add_to_cache(abs_path, (data, hdr)) 

277 else: 

278 data, hdr = self._file_cache[abs_path] 

279 

280 return { 

281 "path": abs_path, 

282 "name": os.path.basename(abs_path), 

283 "type": "file", 

284 "ext": ext, 

285 "hdr": hdr, 

286 "last_modified": os.path.getmtime( 

287 abs_path if os.path.isfile(abs_path) else os.path.dirname(abs_path) 

288 ), # File can be a h5py dataset. In this case, take last modified data of the parent file, 

289 } 

290 

291 def get_file_content(self, path: str) -> np.ndarray: 

292 abs_path = self._make_path_absolute(path) 

293 

294 try: 

295 data, _ = self._file_cache[abs_path] 

296 return data 

297 except KeyError: 

298 logger.warning(f"File {abs_path} not in cache (not loaded)") 

299 # Load file in cache and retry 

300 self.fb_load_file(path) 

301 return self.get_file_content(path) 

302 

303 def retrieve_histogram(self, path: str) -> CacheEntry: 

304 abs_path = self._make_path_absolute(path) 

305 ext = get_file_ext(abs_path) 

306 

307 cache_key = abs_path + "/hist" 

308 if cache_key not in self._file_cache: 

309 format_handler = self._format_handlers[ext] 

310 _, data, _ = format_handler.preload(abs_path) 

311 hist, bins = _compute_histogram(data) 

312 

313 hist_meta = { 

314 "bins": bins, 

315 "shape": hist.shape, 

316 "max": np.max(hist).item() if hist.size > 0 else 0, 

317 } 

318 self._add_to_cache(cache_key, (hist.astype(np.float32), hist_meta)) 

319 

320 return self._file_cache[cache_key] 

321 

322 def get_histogram_data(self, path: str) -> np.ndarray: 

323 hist_data, _ = self.retrieve_histogram(path) 

324 

325 return hist_data 

326 

327 def get_histogram_meta(self, path: str) -> dict: 

328 _, hist_meta = self.retrieve_histogram(path) 

329 

330 return hist_meta 

331 

332 

333class ContainerHandler: 

334 EXT: List[str] 

335 

336 def __init__(self, root_path="", file_types="", show_hidden=False): 

337 self._root_path = root_path 

338 self._show_hidden = show_hidden 

339 self._file_types = file_types 

340 

341 @abstractmethod 

342 def list_entry(self, path: str) -> List[dict]: 

343 raise NotImplementedError 

344 

345 

346class HDF5ContainerHandler(ContainerHandler): 

347 EXT = ["hdf5", "h5"] 

348 

349 def list_entry(self, path) -> List[dict]: 

350 ext = get_file_ext(path) 

351 path_relative_to_root = os.path.relpath(path, self._root_path) 

352 

353 with h5py.File(path, "r") as h5file: 

354 if "/entry/data/data" not in h5file: 

355 return [] 

356 

357 data_attrs = _get_dataset_attr(h5file, "/entry/data/data") 

358 image_nr_low = data_attrs["image_nr_low"] 

359 assert isinstance(image_nr_low, np.generic) # nosec 

360 

361 image_nr_high = data_attrs["image_nr_high"] 

362 assert isinstance(image_nr_high, np.generic) # nosec 

363 

364 return [ 

365 { 

366 "path": os.path.normpath( 

367 os.path.join(path_relative_to_root, f"image_{n}.{ext}.dataset") 

368 ), 

369 "name": f"image_{n}.{ext}.dataset", 

370 "type": "file", 

371 "ext": ext, 

372 # Add n to the date of the parent file to simulate the last modification date 

373 "last_modified": os.path.getmtime(path) + n, 

374 } 

375 for n in range(image_nr_low.item(), image_nr_high.item() + 1) 

376 ] 

377 

378 

379class FSContainerHandler(ContainerHandler): 

380 EXT = ["FS"] 

381 

382 def list_entry(self, path: str) -> List[dict]: 

383 nodes = [] 

384 path_relative_to_root = os.path.relpath(path, self._root_path) 

385 

386 with os.scandir(path) as dir_entries: 

387 for entry in dir_entries: 

388 if not self._show_hidden and entry.name.startswith("."): 

389 continue 

390 

391 ext = get_file_ext(entry.name) if entry.is_file() else "" 

392 if entry.is_file() and ext not in self._file_types: 

393 continue 

394 

395 nodes.append( 

396 { 

397 "path": os.path.normpath( 

398 os.path.join(path_relative_to_root, entry.name) 

399 ), 

400 "name": entry.name, 

401 "type": "file" 

402 if entry.is_file() and ext not in ["h5", "hdf5"] 

403 else "dir", 

404 "ext": ext, 

405 "last_modified": os.path.getmtime(entry), 

406 } 

407 ) 

408 

409 return nodes 

410 

411 

412class FormatHandler: 

413 EXT: List[str] 

414 

415 def __init__(self): 

416 pass 

417 

418 @staticmethod 

419 @abstractmethod 

420 def preload(path: str) -> Tuple[dict, np.ndarray, bytes]: 

421 raise NotImplementedError() 

422 

423 

424class CBFFormatHandler(FormatHandler): 

425 EXT = ["cbf"] 

426 

427 @staticmethod 

428 def preload(path: str) -> Tuple[dict, np.ndarray, bytes]: 

429 if not os.path.isfile(path): 

430 raise RuntimeError("Files does not exist") 

431 

432 cbf_image = CbfImage(fname=path) 

433 float_data = cbf_image.data.astype(np.float32) 

434 

435 preview_data = CBFFormatHandler._8bit_raw_repr(cbf_image) 

436 

437 parsed_ext_hdr, braggy_hdr = CBFFormatHandler._parse_header( 

438 cbf_image, float_data 

439 ) 

440 

441 img_hdr = {} 

442 img_hdr["parsed_ext_hdr"] = parsed_ext_hdr 

443 img_hdr["braggy_hdr"] = braggy_hdr 

444 

445 return img_hdr, float_data.flatten(), preview_data 

446 

447 @staticmethod 

448 def _8bit_raw_repr(raw_data: CbfImage) -> bytes: 

449 data = raw_data.data.clip(0) 

450 data = data.astype(np.uint8) 

451 

452 return data.tobytes() 

453 

454 @staticmethod 

455 def _parse_header(cbf_image: CbfImage, np_array: np.ndarray) -> Tuple[dict, dict]: 

456 height, width = cbf_image.shape 

457 

458 hdr = cbf_image.header 

459 parsed_ext_hdr = {} 

460 braggy_hdr = {} 

461 

462 _ext_hdr = hdr.get("_array_data.header_contents", "").split("\r\n") 

463 for data in _ext_hdr: 

464 # Ignore empty lines coming from multiple line-breaks 

465 if data == "": 

466 continue 

467 

468 key_value = data.strip("#").strip().split() 

469 

470 key = key_value[0].strip(":").strip() 

471 value = " ".join(key_value[1:]) 

472 parsed_ext_hdr[key] = value 

473 try: 

474 w = float(parsed_ext_hdr.get("Wavelength", "0").strip("A ")) 

475 d = float(parsed_ext_hdr.get("Detector_distance", "0").strip("m ")) 

476 

477 bcx, bcy = parsed_ext_hdr["Beam_xy"].split(",") 

478 bcx, bcy = float(bcx.strip("pixels() ")), float(bcy.strip("pixels() ")) 

479 

480 px_size_x, px_size_y = parsed_ext_hdr.get("Pixel_size", "0").split("x") 

481 px_size_x, px_size_y = ( 

482 float(px_size_x.strip("m ")), 

483 float(px_size_y.strip("m ")), 

484 ) 

485 

486 dr = math.sqrt((px_size_x * width) ** 2 + (px_size_y * height) ** 2) / 2 

487 

488 # Remove invalid values (-1) 

489 clean_np_array = np_array[np_array >= 0] 

490 

491 braggy_hdr = { 

492 "wavelength": w, 

493 "detector_distance": d, 

494 "beam_cx": bcx, 

495 "beam_cy": bcy, 

496 "beam_ocx": (width / 2) - bcx, 

497 "beam_ocy": (height / 2) - bcy, 

498 "detector_radius": dr, 

499 "pixel_size_x": px_size_x, 

500 "pixel_size_y": px_size_y, 

501 "img_width": width, 

502 "img_height": height, 

503 "pxxpm": 1 / px_size_x, 

504 "pxypm": 1 / px_size_y, 

505 **get_array_stats( 

506 clean_np_array if clean_np_array.size > 0 else np_array 

507 ), 

508 } 

509 except (KeyError, IndexError): 

510 logging.info("Could not create Braggy header from CBF header") 

511 

512 return parsed_ext_hdr, braggy_hdr 

513 

514 

515class HDF5FormatHandler(FormatHandler): 

516 EXT = ["dataset"] 

517 

518 @staticmethod 

519 def preload(path: str) -> Tuple[dict, np.ndarray, bytes]: 

520 h5path, img_num = HDF5FormatHandler._interpret_path(path) 

521 

522 with h5py.File(h5path, "r") as h5file: 

523 image_nr_low = _get_dataset_attr(h5file, "/entry/data/data")["image_nr_low"] 

524 assert isinstance(image_nr_low, np.generic) # nosec 

525 

526 idx = img_num - image_nr_low.item() 

527 data = _get_dataset_data(h5file, "/entry/data/data", str(idx)) 

528 assert isinstance(data, np.ndarray) # nosec 

529 

530 np_array = data.astype(np.float32) 

531 preview_data = data.clip(0).astype(np.uint8).tobytes() 

532 

533 img_hdr = HDF5FormatHandler._get_hdr(path, np_array) 

534 

535 return img_hdr, np_array, preview_data 

536 

537 @staticmethod 

538 def _get_hdr(path: str, np_array: np.ndarray) -> Dict[str, dict]: 

539 _, ext = os.path.splitext(path.rstrip(".dataset")) 

540 prefix, _ = path.split("_data") 

541 mfpath = prefix + "_master" + ext 

542 

543 with h5py.File(mfpath, "r") as h5file: 

544 wavelength = _get_instrument_param(h5file, "beam/incident_wavelength") 

545 detector = _get_instrument_param(h5file, "detector/detector_distance") 

546 

547 pixel_size_x = _get_instrument_param(h5file, "detector/x_pixel_size") 

548 pixel_size_y = _get_instrument_param(h5file, "detector/y_pixel_size") 

549 width = _get_instrument_param( 

550 h5file, "detector/detectorSpecific/x_pixels_in_detector" 

551 ) 

552 height = _get_instrument_param( 

553 h5file, "detector/detectorSpecific/y_pixels_in_detector" 

554 ) 

555 

556 beam_cx = _get_instrument_param(h5file, "detector/beam_center_x") 

557 beam_cy = _get_instrument_param(h5file, "detector/beam_center_y") 

558 

559 # Remove invalid values (SATURATION VALUES) 

560 clean_np_array = np_array[np_array != np.max(np_array)] 

561 

562 braggy_hdr = { 

563 "wavelength": wavelength, 

564 "detector_distance": detector, 

565 "beam_cx": beam_cx, 

566 "beam_cy": beam_cy, 

567 "beam_ocx": (width / 2) - beam_cx, 

568 "beam_ocy": (height / 2) - beam_cy, 

569 "detector_radius": (width * pixel_size_x) / 2, 

570 "pixel_size_x": pixel_size_x, 

571 "pixel_size_y": pixel_size_y, 

572 "img_width": width, 

573 "img_height": height, 

574 "pxxpm": 1 / pixel_size_x, 

575 "pxypm": 1 / pixel_size_y, 

576 **get_array_stats(clean_np_array if clean_np_array.size > 0 else np_array), 

577 } 

578 

579 return {"braggy_hdr": braggy_hdr} 

580 

581 @staticmethod 

582 def _interpret_path(path: str) -> Tuple[str, int]: 

583 h5path, dataset_path = os.path.split(path) 

584 _, imgnum_suffix = dataset_path.rstrip(".dataset").split("image_") 

585 imgnum, _ = imgnum_suffix.split(".") 

586 

587 return h5path, int(imgnum) 

588 

589 

590def _get_dataset_attr(h5file: h5py.File, dset_path: str): 

591 dset_content = h5grove.create_content(h5file, dset_path) 

592 assert isinstance(dset_content, DatasetContent) # nosec 

593 return dset_content.attributes() 

594 

595 

596def _get_dataset_data( 

597 h5file: h5py.File, dset_path: str, selection: Optional[str] = None 

598): 

599 dset_content = h5grove.create_content(h5file, dset_path) 

600 assert isinstance(dset_content, DatasetContent) # nosec 

601 return dset_content.data(selection) 

602 

603 

604def _get_instrument_param(h5file: h5py.File, param_path: str): 

605 data = _get_dataset_data(h5file, f"/entry/instrument/{param_path}") 

606 assert isinstance(data, np.generic) # nosec 

607 return data.item() 

608 

609 

610def _compute_histogram(data: np.ndarray) -> Tuple[np.ndarray, list]: 

611 std = 3 * np.std(data) 

612 mean = np.mean(data) 

613 clean_data = data[data < mean + std] 

614 

615 if clean_data.size == 0: 

616 return np.ndarray([]), [] 

617 

618 hist, bins = np.histogram( 

619 clean_data.flatten(), 

620 bins=np.arange(np.min(clean_data), np.max(clean_data), 1) 

621 if np.max(clean_data) <= 300 

622 else 300, 

623 ) 

624 return hist, bins.tolist() 

625 

626 

627def get_file_ext(file_name: str): 

628 _, ext = os.path.splitext(file_name) 

629 return ext[1:] # Remove leading dot