Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/implementors/imageviewer/roiscan.py: 47%

120 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3import logging 

4from decimal import Decimal, getcontext, ROUND_DOWN 

5from marshmallow import fields, validate, validates_schema, ValidationError 

6 

7from bliss.config.static import get_config 

8from bliss.common.scans import amesh 

9from bliss.scanning.scan import ScanState 

10import gevent 

11import mmh3 

12 

13 

14from daiquiri.core.components import ( 

15 ComponentActor, 

16 ComponentActorKilled, 

17 ComponentActorSchema, 

18) 

19from daiquiri.core.utils import to_wavelength 

20from daiquiri.core.hardware.bliss.session import * 

21from daiquiri.core.components.utils.monitor import monitor 

22from daiquiri.core.hardware.blissdata.helpers import get_scan_key 

23 

24from .beamlineparams import BeamlineParamsSchema 

25 

26# TODO: what are the implications of doing this? 

27getcontext().prec = 8 

28 

29cfg = get_config() 

30logger = logging.getLogger(__name__) 

31 

32 

33class RoiscanSchema(ComponentActorSchema): 

34 subsampleid = fields.Int(required=True) 

35 steps_x = fields.Str(metadata={"title": "Vertical", "readOnly": True}) 

36 step_size_x = fields.Float( 

37 required=True, 

38 metadata={"title": "Size Vert", "unit": "um"}, 

39 validate=validate.Range(min=0.1), 

40 ) 

41 steps_y = fields.Str(metadata={"title": "Horizontal", "readOnly": True}) 

42 step_size_y = fields.Float( 

43 required=True, 

44 metadata={"title": "Size Horz", "unit": "um"}, 

45 validate=validate.Range(min=0.1), 

46 ) 

47 dwell = fields.Float( 

48 required=True, 

49 metadata={"title": "Dwell time", "unit": "s"}, 

50 validate=validate.Range(min=0.001, max=60), 

51 ) 

52 beamlineparams = fields.Nested( 

53 BeamlineParamsSchema, metadata={"title": "Beamline Parameters"} 

54 ) 

55 with_lima = fields.Bool( 

56 metadata={"title": "Enable Lima Camera"}, dump_default=False 

57 ) 

58 enqueue = fields.Bool(metadata={"title": "Queue Scan"}, dump_default=True) 

59 estimate = fields.Float(dump_default=0) 

60 

61 def _steps(self, start, end, step_size, base_unit=1e-9, places=5): 

62 steps = abs( 

63 (Decimal(end) - Decimal(start)) 

64 * Decimal(base_unit) 

65 / Decimal(step_size) 

66 / Decimal(1e-6) 

67 ) 

68 

69 return steps.quantize(Decimal(10) ** -places, rounding=ROUND_DOWN) 

70 

71 @validates_schema 

72 def schema_validate(self, data, **kwargs): 

73 intervals = [["step_size_x", "x", "x2"], ["step_size_y", "y", "y2"]] 

74 

75 objs = data.get("objects") 

76 if objs: 

77 if len(objs) > 1 and data.get("enqueue") is False: 

78 raise ValidationError( 

79 f"Can only queue scan when more than one object is selected. {len(objs)} objects selected" 

80 ) 

81 

82 for keys in intervals: 

83 if objs: 

84 for obj in data.get("objects"): 

85 steps = self._steps(obj[keys[1]], obj[keys[2]], data[keys[0]]) 

86 if not steps == steps.to_integral_value(): 

87 raise ValidationError( 

88 f"{keys[0]} must be an integer value: {steps}" 

89 ) 

90 

91 def warnings(self, data, **kwargs): 

92 warnings = {} 

93 if data.get("objects"): 

94 for obj in data.get("objects"): 

95 size_x = (obj["x2"] - obj["x"]) * 1e-9 / 1e-6 

96 size_y = (obj["y2"] - obj["y"]) * 1e-9 / 1e-6 

97 

98 if size_x > 100 or size_y > 100: 

99 warnings[ 

100 obj["subsampleid"] 

101 ] = f"Object {obj['subsampleid']} will use stepper rather than piezo as size is {size_x:.0f}x{size_y:.0f} um" 

102 

103 return warnings 

104 

105 def calculated(self, data, **kwargs): 

106 calculated = {} 

107 

108 intervals = { 

109 "steps_x": ["step_size_x", "x", "x2"], 

110 "steps_y": ["step_size_y", "y", "y2"], 

111 } 

112 

113 for iv, keys in intervals.items(): 

114 if data.get(keys[0]): 

115 steps = [] 

116 if data.get("objects"): 

117 for obj in data["objects"]: 

118 step = self._steps(obj[keys[1]], obj[keys[2]], data[keys[0]]) 

119 # print('calculated', key, step, step.to_integral_value() == step, obj[keys[2]] - obj[keys[1]]) 

120 step = ( 

121 step.to_integral_value() 

122 if step.to_integral_value() == step 

123 else round(step, 2) 

124 ) 

125 

126 steps.append(step) 

127 

128 calculated[iv] = ", ".join(map(str, steps)) 

129 

130 calculated["estimate"] = self.time_estimate(data) 

131 

132 return calculated 

133 

134 def time_estimate(self, data): 

135 fudge = 1.5 

136 if data.get("step_size_x") and data.get("step_size_y"): 

137 if data.get("objects"): 

