Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/tomo/scan_listener.py: 38%
65 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
4import logging
5import typing
6import mmh3
8from daiquiri.core.components import Component
10try:
11 from daiquiri.core.hardware.bliss.scans import BlissScans
12except ImportError:
13 pass
15from .datatype import TomoScan
16from .scans.tomo_scan import IncompatibleScanData
17from .scans.sequence_tomo_scan import SequenceTomoScan
18from .scans.standard_tomo_scan import StandardTomoScan
19from .scans.tiling_sequence_tomo_scan import TilingSequenceTomoScan
22logger = logging.getLogger(__name__)
25class TomoScanListener:
26 def __init__(self, component: Component, source):
27 self._scans: typing.Dict[int, TomoScan] = {}
28 """Alive scans containing tomo data
30 FIXME: Use LRU structure to avoid any memory leak
31 """
33 self.__component: Component = component
34 self.__source: BlissScans = source
36 @property
37 def source(self):
38 return self.__source
40 @property
41 def component(self):
42 return self.__component
44 def _scanid(self, node):
45 return mmh3.hash(node) & 0xFFFFFFFF
47 def new_scan(self, scanid, type, title, metadata):
48 """Triggered when a scan start"""
50 supported_scan_class = [
51 SequenceTomoScan,
52 StandardTomoScan,
53 TilingSequenceTomoScan,
54 ]
55 for scan_class in supported_scan_class:
56 try:
57 scan = scan_class(self.component, self.source, scanid, metadata)
58 break
59 except IncompatibleScanData:
60 logger.debug("Scanid %s is not a %s", scanid, scan_class)
61 except Exception:
62 logger.error("Error while parsing scanid %s", scanid, exc_info=True)
63 else:
64 scan = None
66 if scan is not None:
67 logger.debug("Scanid %s interpreted as %s", scanid, scan_class)
68 self._scans[scanid] = scan
69 logger.debug("Scanid %s on_scan_started: %s", scanid, (metadata,))
70 scan.on_scan_started(metadata)
72 group = metadata.get("group")
73 if group is not None:
74 groupid = self._scanid(group)
75 scan_group = self._scans.get(groupid)
76 if scan_group is not None:
77 scan_group.on_subscan_started(scanid, scan)
79 def new_0d_data(self, scanid, channel_name, channel_size):
80 """Triggered during scan on data updated"""
81 scan = self._scans.get(scanid)
82 if scan is not None:
83 logger.debug(
84 "Scanid %s new_0d_data: %s", scanid, (channel_name, channel_size)
85 )
86 scan.on_0d_data_received(channel_name, channel_size)
88 def new_data(
89 self, scanid, master, progress, channel_name, channel_size, channel_progress
90 ):
91 """Triggered during scan on data updated"""
92 scan = self._scans.get(scanid)
93 if scan is not None:
94 logger.debug(
95 "Scanid %s on_data_received: %s",
96 scanid,
97 (master, progress, channel_name, channel_size, channel_progress),
98 )
99 scan.on_data_received(
100 master, progress, channel_name, channel_size, channel_progress
101 )
103 def end_scan(self, scanid, metadata):
104 """Triggered when a scan ended"""
105 scan = self._scans.pop(scanid, None)
106 if scan is not None:
107 logger.debug("Scanid %s on_scan_terminated: %s", scanid, (metadata,))
108 scan.on_scan_terminated(metadata)