Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/abstract/scansource.py: 66%

86 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3from abc import ABC, abstractmethod 

4from typing import Any, Dict 

5import logging 

6 

7import mmh3 

8 

9logger = logging.getLogger(__name__) 

10 

11ScanStates = ["CREATED", "PREPARING", "RUNNING", "FINISHED", "ABORTED", "FAILED"] 

12 

13 

14class ScanSource(ABC): 

15 """The scan source interface""" 

16 

17 def __init__( 

18 self, 

19 config: Dict[str, Any], 

20 app=None, 

21 source_config: Dict[str, Any] = {}, 

22 ): 

23 self._app = app 

24 self._config = config 

25 self._source_config = source_config 

26 self._session_name = source_config.get("session") 

27 

28 self._new_scan_watchers = [] 

29 self._new_data_watchers = [] 

30 self._end_scan_watchers = [] 

31 self._new_0d_data_watchers = [] 

32 

33 def close(self): 

34 """Clean up the service at the end. 

35 

36 After this call, the component should not be accessed anymore 

37 """ 

38 pass 

39 

40 @abstractmethod 

41 def get_scans(self, scanid=None): 

42 """Get a list of scans 

43 

44 Returns a list of scan dicts, can be filtered to only scanid to return 

45 details of a specific scan 

46 

47 Args: 

48 scanid (int): Scan id to return 

49 

50 Returns: 

51 scans (list): A list of valid `ScanSchema` dicts 

52 """ 

53 pass 

54 

55 # @marshal_with(ScanDataSchema()) 

56 @abstractmethod 

57 def get_scan_data(self, scanid, per_page=25, page=1): 

58 """Return scan data for the specific scan 

59 

60 Args: 

61 scanid (int): The scan id 

62 per_page (int): Number of items to return per page 

63 page (int): Page of data to return 

64 

65 Returns: 

66 data (dict): A valid `ScanDataSchema` dict 

67 

68 """ 

69 pass 

70 

71 @abstractmethod 

72 def get_scan_spectra(self, scanid, point=0, allpoints=False): 

73 """Return a scan specific for the specific scan 

74 

75 Args: 

76 scanid (int): The scan id 

77 point (int): The image number to return 

78 

79 Returns: 

80 data (dict): A valid `ScanSpectraSchema` dict 

81 

82 """ 

83 pass 

84 

85 @abstractmethod 

86 def get_scan_image(self, scanid, node_name, image_no): 

87 """Return a scan image for the specific scan 

88 

89 Args: 

90 scanid (int): The scan id 

91 node_name (str): The node for the requested image 

92 image_no (int): The image number to return 

93 

94 Returns: 

95 data (dict): A valid `ScanDataSchema` dict 

96 

97 """ 

98 pass 

99 

100 def watch_new_scan(self, fn): 

101 """Register a callback for when a new scan is started""" 

102 if not callable(fn): 

103 raise AttributeError("Callback function must be callable") 

104 

105 if not (fn in self._new_scan_watchers): 

106 self._new_scan_watchers.append(fn) 

107 else: 

108 logger.warning( 

109 "Function {f} is already subscribed to new scan".format(f=fn) 

110 ) 

111 

112 def watch_new_data(self, fn): 

113 """Register a callback for when there is new data for a scan""" 

114 if not callable(fn): 

115 raise AttributeError("Callback function must be callable") 

116 

117 if not (fn in self._new_data_watchers): 

118 self._new_data_watchers.append(fn) 

119 else: 

120 logger.warning( 

121 "Function {f} is already subscribed to new data".format(f=fn) 

122 ) 

123 

124 def watch_new_0d_data(self, fn): 

125 """Register a callback for when there is new 0d data for a scan""" 

126 if not callable(fn): 

127 raise AttributeError("Callback function must be callable") 

128 

129 if not (fn in self._new_0d_data_watchers): 

130 self._new_0d_data_watchers.append(fn) 

131 else: 

