Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/implementors/imageviewer/fluoxas.py: 36%

113 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-15 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3from decimal import Decimal, getcontext 

4from marshmallow import fields, validate, validates_schema, ValidationError 

5 

6from bliss.config.static import get_config 

7from bliss.common.scans import amesh 

8from bliss.scanning.group import Sequence 

9from bliss.scanning.chain import AcquisitionChannel 

10from bliss.scanning.scan import ScanState 

11 

12import numpy 

13import gevent 

14import mmh3 

15 

16from daiquiri.core.components import ( 

17 ComponentActor, 

18 ComponentActorSchema, 

19 ComponentActorKilled, 

20) 

21from daiquiri.core.schema.components import RegionSchema 

22from daiquiri.core.utils import to_wavelength 

23from daiquiri.core.hardware.bliss.session import * 

24from daiquiri.core.hardware.blissdata.helpers import get_scan_key 

25 

26from .createmap import CreatemapActor 

27from .beamlineparams import BeamlineParamsSchema 

28 

29 

30getcontext().prec = 8 

31 

32cfg = get_config() 

33 

34 

35class FluoxasSchema(ComponentActorSchema): 

36 subsampleid = fields.Int(required=True) 

37 steps_x = fields.Str(metadata={"title": "Steps Horizontally", "readOnly": True}) 

38 step_size_x = fields.Float( 

39 required=True, 

40 metadata={"title": "Step Size Horizontally", "unit": "um"}, 

41 validate=validate.Range(min=0.1), 

42 ) 

43 steps_y = fields.Str(metadata={"title": "Steps Vertically", "readOnly": True}) 

44 step_size_y = fields.Float( 

45 required=True, 

46 metadata={"title": "Step Size Vertically", "unit": "um"}, 

47 validate=validate.Range(min=0.1), 

48 ) 

49 reference = fields.Float( 

50 metadata={ 

51 "title": "Energy Reference", 

52 "unit": "keV", 

53 }, 

54 validate=validate.Range(min=0.1), 

55 required=True, 

56 ) 

57 regions = fields.List( 

58 fields.Nested(RegionSchema), 

59 metadata={"title": "Energy Regions", "minItems": 1}, 

60 required=True, 

61 ) 

62 beamlineparams = fields.Nested( 

63 BeamlineParamsSchema, metadata={"title": "Beamline Parameters"} 

64 ) 

65 enqueue = fields.Bool(metadata={"title": "Queue Scan"}, dump_default=True) 

66 

67 def _steps(self, start, end, step_size, base_unit=1e-9): 

68 return abs( 

69 (Decimal(end) - Decimal(start)) 

70 * Decimal(base_unit) 

71 / Decimal(step_size) 

72 / Decimal(1e-6) 

73 ) 

74 

75 @validates_schema 

76 def schema_validate(self, data, **kwargs): 

77 intervals = [["step_size_x", "x", "x2"], ["step_size_y", "y", "y2"]] 

78 

79 for keys in intervals: 

80 if data.get("objects"): 

81 for obj in data.get("objects"): 

82 steps = self._steps(obj[keys[1]], obj[keys[2]], data[keys[0]]) 

83 if not steps == steps.to_integral_value(): 

84 raise ValidationError( 

85 f"{keys[0]} must be an integer value: {steps}" 

86 ) 

87 

88 def calculated(self, data, **kwargs): 

89 calculated = {} 

90 

91 intervals = { 

92 "steps_x": ["step_size_x", "x", "x2"], 

93 "steps_y": ["step_size_y", "y", "y2"], 

94 } 

95 

96 for iv, keys in intervals.items(): 

97 if data.get(keys[0]): 

98 steps = [] 

99 if data.get("objects"): 

100 for obj in data["objects"]: 

101 step = self._steps(obj[keys[1]], obj[keys[2]], data[keys[0]]) 

102 step = ( 

103 step.to_integral_value() 

104 if step.to_integral_value() == step 

105 else round(step, 2) 

106 ) 

107 

108 steps.append(step) 

109 

110 calculated[iv] = ", ".join(map(str, steps)) 

111 

112 return calculated 

113 

114 class Meta: 

115 uiorder = [ 

116 "subsampleid", 

117 "step_size_x", 

118 "step_size_y", 

119 "steps_x", 

120 "steps_y", 

121 "reference", 

122 "regions", 

123 "beamlineparams", 

124 "enqueue", 

125 ] 

126 uischema = { 

127 "subsampleid": {"classNames": "hidden-row", "ui:widget": "hidden"}, 

128 "interval_x": {"ui:readonly": True}, 

129 "interval_y": {"ui:readonly": True}, 

130 "regions": {"ui:field": "arrayTable"}, 

131 "beamlineparams": {"ui:field": "optionalParams"}, 

132 } 

133 

134 

135class FluoxasActor(ComponentActor): 

136 schema = FluoxasSchema 

137 metatype = "XRF map xas" 

138 name = "fluoxas" 

139 

140 def method(self, **kwargs): 

141 print("moving to roi") 

142 kwargs["absol"]["move_to"](kwargs["absol"]) 

