Coverage for /opt/conda/envs/apienv/lib/python3.11/site-packages/daiquiri/implementors/imageviewer/roipatch.py: 40%

83 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2025-03-29 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3import logging 

4import gevent 

5from marshmallow import fields 

6 

7from bliss.scanning.group import Sequence 

8from bliss.config.static import get_config 

9from bliss.common.scans import amesh 

10from bliss.scanning.scan import ScanState 

11 

12import mmh3 

13 

14from daiquiri.core.components import ( 

15 ComponentActor, 

16 ComponentActorKilled, 

17) 

18from daiquiri.core.utils import to_wavelength 

19from daiquiri.core.hardware.bliss.session import * 

20from daiquiri.core.hardware.blissdata.helpers import get_scan_key 

21from daiquiri.core.components.utils.monitor import monitor 

22from daiquiri.core.schema.scans.mesh import ( 

23 MeshPatchSchema, 

24 set_decimal_precision, 

25) 

26 

27from .beamlineparams import BeamlineParamsSchema 

28 

29set_decimal_precision() 

30 

31cfg = get_config() 

32logger = logging.getLogger(__name__) 

33 

34 

35class RoipatchSchema(MeshPatchSchema): 

36 _FINE_RANGE_X = 20 

37 _FINE_RANGE_Y = 20 

38 

39 beamlineparams = fields.Nested( 

40 BeamlineParamsSchema, metadata={"title": "Beamline Parameters"} 

41 ) 

42 with_lima = fields.Bool( 

43 metadata={"title": "Enable Lima Camera"}, dump_default=False 

44 ) 

45 

46 class Meta: 

47 uiorder = [ 

48 *MeshPatchSchema.Meta.uiorder, 

49 "with_lima", 

50 "beamlineparams", 

51 ] 

52 uischema = { 

53 **MeshPatchSchema.Meta.uischema, 

54 "continuous": {"classNames": "hidden-row", "ui:widget": "hidden"}, 

55 "fast_axis": {"classNames": "hidden-row", "ui:widget": "hidden"}, 

56 "beamlineparams": {"ui:field": "optionalParams"}, 

57 } 

58 

59 uigroups = [ 

60 "subsampleid", 

61 "dwell", 

62 {"Patches": ["patches_x", "patches_y", "patch_size_x", "patch_size_y"]}, 

63 {"Steps": ["step_size_x", "step_size_y", "steps_x", "steps_y"]}, 

64 "with_lima", 

65 "enqueue", 

66 "beamlineparams", 

67 "estimate", 

68 "continuous", 

69 "fast_axis", 

70 ] 

71 

72 

73def get_metatype(self, **kwargs): 

74 return "XRF xrd map" if kwargs.get("with_lima") else "XRF map" 

75 

76 

77class RoipatchActor(ComponentActor): 

78 schema = RoipatchSchema 

79 name = "Roipatch" 

80 metatype = get_metatype 

81 

82 def method(self, **kwargs): 

83 print("moving to roi") 

84 kwargs["absol"]["move_to"](kwargs["absol"]) 

85 

86 print("moving to additional positions") 

87 kwargs["absol"]["move_to_additional"](kwargs["absol"]["positions"]) 

88 

89 mca = cfg.get("simu1") 

90 lima_simulator = cfg.get("lima_simulator") 

91 

92 axes = kwargs["absol"]["axes"] 

93 ( 

94 patch_size_x, 

95 patch_size_y, 

96 steps_x, 

97 steps_y, 

98 offset_x, 

99 offset_y, 

100 orientation, 

101 fast, 

102 slow, 

103 ) = self.schema().calc_patch_steps_size(axes, **kwargs) 

104 

105 print("calculated steps to be", steps_x, steps_y) 

106 

107 print("capture params and image") 

108 kwargs["before_scan_starts"](self) 

109 

110 detectors = [ 

111 mca, 

112 diode, 

113 ] 

114 if kwargs["with_lima"]: 

115 detectors.append(lima_simulator) 

116 

117 seq = Sequence() 

118 with seq.sequence_context() as scan_seq: 

119 kwargs["update_datacollection"]( 

120 self, 

121 datacollectionnumber=mmh3.hash(get_scan_key(seq)) & 0xFFFFFFFF, 

122 imagecontainersubpath="1.1/measurement", 

123 dx_mm=kwargs["step_size_x"] * 1e-3, 

124 dy_mm=kwargs["step_size_y"] * 1e-3, 

125 numberofimages=steps_x 

126 * steps_y 

127 * kwargs["patches_x"] 

128 * kwargs["patches_y"], 

129 exposuretime=kwargs["dwell"], 

130 wavelength=to_wavelength(10000), 

131 steps_x=steps_x * kwargs["patches_x"], 

132 steps_y=steps_y * kwargs["patches_y"], 

133 patchesx=kwargs["patches_x"], 

134 patchesy=kwargs["patches_y"], 

135 orientation=orientation, 

136 ) 

137 

138 x_motor = cfg.get("m1") 

139 y_motor = cfg.get("m2") 

140 x_motor_fine = cfg.get("s1b") 

141 y_motor_fine = cfg.get("s1d") 

142 fine_centre = 0 

143 

144 try: 

145 for py in range(kwargs["patches_y"]): 

146 for px in range(kwargs["patches_x"]): 

147 print( 

148 "roipatch: patch index = %s" 

149 % (py * kwargs["patches_x"] + px) 

150 ) 

151 

152 x_motor.move( 

153 axes[fast]["destination"][0] 

154 + (px * patch_size_x) 

155 + patch_size_x / 2 

156 ) 

157 y_motor.move( 

158 axes[slow]["destination"][0] 

159 + (py * patch_size_y) 

160 + patch_size_y / 2 

161 ) 

162 print("moved to", x_motor.position, y_motor.position) 

163 start_x = fine_centre + ((-patch_size_x / 2) + offset_x) * 1e3 

164 end_x = fine_centre + ((patch_size_x / 2) - offset_x) * 1e3 

165 start_y = fine_centre + ((-patch_size_y / 2) + offset_y) * 1e3 

166 end_y = fine_centre + ((patch_size_y / 2) - offset_y) * 1e3 

167 print("scan", start_x, end_x, "y", start_y, end_y) 

168 

169 # positions are all in context of m1/2 => mm 

170 # convert to um 

171 scan = amesh( 

172 x_motor_fine, 

173 start_x, 

174 end_x, 

175 steps_x - 1, 

176 y_motor_fine, 

177 start_y, 

178 end_y, 

179 steps_y - 1, 

180 kwargs["dwell"], 

181 *detectors, 

182 run=False, 

183 ) 

184 

185 scan_seq.add(scan) 

186 

187 greenlet = gevent.spawn(scan.run) 

188 scan.wait_state(ScanState.STARTING) 

189 

190 monitor_greenlet, kill_monitor = monitor( 

191 axes["x"]["motor"], "position", greenlet 

192 ) 

193 

194 try: 

195 greenlet.join() 

196 except ComponentActorKilled: 

197 greenlet.kill() 

198 raise 

199 finally: 

200 print("stopping monitor") 

201 kill_monitor() 

202 

203 monitor_greenlet.get() 

204 greenlet.get() 

205 

206 finally: 

207 pass