Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/utils/tileimage.py: 0%

165 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 02:13 +0000

1#!/usr/bin/env python 

2# coding: utf-8 

3import logging 

4from typing import Optional, Sequence, Tuple, Union 

5 

6from PIL import Image 

7import h5py 

8import numpy 

9 

10 

11_logger = logging.getLogger(__name__) 

12Image.MAX_IMAGE_PIXELS = None 

13UNITS = "pixels" 

14 

15 

16def blit( 

17 src: numpy.ndarray, dst: Union[numpy.ndarray, h5py.Dataset], offset: Sequence[int] 

18) -> Optional[Tuple[int]]: 

19 """Copy `src` array into `dst` at given `offset` position. 

20 

21 This functions avoids errors in case `src` is partly of completely outside `dst`. 

22 

23 Warning: Negative offset are NOT understand as usual. 

24 Instead they are understand as coordinates in `dst` frame of reference. 

25 

26 The effectively copied shape is returned. 

27 """ 

28 assert src.ndim == dst.ndim == len(offset) 

29 

30 copy_start = numpy.clip(offset, 0, None) 

31 copy_shape = numpy.clip( 

32 src.shape + numpy.clip(offset, None, 0), 0, dst.shape - copy_start 

33 ) 

34 dst[ 

35 tuple(slice(start, start + size) for start, size in zip(copy_start, copy_shape)) 

36 ] = src[tuple(slice(0, size) for size in copy_shape)] 

37 return tuple(copy_shape) if numpy.all(copy_shape != 0) else None 

38 

39 

40def binning(image: numpy.ndarray) -> numpy.ndarray: 

41 """2D 2x2 array binning""" 

42 assert image.ndim == 2 

43 extended_image = image.astype(numpy.uint16) 

44 return ( 

45 0.25 

46 * ( 

47 extended_image[:-1:2, :-1:2] 

48 + extended_image[:-1:2, 1::2] 

49 + extended_image[1::2, :-1:2] 

50 + extended_image[1::2, 1::2] 

51 ) 

52 ).astype(image.dtype) 

53 

54 

55def create_dataset_with_units(group: h5py.Group, path: str, value, units: str) -> None: 

56 group[path] = value 

57 group[path].attrs["units"] = units 

58 

59 

60def prepare_histogram_nxdata( 

61 ingroup: h5py.Group, 

62 nbins: int, 

63 path: str = "histogram", 

64) -> h5py.Group: 

65 """Prepare a group `ingroup[path]` for storing a histogram as a NXdata""" 

66 group = ingroup.create_group(path) 

67 group.create_dataset("count", shape=(nbins,), dtype=numpy.int32, fillvalue=0) 

68 group.create_dataset( 

69 "bin_edges", shape=(nbins + 1,), dtype=numpy.float32, fillvalue=numpy.nan 

70 ) 

71 group.create_dataset( 

72 "centers", shape=(nbins,), dtype=numpy.float32, fillvalue=numpy.nan 

73 ) 

74 group.attrs["NX_class"] = "NXdata" 

75 group.attrs["signal"] = "count" 

76 group.attrs["axes"] = "centers" 

77 group.attrs["title"] = "Histogram" 

78 return group 

79 

80 

81def fill_histogram_nxdata( 

82 group: h5py.Group, 

83 bin_count: numpy.ndarray, 

84 bin_edges: numpy.ndarray, 

85) -> None: 

86 """Fill a prepared group with given histogram""" 

87 assert bin_count.ndim == bin_edges.ndim == 1 

88 nbins = len(bin_count) 

89 assert nbins + 1 == len(bin_edges) 

90 

91 count_dataset = group["count"] 

92 bin_edges_dataset = group["bin_edges"] 

93 centers_dataset = group["centers"] 

94 

95 assert len(count_dataset) == nbins 

96 assert len(bin_edges_dataset) == nbins + 1 

97 assert len(centers_dataset) == nbins 

98 

99 count_dataset[()] = bin_count 

100 bin_edges_dataset[()] = bin_edges 

101 centers_dataset[()] = 0.5 * (bin_edges[:-1] + bin_edges[1:]) 

102 

103 count_dataset.flush() 

104 bin_edges_dataset.flush() 

105 centers_dataset.flush() 

106 

107 

108class HistogramAccumulator: 

109 """Histogram accumulator. 

110 

111 Histogram is re-gridded if needed 

112 """ 

113 

114 def __init__(self, bins: int): 

115 self.__bin_edges = None 

116 self.__count = numpy.zeros((bins,), dtype=int) 

117 

118 def update(self, data: numpy.ndarray): 

