Coverage for /opt/conda/envs/apienv/lib/python3.11/site-packages/daiquiri/implementors/imageviewer/createmap.py: 10%
91 statements
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-29 02:12 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-29 02:12 +0000
1import math
3import numpy as np
5from daiquiri.core.components import ComponentActor
6from daiquiri.core.utils import worker
9class CreatemapActor(ComponentActor):
10 name = "createmap"
12 def reconstruct_patch(self, datacollection, scalars, spectra, rois):
13 steps_per_patch_x = int(datacollection["steps_x"] / datacollection["patchesx"])
14 steps_per_patch_y = int(datacollection["steps_y"] / datacollection["patchesy"])
16 first = spectra[0]
17 evperbin = first["conversion"]["scale"] if first else 1
18 offset = first["conversion"]["zero"] if first else 0
20 maps = []
22 for detector in first["data"]:
23 roi_maps = []
24 for roi in rois:
25 full_roi_map = np.full(
26 (datacollection["steps_y"], datacollection["steps_x"]), -1
27 )
29 for scan_number, scan in enumerate(spectra):
30 patch_y, patch_x = np.unravel_index(
31 scan_number,
32 (datacollection["patchesy"], datacollection["patchesx"]),
33 )
35 mca = scan["data"][detector]["data"] if scan else []
36 roi_start = max(0, math.floor((roi["start"] - offset) / evperbin))
37 roi_end = min(
38 math.ceil((roi["end"] - offset) / evperbin), len(mca[0])
39 )
40 mask = np.zeros(len(mca[0]))
41 mask[roi_start:roi_end] = 1.0
42 points = np.sum(mca * mask, axis=1)
43 scan_map = np.full((steps_per_patch_y * steps_per_patch_x), -1)
44 scan_map[0 : len(points)] = points
45 scan_map.shape = (steps_per_patch_y, steps_per_patch_x)
47 full_roi_map[
48 patch_y * steps_per_patch_y : patch_y * steps_per_patch_y
49 + steps_per_patch_y,
50 patch_x * steps_per_patch_x : patch_x * steps_per_patch_x
51 + steps_per_patch_x,
52 ] = scan_map
54 roi_maps.append(
55 {
56 "maproiid": roi["maproiid"],
57 "data": full_roi_map.flatten().tolist(),
58 }
59 )
61 maps.append({"det": detector, "maps": roi_maps})
63 return maps
65 def method(
66 self,
67 group=False,
68 datacollection=None,
69 datacollectionid=None,
70 datacollectionnumber=None,
71 subsampleid=None,
72 sum_detectors=False,
73 **kwargs
74 ):
75 if group:
76 return self.reconstruct_patch(datacollection, **kwargs)
78 evperbin = kwargs["spectra"]["conversion"]["scale"]
79 offset = kwargs["spectra"]["conversion"]["zero"]
81 spectra = kwargs["spectra"]["data"]
83 def generate():
84 maps = []
85 summed_maps = {}
86 for detector in spectra:
87 roi_maps = []
88 mca = kwargs["spectra"]["data"][detector]["data"]
90 for r in kwargs["rois"]:
91 roi_map = np.full((len(mca)), -1)
92 roi_start = max(0, math.floor((r["start"] - offset) / evperbin))
93 roi_end = min(
94 math.ceil((r["end"] - offset) / evperbin), len(mca[0])
95 )
97 mask = np.zeros(len(mca[0]))
98 mask[roi_start:roi_end] = 1.0
99 points = np.sum(mca * mask, axis=1)
100 roi_map[0 : len(points)] = points
102 roi_maps.append(
103 {"maproiid": r["maproiid"], "data": roi_map.tolist()}
104 )
106 if r["maproiid"] not in summed_maps:
107 summed_maps[r["maproiid"]] = np.zeros(roi_map.shape)
108 # Data from different spectra could be of different lengths
109 if len(roi_map) < len(summed_maps[r["maproiid"]]):
110 roi_map.resize(len(summed_maps[r["maproiid"]]))
111 if len(roi_map) > len(summed_maps[r["maproiid"]]):
112 roi_map = roi_map[: len(summed_maps[r["maproiid"]])]
113 summed_maps[r["maproiid"]] += roi_map
115 maps.append({"det": detector, "maps": roi_maps})
117 config = self.get_config()
118 sum_detectors_config = config.get("sum_detectors")
119 if sum_detectors or sum_detectors_config:
120 return [
121 {
122 "det": "summed",
123 "maps": [
124 {"maproiid": maproiid, "data": roi_map.tolist()}
125 for maproiid, roi_map in summed_maps.items()
126 ],
127 }
128 ]
130 return maps
132 return worker(generate)
135if __name__ == "__main__":
136 patches_x = 3
137 patches_y = 2
138 steps_x = 12
139 steps_y = 6
141 steps_per_patch_x = int(steps_x / patches_x)
142 steps_per_patch_y = int(steps_y / patches_y)
144 print("spp", steps_per_patch_x, steps_per_patch_y)
146 scans = []
147 for i in range(patches_x * patches_y):
148 offset = (i + 1) * 100
149 a = np.array(range(offset, offset + steps_per_patch_x * steps_per_patch_y))
150 a.shape = (steps_per_patch_y, steps_per_patch_x)
151 # print(i, a)
152 scans.append(a)
154 full_scan = np.full((steps_y, steps_x), -1)
155 for scanid, scan in enumerate(scans):
156 scan_pos_x = scanid % patches_x
157 scan_pos_y = math.floor(scanid / patches_x)
158 offset_x = scan_pos_x * steps_per_patch_x
159 offset_y = scan_pos_y * steps_per_patch_y
161 print(scanid, scan_pos_x, scan_pos_y, offset_x, offset_y)
163 full_scan[
164 offset_y : offset_y + steps_per_patch_y,
165 offset_x : offset_x + steps_per_patch_x,
166 ] = scan
168 print(full_scan)
170 print(full_scan.flatten())