Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/dcutilsmixin.py: 21%
96 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-06 02:13 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-06 02:13 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import os
4import pprint
5import traceback
6from datetime import datetime
7from contextlib import contextmanager
9import logging
11logger = logging.getLogger(__name__)
12pp = pprint.PrettyPrinter()
15class DCUtilsMixin:
16 """DC Utils Mixin
18 Mixin class to add reusable functionality for saving data collections
19 Sets up file saving
20 Saves data collection params
21 Saves data collection exceptions
22 Add scan quality indicators
23 """
25 def start_datacollection(self, actor):
26 """Sets the data collection number and the filename"""
27 self.create_datacollection(actor)
28 self.initialize_saving(actor)
30 def initialize_saving(self, actor):
31 """Set the data collection file name if not already done
33 :param actor ComponentActor:
34 """
35 if actor.get("datacollectionid"):
36 if actor.get("imagedirectory") is not None:
37 return
38 filename = self._saving.set_filename(
39 extra_saving_args=actor.saving_args, **actor.all_data
40 )
41 self._saving.create_root_path(wait_exists=True)
42 if actor.get("datacollectionid"):
43 self.update_datacollection(
44 actor,
45 imagedirectory=os.path.dirname(filename),
46 filetemplate=os.path.basename(filename),
47 )
48 logger.info(
49 f"Scan saving initialized ({actor.name}, {actor.uid}, file={filename})"
50 )
52 def create_datacollection(self, actor):
53 """Create data collection when it does not exist yet
55 Args:
56 actor (ComponentActor): The actor
57 """
58 if actor.get("datacollectionid") is not None:
59 return
61 dc = self.next_datacollection(actor)
62 actor.update(datacollectiongroupid=dc["datacollectiongroupid"])
63 logger.info(
64 f"Create datacollection ({actor.name}, {actor.uid}, id={actor['datacollectionid']})"
65 )
67 def next_datacollection(self, actor, emit_start=None, emit_end=None, **opts):
68 """Start a new datacollection in the actor
70 This is useful for actors which want to create multiple
71 datacollections within a datacollectiongroup
73 Args:
74 actor (ComponentActor): The actor
76 Kwargs:
77 grid (bool): Duplicate gridinfo parameters
78 data (bool): Duplicate data location parameters
80 emit_start(bool): Emit a start stomp event
81 emit_end(bool): Emit an end stomp event
83 Returns
84 dc (dict): The new datacollection
85 """
86 self.update_datacollection(
87 actor, endtime=datetime.now(), runstatus="Successful"
88 )
90 bsx = actor.get("beamsize", {}).get("x")
91 bsy = actor.get("beamsize", {}).get("y")
93 if callable(actor.metatype):
94 try:
95 metatype = actor.metatype(**actor.all_data)
96 except Exception:
97 logger.exception(
98 "Could not determine metatype, defaulting to `experiment`"
99 )
100 metatype = "experiment"
101 else:
102 metatype = actor.metatype
104 kwargs = {
105 "sessionid": actor["sessionid"],
106 "datacollectionplanid": actor["datacollectionplanid"],
107 "sampleid": actor["sampleid"],
108 "subsampleid": actor.get("subsampleid"),
109 "starttime": datetime.now(),
110 "experimenttype": metatype,
111 "datacollectiongroupid": actor.get("datacollectiongroupid"),
112 # database stores beamsize in mm (!) not nm
113 "beamsizeatsamplex": bsx / 1e6 if bsx else None,
114 "beamsizeatsampley": bsy / 1e6 if bsy else None,
115 }
117 # Data
118 if opts.get("data"):
119 for k in ["imagedirectory", "filetemplate"]:
120 kwargs[k] = actor.get(k)
122 # GridInfo
123 if opts.get("grid"):
124 for k in [
125 "steps_x",
126 "steps_y",
127 "dx_mm",
128 "dy_mm",
129 "patchesx",
130 "patchesy",
131 "xtalsnapshotfullpath1",
132 "pixelspermicronx",
133 "pixelspermicrony",
134 "snapshot_offsetxpixel",
135 "snapshot_offsetypixel",
136 "snaked",
137 ]:
138 kwargs[k] = actor.get(k)
140 dc = self._metadata.add_datacollection(**kwargs)
141 actor.update(
142 datacollectionid=dc["datacollectionid"],
143 datacollectionnumber=None,
144 endtime=None,
145 runstatus=None,
146 )
148 for client_name in ["stomp", "celery"]:
149 client = self.get_component(client_name)
150 if client:
151 if emit_start:
152 client.send_event(dc["datacollectionid"], "start")
154 if emit_end:
155 client.send_event(dc["datacollectionid"], "end")
157 return dc
159 def update_datacollection(self, actor, emit_start=False, emit_end=False, **params):
160 """Update a datacollection
162 Also emits stomp start / end events when requested
164 Args:
165 actor(ComponentActor): The actor
166 Kwargs:
167 emit_start(bool): Emit a start stomp event
168 emit_end(bool): Emit an end stomp event
169 """
170 actor.update(**params)
171 datacollectionid = actor.get("datacollectionid")
172 if datacollectionid is not None:
173 resp = self._metadata.update_datacollection(
174 datacollectionid=datacollectionid, no_context=True, **params
175 )
177 for client_name in ["stomp", "celery"]:
178 client = self.get_component(client_name)
179 if client:
180 if emit_start:
181 client.send_event(actor["datacollectionid"], "start")
183 if emit_end:
184 client.send_event(actor["datacollectionid"], "end")
186 return resp
188 def add_scanqualityindicators(self, actor, point, datacollectionid=None, **columns):
189 """Adds a scan quality indicator for a point
191 Args:
192 actor(obj): The actor
193 point(int): The point to associate this indicator with
195 Kwargs:
196 datacollectionid(int): A specific datacollectionid if required
197 total(int): Total integrated signal or similar
198 spots(int): No. of spots
199 """
200 self._metadata.add_scanqualityindicators(
201 no_context=True,
202 datacollectionid=(
203 datacollectionid if datacollectionid else actor["datacollectionid"]
204 ),
205 point=point,
206 **columns,
207 )
209 @contextmanager
210 def _open_dc_attachment(self, actor, filetype, suffix="", ext="log"):
211 dcid = actor["datacollectionid"]
212 directory = actor["imagedirectory"]
213 filename = os.extsep.join([f"{dcid}{suffix}", ext])
214 filepath = os.path.join(directory, filename)
215 with open(filepath, "w") as lf:
216 try:
217 yield lf
218 finally:
219 self._metadata.add_datacollection_attachment(
220 no_context=True,
221 datacollectionid=dcid,
222 filetype=filetype,
223 filepath=filepath,
224 )
226 def _save_dc_params(self, actor):
227 """Save the initial parameters of an actor's datacollection"""
228 with self._open_dc_attachment(
229 actor, filetype="params", suffix="_args", ext="json"
230 ) as lf:
231 lf.write(actor.all_data_json_serialized)
233 def _save_dc_log(self, actor):
234 """Save stdout to an actor's datacollection"""
235 if actor.stdout:
236 with self._open_dc_attachment(
237 actor, filetype="log", suffix="_stdout", ext="log"
238 ) as lf:
239 lf.write(actor.stdout)
241 def _save_dc_exception(self, actor, exception):
242 """Save a exception traceback of an actor's datacollection"""
243 with self._open_dc_attachment(
244 actor, filetype="log", suffix="_error", ext="log"
245 ) as lf:
246 s = pprint.pformat(actor.initkwargs_json_serializable)
247 lf.write(f"Actor Arguments:\n{s}\n\n")
248 s = pprint.pformat(actor.data)
249 lf.write(f"Actor Data:\n{s}\n\n")
250 lf.write("Exception:\n")
251 traceback.print_tb(exception.__traceback__, None, lf)
252 lf.write(f"\n{exception.__class__.__name__}: {str(exception)}\n")