119 """Accumulate `data` into histogram, extending range if needed""" 

120 min_, max_ = numpy.nanmin(data), numpy.nanmax(data) 

121 if self.__bin_edges is None: 

122 _logger.debug("Initialize bin_edges") 

123 self.__count, self.__bin_edges = numpy.histogram( 

124 data, bins=len(self.__count), range=(min_, max_) 

125 ) 

126 return 

127 

128 previous_min = self.__bin_edges[0] 

129 previous_max = self.__bin_edges[-1] 

130 if min_ < previous_min or max_ > previous_max: 

131 _logger.debug("Extend bin_edges") 

132 self.__count, self.__bin_edges = numpy.histogram( 

133 self.centers, 

134 weights=self.__count, 

135 bins=len(self.__count), 

136 range=(min(min_, previous_min), max(max_, previous_max)), 

137 ) 

138 

139 self.__count += numpy.histogram(data, self.__bin_edges)[0] 

140 

141 @property 

142 def bin_edges(self) -> numpy.ndarray: 

143 if self.__bin_edges is None: 

144 return numpy.zeros((len(self.__count),), dtype=numpy.float64) 

145 else: 

146 return numpy.array(self.__bin_edges) 

147 

148 @property 

149 def centers(self) -> numpy.ndarray: 

150 edges = self.bin_edges 

151 return 0.5 * (edges[:-1] + edges[1:]) 

152 

153 @property 

154 def bin_count(self) -> numpy.ndarray: 

155 return numpy.array(self.__count) 

156 

157 

158def compute_nlevels(shape: Sequence[int], atleast_powerof2: int) -> int: 

159 """Returns number of levels in pyramid of details for given shape and constraint. 

160 

161 `atleast_powerof2` is the power of 2 of the image size where to cut the pyramid. 

162 """ 

163 shape_array = numpy.asarray(shape) 

164 

165 if numpy.any(shape_array <= 0): 

166 return 1 # Base of the pyramid is empty 

167 

168 # First, find level in the pyramid of details where the smallest dimension is 1 

169 max_level = numpy.min(numpy.log2(shape_array).astype(int)) 

