Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/tomo/scan_listener.py: 38%

65 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3 

4import logging 

5import typing 

6import mmh3 

7 

8from daiquiri.core.components import Component 

9 

10try: 

11 from daiquiri.core.hardware.bliss.scans import BlissScans 

12except ImportError: 

13 pass 

14 

15from .datatype import TomoScan 

16from .scans.tomo_scan import IncompatibleScanData 

17from .scans.sequence_tomo_scan import SequenceTomoScan 

18from .scans.standard_tomo_scan import StandardTomoScan 

19from .scans.tiling_sequence_tomo_scan import TilingSequenceTomoScan 

20 

21 

22logger = logging.getLogger(__name__) 

23 

24 

25class TomoScanListener: 

26 def __init__(self, component: Component, source): 

27 self._scans: typing.Dict[int, TomoScan] = {} 

28 """Alive scans containing tomo data 

29 

30 FIXME: Use LRU structure to avoid any memory leak 

31 """ 

32 

33 self.__component: Component = component 

34 self.__source: BlissScans = source 

35 

36 @property 

37 def source(self): 

38 return self.__source 

39 

40 @property 

41 def component(self): 

42 return self.__component 

43 

44 def _scanid(self, node): 

45 return mmh3.hash(node) & 0xFFFFFFFF 

46 

47 def new_scan(self, scanid, type, title, metadata): 

48 """Triggered when a scan start""" 

49 

50 supported_scan_class = [ 

51 SequenceTomoScan, 

52 StandardTomoScan, 

53 TilingSequenceTomoScan, 

54 ] 

55 for scan_class in supported_scan_class: 

56 try: 

57 scan = scan_class(self.component, self.source, scanid, metadata) 

58 break 

59 except IncompatibleScanData: 

60 logger.debug("Scanid %s is not a %s", scanid, scan_class) 

61 except Exception: 

62 logger.error("Error while parsing scanid %s", scanid, exc_info=True) 

63 else: 

64 scan = None 

65 

66 if scan is not None: 

67 logger.debug("Scanid %s interpreted as %s", scanid, scan_class) 

68 self._scans[scanid] = scan 

69 logger.debug("Scanid %s on_scan_started: %s", scanid, (metadata,)) 

70 scan.on_scan_started(metadata) 

71 

72 group = metadata.get("group") 

73 if group is not None: 

74 groupid = self._scanid(group) 

75 scan_group = self._scans.get(groupid) 

76 if scan_group is not None: 

77 scan_group.on_subscan_started(scanid, scan) 

78 

79 def new_0d_data(self, scanid, channel_name, channel_size): 

80 """Triggered during scan on data updated""" 

81 scan = self._scans.get(scanid) 

82 if scan is not None: 

83 logger.debug( 

84 "Scanid %s new_0d_data: %s", scanid, (channel_name, channel_size) 

85 ) 

86 scan.on_0d_data_received(channel_name, channel_size) 

87 

88 def new_data( 

89 self, scanid, master, progress, channel_name, channel_size, channel_progress 

90 ): 

91 """Triggered during scan on data updated""" 

92 scan = self._scans.get(scanid) 

93 if scan is not None: 

94 logger.debug( 

95 "Scanid %s on_data_received: %s", 

96 scanid, 

97 (master, progress, channel_name, channel_size, channel_progress), 

98 ) 

99 scan.on_data_received( 

100 master, progress, channel_name, channel_size, channel_progress 

101 ) 

102 

103 def end_scan(self, scanid, metadata): 

104 """Triggered when a scan ended""" 

105 scan = self._scans.pop(scanid, None) 

106 if scan is not None: 

107 logger.debug("Scanid %s on_scan_terminated: %s", scanid, (metadata,)) 

108 scan.on_scan_terminated(metadata)