Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/dcutilsmixin.py: 21%

96 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-02-06 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3import os 

4import pprint 

5import traceback 

6from datetime import datetime 

7from contextlib import contextmanager 

8 

9import logging 

10 

11logger = logging.getLogger(__name__) 

12pp = pprint.PrettyPrinter() 

13 

14 

15class DCUtilsMixin: 

16 """DC Utils Mixin 

17 

18 Mixin class to add reusable functionality for saving data collections 

19 Sets up file saving 

20 Saves data collection params 

21 Saves data collection exceptions 

22 Add scan quality indicators 

23 """ 

24 

25 def start_datacollection(self, actor): 

26 """Sets the data collection number and the filename""" 

27 self.create_datacollection(actor) 

28 self.initialize_saving(actor) 

29 

30 def initialize_saving(self, actor): 

31 """Set the data collection file name if not already done 

32 

33 :param actor ComponentActor: 

34 """ 

35 if actor.get("datacollectionid"): 

36 if actor.get("imagedirectory") is not None: 

37 return 

38 filename = self._saving.set_filename( 

39 extra_saving_args=actor.saving_args, **actor.all_data 

40 ) 

41 self._saving.create_root_path(wait_exists=True) 

42 if actor.get("datacollectionid"): 

43 self.update_datacollection( 

44 actor, 

45 imagedirectory=os.path.dirname(filename), 

46 filetemplate=os.path.basename(filename), 

47 ) 

48 logger.info( 

49 f"Scan saving initialized ({actor.name}, {actor.uid}, file={filename})" 

50 ) 

51 

52 def create_datacollection(self, actor): 

53 """Create data collection when it does not exist yet 

54 

55 Args: 

56 actor (ComponentActor): The actor 

57 """ 

58 if actor.get("datacollectionid") is not None: 

59 return 

60 

61 dc = self.next_datacollection(actor) 

62 actor.update(datacollectiongroupid=dc["datacollectiongroupid"]) 

63 logger.info( 

64 f"Create datacollection ({actor.name}, {actor.uid}, id={actor['datacollectionid']})" 

65 ) 

66 

67 def next_datacollection(self, actor, emit_start=None, emit_end=None, **opts): 

68 """Start a new datacollection in the actor 

69 

70 This is useful for actors which want to create multiple 

71 datacollections within a datacollectiongroup 

72 

73 Args: 

74 actor (ComponentActor): The actor 

75 

76 Kwargs: 

77 grid (bool): Duplicate gridinfo parameters 

78 data (bool): Duplicate data location parameters 

79 

80 emit_start(bool): Emit a start stomp event 

81 emit_end(bool): Emit an end stomp event 

82 

83 Returns 

84 dc (dict): The new datacollection 

85 """ 

86 self.update_datacollection( 

87 actor, endtime=datetime.now(), runstatus="Successful" 

88 ) 

89 

90 bsx = actor.get("beamsize", {}).get("x") 

91 bsy = actor.get("beamsize", {}).get("y") 

92 

93 if callable(actor.metatype): 

94 try: 

95 metatype = actor.metatype(**actor.all_data) 

96 except Exception: 

97 logger.exception( 

98 "Could not determine metatype, defaulting to `experiment`" 

99 ) 

100 metatype = "experiment" 

101 else: 

102 metatype = actor.metatype 

103 

104 kwargs = { 

105 "sessionid": actor["sessionid"], 

106 "datacollectionplanid": actor["datacollectionplanid"], 

107 "sampleid": actor["sampleid"], 

108 "subsampleid": actor.get("subsampleid"), 

109 "starttime": datetime.now(), 

110 "experimenttype": metatype, 

111 "datacollectiongroupid": actor.get("datacollectiongroupid"), 

112 # database stores beamsize in mm (!) not nm 

113 "beamsizeatsamplex": bsx / 1e6 if bsx else None, 

114 "beamsizeatsampley": bsy / 1e6 if bsy else None, 

115 } 

116 

117 # Data 

118 if opts.get("data"): 

119 for k in ["imagedirectory", "filetemplate"]: 

120 kwargs[k] = actor.get(k) 

121 

122 # GridInfo 

123 if opts.get("grid"): 

124 for k in [ 

125 "steps_x", 

126 "steps_y", 

127 "dx_mm", 

128 "dy_mm", 

129 "patchesx", 

130 "patchesy", 

131 "xtalsnapshotfullpath1", 

132 "pixelspermicronx", 

133 "pixelspermicrony", 

134 "snapshot_offsetxpixel", 

135 "snapshot_offsetypixel", 

136 "snaked", 

137 ]: 