143 

144 print("moving to additional positions") 

145 kwargs["absol"]["move_to_additional"](kwargs["absol"]["positions"]) 

146 

147 mca = cfg.get("simu1") 

148 

149 axes = kwargs["absol"]["axes"] 

150 steps_x = int( 

151 round( 

152 self.schema()._steps( 

153 axes["x"]["destination"][0], 

154 axes["x"]["destination"][1], 

155 kwargs["step_size_x"], 

156 base_unit=axes["x"]["unit_exponent"], 

157 ) 

158 ) 

159 ) 

160 steps_y = int( 

161 round( 

162 self.schema()._steps( 

163 axes["y"]["destination"][0], 

164 axes["y"]["destination"][1], 

165 kwargs["step_size_y"], 

166 base_unit=axes["y"]["unit_exponent"], 

167 ) 

168 ) 

169 ) 

170 

171 step_size_x = ( 

172 axes["x"]["destination"][1] - axes["x"]["destination"][0] 

173 ) / steps_x 

174 step_size_y = ( 

175 axes["y"]["destination"][1] - axes["y"]["destination"][0] 

176 ) / steps_y 

177 

178 print("calculated steps to be", steps_x, steps_y) 

179 

180 print("capture params and image") 

181 kwargs["before_scan_starts"](self) 

182 

183 seq = Sequence() 

184 seq.add_custom_channel(AcquisitionChannel("sum_energy", float, ())) 

185 

186 rois = kwargs["get_rois"]()["rois"] 

187 roi_by_id = {} 

188 for roi in rois: 

189 seq.add_custom_channel(AcquisitionChannel(f"sum_{roi['name']}", int, ())) 

190 roi_by_id[roi["maproiid"]] = roi["name"] 

191 

192 first_dc = self["datacollectionid"] 

193 

194 with seq.sequence_context() as scan_seq: 

195 point = 0 

196 

197 for region in kwargs["regions"]: 

198 intervals = ( 

199 int( 

200 round( 

201 (region["end_e"] - region["start_e"]) 

202 / (region["step"] if region["step"] > 0 else 1) 

203 ) 

204 ) 

205 # Want to go to end of energy range 

206 + 1 

207 ) 

208 

209 for i in range(intervals): 

210 energy = ( 

211 (kwargs["reference"] * 1000) 

212 + region["start_e"] 

213 + (i * region["step"]) 

214 ) 

215 

216 mesh = amesh( 

217 axes["x"]["motor"].object(), 

218 axes["x"]["destination"][0] + step_size_x / 2, 

219 axes["x"]["destination"][1] - step_size_x / 2, 

220 steps_x - 1, 

221 axes["y"]["motor"].object(), 

222 axes["y"]["destination"][0] + step_size_y / 2, 

223 axes["y"]["destination"][1] - step_size_y / 2, 

224 steps_y - 1, 

225 region["dwell"], 

226 mca, 

227 diode, 

228 run=False, 

229 ) 

230 

231 scan_seq.add(mesh) 

232 

233 greenlet = gevent.spawn(mesh.run) 

234 mesh.wait_state(ScanState.STARTING) 

235 

236 scan_number = mmh3.hash(get_scan_key(mesh)) & 0xFFFFFFFF 

237 kwargs["update_datacollection"]( 

238 self, 

239 datacollectionnumber=scan_number, 

240 imagecontainersubpath=f"{i+2}.1/measurement", 

241 dx_mm=kwargs["step_size_x"] * 1e-3, 

242 dy_mm=kwargs["step_size_y"] * 1e-3, 

243 numberofimages=steps_x * steps_y, 

244 exposuretime=region["dwell"], 

245 wavelength=to_wavelength(energy), 

246 steps_x=steps_x, 

247 steps_y=steps_y, 

248 orientation="horizontal", 

249 ) 

250 

251 try: 

252 greenlet.join() 

253 except ComponentActorKilled: 

254 greenlet.kill() 

255 raise 

256 else: 

257 spectra = kwargs["scans"].get_scan_spectra( 

258 scan_number, allpoints=True 

259 ) 

260 

261 mapsActor = CreatemapActor() 

262 maps = mapsActor.method(spectra=spectra, rois=rois) 

263 

264 seq.custom_channels["sum_energy"].emit([energy / 1000]) 

265 for j, mroi in enumerate(maps[0]["maps"]): 

266 xanes = sum(mroi["data"]) 

267 name = roi_by_id[mroi["maproiid"]] 

268 seq.custom_channels[f"sum_{name}"].emit([xanes]) 

269 

270 if j == 0: 

271 kwargs["add_scanqualityindicators"]( 

272 self, 

273 point=point + 1, 

274 datacollectionid=first_dc, 

275 total=xanes, 

276 ) 

277 

278 point += 1 

279 

280 kwargs["generate_maps"]( 

281 self["subsampleid"], self["datacollectionid"] 

282 ) 

283 

284 if not (region == kwargs["regions"][-1] and i == intervals - 1): 

285 kwargs["next_datacollection"](self, data=True, grid=True)