Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/metadata/xrf.py: 34%

96 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-15 02:12 +0000

1# -*- coding: utf-8 -*- 

2from dataclasses import dataclass 

3import io 

4import gzip 

5import json 

6 

7import matplotlib as mpl 

8import matplotlib.cm as cm 

9import numpy as np 

10from PIL import Image 

11 

12from daiquiri.core.utils import worker 

13 

14 

15def shape_map(map_): 

16 """Shapes a 1d XRF map array into the correct 2d image 

17 

18 Reorders the data if needbe for snaked collection 

19 Reshapes if the data was collected vertically 

20 

21 Args: 

22 map_ (dict): An XRF map from the metadata handler 

23 

24 Returns: 

25 data (ndarray): The XRF map data 

26 

27 """ 

28 data = np.array(map_["data"]) 

29 

30 # TODO: Catch raise 

31 if map_.get("orientation") == "vertical": 

32 data = data.reshape(int(map_["w"]), int(map_["h"])) 

33 data = np.rot90(data) 

34 data = np.flipud(data) 

35 else: 

36 data = data.reshape(int(map_["h"]), int(map_["w"])) 

37 

38 # For snaked collection every other row is reversed 

39 if map_.get("snaked"): 

40 data[1::2, :] = data[1::2, ::-1] 

41 

42 return data 

43 

44 

45MAP_SCALINGS = {"linear": mpl.colors.Normalize, "logarithmic": mpl.colors.LogNorm} 

46 

47 

48def generate_map_image(map_): 

49 """Generates a PIL Image from an XRF map 

50 

51 -1 placeholder values are converted to a transparent pixel 

52 

53 Args: 

54 map_ (dict): An XRF map from the metadata handler 

55 

56 Returns: 

57 image (Image): A PIL image 

58 """ 

59 

60 def generate(): 

61 data = shape_map(map_) 

62 

63 norm_alg = MAP_SCALINGS.get(map_.get("scale", "linear"), MAP_SCALINGS["linear"]) 

64 # Remove -1 values so map is correctly scaled during scan and without manual scaling 

65 filtered_data = data.flatten()[data.flatten() != -1] 

66 min_value = map_.get("min") 

67 if len(filtered_data): 

68 if min_value is None: 

69 min_value = np.min(filtered_data) 

70 norm = norm_alg(vmin=min_value, vmax=map_.get("max")) 

71 

72 colourmap = map_.get("colourmap") or "viridis" 

73 if not hasattr(cm, colourmap): 

74 colourmap = "viridis" 

75 

76 cmap = getattr(cm, colourmap or "viridis") 

77 

78 m = cm.ScalarMappable(norm=norm, cmap=cmap) 

79 img_data = m.to_rgba(data, bytes=True, alpha=map_["opacity"]) 

80 

81 mask = data == -1 

82 img_data[mask, :] = [255, 255, 255, 0] 

83 

84 return Image.fromarray(img_data, "RGBA") 

85 

86 return worker(generate) 

87 

88 

89def generate_composite_image(comp, maps): 

90 """Generates a PIL Image from an XRF composite map 

91 

92 Args: 

93 comp (dict): An XRF composite map from the metadata handler 

94 maps (list(dict)): A list of XRF maps from the metadata handler 

95 

96 Returns: 

97 image (Image): A PIL image 

98 """ 

99 

100 def generate(): 

101 layers = [] 

102 for col in ["r", "g", "b"]: 

103 map_ = maps[col] 

104 

105 data = shape_map(map_) 

106 

107 norm = mpl.colors.Normalize(vmin=map_.get("min"), vmax=map_.get("max")) 

108 map_["norm"] = norm(data) * 255 * comp[f"{col}opacity"] 

109 

110 layers.append(map_["norm"]) 

111 

112 layers.append( 

113 np.full((int(map_["h"]), int(map_["w"])), round(comp["opacity"] * 255)) 

114 ) 

115 

116 img_data = np.dstack(layers).astype(np.uint8) 

117 return Image.fromarray(img_data, "RGBA") 

118 

119 return worker(generate) 

120 

121 

122@dataclass 

123class AutoScaleMinMax: 

124 min: float 

125 max: float 

126 

127 

128def autoscale_min_max(data): 

129 flat = data.flatten() 

130 filtered_data = flat[flat != -1] 

131 std_dev = np.std(filtered_data) 

132 mean = np.mean(filtered_data) 

133 

134 return AutoScaleMinMax(min=float(mean - 3 * std_dev), max=float(mean + 3 * std_dev)) 

135 

136 

137def generate_histogram(data, autoscale: bool = False): 

138 """Generates a histogram of map data 

139 

140 Args: 

141 data(list): The XRF map data 

142 autoscale(bool): Whether to autoscale the histogram (bins = mean +/- 3sig) 

143 

144 Returns: 

145 data: (dict(list)): The histogram, bins, and widths 

146 """ 

147 

148 def generate(): 

149 nonlocal autoscale 

150 

151 ndata = np.array(data) 

152 # Remove -1 (transparent) pixels as they skew the histogram 

153 rdata = ndata[ndata != -1] 

154 

155 if autoscale: 

156 values = autoscale_min_max(rdata) 

157 rdata = rdata[rdata >= values.min] 

158 rdata = rdata[rdata <= values.max] 

159 

160 try: 

161 hist, bins = np.histogram(rdata, bins=50) 

162 center = (bins[:-1] + bins[1:]) / 2 

163 width = np.diff(bins) 

164 

165 # TODO: This should not happen 

166 except (OverflowError, ValueError): 

167 hist = [] 

168 center = [] 

169 width = [] 

170 

171 return {"hist": hist, "bins": center, "width": width} 

172 

173 return worker(generate) 

174 

175 

176def gunzip_json(bytes_obj): 

177 """Un gzips a bytes object and load into json 

178 

179 Args: 

180 bytes_obj(BytesIO): The compressed data 

181 

182 Returns: 

183 data(dict): The decoded json as a python object 

184 """ 

185 if not bytes_obj: 

186 return [] 

187 

188 in_ = io.BytesIO() 

189 in_.write(bytes_obj) 

190 in_.seek(0) 

191 with gzip.GzipFile(fileobj=in_, mode="rb") as fo: 

192 gunzipped_bytes_obj = fo.read() 

193 

194 return json.loads(gunzipped_bytes_obj.decode()) 

195 

196 

197def gzip_json(obj): 

198 """Gzips a json dump of a python object 

199 

200 Args: 

201 obj(dict): An object 

202 

203 Returns: 

204 data(BytesIO): The binary compressed data 

205 """ 

206 json_str = json.dumps(obj) 

207 

208 out = io.BytesIO() 

209 with gzip.GzipFile(fileobj=out, mode="w") as fo: 

210 fo.write(json_str.encode()) 

211 

212 return out.getvalue()