Coverage for /opt/conda/envs/apienv/lib/python3.11/site-packages/daiquiri/implementors/imageviewer/createmap.py: 10%

91 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2025-03-29 02:12 +0000

1import math 

2 

3import numpy as np 

4 

5from daiquiri.core.components import ComponentActor 

6from daiquiri.core.utils import worker 

7 

8 

9class CreatemapActor(ComponentActor): 

10 name = "createmap" 

11 

12 def reconstruct_patch(self, datacollection, scalars, spectra, rois): 

13 steps_per_patch_x = int(datacollection["steps_x"] / datacollection["patchesx"]) 

14 steps_per_patch_y = int(datacollection["steps_y"] / datacollection["patchesy"]) 

15 

16 first = spectra[0] 

17 evperbin = first["conversion"]["scale"] if first else 1 

18 offset = first["conversion"]["zero"] if first else 0 

19 

20 maps = [] 

21 

22 for detector in first["data"]: 

23 roi_maps = [] 

24 for roi in rois: 

25 full_roi_map = np.full( 

26 (datacollection["steps_y"], datacollection["steps_x"]), -1 

27 ) 

28 

29 for scan_number, scan in enumerate(spectra): 

30 patch_y, patch_x = np.unravel_index( 

31 scan_number, 

32 (datacollection["patchesy"], datacollection["patchesx"]), 

33 ) 

34 

35 mca = scan["data"][detector]["data"] if scan else [] 

36 roi_start = max(0, math.floor((roi["start"] - offset) / evperbin)) 

37 roi_end = min( 

38 math.ceil((roi["end"] - offset) / evperbin), len(mca[0]) 

39 ) 

40 mask = np.zeros(len(mca[0])) 

41 mask[roi_start:roi_end] = 1.0 

42 points = np.sum(mca * mask, axis=1) 

43 scan_map = np.full((steps_per_patch_y * steps_per_patch_x), -1) 

44 scan_map[0 : len(points)] = points 

45 scan_map.shape = (steps_per_patch_y, steps_per_patch_x) 

46 

47 full_roi_map[ 

48 patch_y * steps_per_patch_y : patch_y * steps_per_patch_y 

49 + steps_per_patch_y, 

50 patch_x * steps_per_patch_x : patch_x * steps_per_patch_x 

51 + steps_per_patch_x, 

52 ] = scan_map 

53 

54 roi_maps.append( 

55 { 

56 "maproiid": roi["maproiid"], 

57 "data": full_roi_map.flatten().tolist(), 

58 } 

59 ) 

60 

61 maps.append({"det": detector, "maps": roi_maps}) 

62 

63 return maps 

64 

65 def method( 

66 self, 

67 group=False, 

68 datacollection=None, 

69 datacollectionid=None, 

70 datacollectionnumber=None, 

71 subsampleid=None, 

72 sum_detectors=False, 

73 **kwargs 

74 ): 

75 if group: 

76 return self.reconstruct_patch(datacollection, **kwargs) 

77 

78 evperbin = kwargs["spectra"]["conversion"]["scale"] 

79 offset = kwargs["spectra"]["conversion"]["zero"] 

80 

81 spectra = kwargs["spectra"]["data"] 

82 

83 def generate(): 

84 maps = [] 

85 summed_maps = {} 

86 for detector in spectra: 

87 roi_maps = [] 

88 mca = kwargs["spectra"]["data"][detector]["data"] 

89 

90 for r in kwargs["rois"]: 

91 roi_map = np.full((len(mca)), -1) 

92 roi_start = max(0, math.floor((r["start"] - offset) / evperbin)) 

93 roi_end = min( 

94 math.ceil((r["end"] - offset) / evperbin), len(mca[0]) 

95 ) 

96 

97 mask = np.zeros(len(mca[0])) 

98 mask[roi_start:roi_end] = 1.0 

99 points = np.sum(mca * mask, axis=1) 

100 roi_map[0 : len(points)] = points 

101 

102 roi_maps.append( 

103 {"maproiid": r["maproiid"], "data": roi_map.tolist()} 

104 ) 

105 

106 if r["maproiid"] not in summed_maps: 

107 summed_maps[r["maproiid"]] = np.zeros(roi_map.shape) 

108 # Data from different spectra could be of different lengths 

109 if len(roi_map) < len(summed_maps[r["maproiid"]]): 

110 roi_map.resize(len(summed_maps[r["maproiid"]])) 

111 if len(roi_map) > len(summed_maps[r["maproiid"]]): 

112 roi_map = roi_map[: len(summed_maps[r["maproiid"]])] 

113 summed_maps[r["maproiid"]] += roi_map 

114 

115 maps.append({"det": detector, "maps": roi_maps}) 

116 

117 config = self.get_config() 

118 sum_detectors_config = config.get("sum_detectors") 

119 if sum_detectors or sum_detectors_config: 

120 return [ 

121 { 

122 "det": "summed", 

123 "maps": [ 

124 {"maproiid": maproiid, "data": roi_map.tolist()} 

125 for maproiid, roi_map in summed_maps.items() 

126 ], 

127 } 

128 ] 

129 

130 return maps 

131 

132 return worker(generate) 

133 

134 

135if __name__ == "__main__": 

136 patches_x = 3 

137 patches_y = 2 

138 steps_x = 12 

139 steps_y = 6 

140 

141 steps_per_patch_x = int(steps_x / patches_x) 

142 steps_per_patch_y = int(steps_y / patches_y) 

143 

144 print("spp", steps_per_patch_x, steps_per_patch_y) 

145 

146 scans = [] 

147 for i in range(patches_x * patches_y): 

148 offset = (i + 1) * 100 

149 a = np.array(range(offset, offset + steps_per_patch_x * steps_per_patch_y)) 

150 a.shape = (steps_per_patch_y, steps_per_patch_x) 

151 # print(i, a) 

152 scans.append(a) 

153 

154 full_scan = np.full((steps_y, steps_x), -1) 

155 for scanid, scan in enumerate(scans): 

156 scan_pos_x = scanid % patches_x 

157 scan_pos_y = math.floor(scanid / patches_x) 

158 offset_x = scan_pos_x * steps_per_patch_x 

159 offset_y = scan_pos_y * steps_per_patch_y 

160 

161 print(scanid, scan_pos_x, scan_pos_y, offset_x, offset_y) 

162 

163 full_scan[ 

164 offset_y : offset_y + steps_per_patch_y, 

165 offset_x : offset_x + steps_per_patch_x, 

166 ] = scan 

167 

168 print(full_scan) 

169 

170 print(full_scan.flatten())