Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/utils/tileimage.py: 0%
165 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
1#!/usr/bin/env python
2# coding: utf-8
3import logging
4from typing import Optional, Sequence, Tuple, Union
6from PIL import Image
7import h5py
8import numpy
11_logger = logging.getLogger(__name__)
12Image.MAX_IMAGE_PIXELS = None
13UNITS = "pixels"
16def blit(
17 src: numpy.ndarray, dst: Union[numpy.ndarray, h5py.Dataset], offset: Sequence[int]
18) -> Optional[Tuple[int]]:
19 """Copy `src` array into `dst` at given `offset` position.
21 This functions avoids errors in case `src` is partly of completely outside `dst`.
23 Warning: Negative offset are NOT understand as usual.
24 Instead they are understand as coordinates in `dst` frame of reference.
26 The effectively copied shape is returned.
27 """
28 assert src.ndim == dst.ndim == len(offset)
30 copy_start = numpy.clip(offset, 0, None)
31 copy_shape = numpy.clip(
32 src.shape + numpy.clip(offset, None, 0), 0, dst.shape - copy_start
33 )
34 dst[
35 tuple(slice(start, start + size) for start, size in zip(copy_start, copy_shape))
36 ] = src[tuple(slice(0, size) for size in copy_shape)]
37 return tuple(copy_shape) if numpy.all(copy_shape != 0) else None
40def binning(image: numpy.ndarray) -> numpy.ndarray:
41 """2D 2x2 array binning"""
42 assert image.ndim == 2
43 extended_image = image.astype(numpy.uint16)
44 return (
45 0.25
46 * (
47 extended_image[:-1:2, :-1:2]
48 + extended_image[:-1:2, 1::2]
49 + extended_image[1::2, :-1:2]
50 + extended_image[1::2, 1::2]
51 )
52 ).astype(image.dtype)
55def create_dataset_with_units(group: h5py.Group, path: str, value, units: str) -> None:
56 group[path] = value
57 group[path].attrs["units"] = units
60def prepare_histogram_nxdata(
61 ingroup: h5py.Group,
62 nbins: int,
63 path: str = "histogram",
64) -> h5py.Group:
65 """Prepare a group `ingroup[path]` for storing a histogram as a NXdata"""
66 group = ingroup.create_group(path)
67 group.create_dataset("count", shape=(nbins,), dtype=numpy.int32, fillvalue=0)
68 group.create_dataset(
69 "bin_edges", shape=(nbins + 1,), dtype=numpy.float32, fillvalue=numpy.nan
70 )
71 group.create_dataset(
72 "centers", shape=(nbins,), dtype=numpy.float32, fillvalue=numpy.nan
73 )
74 group.attrs["NX_class"] = "NXdata"
75 group.attrs["signal"] = "count"
76 group.attrs["axes"] = "centers"
77 group.attrs["title"] = "Histogram"
78 return group
81def fill_histogram_nxdata(
82 group: h5py.Group,
83 bin_count: numpy.ndarray,
84 bin_edges: numpy.ndarray,
85) -> None:
86 """Fill a prepared group with given histogram"""
87 assert bin_count.ndim == bin_edges.ndim == 1
88 nbins = len(bin_count)
89 assert nbins + 1 == len(bin_edges)
91 count_dataset = group["count"]
92 bin_edges_dataset = group["bin_edges"]
93 centers_dataset = group["centers"]
95 assert len(count_dataset) == nbins
96 assert len(bin_edges_dataset) == nbins + 1
97 assert len(centers_dataset) == nbins
99 count_dataset[()] = bin_count
100 bin_edges_dataset[()] = bin_edges
101 centers_dataset[()] = 0.5 * (bin_edges[:-1] + bin_edges[1:])
103 count_dataset.flush()
104 bin_edges_dataset.flush()
105 centers_dataset.flush()
108class HistogramAccumulator:
109 """Histogram accumulator.
111 Histogram is re-gridded if needed
112 """
114 def __init__(self, bins: int):
115 self.__bin_edges = None
116 self.__count = numpy.zeros((bins,), dtype=int)
118 def update(self, data: numpy.ndarray):
119 """Accumulate `data` into histogram, extending range if needed"""
120 min_, max_ = numpy.nanmin(data), numpy.nanmax(data)
121 if self.__bin_edges is None:
122 _logger.debug("Initialize bin_edges")
123 self.__count, self.__bin_edges = numpy.histogram(
124 data, bins=len(self.__count), range=(min_, max_)
125 )
126 return
128 previous_min = self.__bin_edges[0]
129 previous_max = self.__bin_edges[-1]
130 if min_ < previous_min or max_ > previous_max:
131 _logger.debug("Extend bin_edges")
132 self.__count, self.__bin_edges = numpy.histogram(
133 self.centers,
134 weights=self.__count,
135 bins=len(self.__count),
136 range=(min(min_, previous_min), max(max_, previous_max)),
137 )
139 self.__count += numpy.histogram(data, self.__bin_edges)[0]
141 @property
142 def bin_edges(self) -> numpy.ndarray:
143 if self.__bin_edges is None:
144 return numpy.zeros((len(self.__count),), dtype=numpy.float64)
145 else:
146 return numpy.array(self.__bin_edges)
148 @property
149 def centers(self) -> numpy.ndarray:
150 edges = self.bin_edges
151 return 0.5 * (edges[:-1] + edges[1:])
153 @property
154 def bin_count(self) -> numpy.ndarray:
155 return numpy.array(self.__count)
158def compute_nlevels(shape: Sequence[int], atleast_powerof2: int) -> int:
159 """Returns number of levels in pyramid of details for given shape and constraint.
161 `atleast_powerof2` is the power of 2 of the image size where to cut the pyramid.
162 """
163 shape_array = numpy.asarray(shape)
165 if numpy.any(shape_array <= 0):
166 return 1 # Base of the pyramid is empty
168 # First, find level in the pyramid of details where the smallest dimension is 1
169 max_level = numpy.min(numpy.log2(shape_array).astype(int))
170 return (
171 max_level
172 + 1 # At least one level
173 - numpy.clip( # Cut pyramid if possible based on atleast_powerof2
174 atleast_powerof2
175 - numpy.ceil(numpy.log2(numpy.max(shape_array // 2**max_level))).astype(
176 int
177 ),
178 0,
179 max_level,
180 )
181 )
184def prepare_tiling_dataset(
185 group: h5py.Group,
186 image: numpy.ndarray,
187 pixel_size: float,
188 nbins: int,
189 dtype=numpy.float32,
190):
191 _logger.debug("Read scan information")
192 y_range = [0, image.shape[1]]
193 z_range = [0, image.shape[0]]
195 # Get full image shape and corrected coordinate ranges
196 full_shape = numpy.array(
197 (
198 int(numpy.ceil(numpy.abs(z_range[1] - z_range[0]) / pixel_size)),
199 int(numpy.ceil(numpy.abs(y_range[1] - y_range[0]) / pixel_size)),
200 )
201 )
202 y_range = y_range[0], y_range[0] + full_shape[1] * pixel_size
203 z_range = z_range[0], z_range[0] + full_shape[0] * pixel_size
205 _logger.debug(
206 f"Save image calibration: pixel size: {pixel_size}{UNITS}, y: {y_range} {UNITS}, z: {z_range} {UNITS}"
207 )
208 create_dataset_with_units(group, "pixel_size", pixel_size, UNITS)
209 create_dataset_with_units(group, "y", y_range, UNITS)
210 create_dataset_with_units(group, "z", z_range, UNITS)
212 # Get number of levels by cutting levels where images have all dim smaller than 1024
213 nlevel = compute_nlevels(full_shape, 9)
214 _logger.debug(f"Number of level datasets: {nlevel}")
216 datasets = []
217 layer_names = []
218 for level in range(nlevel):
219 name = f"level{level}"
220 layer_names.insert(0, name)
221 level_shape = tuple(full_shape // (2**level))
222 _logger.debug(
223 f"Create dataset {name}: shape: {level_shape}, dtype: {numpy.dtype(dtype).name}"
224 )
225 datasets.append(group.create_dataset(name, shape=level_shape, dtype=dtype))
227 group["definition"] = "tiling"
228 group.attrs["NX_class"] = "NXentry"
229 group["layers"] = layer_names
231 prepare_histogram_nxdata(group, nbins)
233 return datasets
236def fill_tiling_dataset(
237 group: h5py.Group,
238 image: numpy.ndarray,
239 datasets: Sequence[h5py.Dataset],
240 nbins: int,
241):
242 _logger.debug("Read scan information")
243 histogram = HistogramAccumulator(bins=nbins)
245 nlevel = len(datasets)
247 _logger.debug("Process image:")
248 # image = numpy.flip(image, 1)
250 offset = numpy.array((0, 0))
251 _logger.debug(
252 f"- Update level 0 at offset {tuple(offset)}, shape: {image.shape}" # nosec
253 )
254 shape = blit(image, datasets[0], offset)
255 if shape is None:
256 _logger.warning("! Image has no overlap: skipping")
257 return # No more processing needed
259 datasets[0].flush() # Notify readers of the update
261 if shape != image.shape:
262 _logger.warning(f"! Some pixels were not used: copied shape: {shape}")
264 _logger.debug("- Update histogram")
265 histogram.update(image[: shape[0], : shape[1]])
266 fill_histogram_nxdata(group["histogram"], histogram.bin_count, histogram.bin_edges)
268 offset = numpy.maximum(offset, 0) # Clip offset to 0
270 # Update other levels of the pyramid of images
271 for level in range(1, nlevel):
272 # Get copy area with even bounds eventually enlarging it
273 end = 2 * numpy.ceil((offset + shape) / 2).astype(int)
274 offset = 2 * (offset // 2)
275 _logger.debug(
276 f"- Binning level {level - 1} from {tuple(offset)} to {tuple(end)}"
277 )
278 binned_image = binning(
279 datasets[level - 1][offset[0] : end[0], offset[1] : end[1]]
280 )
282 offset //= 2
283 _logger.debug(
284 f"- Updated level {level} at offset {tuple(offset)}, shape: {binned_image.shape}"
285 )
286 shape = blit(binned_image, datasets[level], offset)
287 datasets[level].flush() # Notify readers of the update
289 if shape is None:
290 _logger.warning(f"No overlap for level {level}, offset: {tuple(offset)}")
291 break
294def generate_pyramid(
295 output_filename: str, input_filename: str, nbins: int = 1000, pixel_size: float = 1
296):
297 _logger.info(f"Write to {output_filename}")
298 with h5py.File(output_filename, "w", libver="v110") as h5file:
299 _logger.info(f"Read from `{input_filename}`")
301 with Image.open(input_filename) as image:
302 _logger.info("Prepare HDF5 file for tiling")
303 group_name = "sample_registration"
304 group = h5file.create_group(group_name)
306 print("image", image)
307 image = image.convert("L")
308 print("image grayscale", image)
309 image = numpy.array(image)
310 print("image numpy", image.shape)
312 datasets = prepare_tiling_dataset(
313 group, image, pixel_size, nbins, dtype=numpy.uint8
314 )
315 fill_tiling_dataset(
316 group=group,
317 image=image,
318 datasets=datasets,
319 nbins=nbins,
320 )
321 _logger.info("Done")
324if __name__ == "__main__":
325 import argparse
327 logging.basicConfig()
329 parser = argparse.ArgumentParser(description=__doc__)
330 parser.add_argument("input_filename", type=str, help="Input image file")
331 parser.add_argument("pixel_size", type=float, help="Pixel Size")
332 parser.add_argument(
333 "output_filename",
334 type=str,
335 default="output.h5",
336 nargs="?",
337 help="HDF5 file where to save the result",
338 )
339 parser.add_argument(
340 "--verbose", "-v", action="count", default=0, help="Increase verbosity"
341 )
342 options = parser.parse_args()
344 _logger.setLevel(
345 {0: logging.WARNING, 1: logging.INFO, 2: logging.DEBUG}.get(
346 options.verbose, logging.DEBUG
347 )
348 )
350 nbins = 100
351 generate_pyramid(
352 options.output_filename, options.input_filename, nbins, options.pixel_size
353 )