Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/implementors/imageviewer/roiscan.py: 47%
120 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import logging
4from decimal import Decimal, getcontext, ROUND_DOWN
5from marshmallow import fields, validate, validates_schema, ValidationError
7from bliss.config.static import get_config
8from bliss.common.scans import amesh
9from bliss.scanning.scan import ScanState
10import gevent
11import mmh3
14from daiquiri.core.components import (
15 ComponentActor,
16 ComponentActorKilled,
17 ComponentActorSchema,
18)
19from daiquiri.core.utils import to_wavelength
20from daiquiri.core.hardware.bliss.session import *
21from daiquiri.core.components.utils.monitor import monitor
22from daiquiri.core.hardware.blissdata.helpers import get_scan_key
24from .beamlineparams import BeamlineParamsSchema
26# TODO: what are the implications of doing this?
27getcontext().prec = 8
29cfg = get_config()
30logger = logging.getLogger(__name__)
33class RoiscanSchema(ComponentActorSchema):
34 subsampleid = fields.Int(required=True)
35 steps_x = fields.Str(metadata={"title": "Vertical", "readOnly": True})
36 step_size_x = fields.Float(
37 required=True,
38 metadata={"title": "Size Vert", "unit": "um"},
39 validate=validate.Range(min=0.1),
40 )
41 steps_y = fields.Str(metadata={"title": "Horizontal", "readOnly": True})
42 step_size_y = fields.Float(
43 required=True,
44 metadata={"title": "Size Horz", "unit": "um"},
45 validate=validate.Range(min=0.1),
46 )
47 dwell = fields.Float(
48 required=True,
49 metadata={"title": "Dwell time", "unit": "s"},
50 validate=validate.Range(min=0.001, max=60),
51 )
52 beamlineparams = fields.Nested(
53 BeamlineParamsSchema, metadata={"title": "Beamline Parameters"}
54 )
55 with_lima = fields.Bool(
56 metadata={"title": "Enable Lima Camera"}, dump_default=False
57 )
58 enqueue = fields.Bool(metadata={"title": "Queue Scan"}, dump_default=True)
59 estimate = fields.Float(dump_default=0)
61 def _steps(self, start, end, step_size, base_unit=1e-9, places=5):
62 steps = abs(
63 (Decimal(end) - Decimal(start))
64 * Decimal(base_unit)
65 / Decimal(step_size)
66 / Decimal(1e-6)
67 )
69 return steps.quantize(Decimal(10) ** -places, rounding=ROUND_DOWN)
71 @validates_schema
72 def schema_validate(self, data, **kwargs):
73 intervals = [["step_size_x", "x", "x2"], ["step_size_y", "y", "y2"]]
75 objs = data.get("objects")
76 if objs:
77 if len(objs) > 1 and data.get("enqueue") is False:
78 raise ValidationError(
79 f"Can only queue scan when more than one object is selected. {len(objs)} objects selected"
80 )
82 for keys in intervals:
83 if objs:
84 for obj in data.get("objects"):
85 steps = self._steps(obj[keys[1]], obj[keys[2]], data[keys[0]])
86 if not steps == steps.to_integral_value():
87 raise ValidationError(
88 f"{keys[0]} must be an integer value: {steps}"
89 )
91 def warnings(self, data, **kwargs):
92 warnings = {}
93 if data.get("objects"):
94 for obj in data.get("objects"):
95 size_x = (obj["x2"] - obj["x"]) * 1e-9 / 1e-6
96 size_y = (obj["y2"] - obj["y"]) * 1e-9 / 1e-6
98 if size_x > 100 or size_y > 100:
99 warnings[
100 obj["subsampleid"]
101 ] = f"Object {obj['subsampleid']} will use stepper rather than piezo as size is {size_x:.0f}x{size_y:.0f} um"
103 return warnings
105 def calculated(self, data, **kwargs):
106 calculated = {}
108 intervals = {
109 "steps_x": ["step_size_x", "x", "x2"],
110 "steps_y": ["step_size_y", "y", "y2"],
111 }
113 for iv, keys in intervals.items():
114 if data.get(keys[0]):
115 steps = []
116 if data.get("objects"):
117 for obj in data["objects"]:
118 step = self._steps(obj[keys[1]], obj[keys[2]], data[keys[0]])
119 # print('calculated', key, step, step.to_integral_value() == step, obj[keys[2]] - obj[keys[1]])
120 step = (
121 step.to_integral_value()
122 if step.to_integral_value() == step
123 else round(step, 2)
124 )
126 steps.append(step)
128 calculated[iv] = ", ".join(map(str, steps))
130 calculated["estimate"] = self.time_estimate(data)
132 return calculated
134 def time_estimate(self, data):
135 fudge = 1.5
136 if data.get("step_size_x") and data.get("step_size_y"):
137 if data.get("objects"):
138 for obj in data["objects"]:
139 steps_x = (obj["x2"] - obj["x"]) * 1e-9 / data["step_size_x"] / 1e-6
140 steps_y = (obj["y2"] - obj["y"]) * 1e-9 / data["step_size_y"] / 1e-6
141 return data["dwell"] * steps_x * steps_y * fudge
143 class Meta:
144 uiorder = [
145 "subsampleid",
146 "dwell",
147 "step_size_x",
148 "step_size_y",
149 "steps_x",
150 "steps_y",
151 "beamlineparams",
152 "with_lima",
153 "enqueue",
154 "estimate",
155 ]
156 uischema = {
157 "subsampleid": {"classNames": "hidden-row", "ui:widget": "hidden"},
158 "enqueue": {"classNames": "hidden-row", "ui:widget": "hidden"},
159 "estimate": {"classNames": "hidden-row", "ui:widget": "hidden"},
160 "steps_x": {"ui:readonly": True},
161 "steps_y": {"ui:readonly": True},
162 "beamlineparams": {"ui:field": "optionalParams"},
163 }
164 uigroups = [
165 "subsampleid",
166 "dwell",
167 {"Steps": ["step_size_x", "step_size_y", "steps_x", "steps_y"]},
168 "beamlineparams",
169 "with_lima",
170 "enqueue",
171 "estimate",
172 ]
175def get_metatype(self, **kwargs):
176 return "XRF xrd map" if kwargs.get("with_lima") else "XRF map"
179class RoiscanActor(ComponentActor):
180 schema = RoiscanSchema
181 name = "roiscan"
182 metatype = get_metatype
183 additional_metadata = {"definition": "XRF_2Dmap"}
185 def method(self, **kwargs):
186 print("Add roiscan", kwargs)
188 print("moving to roi")
189 kwargs["absol"]["move_to"](kwargs["absol"])
191 print("moving to additional positions")
192 kwargs["absol"]["move_to_additional"](kwargs["absol"]["positions"])
194 mca = cfg.get("simu1")
195 lima_simulator = cfg.get("lima_simulator")
197 axes = kwargs["absol"]["axes"]
198 steps_x = int(
199 round(
200 self.schema()._steps(
201 axes["x"]["destination"][0],
202 axes["x"]["destination"][1],
203 kwargs["step_size_x"],
204 base_unit=axes["x"]["unit_exponent"],
205 )
206 )
207 )
208 steps_y = int(
209 round(
210 self.schema()._steps(
211 axes["y"]["destination"][0],
212 axes["y"]["destination"][1],
213 kwargs["step_size_y"],
214 base_unit=axes["y"]["unit_exponent"],
215 )
216 )
217 )
219 step_size_x = (
220 axes["x"]["destination"][1] - axes["x"]["destination"][0]
221 ) / steps_x
222 step_size_y = (
223 axes["y"]["destination"][1] - axes["y"]["destination"][0]
224 ) / steps_y
226 print("calculated steps to be", steps_x, steps_y)
228 print("capture params and image")
229 kwargs["before_scan_starts"](self)
231 detectors = [
232 mca,
233 diode,
234 ]
235 if kwargs["with_lima"]:
236 detectors.append(lima_simulator)
238 mesh = amesh(
239 axes["x"]["motor"].object(),
240 axes["x"]["destination"][0] + step_size_x / 2,
241 axes["x"]["destination"][1] - step_size_x / 2,
242 steps_x - 1,
243 axes["y"]["motor"].object(),
244 axes["y"]["destination"][0] + step_size_y / 2,
245 axes["y"]["destination"][1] - step_size_y / 2,
246 steps_y - 1,
247 kwargs["dwell"],
248 *detectors,
249 run=False,
250 )
252 greenlet = gevent.spawn(mesh.run)
253 mesh.wait_state(ScanState.STARTING)
254 kwargs["update_datacollection"](
255 self,
256 emit_start=True,
257 datacollectionnumber=mmh3.hash(get_scan_key(mesh)) & 0xFFFFFFFF,
258 imagecontainersubpath="1.1/measurement",
259 dx_mm=kwargs["step_size_x"] * 1e-3,
260 dy_mm=kwargs["step_size_y"] * 1e-3,
261 numberofimages=steps_x * steps_y,
262 exposuretime=kwargs["dwell"],
263 wavelength=to_wavelength(10000),
264 steps_x=steps_x,
265 steps_y=steps_y,
266 orientation="horizontal",
267 )
269 print("starting monitor")
270 monitor_greenlet, kill_monitor = monitor(
271 axes["x"]["motor"], "position", greenlet
272 )
274 try:
275 greenlet.join()
276 except ComponentActorKilled:
277 greenlet.kill()
278 raise
279 finally:
280 print("stopping monitor")
281 kill_monitor()
283 monitor_greenlet.get()
285 return greenlet.get()