Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/filebrowser.py: 0%
327 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import os
4import collections
5import math
7from abc import abstractmethod
8from typing import Dict, List, Optional, OrderedDict, Tuple, Type, Union
9import gevent
10import time
12import numpy as np
13import h5py
15import h5grove
16from h5grove.content import DatasetContent
17from h5grove.utils import get_array_stats
19from flask import Response
20from marshmallow import Schema, fields
22from fabio.cbfimage import CbfImage
24from daiquiri.core import marshal
25from daiquiri.core.components import Component, ComponentResource
26from daiquiri.core.responses import gzipped
27from daiquiri.core.schema import ErrorSchema
28from daiquiri.core.utils import worker
29from daiquiri.resources.utils import YamlDict
32import logging
34CacheEntry = Tuple[np.ndarray, dict]
37logger = logging.getLogger(__name__)
40class NodeSchema(Schema):
41 """Node Schema"""
43 path = fields.Str()
44 name = fields.Str()
45 type = fields.Str()
46 ext = fields.Str()
47 hdr = fields.Dict()
48 last_modified = fields.Float()
51class FileBrowserDirectorySchema(Schema):
52 """File Schema"""
54 abs_path = fields.Str()
55 rows = fields.Nested(NodeSchema, many=True)
56 total = fields.Int()
59class FileBrowserDirectoryResource(ComponentResource):
60 @marshal(
61 inp={"path": fields.Str()},
62 out=[
63 [
64 200,
65 FileBrowserDirectorySchema,
66 "Directory listing for current resources",
67 ]
68 ],
69 )
70 def get(self, path: str, **kwargs):
71 """Get the directory listing"""
72 return self._parent.get_directory(path, **kwargs)
75class LoadFileResource(ComponentResource):
76 @marshal(
77 inp={"path": fields.Str()},
78 out=[
79 [200, NodeSchema(), "Read file contents"],
80 [400, ErrorSchema(), "Could not read file contents"],
81 ],
82 )
83 def get(self, path: str, **kwargs):
84 """Get a file"""
85 file = self._parent.fb_load_file(path, **kwargs)
87 if file is None:
88 return {"message": "Could not read file"}, 400
89 return file
92class FileResource(ComponentResource):
93 @marshal(
94 inp={"path": fields.Str()},
95 out=[[400, ErrorSchema(), "Could not read file contents"]],
96 )
97 def get(self, path: str, **kwargs) -> Union[Response, Tuple[dict, int]]:
98 """Get a file"""
99 file = self._parent.get_file_content(path, **kwargs)
100 if file is None:
101 return {"message": "Could not read file"}, 400
103 def gzip():
104 return gzipped(file)
106 return worker(gzip)
109class HistResource(ComponentResource):
110 @marshal(
111 inp={"path": fields.Str()},
112 out=[[400, ErrorSchema(), "Could not read hist data"]],
113 )
114 def get(self, path: str, **kwargs) -> Union[Response, Tuple[dict, int]]:
115 data = self._parent.get_histogram_data(path, **kwargs)
116 if data is None:
117 return {"message": "Could not read hist data"}, 400
119 def gzip():
120 return gzipped(data)
122 return worker(gzip)
125class HistMetaResource(ComponentResource):
126 @marshal(
127 inp={"path": fields.Str()},
128 out=[[400, ErrorSchema(), "Could not read file contents"]],
129 )
130 def get(self, path: str, **kwargs) -> Union[Response, Tuple[dict, int]]:
131 metadata = self._parent.get_histogram_meta(path, **kwargs)
132 if metadata is None:
133 return {"message": "Could not read hist metadata"}, 400
135 return metadata
138class Filebrowser(Component):
139 _base_url = "filebrowser"
140 _require_blsession = False
141 _CACHE_LIMIT = 100
143 def setup(self):
144 assert isinstance(self._config, YamlDict) # nosec
145 self.register_route(FileBrowserDirectoryResource, "/directory")
146 self.register_route(LoadFileResource, "/loadfile")
147 self.register_route(FileResource, "/file")
148 self.register_route(HistMetaResource, "/loadhist")
149 self.register_route(HistResource, "/hist")
150 self._root_path: str = os.path.normpath(self._config.get("root", "/"))
151 self._file_types: List[str] = self._config.get("file_types", [])
152 self._show_hidden: bool = self._config.get("show_hidden", False)
153 self._container_file_types: List[str] = self._config.get(
154 "container_file_types", []
155 )
156 self._file_cache: OrderedDict[str, CacheEntry] = collections.OrderedDict()
157 self._container_handlers: Dict[str, ContainerHandler] = {
158 **{
159 ext: FSContainerHandler(
160 self._root_path, self._file_types, self._show_hidden
161 )
162 for ext in FSContainerHandler.EXT
163 },
164 **{
165 ext: HDF5ContainerHandler(
166 self._root_path, self._file_types, self._show_hidden
167 )
168 for ext in HDF5ContainerHandler.EXT
169 },
170 }
171 self._format_handlers: Dict[str, Type[FormatHandler]] = {
172 **{ext: HDF5FormatHandler for ext in HDF5FormatHandler.EXT},
173 **{ext: CBFFormatHandler for ext in CBFFormatHandler.EXT},
174 }
176 self.current_folder_watcher = gevent.spawn(
177 self.watch_file_creation, self._root_path
178 )
180 def watch_file_creation(self, folder):
181 ext = get_file_ext(folder)
183 # Disable watching for HDF5 "folders"
184 if ext in HDF5ContainerHandler.EXT:
185 return
187 old_listing = {f: None for f in os.listdir(folder)}
188 while True:
189 time.sleep(2)
190 new_listing = {f: None for f in os.listdir(folder)}
191 new_files = [f for f in new_listing if f not in old_listing]
192 for f in new_files:
193 self.emit(
194 "new_file_in_dir",
195 {"dir_path": os.path.relpath(folder, self._root_path), "file": f},
196 )
197 old_listing = new_listing
199 def _add_to_cache(self, path: str, data: CacheEntry):
200 if len(self._file_cache) > self._CACHE_LIMIT:
201 self._file_cache.popitem(last=False)
203 self._file_cache[path] = data
205 def _get_root_path(self, path: str) -> str:
206 abs_path = os.path.normpath(os.path.join(self._root_path, path))
208 # Restrict listing to root folder
209 if (
210 os.path.commonpath([self._root_path, abs_path]) != self._root_path
211 or self._root_path == abs_path
212 ):
213 abs_path = self._root_path
215 return abs_path
217 def get_directory(
218 self, path: str
219 ) -> Optional[Dict[str, Union[int, list, str, float]]]:
220 abs_path = self._get_root_path(path)
222 if not os.path.exists(abs_path):
223 logger.warning(f"Path {abs_path} does not exist")
224 return None
226 raw_ext = get_file_ext(abs_path)
228 # Default to file system listing if no container handler is found
229 ext = raw_ext if raw_ext != "" else "FS"
231 nodes = (
232 self._container_handlers[ext].list_entry(abs_path)
233 if ext in self._container_handlers
234 else []
235 )
237 self.current_folder_watcher.kill()
238 self.current_folder_watcher = gevent.spawn(self.watch_file_creation, abs_path)
240 return {"total": len(nodes), "rows": nodes, "abs_path": abs_path}
242 def _make_path_absolute(self, path: str) -> str:
243 path = os.path.normpath(path)
244 abs_path = os.path.normpath(os.path.join(self._root_path, path))
246 if (
247 os.path.commonpath([self._root_path, abs_path]) != self._root_path
248 or self._root_path == abs_path
249 ):
250 raise IOError(f"Can't access {abs_path}")
252 return abs_path
254 def fb_load_file(self, path: str) -> Optional[dict]:
255 try:
256 abs_path = self._make_path_absolute(path)
257 except IOError as e:
258 logging.warning(e)
259 return None
261 ext = get_file_ext(abs_path)
263 if abs_path not in self._file_cache:
264 try:
265 format_handler = self._format_handlers[ext]
266 except KeyError:
267 logger.warning(f"File empty or format not handled {abs_path}")
268 return
270 try:
271 hdr, data, _ = format_handler.preload(abs_path)
272 except RuntimeError:
273 logger.warning(f"File {abs_path} is not a file")
274 return
276 self._add_to_cache(abs_path, (data, hdr))
277 else:
278 data, hdr = self._file_cache[abs_path]
280 return {
281 "path": abs_path,
282 "name": os.path.basename(abs_path),
283 "type": "file",
284 "ext": ext,
285 "hdr": hdr,
286 "last_modified": os.path.getmtime(
287 abs_path if os.path.isfile(abs_path) else os.path.dirname(abs_path)
288 ), # File can be a h5py dataset. In this case, take last modified data of the parent file,
289 }
291 def get_file_content(self, path: str) -> np.ndarray:
292 abs_path = self._make_path_absolute(path)
294 try:
295 data, _ = self._file_cache[abs_path]
296 return data
297 except KeyError:
298 logger.warning(f"File {abs_path} not in cache (not loaded)")
299 # Load file in cache and retry
300 self.fb_load_file(path)
301 return self.get_file_content(path)
303 def retrieve_histogram(self, path: str) -> CacheEntry:
304 abs_path = self._make_path_absolute(path)
305 ext = get_file_ext(abs_path)
307 cache_key = abs_path + "/hist"
308 if cache_key not in self._file_cache:
309 format_handler = self._format_handlers[ext]
310 _, data, _ = format_handler.preload(abs_path)
311 hist, bins = _compute_histogram(data)
313 hist_meta = {
314 "bins": bins,
315 "shape": hist.shape,
316 "max": np.max(hist).item() if hist.size > 0 else 0,
317 }
318 self._add_to_cache(cache_key, (hist.astype(np.float32), hist_meta))
320 return self._file_cache[cache_key]
322 def get_histogram_data(self, path: str) -> np.ndarray:
323 hist_data, _ = self.retrieve_histogram(path)
325 return hist_data
327 def get_histogram_meta(self, path: str) -> dict:
328 _, hist_meta = self.retrieve_histogram(path)
330 return hist_meta
333class ContainerHandler:
334 EXT: List[str]
336 def __init__(self, root_path="", file_types="", show_hidden=False):
337 self._root_path = root_path
338 self._show_hidden = show_hidden
339 self._file_types = file_types
341 @abstractmethod
342 def list_entry(self, path: str) -> List[dict]:
343 raise NotImplementedError
346class HDF5ContainerHandler(ContainerHandler):
347 EXT = ["hdf5", "h5"]
349 def list_entry(self, path) -> List[dict]:
350 ext = get_file_ext(path)
351 path_relative_to_root = os.path.relpath(path, self._root_path)
353 with h5py.File(path, "r") as h5file:
354 if "/entry/data/data" not in h5file:
355 return []
357 data_attrs = _get_dataset_attr(h5file, "/entry/data/data")
358 image_nr_low = data_attrs["image_nr_low"]
359 assert isinstance(image_nr_low, np.generic) # nosec
361 image_nr_high = data_attrs["image_nr_high"]
362 assert isinstance(image_nr_high, np.generic) # nosec
364 return [
365 {
366 "path": os.path.normpath(
367 os.path.join(path_relative_to_root, f"image_{n}.{ext}.dataset")
368 ),
369 "name": f"image_{n}.{ext}.dataset",
370 "type": "file",
371 "ext": ext,
372 # Add n to the date of the parent file to simulate the last modification date
373 "last_modified": os.path.getmtime(path) + n,
374 }
375 for n in range(image_nr_low.item(), image_nr_high.item() + 1)
376 ]
379class FSContainerHandler(ContainerHandler):
380 EXT = ["FS"]
382 def list_entry(self, path: str) -> List[dict]:
383 nodes = []
384 path_relative_to_root = os.path.relpath(path, self._root_path)
386 with os.scandir(path) as dir_entries:
387 for entry in dir_entries:
388 if not self._show_hidden and entry.name.startswith("."):
389 continue
391 ext = get_file_ext(entry.name) if entry.is_file() else ""
392 if entry.is_file() and ext not in self._file_types:
393 continue
395 nodes.append(
396 {
397 "path": os.path.normpath(
398 os.path.join(path_relative_to_root, entry.name)
399 ),
400 "name": entry.name,
401 "type": "file"
402 if entry.is_file() and ext not in ["h5", "hdf5"]
403 else "dir",
404 "ext": ext,
405 "last_modified": os.path.getmtime(entry),
406 }
407 )
409 return nodes
412class FormatHandler:
413 EXT: List[str]
415 def __init__(self):
416 pass
418 @staticmethod
419 @abstractmethod
420 def preload(path: str) -> Tuple[dict, np.ndarray, bytes]:
421 raise NotImplementedError()
424class CBFFormatHandler(FormatHandler):
425 EXT = ["cbf"]
427 @staticmethod
428 def preload(path: str) -> Tuple[dict, np.ndarray, bytes]:
429 if not os.path.isfile(path):
430 raise RuntimeError("Files does not exist")
432 cbf_image = CbfImage(fname=path)
433 float_data = cbf_image.data.astype(np.float32)
435 preview_data = CBFFormatHandler._8bit_raw_repr(cbf_image)
437 parsed_ext_hdr, braggy_hdr = CBFFormatHandler._parse_header(
438 cbf_image, float_data
439 )
441 img_hdr = {}
442 img_hdr["parsed_ext_hdr"] = parsed_ext_hdr
443 img_hdr["braggy_hdr"] = braggy_hdr
445 return img_hdr, float_data.flatten(), preview_data
447 @staticmethod
448 def _8bit_raw_repr(raw_data: CbfImage) -> bytes:
449 data = raw_data.data.clip(0)
450 data = data.astype(np.uint8)
452 return data.tobytes()
454 @staticmethod
455 def _parse_header(cbf_image: CbfImage, np_array: np.ndarray) -> Tuple[dict, dict]:
456 height, width = cbf_image.shape
458 hdr = cbf_image.header
459 parsed_ext_hdr = {}
460 braggy_hdr = {}
462 _ext_hdr = hdr.get("_array_data.header_contents", "").split("\r\n")
463 for data in _ext_hdr:
464 # Ignore empty lines coming from multiple line-breaks
465 if data == "":
466 continue
468 key_value = data.strip("#").strip().split()
470 key = key_value[0].strip(":").strip()
471 value = " ".join(key_value[1:])
472 parsed_ext_hdr[key] = value
473 try:
474 w = float(parsed_ext_hdr.get("Wavelength", "0").strip("A "))
475 d = float(parsed_ext_hdr.get("Detector_distance", "0").strip("m "))
477 bcx, bcy = parsed_ext_hdr["Beam_xy"].split(",")
478 bcx, bcy = float(bcx.strip("pixels() ")), float(bcy.strip("pixels() "))
480 px_size_x, px_size_y = parsed_ext_hdr.get("Pixel_size", "0").split("x")
481 px_size_x, px_size_y = (
482 float(px_size_x.strip("m ")),
483 float(px_size_y.strip("m ")),
484 )
486 dr = math.sqrt((px_size_x * width) ** 2 + (px_size_y * height) ** 2) / 2
488 # Remove invalid values (-1)
489 clean_np_array = np_array[np_array >= 0]
491 braggy_hdr = {
492 "wavelength": w,
493 "detector_distance": d,
494 "beam_cx": bcx,
495 "beam_cy": bcy,
496 "beam_ocx": (width / 2) - bcx,
497 "beam_ocy": (height / 2) - bcy,
498 "detector_radius": dr,
499 "pixel_size_x": px_size_x,
500 "pixel_size_y": px_size_y,
501 "img_width": width,
502 "img_height": height,
503 "pxxpm": 1 / px_size_x,
504 "pxypm": 1 / px_size_y,
505 **get_array_stats(
506 clean_np_array if clean_np_array.size > 0 else np_array
507 ),
508 }
509 except (KeyError, IndexError):
510 logging.info("Could not create Braggy header from CBF header")
512 return parsed_ext_hdr, braggy_hdr
515class HDF5FormatHandler(FormatHandler):
516 EXT = ["dataset"]
518 @staticmethod
519 def preload(path: str) -> Tuple[dict, np.ndarray, bytes]:
520 h5path, img_num = HDF5FormatHandler._interpret_path(path)
522 with h5py.File(h5path, "r") as h5file:
523 image_nr_low = _get_dataset_attr(h5file, "/entry/data/data")["image_nr_low"]
524 assert isinstance(image_nr_low, np.generic) # nosec
526 idx = img_num - image_nr_low.item()
527 data = _get_dataset_data(h5file, "/entry/data/data", str(idx))
528 assert isinstance(data, np.ndarray) # nosec
530 np_array = data.astype(np.float32)
531 preview_data = data.clip(0).astype(np.uint8).tobytes()
533 img_hdr = HDF5FormatHandler._get_hdr(path, np_array)
535 return img_hdr, np_array, preview_data
537 @staticmethod
538 def _get_hdr(path: str, np_array: np.ndarray) -> Dict[str, dict]:
539 _, ext = os.path.splitext(path.rstrip(".dataset"))
540 prefix, _ = path.split("_data")
541 mfpath = prefix + "_master" + ext
543 with h5py.File(mfpath, "r") as h5file:
544 wavelength = _get_instrument_param(h5file, "beam/incident_wavelength")
545 detector = _get_instrument_param(h5file, "detector/detector_distance")
547 pixel_size_x = _get_instrument_param(h5file, "detector/x_pixel_size")
548 pixel_size_y = _get_instrument_param(h5file, "detector/y_pixel_size")
549 width = _get_instrument_param(
550 h5file, "detector/detectorSpecific/x_pixels_in_detector"
551 )
552 height = _get_instrument_param(
553 h5file, "detector/detectorSpecific/y_pixels_in_detector"
554 )
556 beam_cx = _get_instrument_param(h5file, "detector/beam_center_x")
557 beam_cy = _get_instrument_param(h5file, "detector/beam_center_y")
559 # Remove invalid values (SATURATION VALUES)
560 clean_np_array = np_array[np_array != np.max(np_array)]
562 braggy_hdr = {
563 "wavelength": wavelength,
564 "detector_distance": detector,
565 "beam_cx": beam_cx,
566 "beam_cy": beam_cy,
567 "beam_ocx": (width / 2) - beam_cx,
568 "beam_ocy": (height / 2) - beam_cy,
569 "detector_radius": (width * pixel_size_x) / 2,
570 "pixel_size_x": pixel_size_x,
571 "pixel_size_y": pixel_size_y,
572 "img_width": width,
573 "img_height": height,
574 "pxxpm": 1 / pixel_size_x,
575 "pxypm": 1 / pixel_size_y,
576 **get_array_stats(clean_np_array if clean_np_array.size > 0 else np_array),
577 }
579 return {"braggy_hdr": braggy_hdr}
581 @staticmethod
582 def _interpret_path(path: str) -> Tuple[str, int]:
583 h5path, dataset_path = os.path.split(path)
584 _, imgnum_suffix = dataset_path.rstrip(".dataset").split("image_")
585 imgnum, _ = imgnum_suffix.split(".")
587 return h5path, int(imgnum)
590def _get_dataset_attr(h5file: h5py.File, dset_path: str):
591 dset_content = h5grove.create_content(h5file, dset_path)
592 assert isinstance(dset_content, DatasetContent) # nosec
593 return dset_content.attributes()
596def _get_dataset_data(
597 h5file: h5py.File, dset_path: str, selection: Optional[str] = None
598):
599 dset_content = h5grove.create_content(h5file, dset_path)
600 assert isinstance(dset_content, DatasetContent) # nosec
601 return dset_content.data(selection)
604def _get_instrument_param(h5file: h5py.File, param_path: str):
605 data = _get_dataset_data(h5file, f"/entry/instrument/{param_path}")
606 assert isinstance(data, np.generic) # nosec
607 return data.item()
610def _compute_histogram(data: np.ndarray) -> Tuple[np.ndarray, list]:
611 std = 3 * np.std(data)
612 mean = np.mean(data)
613 clean_data = data[data < mean + std]
615 if clean_data.size == 0:
616 return np.ndarray([]), []
618 hist, bins = np.histogram(
619 clean_data.flatten(),
620 bins=np.arange(np.min(clean_data), np.max(clean_data), 1)
621 if np.max(clean_data) <= 300
622 else 300,
623 )
624 return hist, bins.tolist()
627def get_file_ext(file_name: str):
628 _, ext = os.path.splitext(file_name)
629 return ext[1:] # Remove leading dot