Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/implementors/imageviewer/fluoxas.py: 36%
113 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from decimal import Decimal, getcontext
4from marshmallow import fields, validate, validates_schema, ValidationError
6from bliss.config.static import get_config
7from bliss.common.scans import amesh
8from bliss.scanning.group import Sequence
9from bliss.scanning.chain import AcquisitionChannel
10from bliss.scanning.scan import ScanState
12import numpy
13import gevent
14import mmh3
16from daiquiri.core.components import (
17 ComponentActor,
18 ComponentActorSchema,
19 ComponentActorKilled,
20)
21from daiquiri.core.schema.components import RegionSchema
22from daiquiri.core.utils import to_wavelength
23from daiquiri.core.hardware.bliss.session import *
24from daiquiri.core.hardware.blissdata.helpers import get_scan_key
26from .createmap import CreatemapActor
27from .beamlineparams import BeamlineParamsSchema
30getcontext().prec = 8
32cfg = get_config()
35class FluoxasSchema(ComponentActorSchema):
36 subsampleid = fields.Int(required=True)
37 steps_x = fields.Str(metadata={"title": "Steps Horizontally", "readOnly": True})
38 step_size_x = fields.Float(
39 required=True,
40 metadata={"title": "Step Size Horizontally", "unit": "um"},
41 validate=validate.Range(min=0.1),
42 )
43 steps_y = fields.Str(metadata={"title": "Steps Vertically", "readOnly": True})
44 step_size_y = fields.Float(
45 required=True,
46 metadata={"title": "Step Size Vertically", "unit": "um"},
47 validate=validate.Range(min=0.1),
48 )
49 reference = fields.Float(
50 metadata={
51 "title": "Energy Reference",
52 "unit": "keV",
53 },
54 validate=validate.Range(min=0.1),
55 required=True,
56 )
57 regions = fields.List(
58 fields.Nested(RegionSchema),
59 metadata={"title": "Energy Regions", "minItems": 1},
60 required=True,
61 )
62 beamlineparams = fields.Nested(
63 BeamlineParamsSchema, metadata={"title": "Beamline Parameters"}
64 )
65 enqueue = fields.Bool(metadata={"title": "Queue Scan"}, dump_default=True)
67 def _steps(self, start, end, step_size, base_unit=1e-9):
68 return abs(
69 (Decimal(end) - Decimal(start))
70 * Decimal(base_unit)
71 / Decimal(step_size)
72 / Decimal(1e-6)
73 )
75 @validates_schema
76 def schema_validate(self, data, **kwargs):
77 intervals = [["step_size_x", "x", "x2"], ["step_size_y", "y", "y2"]]
79 for keys in intervals:
80 if data.get("objects"):
81 for obj in data.get("objects"):
82 steps = self._steps(obj[keys[1]], obj[keys[2]], data[keys[0]])
83 if not steps == steps.to_integral_value():
84 raise ValidationError(
85 f"{keys[0]} must be an integer value: {steps}"
86 )
88 def calculated(self, data, **kwargs):
89 calculated = {}
91 intervals = {
92 "steps_x": ["step_size_x", "x", "x2"],
93 "steps_y": ["step_size_y", "y", "y2"],
94 }
96 for iv, keys in intervals.items():
97 if data.get(keys[0]):
98 steps = []
99 if data.get("objects"):
100 for obj in data["objects"]:
101 step = self._steps(obj[keys[1]], obj[keys[2]], data[keys[0]])
102 step = (
103 step.to_integral_value()
104 if step.to_integral_value() == step
105 else round(step, 2)
106 )
108 steps.append(step)
110 calculated[iv] = ", ".join(map(str, steps))
112 return calculated
114 class Meta:
115 uiorder = [
116 "subsampleid",
117 "step_size_x",
118 "step_size_y",
119 "steps_x",
120 "steps_y",
121 "reference",
122 "regions",
123 "beamlineparams",
124 "enqueue",
125 ]
126 uischema = {
127 "subsampleid": {"classNames": "hidden-row", "ui:widget": "hidden"},
128 "interval_x": {"ui:readonly": True},
129 "interval_y": {"ui:readonly": True},
130 "regions": {"ui:field": "arrayTable"},
131 "beamlineparams": {"ui:field": "optionalParams"},
132 }
135class FluoxasActor(ComponentActor):
136 schema = FluoxasSchema
137 metatype = "XRF map xas"
138 name = "fluoxas"
140 def method(self, **kwargs):
141 print("moving to roi")
142 kwargs["absol"]["move_to"](kwargs["absol"])
144 print("moving to additional positions")
145 kwargs["absol"]["move_to_additional"](kwargs["absol"]["positions"])
147 mca = cfg.get("simu1")
149 axes = kwargs["absol"]["axes"]
150 steps_x = int(
151 round(
152 self.schema()._steps(
153 axes["x"]["destination"][0],
154 axes["x"]["destination"][1],
155 kwargs["step_size_x"],
156 base_unit=axes["x"]["unit_exponent"],
157 )
158 )
159 )
160 steps_y = int(
161 round(
162 self.schema()._steps(
163 axes["y"]["destination"][0],
164 axes["y"]["destination"][1],
165 kwargs["step_size_y"],
166 base_unit=axes["y"]["unit_exponent"],
167 )
168 )
169 )
171 step_size_x = (
172 axes["x"]["destination"][1] - axes["x"]["destination"][0]
173 ) / steps_x
174 step_size_y = (
175 axes["y"]["destination"][1] - axes["y"]["destination"][0]
176 ) / steps_y
178 print("calculated steps to be", steps_x, steps_y)
180 print("capture params and image")
181 kwargs["before_scan_starts"](self)
183 seq = Sequence()
184 seq.add_custom_channel(AcquisitionChannel("sum_energy", float, ()))
186 rois = kwargs["get_rois"]()["rois"]
187 roi_by_id = {}
188 for roi in rois:
189 seq.add_custom_channel(AcquisitionChannel(f"sum_{roi['name']}", int, ()))
190 roi_by_id[roi["maproiid"]] = roi["name"]
192 first_dc = self["datacollectionid"]
194 with seq.sequence_context() as scan_seq:
195 point = 0
197 for region in kwargs["regions"]:
198 intervals = (
199 int(
200 round(
201 (region["end_e"] - region["start_e"])
202 / (region["step"] if region["step"] > 0 else 1)
203 )
204 )
205 # Want to go to end of energy range
206 + 1
207 )
209 for i in range(intervals):
210 energy = (
211 (kwargs["reference"] * 1000)
212 + region["start_e"]
213 + (i * region["step"])
214 )
216 mesh = amesh(
217 axes["x"]["motor"].object(),
218 axes["x"]["destination"][0] + step_size_x / 2,
219 axes["x"]["destination"][1] - step_size_x / 2,
220 steps_x - 1,
221 axes["y"]["motor"].object(),
222 axes["y"]["destination"][0] + step_size_y / 2,
223 axes["y"]["destination"][1] - step_size_y / 2,
224 steps_y - 1,
225 region["dwell"],
226 mca,
227 diode,
228 run=False,
229 )
231 scan_seq.add(mesh)
233 greenlet = gevent.spawn(mesh.run)
234 mesh.wait_state(ScanState.STARTING)
236 scan_number = mmh3.hash(get_scan_key(mesh)) & 0xFFFFFFFF
237 kwargs["update_datacollection"](
238 self,
239 datacollectionnumber=scan_number,
240 imagecontainersubpath=f"{i+2}.1/measurement",
241 dx_mm=kwargs["step_size_x"] * 1e-3,
242 dy_mm=kwargs["step_size_y"] * 1e-3,
243 numberofimages=steps_x * steps_y,
244 exposuretime=region["dwell"],
245 wavelength=to_wavelength(energy),
246 steps_x=steps_x,
247 steps_y=steps_y,
248 orientation="horizontal",
249 )
251 try:
252 greenlet.join()
253 except ComponentActorKilled:
254 greenlet.kill()
255 raise
256 else:
257 spectra = kwargs["scans"].get_scan_spectra(
258 scan_number, allpoints=True
259 )
261 mapsActor = CreatemapActor()
262 maps = mapsActor.method(spectra=spectra, rois=rois)
264 seq.custom_channels["sum_energy"].emit([energy / 1000])
265 for j, mroi in enumerate(maps[0]["maps"]):
266 xanes = sum(mroi["data"])
267 name = roi_by_id[mroi["maproiid"]]
268 seq.custom_channels[f"sum_{name}"].emit([xanes])
270 if j == 0:
271 kwargs["add_scanqualityindicators"](
272 self,
273 point=point + 1,
274 datacollectionid=first_dc,
275 total=xanes,
276 )
278 point += 1
280 kwargs["generate_maps"](
281 self["subsampleid"], self["datacollectionid"]
282 )
284 if not (region == kwargs["regions"][-1] and i == intervals - 1):
285 kwargs["next_datacollection"](self, data=True, grid=True)