Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/dcutilsmixin.py: 19%
104 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import os
4import pprint
5import traceback
6from datetime import datetime
7from contextlib import contextmanager
9import logging
11logger = logging.getLogger(__name__)
12pp = pprint.PrettyPrinter()
15class DCUtilsMixin:
16 """DC Utils Mixin
18 Mixin class to add reusable functionality for saving data collections
19 Sets up file saving
20 Saves data collection params
21 Saves data collection exceptions
22 Add scan quality indicators
23 """
25 def start_datacollection(self, actor):
26 """Sets the data collection number and the filename"""
27 self.create_datacollection(actor)
28 self.initialize_saving(actor)
30 def initialize_saving(self, actor):
31 """Set the data collection file name if not already done
33 :param actor ComponentActor:
34 """
35 if actor.get("datacollectionid"):
36 if actor.get("imagedirectory") is not None:
37 return
38 filename = self._saving.set_filename(
39 extra_saving_args=actor.saving_args, **actor.all_data
40 )
41 self._saving.create_root_path(wait_exists=True)
42 if actor.get("datacollectionid"):
43 self.update_datacollection(
44 actor,
45 imagedirectory=os.path.dirname(filename),
46 filetemplate=os.path.basename(filename),
47 )
48 logger.info(
49 f"Scan saving initialized ({actor.name}, {actor.uid}, file={filename})"
50 )
52 def create_datacollection(self, actor):
53 """Create data collection when it does not exist yet
55 Args:
56 actor (ComponentActor): The actor
57 """
58 if actor.get("datacollectionid") is not None:
59 return
61 dc = self.next_datacollection(actor)
62 actor.update(datacollectiongroupid=dc["datacollectiongroupid"])
63 logger.info(
64 f"Create datacollection ({actor.name}, {actor.uid}, id={actor['datacollectionid']})"
65 )
67 def next_datacollection(self, actor, **opts):
68 """Start a new datacollection in the actor
70 This is useful for actors which want to create multiple
71 datacollections within a datacollectiongroup
73 Args:
74 actor (ComponentActor): The actor
76 Kwargs:
77 grid (bool): Duplicate gridinfo parameters
78 data (bool): Duplicate data location parameters
80 emit_start(bool): Emit a start stomp event
81 emit_end(bool): Emit an end stomp event
83 Returns
84 dc (dict): The new datacollection
85 """
86 self.update_datacollection(
87 actor, endtime=datetime.now(), runstatus="Successful"
88 )
90 bsx = actor.get("beamsize", {}).get("x")
91 bsy = actor.get("beamsize", {}).get("y")
93 if callable(actor.metatype):
94 try:
95 metatype = actor.metatype(**actor.all_data)
96 except Exception:
97 logger.exception(
98 "Could not determine metatype, defaulting to `experiment`"
99 )
100 metatype = "experiment"
101 else:
102 metatype = actor.metatype
104 kwargs = {
105 "sessionid": actor["sessionid"],
106 "datacollectionplanid": actor["datacollectionplanid"],
107 "sampleid": actor["sampleid"],
108 "subsampleid": actor.get("subsampleid"),
109 "starttime": datetime.now(),
110 "experimenttype": metatype,
111 "datacollectiongroupid": actor.get("datacollectiongroupid"),
112 # database stores beamsize in mm (!) not nm
113 "beamsizeatsamplex": bsx / 1e6 if bsx else None,
114 "beamsizeatsampley": bsy / 1e6 if bsy else None,
115 }
117 # Data
118 if opts.get("data"):
119 for k in ["imagedirectory", "filetemplate"]:
120 kwargs[k] = actor.get(k)
122 # GridInfo
123 if opts.get("grid"):
124 for k in [
125 "steps_x",
126 "steps_y",
127 "dx_mm",
128 "dy_mm",
129 "xtalsnapshotfullpath1",
130 "pixelspermicronx",
131 "pixelspermicrony",
132 "snapshot_offsetxpixel",
133 "snapshot_offsetypixel",
134 ]:
135 kwargs[k] = actor.get(k)
137 dc = self._metadata.add_datacollection(**kwargs)
138 actor.update(
139 datacollectionid=dc["datacollectionid"],
140 datacollectionnumber=None,
141 endtime=None,
142 runstatus=None,
143 )
145 if self._stomp is not None:
146 if opts.get("emit_start"):
147 self._stomp.send_event(dc["datacollectionid"], "start")
149 if opts.get("emit_end"):
150 self._stomp.send_event(dc["datacollectionid"], "end")
152 celery = self.get_component("celery")
153 if celery is not None:
154 if opts.get("emit_start"):
155 celery.send_event(dc["datacollectionid"], "start")
157 if opts.get("emit_end"):
158 celery.send_event(dc["datacollectionid"], "end")
160 return dc
162 def update_datacollection(self, actor, emit_start=False, emit_end=False, **params):
163 """Update a datacollection
165 Also emits stomp start / end events when requested
167 Args:
168 actor(ComponentActor): The actor
169 Kwargs:
170 emit_start(bool): Emit a start stomp event
171 emit_end(bool): Emit an end stomp event
172 """
173 actor.update(**params)
174 datacollectionid = actor.get("datacollectionid")
175 if datacollectionid is not None:
176 resp = self._metadata.update_datacollection(
177 datacollectionid=datacollectionid, no_context=True, **params
178 )
180 if self._stomp is not None:
181 if emit_start:
182 self._stomp.send_event(actor["datacollectionid"], "start")
184 if emit_end:
185 self._stomp.send_event(actor["datacollectionid"], "end")
187 celery = self.get_component("celery")
188 if celery is not None:
189 if emit_start:
190 celery.send_event(actor["datacollectionid"], "start")
192 if emit_end:
193 celery.send_event(actor["datacollectionid"], "end")
195 return resp
197 def add_scanqualityindicators(self, actor, point, datacollectionid=None, **columns):
198 """Adds a scan quality indicator for a point
200 Args:
201 actor(obj): The actor
202 point(int): The point to associate this indicator with
204 Kwargs:
205 datacollectionid(int): A specific datacollectionid if required
206 total(int): Total integrated signal or similar
207 spots(int): No. of spots
208 """
209 self._metadata.add_scanqualityindicators(
210 no_context=True,
211 datacollectionid=datacollectionid
212 if datacollectionid
213 else actor["datacollectionid"],
214 point=point,
215 **columns,
216 )
218 @contextmanager
219 def _open_dc_attachment(self, actor, filetype, suffix="", ext="log"):
220 dcid = actor["datacollectionid"]
221 directory = actor["imagedirectory"]
222 filename = os.extsep.join([f"{dcid}{suffix}", ext])
223 filepath = os.path.join(directory, filename)
224 with open(filepath, "w") as lf:
225 try:
226 yield lf
227 finally:
228 self._metadata.add_datacollection_attachment(
229 no_context=True,
230 datacollectionid=dcid,
231 filetype=filetype,
232 filepath=filepath,
233 )
235 def _save_dc_params(self, actor):
236 """Save the initial parameters of an actor's datacollection"""
237 with self._open_dc_attachment(
238 actor, filetype="params", suffix="_args", ext="json"
239 ) as lf:
240 lf.write(actor.all_data_json_serialized)
242 def _save_dc_log(self, actor):
243 """Save stdout to an actor's datacollection"""
244 if actor.stdout:
245 with self._open_dc_attachment(
246 actor, filetype="log", suffix="_stdout", ext="log"
247 ) as lf:
248 lf.write(actor.stdout)
250 def _save_dc_exception(self, actor, exception):
251 """Save a exception traceback of an actor's datacollection"""
252 with self._open_dc_attachment(
253 actor, filetype="log", suffix="_error", ext="log"
254 ) as lf:
255 s = pprint.pformat(actor.initkwargs_json_serializable)
256 lf.write(f"Actor Arguments:\n{s}\n\n")
257 s = pprint.pformat(actor.data)
258 lf.write(f"Actor Data:\n{s}\n\n")
259 lf.write("Exception:\n")
260 traceback.print_tb(exception.__traceback__, None, lf)
261 lf.write(f"\n{exception.__class__.__name__}: {str(exception)}\n")