Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/implementors/imageviewer/loiscan.py: 35%

109 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-15 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3import logging 

4import math 

5 

6import mmh3 

7import numpy 

8from marshmallow import fields, validate, validates_schema, ValidationError 

9 

10from bliss.config.static import get_config 

11from bliss.common.scans import ascan 

12from bliss.scanning.group import Sequence 

13from bliss.scanning.chain import AcquisitionChannel 

14 

15from daiquiri.core.utils import format_eng, to_wavelength, to_unit 

16from daiquiri.core.components import ComponentActor, ComponentActorSchema 

17from daiquiri.core.schema.components import RegionSchema 

18from daiquiri.core.hardware.bliss.session import * 

19from daiquiri.core.hardware.blissdata.helpers import get_scan_key 

20 

21from .createmap import CreatemapActor 

22from .beamlineparams import BeamlineParamsSchema 

23 

24cfg = get_config() 

25logger = logging.getLogger(__name__) 

26 

27 

28class LoiscanSchema(ComponentActorSchema): 

29 subsampleid = fields.Int(required=True) 

30 steps = fields.Int( 

31 required=True, 

32 metadata={"title": "Steps"}, 

33 validate=validate.Range(min=2), 

34 ) 

35 step_size = fields.Str( 

36 metadata={"title": "Size", "readOnly": True}, 

37 ) 

38 regions = fields.List( 

39 fields.Nested(RegionSchema), 

40 metadata={ 

41 "title": "Energy Regions", 

42 "minItems": 1, 

43 "maxItems": 1, 

44 }, 

45 required=True, 

46 ) 

47 reference = fields.Float( 

48 metadata={"title": "Energy Reference", "unit": "keV"}, 

49 validate=validate.Range(min=0.1), 

50 required=True, 

51 ) 

52 # repeats = fields.Int( 

53 # required=True, 

54 # metadata={"title": "No. Repeats"}, 

55 # validate=validate.Range(min=1), 

56 # dump_default=1, 

57 # ) 

58 beamlineparams = fields.Nested( 

59 BeamlineParamsSchema, metadata={"title": "Beamline Parameters"} 

60 ) 

61 enqueue = fields.Bool(metadata={"title": "Queue Scan"}, dump_default=True) 

62 

63 @validates_schema 

64 def schema_validate(self, data, **kwargs): 

65 objs = data.get("objects") 

66 if objs: 

67 if len(objs) > 1 and data.get("enqueue") is False: 

68 raise ValidationError( 

69 f"Can only queue scan when more than one object is selected. {len(objs)} objects selected" 

70 ) 

71 

72 def calculated(self, data): 

73 calculated = {} 

74 if data.get("objects"): 

75 steps = [] 

76 for obj in data["objects"]: 

77 length = math.sqrt( 

78 math.pow(obj["x2"] - obj["x"], 2) 

79 + math.pow(obj["y2"] - obj["y"], 2) 

80 ) 

81 eng = format_eng(length * 1e-9 / (data["steps"] - 1)) 

82 steps.append(f"{eng['scalar']:.2f} {eng['prefix']}m") 

83 calculated["step_size"] = ", ".join(steps) 

84 

85 return calculated 

86 

87 class Meta: 

88 uiorder = [ 

89 "subsampleid", 

90 "reference", 

91 "regions", 

92 "steps", 

93 "step_size", 

94 # "repeats", 

95 "beamlineparams", 

96 "enqueue", 

97 ] 

98 uischema = { 

99 "subsampleid": {"classNames": "hidden-row", "ui:widget": "hidden"}, 

100 "enqueue": {"classNames": "hidden-row", "ui:widget": "hidden"}, 

101 "regions": {"ui:field": "arrayTable"}, 

102 "beamlineparams": {"ui:field": "optionalParams"}, 

103 } 

104 

105 uigroups = [ 

106 "subsampleid", 

107 {"Steps": ["steps", "step_size"]}, 

108 "reference", 

109 "regions", 

110 # "repeats", 

111 "beamlineparams", 

112 "enqueue", 

113 ] 

114 

115 

116class LoiscanActor(ComponentActor): 

117 schema = LoiscanSchema 

118 metatype = "Energy scan" 

119 name = "loiscan" 

120 

121 def method(self, **kwargs): 

122 print("Add loiscan", kwargs) 

123 

124 print("moving to loi") 

125 kwargs["absol"]["move_to"](kwargs["absol"]) 

126 

127 print("moving to addiional positions") 

128 kwargs["absol"]["move_to_additional"](kwargs["absol"]["positions"]) 

129 

130 print("capture params and image") 

131 kwargs["before_scan_starts"](self) 

132 

133 axes = kwargs["absol"]["axes"] 

