Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/implementors/imageviewer/loiscan.py: 35%
109 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import logging
4import math
6import mmh3
7import numpy
8from marshmallow import fields, validate, validates_schema, ValidationError
10from bliss.config.static import get_config
11from bliss.common.scans import ascan
12from bliss.scanning.group import Sequence
13from bliss.scanning.chain import AcquisitionChannel
15from daiquiri.core.utils import format_eng, to_wavelength, to_unit
16from daiquiri.core.components import ComponentActor, ComponentActorSchema
17from daiquiri.core.schema.components import RegionSchema
18from daiquiri.core.hardware.bliss.session import *
19from daiquiri.core.hardware.blissdata.helpers import get_scan_key
21from .createmap import CreatemapActor
22from .beamlineparams import BeamlineParamsSchema
24cfg = get_config()
25logger = logging.getLogger(__name__)
28class LoiscanSchema(ComponentActorSchema):
29 subsampleid = fields.Int(required=True)
30 steps = fields.Int(
31 required=True,
32 metadata={"title": "Steps"},
33 validate=validate.Range(min=2),
34 )
35 step_size = fields.Str(
36 metadata={"title": "Size", "readOnly": True},
37 )
38 regions = fields.List(
39 fields.Nested(RegionSchema),
40 metadata={
41 "title": "Energy Regions",
42 "minItems": 1,
43 "maxItems": 1,
44 },
45 required=True,
46 )
47 reference = fields.Float(
48 metadata={"title": "Energy Reference", "unit": "keV"},
49 validate=validate.Range(min=0.1),
50 required=True,
51 )
52 # repeats = fields.Int(
53 # required=True,
54 # metadata={"title": "No. Repeats"},
55 # validate=validate.Range(min=1),
56 # dump_default=1,
57 # )
58 beamlineparams = fields.Nested(
59 BeamlineParamsSchema, metadata={"title": "Beamline Parameters"}
60 )
61 enqueue = fields.Bool(metadata={"title": "Queue Scan"}, dump_default=True)
63 @validates_schema
64 def schema_validate(self, data, **kwargs):
65 objs = data.get("objects")
66 if objs:
67 if len(objs) > 1 and data.get("enqueue") is False:
68 raise ValidationError(
69 f"Can only queue scan when more than one object is selected. {len(objs)} objects selected"
70 )
72 def calculated(self, data):
73 calculated = {}
74 if data.get("objects"):
75 steps = []
76 for obj in data["objects"]:
77 length = math.sqrt(
78 math.pow(obj["x2"] - obj["x"], 2)
79 + math.pow(obj["y2"] - obj["y"], 2)
80 )
81 eng = format_eng(length * 1e-9 / (data["steps"] - 1))
82 steps.append(f"{eng['scalar']:.2f} {eng['prefix']}m")
83 calculated["step_size"] = ", ".join(steps)
85 return calculated
87 class Meta:
88 uiorder = [
89 "subsampleid",
90 "reference",
91 "regions",
92 "steps",
93 "step_size",
94 # "repeats",
95 "beamlineparams",
96 "enqueue",
97 ]
98 uischema = {
99 "subsampleid": {"classNames": "hidden-row", "ui:widget": "hidden"},
100 "enqueue": {"classNames": "hidden-row", "ui:widget": "hidden"},
101 "regions": {"ui:field": "arrayTable"},
102 "beamlineparams": {"ui:field": "optionalParams"},
103 }
105 uigroups = [
106 "subsampleid",
107 {"Steps": ["steps", "step_size"]},
108 "reference",
109 "regions",
110 # "repeats",
111 "beamlineparams",
112 "enqueue",
113 ]
116class LoiscanActor(ComponentActor):
117 schema = LoiscanSchema
118 metatype = "Energy scan"
119 name = "loiscan"
121 def method(self, **kwargs):
122 print("Add loiscan", kwargs)
124 print("moving to loi")
125 kwargs["absol"]["move_to"](kwargs["absol"])
127 print("moving to addiional positions")
128 kwargs["absol"]["move_to_additional"](kwargs["absol"]["positions"])
130 print("capture params and image")
131 kwargs["before_scan_starts"](self)
133 axes = kwargs["absol"]["axes"]
134 step_size_x = (axes["x"]["destination"][1] - axes["x"]["destination"][0]) / (
135 kwargs["steps"] - 1
136 )
137 step_size_y = (axes["y"]["destination"][1] - axes["y"]["destination"][0]) / (
138 kwargs["steps"] - 1
139 )
141 points = int(
142 (kwargs["regions"][0]["end_e"] - kwargs["regions"][0]["start_e"])
143 / kwargs["regions"][0]["step"]
144 )
146 mca = cfg.get("simu1")
147 mca.block_size = 10
149 seq = Sequence()
150 seq.add_custom_channel(AcquisitionChannel("avg_energy", float, ()))
152 rois = kwargs["get_rois"]()["rois"]
153 roi_data = {}
154 for roi in rois:
155 seq.add_custom_channel(AcquisitionChannel(f"avg_{roi['name']}", float, ()))
156 roi_data[roi["maproiid"]] = {"name": roi["name"], "data": []}
158 diodes = []
159 for i in range(4):
160 diodes.append(cfg.get(f"diode{i+2}"))
162 with seq.sequence_context() as scan_seq:
163 kwargs["update_datacollection"](
164 self,
165 datacollectionnumber=mmh3.hash(get_scan_key(seq)) & 0xFFFFFFFF,
166 imagecontainersubpath="1.1/measurement",
167 exposuretime=kwargs["regions"][0]["dwell"],
168 numberofimages=points,
169 dx_mm=to_unit(
170 step_size_x, axes["x"]["motor"].get("unit") or "mm", "mm"
171 ),
172 dy_mm=to_unit(
173 step_size_y, axes["y"]["motor"].get("unit") or "mm", "mm"
174 ),
175 steps_x=kwargs["steps"],
176 steps_y=kwargs["steps"],
177 # numberofpasses=kwargs["repeats"],
178 wavelength=to_wavelength(kwargs["reference"] * 1e3),
179 emit_start=True,
180 )
182 for step_id in range(kwargs["steps"]):
183 axes["x"]["motor"].move(
184 axes["x"]["destination"][0] + step_id * step_size_x
185 )
186 axes["y"]["motor"].move(
187 axes["y"]["destination"][0] + step_id * step_size_y
188 )
190 scan = ascan(
191 omega,
192 kwargs["reference"],
193 kwargs["reference"] + kwargs["regions"][0]["end_e"] / 1000,
194 # Intervals vs points !
195 points - 1,
196 kwargs["regions"][0]["dwell"],
197 diode,
198 diodes[0],
199 diodes[1],
200 diodes[2],
201 diodes[3],
202 mca,
203 )
204 scan_seq.add(scan)
206 scan_number = mmh3.hash(get_scan_key(scan)) & 0xFFFFFFFF
207 spectra = kwargs["scans"].get_scan_spectra(scan_number, allpoints=True)
209 mapsActor = CreatemapActor()
210 maps = mapsActor.method(spectra=spectra, rois=rois)
212 if maps:
213 for j, mroi in enumerate(maps[0]["maps"]):
214 roi_data[mroi["maproiid"]]["data"].append(mroi["data"])
216 if j == 0:
217 kwargs["add_scanqualityindicators"](
218 self, point=step_id + 1, total=sum(mroi["data"])
219 )
221 avg_len = 0
222 avgs = {}
223 for roi in roi_data.values():
224 avg = numpy.average(roi["data"], axis=0)
225 avg_len = len(avg)
226 seq.custom_channels[f"avg_{roi['name']}"].emit(avg)
227 avgs[roi["name"]] = avg
229 energy_counter = "axis:omega"
230 energy = []
231 scalars = kwargs["scans"].get_scan_data(
232 scan_number, per_page=1e10, scalars=["axis:omega"]
233 )
234 if energy_counter in scalars["data"]:
235 energy = scalars["data"][energy_counter]["data"]
236 else:
237 logger.warning("Cannot find energy counter")
238 energy = range(avg_len)
240 seq.custom_channels["avg_energy"].emit(energy)
242 with kwargs["open_attachment"](
243 self, "xy", suffix="_average", ext="asc"
244 ) as avgf:
245 headers = "\t".join(avgs.keys())
246 data = numpy.array([energy] + list(avgs.values())).T
247 numpy.savetxt(
248 avgf,
249 data,
250 delimiter="\t",
251 header=f"{energy_counter}\t{headers}",
252 )