138 for obj in data["objects"]: 

139 steps_x = (obj["x2"] - obj["x"]) * 1e-9 / data["step_size_x"] / 1e-6 

140 steps_y = (obj["y2"] - obj["y"]) * 1e-9 / data["step_size_y"] / 1e-6 

141 return data["dwell"] * steps_x * steps_y * fudge 

142 

143 class Meta: 

144 uiorder = [ 

145 "subsampleid", 

146 "dwell", 

147 "step_size_x", 

148 "step_size_y", 

149 "steps_x", 

150 "steps_y", 

151 "beamlineparams", 

152 "with_lima", 

153 "enqueue", 

154 "estimate", 

155 ] 

156 uischema = { 

157 "subsampleid": {"classNames": "hidden-row", "ui:widget": "hidden"}, 

158 "enqueue": {"classNames": "hidden-row", "ui:widget": "hidden"}, 

159 "estimate": {"classNames": "hidden-row", "ui:widget": "hidden"}, 

160 "steps_x": {"ui:readonly": True}, 

161 "steps_y": {"ui:readonly": True}, 

162 "beamlineparams": {"ui:field": "optionalParams"}, 

163 } 

164 uigroups = [ 

165 "subsampleid", 

166 "dwell", 

167 {"Steps": ["step_size_x", "step_size_y", "steps_x", "steps_y"]}, 

168 "beamlineparams", 

169 "with_lima", 

170 "enqueue", 

171 "estimate", 

172 ] 

173 

174 

175def get_metatype(self, **kwargs): 

176 return "XRF xrd map" if kwargs.get("with_lima") else "XRF map" 

177 

178 

179class RoiscanActor(ComponentActor): 

180 schema = RoiscanSchema 

181 name = "roiscan" 

182 metatype = get_metatype 

183 additional_metadata = {"definition": "XRF_2Dmap"} 

184 

185 def method(self, **kwargs): 

186 print("Add roiscan", kwargs) 

187 

188 print("moving to roi") 

189 kwargs["absol"]["move_to"](kwargs["absol"]) 

190 

191 print("moving to additional positions") 

192 kwargs["absol"]["move_to_additional"](kwargs["absol"]["positions"]) 

193 

194 mca = cfg.get("simu1") 

195 lima_simulator = cfg.get("lima_simulator") 

196 

197 axes = kwargs["absol"]["axes"] 

198 steps_x = int( 

199 round( 

200 self.schema()._steps( 

201 axes["x"]["destination"][0], 

202 axes["x"]["destination"][1], 

203 kwargs["step_size_x"], 

204 base_unit=axes["x"]["unit_exponent"], 

205 ) 

206 ) 

207 ) 

208 steps_y = int( 

209 round( 

210 self.schema()._steps( 

211 axes["y"]["destination"][0], 

212 axes["y"]["destination"][1], 

213 kwargs["step_size_y"], 

214 base_unit=axes["y"]["unit_exponent"], 

215 ) 

216 ) 

217 ) 

218 

219 step_size_x = ( 

220 axes["x"]["destination"][1] - axes["x"]["destination"][0] 

221 ) / steps_x 

222 step_size_y = ( 

223 axes["y"]["destination"][1] - axes["y"]["destination"][0] 

224 ) / steps_y 

225 

226 print("calculated steps to be", steps_x, steps_y) 

227 

228 print("capture params and image") 

229 kwargs["before_scan_starts"](self) 

230 

231 detectors = [ 

232 mca, 

233 diode, 

234 ] 

235 if kwargs["with_lima"]: 

236 detectors.append(lima_simulator) 

237 

238 mesh = amesh( 

239 axes["x"]["motor"].object(), 

240 axes["x"]["destination"][0] + step_size_x / 2, 

241 axes["x"]["destination"][1] - step_size_x / 2, 

242 steps_x - 1, 

243 axes["y"]["motor"].object(), 

244 axes["y"]["destination"][0] + step_size_y / 2, 

245 axes["y"]["destination"][1] - step_size_y / 2, 

246 steps_y - 1, 

247 kwargs["dwell"], 

248 *detectors, 

249 run=False, 

250 ) 

251 

252 greenlet = gevent.spawn(mesh.run) 

253 mesh.wait_state(ScanState.STARTING) 

254 kwargs["update_datacollection"]( 

255 self, 

256 emit_start=True, 

257 datacollectionnumber=mmh3.hash(get_scan_key(mesh)) & 0xFFFFFFFF, 

258 imagecontainersubpath="1.1/measurement", 

259 dx_mm=kwargs["step_size_x"] * 1e-3, 

260 dy_mm=kwargs["step_size_y"] * 1e-3, 

261 numberofimages=steps_x * steps_y, 

262 exposuretime=kwargs["dwell"], 

263 wavelength=to_wavelength(10000), 

264 steps_x=steps_x, 

265 steps_y=steps_y, 

266 orientation="horizontal", 

267 ) 

268 

269 print("starting monitor") 

270 monitor_greenlet, kill_monitor = monitor( 

271 axes["x"]["motor"], "position", greenlet 

272 ) 

273 

274 try: 

275 greenlet.join() 

276 except ComponentActorKilled: 

277 greenlet.kill() 

278 raise 

279 finally: 

280 print("stopping monitor") 

281 kill_monitor() 

282 

283 monitor_greenlet.get() 

284 

285 return greenlet.get()