170 return ( 

171 max_level 

172 + 1 # At least one level 

173 - numpy.clip( # Cut pyramid if possible based on atleast_powerof2 

174 atleast_powerof2 

175 - numpy.ceil(numpy.log2(numpy.max(shape_array // 2**max_level))).astype( 

176 int 

177 ), 

178 0, 

179 max_level, 

180 ) 

181 ) 

182 

183 

184def prepare_tiling_dataset( 

185 group: h5py.Group, 

186 image: numpy.ndarray, 

187 pixel_size: float, 

188 nbins: int, 

189 dtype=numpy.float32, 

190): 

191 _logger.debug("Read scan information") 

192 y_range = [0, image.shape[1]] 

193 z_range = [0, image.shape[0]] 

194 

195 # Get full image shape and corrected coordinate ranges 

196 full_shape = numpy.array( 

197 ( 

198 int(numpy.ceil(numpy.abs(z_range[1] - z_range[0]) / pixel_size)), 

199 int(numpy.ceil(numpy.abs(y_range[1] - y_range[0]) / pixel_size)), 

200 ) 

201 ) 

202 y_range = y_range[0], y_range[0] + full_shape[1] * pixel_size 

203 z_range = z_range[0], z_range[0] + full_shape[0] * pixel_size 

204 

205 _logger.debug( 

206 f"Save image calibration: pixel size: {pixel_size}{UNITS}, y: {y_range} {UNITS}, z: {z_range} {UNITS}" 

207 ) 

208 create_dataset_with_units(group, "pixel_size", pixel_size, UNITS) 

209 create_dataset_with_units(group, "y", y_range, UNITS) 

210 create_dataset_with_units(group, "z", z_range, UNITS) 

211 

212 # Get number of levels by cutting levels where images have all dim smaller than 1024 

213 nlevel = compute_nlevels(full_shape, 9) 

214 _logger.debug(f"Number of level datasets: {nlevel}") 

215 

216 datasets = [] 

217 layer_names = [] 

218 for level in range(nlevel): 

219 name = f"level{level}" 

220 layer_names.insert(0, name) 

221 level_shape = tuple(full_shape // (2**level)) 

222 _logger.debug( 

223 f"Create dataset {name}: shape: {level_shape}, dtype: {numpy.dtype(dtype).name}" 

224 ) 

225 datasets.append(group.create_dataset(name, shape=level_shape, dtype=dtype)) 

226 

227 group["definition"] = "tiling" 

228 group.attrs["NX_class"] = "NXentry" 

229 group["layers"] = layer_names 

230 

231 prepare_histogram_nxdata(group, nbins) 

232 

233 return datasets 

234 

235 

236def fill_tiling_dataset( 

237 group: h5py.Group, 

238 image: numpy.ndarray, 

239 datasets: Sequence[h5py.Dataset], 

240 nbins: int, 

241): 

242 _logger.debug("Read scan information") 

243 histogram = HistogramAccumulator(bins=nbins) 

244 

245 nlevel = len(datasets) 

246 

247 _logger.debug("Process image:") 

248 # image = numpy.flip(image, 1) 

249 

250 offset = numpy.array((0, 0)) 

251 _logger.debug( 

252 f"- Update level 0 at offset {tuple(offset)}, shape: {image.shape}" # nosec 

253 ) 

254 shape = blit(image, datasets[0], offset) 

255 if shape is None: 

256 _logger.warning("! Image has no overlap: skipping") 

257 return # No more processing needed 

258 

259 datasets[0].flush() # Notify readers of the update 

260 

261 if shape != image.shape: 

262 _logger.warning(f"! Some pixels were not used: copied shape: {shape}") 

263 

264 _logger.debug("- Update histogram") 

265 histogram.update(image[: shape[0], : shape[1]]) 

266 fill_histogram_nxdata(group["histogram"], histogram.bin_count, histogram.bin_edges) 

267 

268 offset = numpy.maximum(offset, 0) # Clip offset to 0 

269 

270 # Update other levels of the pyramid of images 

271 for level in range(1, nlevel): 

272 # Get copy area with even bounds eventually enlarging it 

273 end = 2 * numpy.ceil((offset + shape) / 2).astype(int) 

274 offset = 2 * (offset // 2) 

275 _logger.debug( 

276 f"- Binning level {level - 1} from {tuple(offset)} to {tuple(end)}" 

277 ) 

278 binned_image = binning( 

279 datasets[level - 1][offset[0] : end[0], offset[1] : end[1]] 

280 ) 

281 

282 offset //= 2 

283 _logger.debug( 

284 f"- Updated level {level} at offset {tuple(offset)}, shape: {binned_image.shape}" 

285 ) 

286 shape = blit(binned_image, datasets[level], offset) 

287 datasets[level].flush() # Notify readers of the update 

288 

289 if shape is None: 

290 _logger.warning(f"No overlap for level {level}, offset: {tuple(offset)}") 

291 break 

292 

293 

294def generate_pyramid( 

295 output_filename: str, input_filename: str, nbins: int = 1000, pixel_size: float = 1 

296): 

297 _logger.info(f"Write to {output_filename}") 

298 with h5py.File(output_filename, "w", libver="v110") as h5file: 

299 _logger.info(f"Read from `{input_filename}`") 

300 

301 with Image.open(input_filename) as image: 

302 _logger.info("Prepare HDF5 file for tiling") 

303 group_name = "sample_registration" 

304 group = h5file.create_group(group_name) 

305 

306 print("image", image) 

307 image = image.convert("L") 

308 print("image grayscale", image) 

309 image = numpy.array(image) 

310 print("image numpy", image.shape) 

311 

312 datasets = prepare_tiling_dataset( 

313 group, image, pixel_size, nbins, dtype=numpy.uint8 

314 ) 

315 fill_tiling_dataset( 

316 group=group, 

317 image=image, 

318 datasets=datasets, 

319 nbins=nbins, 

320 ) 

321 _logger.info("Done") 

322 

323 

324if __name__ == "__main__": 

325 import argparse 

326 

327 logging.basicConfig() 

328 

329 parser = argparse.ArgumentParser(description=__doc__) 

330 parser.add_argument("input_filename", type=str, help="Input image file") 

331 parser.add_argument("pixel_size", type=float, help="Pixel Size") 

332 parser.add_argument( 

333 "output_filename", 

334 type=str, 

335 default="output.h5", 

336 nargs="?", 

337 help="HDF5 file where to save the result", 

338 ) 

339 parser.add_argument( 

340 "--verbose", "-v", action="count", default=0, help="Increase verbosity" 

341 ) 

342 options = parser.parse_args() 

343 

344 _logger.setLevel( 

345 {0: logging.WARNING, 1: logging.INFO, 2: logging.DEBUG}.get( 

346 options.verbose, logging.DEBUG 

347 ) 

348 ) 

349 

350 nbins = 100 

351 generate_pyramid( 

352 options.output_filename, options.input_filename, nbins, options.pixel_size 

353 )