Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/responses.py: 81%

132 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3from typing import Union 

4from io import BytesIO 

5from datetime import datetime 

6from functools import update_wrapper, wraps 

7from flask import Response, make_response 

8import gzip 

9import json 

10import numpy 

11import struct 

12from silx.math.combo import min_max 

13 

14 

15def ndarray_response(data: numpy.ndarray, extra_headers: dict = None): 

16 """Create a flask response containing a nd-array.""" 

17 msg = data.tobytes() 

18 resp = Response( 

19 response=msg, 

20 mimetype="application/octet-stream", 

21 status=200, 

22 ) 

23 resp.headers["Content-Length"] = len(msg) 

24 resp.headers["Access-Control-Expose-Headers"] = "*" 

25 if extra_headers: 

26 for k, v in extra_headers.items(): 

27 resp.headers[k] = v 

28 resp.headers["DQR-dtype"] = data.dtype.descr[0][1] 

29 resp.headers["DQR-shape"] = " ".join([str(s) for s in data.shape]) 

30 # FIXME: This could be done with less calls using silx 

31 resp.headers["DQR-min"] = numpy.nanmin(data) 

32 resp.headers["DQR-max"] = numpy.nanmax(data) 

33 resp.headers["DQR-mean"] = numpy.nanmean(data) 

34 resp.headers["DQR-std"] = numpy.nanstd(data) 

35 return resp 

36 

37 

38SUPPORTED_IFF_PROFILES = {"raw", "f16", "u8"} 

39 

40UINT8_LUT = numpy.arange(256, dtype=numpy.uint8).reshape(-1, 1) 

41 

42 

43def apply_normalization( 

44 data, 

45 norm: str = "linear", 

46 autoscale: str = "minmax", 

47 vmin=None, 

48 vmax=None, 

49 gamma=1.0, 

50): 

51 """Apply normalization to data. 

52 

53 Arguments: 

54 data: Data on which to apply the colormap 

55 norm: Normalization to use 

56 autoscale: Autoscale mode: "minmax" (default) or "stddev3" 

57 vmin: Lower bound, None (default) to autoscale 

58 vmax: Upper bound, None (default) to autoscale 

59 gamma: Gamma correction parameter (used only for "gamma" normalization) 

60 

61 Returns: 

62 Array of colors, vmin, vmax 

63 """ 

64 from silx.math.colormap import GammaNormalization, _BASIC_NORMALIZATIONS 

65 from silx.math._colormap import cmap 

66 

67 if norm == "gamma": 

68 normalizer = GammaNormalization(gamma) 

69 else: 

70 normalizer = _BASIC_NORMALIZATIONS[norm] 

71 

72 if vmin is None or vmax is None: 

73 auto_vmin, auto_vmax = normalizer.autoscale(data, autoscale) 

74 if vmin is None: # Set vmin respecting provided vmax 

75 vmin = auto_vmin if vmax is None else min(auto_vmin, vmax) 

76 if vmax is None: 

77 vmax = max(auto_vmax, vmin) # Handle max_ <= 0 for log scale 

78 

79 norm_data = cmap(data, UINT8_LUT, vmin, vmax, normalization=norm, nan_color=[0]) 

80 norm_data.shape = data.shape 

81 return norm_data, vmin, vmax 

82 

83 

84def iff_response( 

85 data: numpy.ndarray, 

86 extra_header_type: bytes = b"EXTR", 

87 extra_headers: dict = None, 

88 profiles: str = "raw", 

89 norm: str = None, 

90 autoscale: str = None, 

91 vmin: float = None, 

92 vmax: float = None, 

93 histogram: bool = False, 

94): 

95 """Create a flask response containing a nd-array with extra stuffs encoded 

96 as IFF blocks. 

97 

98 It provides a set of profiles for encoding which can be extended. 

99 

100 - `raw`: Send the raw data if possible 

101 - `f2`: Send the data as float 16-bits 

102 

103 Attributes: 

104 data: The data to encode 

105 extra_header_type: The type of the extra block if any 

106 extra_headers: The content of the extra block if any 

107 profiles: A list of supported profiles with `;` separator. The first one 

108 supported will be used. Default is 'raw' 

109 histogram: If true, join an histogram with the response 

110 """ 

111 ALIGN_BLOCK = 4 

112 

113 # For consistency with other image response 

114 if autoscale == "none": 

115 autoscale = None 

116 

117 def fourcc(chunk_id: bytes): 

118 """Returns an 4 chars represented by string""" 

119 assert len(chunk_id) == 4 # nosec 

120 return chunk_id 

121 

122 def chunk_size(chunk_data: bytes): 

123 """Returns an 4 chars represented by string""" 

124 size = len(chunk_data) 

125 return struct.pack(">I", size) 

126 

127 def chunk(chunk_id: bytes, chunk_data: bytes): 

128 return (fourcc(chunk_id), chunk_size(chunk_data), chunk_data, pad(chunk_data)) 

129 

130 def json_chunk(chunk_id: bytes, record: dict): 

131 raw = json.dumps(record).encode("utf-8") 

132 return (fourcc(chunk_id), chunk_size(raw), raw, pad(raw)) 

133 

134 def pad(chunk_data: bytes): 

135 """Pad the data if needed""" 

136 nb_bytes = len(chunk_data) % ALIGN_BLOCK 

137 if nb_bytes == 0: 

138 return b"" 

139 return b"\x00" * (ALIGN_BLOCK - nb_bytes) 

140 

141 # Select the first available profile 

