Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/dcutilsmixin.py: 19%

104 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-15 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3import os 

4import pprint 

5import traceback 

6from datetime import datetime 

7from contextlib import contextmanager 

8 

9import logging 

10 

11logger = logging.getLogger(__name__) 

12pp = pprint.PrettyPrinter() 

13 

14 

15class DCUtilsMixin: 

16 """DC Utils Mixin 

17 

18 Mixin class to add reusable functionality for saving data collections 

19 Sets up file saving 

20 Saves data collection params 

21 Saves data collection exceptions 

22 Add scan quality indicators 

23 """ 

24 

25 def start_datacollection(self, actor): 

26 """Sets the data collection number and the filename""" 

27 self.create_datacollection(actor) 

28 self.initialize_saving(actor) 

29 

30 def initialize_saving(self, actor): 

31 """Set the data collection file name if not already done 

32 

33 :param actor ComponentActor: 

34 """ 

35 if actor.get("datacollectionid"): 

36 if actor.get("imagedirectory") is not None: 

37 return 

38 filename = self._saving.set_filename( 

39 extra_saving_args=actor.saving_args, **actor.all_data 

40 ) 

41 self._saving.create_root_path(wait_exists=True) 

42 if actor.get("datacollectionid"): 

43 self.update_datacollection( 

44 actor, 

45 imagedirectory=os.path.dirname(filename), 

46 filetemplate=os.path.basename(filename), 

47 ) 

48 logger.info( 

49 f"Scan saving initialized ({actor.name}, {actor.uid}, file={filename})" 

50 ) 

51 

52 def create_datacollection(self, actor): 

53 """Create data collection when it does not exist yet 

54 

55 Args: 

56 actor (ComponentActor): The actor 

57 """ 

58 if actor.get("datacollectionid") is not None: 

59 return 

60 

61 dc = self.next_datacollection(actor) 

62 actor.update(datacollectiongroupid=dc["datacollectiongroupid"]) 

63 logger.info( 

64 f"Create datacollection ({actor.name}, {actor.uid}, id={actor['datacollectionid']})" 

65 ) 

66 

67 def next_datacollection(self, actor, **opts): 

68 """Start a new datacollection in the actor 

69 

70 This is useful for actors which want to create multiple 

71 datacollections within a datacollectiongroup 

72 

73 Args: 

74 actor (ComponentActor): The actor 

75 

76 Kwargs: 

77 grid (bool): Duplicate gridinfo parameters 

78 data (bool): Duplicate data location parameters 

79 

80 emit_start(bool): Emit a start stomp event 

81 emit_end(bool): Emit an end stomp event 

82 

83 Returns 

84 dc (dict): The new datacollection 

85 """ 

86 self.update_datacollection( 

87 actor, endtime=datetime.now(), runstatus="Successful" 

88 ) 

89 

90 bsx = actor.get("beamsize", {}).get("x") 

91 bsy = actor.get("beamsize", {}).get("y") 

92 

93 if callable(actor.metatype): 

94 try: 

95 metatype = actor.metatype(**actor.all_data) 

96 except Exception: 

97 logger.exception( 

98 "Could not determine metatype, defaulting to `experiment`" 

99 ) 

100 metatype = "experiment" 

101 else: 

102 metatype = actor.metatype 

103 

104 kwargs = { 

105 "sessionid": actor["sessionid"], 

106 "datacollectionplanid": actor["datacollectionplanid"], 

107 "sampleid": actor["sampleid"], 

108 "subsampleid": actor.get("subsampleid"), 

109 "starttime": datetime.now(), 

110 "experimenttype": metatype, 

111 "datacollectiongroupid": actor.get("datacollectiongroupid"), 

112 # database stores beamsize in mm (!) not nm 

113 "beamsizeatsamplex": bsx / 1e6 if bsx else None, 

114 "beamsizeatsampley": bsy / 1e6 if bsy else None, 

115 } 

116 

117 # Data 

118 if opts.get("data"): 

119 for k in ["imagedirectory", "filetemplate"]: 

120 kwargs[k] = actor.get(k) 

121 

122 # GridInfo 

123 if opts.get("grid"): 

124 for k in [ 

125 "steps_x", 

126 "steps_y", 

127 "dx_mm", 

128 "dy_mm", 

129 "xtalsnapshotfullpath1", 

130 "pixelspermicronx", 

131 "pixelspermicrony", 

132 "snapshot_offsetxpixel", 

133 "snapshot_offsetypixel", 

134 ]: 

135 kwargs[k] = actor.get(k) 

136 

137 dc = self._metadata.add_datacollection(**kwargs) 