134 step_size_x = (axes["x"]["destination"][1] - axes["x"]["destination"][0]) / ( 

135 kwargs["steps"] - 1 

136 ) 

137 step_size_y = (axes["y"]["destination"][1] - axes["y"]["destination"][0]) / ( 

138 kwargs["steps"] - 1 

139 ) 

140 

141 points = int( 

142 (kwargs["regions"][0]["end_e"] - kwargs["regions"][0]["start_e"]) 

143 / kwargs["regions"][0]["step"] 

144 ) 

145 

146 mca = cfg.get("simu1") 

147 mca.block_size = 10 

148 

149 seq = Sequence() 

150 seq.add_custom_channel(AcquisitionChannel("avg_energy", float, ())) 

151 

152 rois = kwargs["get_rois"]()["rois"] 

153 roi_data = {} 

154 for roi in rois: 

155 seq.add_custom_channel(AcquisitionChannel(f"avg_{roi['name']}", float, ())) 

156 roi_data[roi["maproiid"]] = {"name": roi["name"], "data": []} 

157 

158 diodes = [] 

159 for i in range(4): 

160 diodes.append(cfg.get(f"diode{i+2}")) 

161 

162 with seq.sequence_context() as scan_seq: 

163 kwargs["update_datacollection"]( 

164 self, 

165 datacollectionnumber=mmh3.hash(get_scan_key(seq)) & 0xFFFFFFFF, 

166 imagecontainersubpath="1.1/measurement", 

167 exposuretime=kwargs["regions"][0]["dwell"], 

168 numberofimages=points, 

169 dx_mm=to_unit( 

170 step_size_x, axes["x"]["motor"].get("unit") or "mm", "mm" 

171 ), 

172 dy_mm=to_unit( 

173 step_size_y, axes["y"]["motor"].get("unit") or "mm", "mm" 

174 ), 

175 steps_x=kwargs["steps"], 

176 steps_y=kwargs["steps"], 

177 # numberofpasses=kwargs["repeats"], 

178 wavelength=to_wavelength(kwargs["reference"] * 1e3), 

179 emit_start=True, 

180 ) 

181 

182 for step_id in range(kwargs["steps"]): 

183 axes["x"]["motor"].move( 

184 axes["x"]["destination"][0] + step_id * step_size_x 

185 ) 

186 axes["y"]["motor"].move( 

187 axes["y"]["destination"][0] + step_id * step_size_y 

188 ) 

189 

190 scan = ascan( 

191 omega, 

192 kwargs["reference"], 

193 kwargs["reference"] + kwargs["regions"][0]["end_e"] / 1000, 

194 # Intervals vs points ! 

195 points - 1, 

196 kwargs["regions"][0]["dwell"], 

197 diode, 

198 diodes[0], 

199 diodes[1], 

200 diodes[2], 

201 diodes[3], 

202 mca, 

203 ) 

204 scan_seq.add(scan) 

205 

206 scan_number = mmh3.hash(get_scan_key(scan)) & 0xFFFFFFFF 

207 spectra = kwargs["scans"].get_scan_spectra(scan_number, allpoints=True) 

208 

209 mapsActor = CreatemapActor() 

210 maps = mapsActor.method(spectra=spectra, rois=rois) 

211 

212 if maps: 

213 for j, mroi in enumerate(maps[0]["maps"]): 

214 roi_data[mroi["maproiid"]]["data"].append(mroi["data"]) 

215 

216 if j == 0: 

217 kwargs["add_scanqualityindicators"]( 

218 self, point=step_id + 1, total=sum(mroi["data"]) 

219 ) 

220 

221 avg_len = 0 

222 avgs = {} 

223 for roi in roi_data.values(): 

224 avg = numpy.average(roi["data"], axis=0) 

225 avg_len = len(avg) 

226 seq.custom_channels[f"avg_{roi['name']}"].emit(avg) 

227 avgs[roi["name"]] = avg 

228 

229 energy_counter = "axis:omega" 

230 energy = [] 

231 scalars = kwargs["scans"].get_scan_data( 

232 scan_number, per_page=1e10, scalars=["axis:omega"] 

233 ) 

234 if energy_counter in scalars["data"]: 

235 energy = scalars["data"][energy_counter]["data"] 

236 else: 

237 logger.warning("Cannot find energy counter") 

238 energy = range(avg_len) 

239 

240 seq.custom_channels["avg_energy"].emit(energy) 

241 

242 with kwargs["open_attachment"]( 

243 self, "xy", suffix="_average", ext="asc" 

244 ) as avgf: 

245 headers = "\t".join(avgs.keys()) 

246 data = numpy.array([energy] + list(avgs.values())).T 

247 numpy.savetxt( 

248 avgf, 

249 data, 

250 delimiter="\t", 

251 header=f"{energy_counter}\t{headers}", 

252 )