142 profiles = [p for p in profiles.split(";") if p in SUPPORTED_IFF_PROFILES] 

143 profiles.append("raw") 

144 profile = profiles[0] 

145 

146 minmax = min_max(data, min_positive=True, finite=True) 

147 

148 def float_or_none(v): 

149 return None if v is None else float(v) 

150 

151 stat = { 

152 "dtype": data.dtype.descr[0][1], 

153 "shape": [s for s in data.shape], 

154 "profile": profile, 

155 "min": float_or_none(minmax.minimum), 

156 "min_positive": float_or_none(minmax.min_positive), 

157 "max": float_or_none(minmax.maximum), 

158 "mean": float(numpy.nanmean(data)), 

159 "std": float(numpy.nanstd(data)), 

160 } 

161 

162 if profile == "f16": 

163 pdata = data.astype(numpy.float16) 

164 

165 elif profile == "u8": 

166 pdata, vmin, vmax = apply_normalization( 

167 data, norm=norm, autoscale=autoscale, vmin=vmin, vmax=vmax 

168 ) 

169 stat["u8_norm"] = norm 

170 stat["u8_autoscale"] = autoscale 

171 stat["u8_min"] = float_or_none(vmin) 

172 stat["u8_max"] = float_or_none(vmax) 

173 

174 elif profile == "raw": 

175 pdata = data 

176 else: 

177 raise RuntimeError(f"Unsupported profile '{profile}'") 

178 

179 blocks = [ 

180 *chunk(b"DQR0", b""), 

181 *json_chunk(b"FORM", stat), 

182 *chunk(b"DATA", pdata.tobytes()), 

183 ] 

184 

185 if histogram: 

186 # FIXME: Normalize the histogram depending on the normalization 

187 normalized_array = data[numpy.isfinite(data)] 

188 count, edges = numpy.histogram(normalized_array, 256) 

189 count, edges = count.astype(numpy.float32), edges.astype(numpy.float32) 

190 blocks.extend(chunk(b"HST0", count.tobytes() + edges.tobytes())) 

191 

192 if extra_headers: 

193 blocks.extend(json_chunk(extra_header_type, extra_headers)) 

194 

195 msg = b"".join(blocks) 

196 resp = Response( 

197 response=msg, 

198 mimetype="application/octet-stream", 

199 status=200, 

200 ) 

201 resp.headers["Content-Length"] = len(msg) 

202 return resp 

203 

204 

205def image_response(img, img_format="PNG", extra_headers=None, max_age=None): 

206 """Create a flask response with an image 

207 

208 Creates the correct content length so that an XHR request 

209 can monitor progress correctly 

210 

211 Args: 

212 img (Image): A PIL image to send as a flask response 

213 

214 Kwargs: 

215 img_format (str): The image type, default png 

216 

217 Returns 

218 resp (Response): The flask response 

219 """ 

220 img_io = BytesIO() 

221 img.save(img_io, img_format, quality=100) 

222 img_io.seek(0) 

223 

224 resp = Response(response=img_io, mimetype=f"image/{img_format.lower()}", status=200) 

225 resp.headers["Content-Length"] = img_io.getbuffer().nbytes 

226 if extra_headers: 

227 resp.headers["Access-Control-Expose-Headers"] = "*" 

228 for k, v in extra_headers.items(): 

229 resp.headers[k] = v 

230 

231 if max_age: 

232 resp.cache_control.max_age = max_age 

233 

234 return resp 

235 

236 

237def nocache(view): 

238 """A response with no-cache 

239 

240 Stolen from https://arusahni.net/blog/2014/03/flask-nocache.html 

241 """ 

242 

243 @wraps(view) 

244 def no_cache(*args, **kwargs): 

245 response = make_response(view(*args, **kwargs)) 

246 response.headers["Last-Modified"] = datetime.now() 

247 response.headers[ 

248 "Cache-Control" 

249 ] = "no-store, no-cache, must-revalidate, post-check=0, pre-check=0, max-age=0" 

250 response.headers["Pragma"] = "no-cache" 

251 response.headers["Expires"] = "-1" 

252 return response 

253 

254 return update_wrapper(no_cache, view) 

255 

256 

257def gzipped( 

258 data: Union[dict, numpy.ndarray, bytes], compress_level: int = 6 

259) -> Response: 

260 """A gzipped response 

261 

262 Args: 

263 data (dict | numpy.ndarray): An list of data or a dataarray to compress 

264 

265 Kwargs: 

266 compress_level (int): The compression level to apply 

267 

268 Returns: 

269 resp (Response): The flask response 

270 """ 

271 if isinstance(data, numpy.ndarray): 

272 mimetype = "application/octet-stream" 

273 data_to_bytes = data.tobytes() 

274 elif isinstance(data, bytes): 

275 mimetype = "application/octet-stream" 

276 data_to_bytes = data 

277 else: 

278 mimetype = "application/json" 

279 data_to_bytes = json.dumps(data).encode("utf-8") 

280 

281 gzip_buffer = BytesIO() 

282 gzip_file = gzip.GzipFile( 

283 mode="wb", compresslevel=compress_level, fileobj=gzip_buffer 

284 ) 

285 gzip_file.write(data_to_bytes) 

286 gzip_file.close() 

287 

288 resp = Response(response=gzip_buffer.getvalue(), mimetype=mimetype, status=200) 

289 resp.headers["Content-Encoding"] = "gzip" 

290 resp.headers["Content-Length"] = len(resp.get_data()) 

291 

292 return resp