138 actor.update( 

139 datacollectionid=dc["datacollectionid"], 

140 datacollectionnumber=None, 

141 endtime=None, 

142 runstatus=None, 

143 ) 

144 

145 if self._stomp is not None: 

146 if opts.get("emit_start"): 

147 self._stomp.send_event(dc["datacollectionid"], "start") 

148 

149 if opts.get("emit_end"): 

150 self._stomp.send_event(dc["datacollectionid"], "end") 

151 

152 celery = self.get_component("celery") 

153 if celery is not None: 

154 if opts.get("emit_start"): 

155 celery.send_event(dc["datacollectionid"], "start") 

156 

157 if opts.get("emit_end"): 

158 celery.send_event(dc["datacollectionid"], "end") 

159 

160 return dc 

161 

162 def update_datacollection(self, actor, emit_start=False, emit_end=False, **params): 

163 """Update a datacollection 

164 

165 Also emits stomp start / end events when requested 

166 

167 Args: 

168 actor(ComponentActor): The actor 

169 Kwargs: 

170 emit_start(bool): Emit a start stomp event 

171 emit_end(bool): Emit an end stomp event 

172 """ 

173 actor.update(**params) 

174 datacollectionid = actor.get("datacollectionid") 

175 if datacollectionid is not None: 

176 resp = self._metadata.update_datacollection( 

177 datacollectionid=datacollectionid, no_context=True, **params 

178 ) 

179 

180 if self._stomp is not None: 

181 if emit_start: 

182 self._stomp.send_event(actor["datacollectionid"], "start") 

183 

184 if emit_end: 

185 self._stomp.send_event(actor["datacollectionid"], "end") 

186 

187 celery = self.get_component("celery") 

188 if celery is not None: 

189 if emit_start: 

190 celery.send_event(actor["datacollectionid"], "start") 

191 

192 if emit_end: 

193 celery.send_event(actor["datacollectionid"], "end") 

194 

195 return resp 

196 

197 def add_scanqualityindicators(self, actor, point, datacollectionid=None, **columns): 

198 """Adds a scan quality indicator for a point 

199 

200 Args: 

201 actor(obj): The actor 

202 point(int): The point to associate this indicator with 

203 

204 Kwargs: 

205 datacollectionid(int): A specific datacollectionid if required 

206 total(int): Total integrated signal or similar 

207 spots(int): No. of spots 

208 """ 

209 self._metadata.add_scanqualityindicators( 

210 no_context=True, 

211 datacollectionid=datacollectionid 

212 if datacollectionid 

213 else actor["datacollectionid"], 

214 point=point, 

215 **columns, 

216 ) 

217 

218 @contextmanager 

219 def _open_dc_attachment(self, actor, filetype, suffix="", ext="log"): 

220 dcid = actor["datacollectionid"] 

221 directory = actor["imagedirectory"] 

222 filename = os.extsep.join([f"{dcid}{suffix}", ext]) 

223 filepath = os.path.join(directory, filename) 

224 with open(filepath, "w") as lf: 

225 try: 

226 yield lf 

227 finally: 

228 self._metadata.add_datacollection_attachment( 

229 no_context=True, 

230 datacollectionid=dcid, 

231 filetype=filetype, 

232 filepath=filepath, 

233 ) 

234 

235 def _save_dc_params(self, actor): 

236 """Save the initial parameters of an actor's datacollection""" 

237 with self._open_dc_attachment( 

238 actor, filetype="params", suffix="_args", ext="json" 

239 ) as lf: 

240 lf.write(actor.all_data_json_serialized) 

241 

242 def _save_dc_log(self, actor): 

243 """Save stdout to an actor's datacollection""" 

244 if actor.stdout: 

245 with self._open_dc_attachment( 

246 actor, filetype="log", suffix="_stdout", ext="log" 

247 ) as lf: 

248 lf.write(actor.stdout) 

249 

250 def _save_dc_exception(self, actor, exception): 

251 """Save a exception traceback of an actor's datacollection""" 

252 with self._open_dc_attachment( 

253 actor, filetype="log", suffix="_error", ext="log" 

254 ) as lf: 

255 s = pprint.pformat(actor.initkwargs_json_serializable) 

256 lf.write(f"Actor Arguments:\n{s}\n\n") 

257 s = pprint.pformat(actor.data) 

258 lf.write(f"Actor Data:\n{s}\n\n") 

259 lf.write("Exception:\n") 

260 traceback.print_tb(exception.__traceback__, None, lf) 

261 lf.write(f"\n{exception.__class__.__name__}: {str(exception)}\n")