138 kwargs[k] = actor.get(k) 

139 

140 dc = self._metadata.add_datacollection(**kwargs) 

141 actor.update( 

142 datacollectionid=dc["datacollectionid"], 

143 datacollectionnumber=None, 

144 endtime=None, 

145 runstatus=None, 

146 ) 

147 

148 for client_name in ["stomp", "celery"]: 

149 client = self.get_component(client_name) 

150 if client: 

151 if emit_start: 

152 client.send_event(dc["datacollectionid"], "start") 

153 

154 if emit_end: 

155 client.send_event(dc["datacollectionid"], "end") 

156 

157 return dc 

158 

159 def update_datacollection(self, actor, emit_start=False, emit_end=False, **params): 

160 """Update a datacollection 

161 

162 Also emits stomp start / end events when requested 

163 

164 Args: 

165 actor(ComponentActor): The actor 

166 Kwargs: 

167 emit_start(bool): Emit a start stomp event 

168 emit_end(bool): Emit an end stomp event 

169 """ 

170 actor.update(**params) 

171 datacollectionid = actor.get("datacollectionid") 

172 if datacollectionid is not None: 

173 resp = self._metadata.update_datacollection( 

174 datacollectionid=datacollectionid, no_context=True, **params 

175 ) 

176 

177 for client_name in ["stomp", "celery"]: 

178 client = self.get_component(client_name) 

179 if client: 

180 if emit_start: 

181 client.send_event(actor["datacollectionid"], "start") 

182 

183 if emit_end: 

184 client.send_event(actor["datacollectionid"], "end") 

185 

186 return resp 

187 

188 def add_scanqualityindicators(self, actor, point, datacollectionid=None, **columns): 

189 """Adds a scan quality indicator for a point 

190 

191 Args: 

192 actor(obj): The actor 

193 point(int): The point to associate this indicator with 

194 

195 Kwargs: 

196 datacollectionid(int): A specific datacollectionid if required 

197 total(int): Total integrated signal or similar 

198 spots(int): No. of spots 

199 """ 

200 self._metadata.add_scanqualityindicators( 

201 no_context=True, 

202 datacollectionid=( 

203 datacollectionid if datacollectionid else actor["datacollectionid"] 

204 ), 

205 point=point, 

206 **columns, 

207 ) 

208 

209 @contextmanager 

210 def _open_dc_attachment(self, actor, filetype, suffix="", ext="log"): 

211 dcid = actor["datacollectionid"] 

212 directory = actor["imagedirectory"] 

213 filename = os.extsep.join([f"{dcid}{suffix}", ext]) 

214 filepath = os.path.join(directory, filename) 

215 with open(filepath, "w") as lf: 

216 try: 

217 yield lf 

218 finally: 

219 self._metadata.add_datacollection_attachment( 

220 no_context=True, 

221 datacollectionid=dcid, 

222 filetype=filetype, 

223 filepath=filepath, 

224 ) 

225 

226 def _save_dc_params(self, actor): 

227 """Save the initial parameters of an actor's datacollection""" 

228 with self._open_dc_attachment( 

229 actor, filetype="params", suffix="_args", ext="json" 

230 ) as lf: 

231 lf.write(actor.all_data_json_serialized) 

232 

233 def _save_dc_log(self, actor): 

234 """Save stdout to an actor's datacollection""" 

235 if actor.stdout: 

236 with self._open_dc_attachment( 

237 actor, filetype="log", suffix="_stdout", ext="log" 

238 ) as lf: 

239 lf.write(actor.stdout) 

240 

241 def _save_dc_exception(self, actor, exception): 

242 """Save a exception traceback of an actor's datacollection""" 

243 with self._open_dc_attachment( 

244 actor, filetype="log", suffix="_error", ext="log" 

245 ) as lf: 

246 s = pprint.pformat(actor.initkwargs_json_serializable) 

247 lf.write(f"Actor Arguments:\n{s}\n\n") 

248 s = pprint.pformat(actor.data) 

249 lf.write(f"Actor Data:\n{s}\n\n") 

250 lf.write("Exception:\n") 

251 traceback.print_tb(exception.__traceback__, None, lf) 

252 lf.write(f"\n{exception.__class__.__name__}: {str(exception)}\n")