132 logger.warning( 

133 "Function {f} is already subscribed to new 0d data".format(f=fn) 

134 ) 

135 

136 def watch_end_scan(self, fn): 

137 """Register a callback for a scan ends""" 

138 if not callable(fn): 

139 raise AttributeError("Callback function must be callable") 

140 

141 if not (fn in self._end_scan_watchers): 

142 self._end_scan_watchers.append(fn) 

143 else: 

144 logger.warning( 

145 "Function {f} is already subscribed to end scan".format(f=fn) 

146 ) 

147 

148 def _emit_new_scan_event( 

149 self, scanid: int, type: str, title: str, metadata: Dict[str, Any] 

150 ) -> None: 

151 """Emit an event when a new scan starts 

152 

153 Kwargs: 

154 scanid: The id of the new scan 

155 type: The scan type, ascan, amesh, etc 

156 title: A short description of the scan 

157 metadata: Any related metadata (not currently used) 

158 """ 

159 for cb in self._new_scan_watchers: 

160 try: 

161 cb(scanid, type, type, metadata=metadata) 

162 except Exception: 

163 logger.error("Error during scan start callback", exc_info=True) 

164 

165 def _emit_end_scan_event(self, scanid: int, metadata: Dict[str, Any]) -> None: 

166 """Emit an event when a scan ends 

167 

168 Kwargs: 

169 scanid: The id of the scan that has ended 

170 metadata: Any related metadata (not currently used) 

171 """ 

172 for cb in self._end_scan_watchers: 

173 try: 

174 cb(scanid, metadata=metadata) 

175 except Exception: 

176 logger.error("Error during scan end callback", exc_info=True) 

177 

178 def _emit_new_scan_data_0d_event( 

179 self, scanid: int, channel_name: str, channel_size: int, continue_: bool = False 

180 ) -> None: 

181 """Emit an event when there is new 0d scan data 

182 

183 Kwargs: 

184 scanid: The id of the scan in progress 

185 channel_name: The channel that has new data 

186 channel_size: The length of the new channel 

187 """ 

188 for cb in self._new_0d_data_watchers: 

189 try: 

190 cb(scanid, channel_name=channel_name, channel_size=channel_size) 

191 except Exception: 

192 logger.error("Error during 0d data callback", exc_info=True) 

193 if continue_: 

194 continue 

195 

196 def _emit_new_scan_data_event( 

197 self, 

198 scanid: int, 

199 master_channel: str, 

200 progress: float, 

201 channel_name: str, 

202 channel_size: int, 

203 channel_progress: float, 

204 ) -> None: 

205 """Emit an event when there is new scan data 

206 

207 Kwargs: 

208 scanid: The id of the new scan 

209 master_channel: The master channel of the scan 

210 progress: A value between 0 and 100 for the scan progress 

211 channel_name: The channel that has new data 

212 channel_size: The length of the new channel 

213 channel_progress: The progres between 0 and 100 for the triggering channel 

214 """ 

215 for cb in self._new_data_watchers: 

216 try: 

217 cb( 

218 scanid, 

219 master_channel, 

220 progress, 

221 channel_name, 

222 channel_size, 

223 channel_progress, 

224 ) 

225 except Exception: 

226 logger.error("Error during data callback", exc_info=True) 

227 

228 def _create_scanid(self, scan_key: str) -> int: 

229 """Construct a valid DB key from the scan key 

230 

231 Converts any string based scan id into a hashed number for database compatibility 

232 

233 Kwargs: 

234 scan_key: The scan engine scan key 

235 """ 

236 return mmh3.hash(scan_key) & 0xFFFFFFFF 

237 

238 def get_conversion(self) -> dict: 

239 """Get the mca conversion factors to convert bins to energy 

240 

241 energy = zero + bin * scale 

242 """ 

243 if "mca" not in self._config: 

244 return {} 

245 return { 

246 "zero": self._config["mca"]["conversion"]["zero"], 

247 "scale": self._config["mca"]["conversion"]["scale"], 

248 }