Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/abstract/scansource.py: 66%
86 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from abc import ABC, abstractmethod
4from typing import Any, Dict
5import logging
7import mmh3
9logger = logging.getLogger(__name__)
11ScanStates = ["CREATED", "PREPARING", "RUNNING", "FINISHED", "ABORTED", "FAILED"]
14class ScanSource(ABC):
15 """The scan source interface"""
17 def __init__(
18 self,
19 config: Dict[str, Any],
20 app=None,
21 source_config: Dict[str, Any] = {},
22 ):
23 self._app = app
24 self._config = config
25 self._source_config = source_config
26 self._session_name = source_config.get("session")
28 self._new_scan_watchers = []
29 self._new_data_watchers = []
30 self._end_scan_watchers = []
31 self._new_0d_data_watchers = []
33 def close(self):
34 """Clean up the service at the end.
36 After this call, the component should not be accessed anymore
37 """
38 pass
40 @abstractmethod
41 def get_scans(self, scanid=None):
42 """Get a list of scans
44 Returns a list of scan dicts, can be filtered to only scanid to return
45 details of a specific scan
47 Args:
48 scanid (int): Scan id to return
50 Returns:
51 scans (list): A list of valid `ScanSchema` dicts
52 """
53 pass
55 # @marshal_with(ScanDataSchema())
56 @abstractmethod
57 def get_scan_data(self, scanid, per_page=25, page=1):
58 """Return scan data for the specific scan
60 Args:
61 scanid (int): The scan id
62 per_page (int): Number of items to return per page
63 page (int): Page of data to return
65 Returns:
66 data (dict): A valid `ScanDataSchema` dict
68 """
69 pass
71 @abstractmethod
72 def get_scan_spectra(self, scanid, point=0, allpoints=False):
73 """Return a scan specific for the specific scan
75 Args:
76 scanid (int): The scan id
77 point (int): The image number to return
79 Returns:
80 data (dict): A valid `ScanSpectraSchema` dict
82 """
83 pass
85 @abstractmethod
86 def get_scan_image(self, scanid, node_name, image_no):
87 """Return a scan image for the specific scan
89 Args:
90 scanid (int): The scan id
91 node_name (str): The node for the requested image
92 image_no (int): The image number to return
94 Returns:
95 data (dict): A valid `ScanDataSchema` dict
97 """
98 pass
100 def watch_new_scan(self, fn):
101 """Register a callback for when a new scan is started"""
102 if not callable(fn):
103 raise AttributeError("Callback function must be callable")
105 if not (fn in self._new_scan_watchers):
106 self._new_scan_watchers.append(fn)
107 else:
108 logger.warning(
109 "Function {f} is already subscribed to new scan".format(f=fn)
110 )
112 def watch_new_data(self, fn):
113 """Register a callback for when there is new data for a scan"""
114 if not callable(fn):
115 raise AttributeError("Callback function must be callable")
117 if not (fn in self._new_data_watchers):
118 self._new_data_watchers.append(fn)
119 else:
120 logger.warning(
121 "Function {f} is already subscribed to new data".format(f=fn)
122 )
124 def watch_new_0d_data(self, fn):
125 """Register a callback for when there is new 0d data for a scan"""
126 if not callable(fn):
127 raise AttributeError("Callback function must be callable")
129 if not (fn in self._new_0d_data_watchers):
130 self._new_0d_data_watchers.append(fn)
131 else:
132 logger.warning(
133 "Function {f} is already subscribed to new 0d data".format(f=fn)
134 )
136 def watch_end_scan(self, fn):
137 """Register a callback for a scan ends"""
138 if not callable(fn):
139 raise AttributeError("Callback function must be callable")
141 if not (fn in self._end_scan_watchers):
142 self._end_scan_watchers.append(fn)
143 else:
144 logger.warning(
145 "Function {f} is already subscribed to end scan".format(f=fn)
146 )
148 def _emit_new_scan_event(
149 self, scanid: int, type: str, title: str, metadata: Dict[str, Any]
150 ) -> None:
151 """Emit an event when a new scan starts
153 Kwargs:
154 scanid: The id of the new scan
155 type: The scan type, ascan, amesh, etc
156 title: A short description of the scan
157 metadata: Any related metadata (not currently used)
158 """
159 for cb in self._new_scan_watchers:
160 try:
161 cb(scanid, type, type, metadata=metadata)
162 except Exception:
163 logger.error("Error during scan start callback", exc_info=True)
165 def _emit_end_scan_event(self, scanid: int, metadata: Dict[str, Any]) -> None:
166 """Emit an event when a scan ends
168 Kwargs:
169 scanid: The id of the scan that has ended
170 metadata: Any related metadata (not currently used)
171 """
172 for cb in self._end_scan_watchers:
173 try:
174 cb(scanid, metadata=metadata)
175 except Exception:
176 logger.error("Error during scan end callback", exc_info=True)
178 def _emit_new_scan_data_0d_event(
179 self, scanid: int, channel_name: str, channel_size: int, continue_: bool = False
180 ) -> None:
181 """Emit an event when there is new 0d scan data
183 Kwargs:
184 scanid: The id of the scan in progress
185 channel_name: The channel that has new data
186 channel_size: The length of the new channel
187 """
188 for cb in self._new_0d_data_watchers:
189 try:
190 cb(scanid, channel_name=channel_name, channel_size=channel_size)
191 except Exception:
192 logger.error("Error during 0d data callback", exc_info=True)
193 if continue_:
194 continue
196 def _emit_new_scan_data_event(
197 self,
198 scanid: int,
199 master_channel: str,
200 progress: float,
201 channel_name: str,
202 channel_size: int,
203 channel_progress: float,
204 ) -> None:
205 """Emit an event when there is new scan data
207 Kwargs:
208 scanid: The id of the new scan
209 master_channel: The master channel of the scan
210 progress: A value between 0 and 100 for the scan progress
211 channel_name: The channel that has new data
212 channel_size: The length of the new channel
213 channel_progress: The progres between 0 and 100 for the triggering channel
214 """
215 for cb in self._new_data_watchers:
216 try:
217 cb(
218 scanid,
219 master_channel,
220 progress,
221 channel_name,
222 channel_size,
223 channel_progress,
224 )
225 except Exception:
226 logger.error("Error during data callback", exc_info=True)
228 def _create_scanid(self, scan_key: str) -> int:
229 """Construct a valid DB key from the scan key
231 Converts any string based scan id into a hashed number for database compatibility
233 Kwargs:
234 scan_key: The scan engine scan key
235 """
236 return mmh3.hash(scan_key) & 0xFFFFFFFF
238 def get_conversion(self) -> dict:
239 """Get the mca conversion factors to convert bins to energy
241 energy = zero + bin * scale
242 """
243 if "mca" not in self._config:
244 return {}
245 return {
246 "zero": self._config["mca"]["conversion"]["zero"],
247 "scale": self._config["mca"]["conversion"]["scale"],
248 }