Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/tomo/scans/tomo_scan.py: 48%

44 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-15 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3 

4from __future__ import annotations 

5import logging 

6import typing 

7 

8 

9logger = logging.getLogger(__name__) 

10 

11 

12class IncompatibleScanData(Exception): 

13 pass 

14 

15 

16class TomoScan: 

17 """Base class to dispatch, receive events from scans, and expose specific 

18 scan state 

19 

20 Raises: 

21 IncompatibleScanData: If the metadata is not compatible with this 

22 constructor 

23 """ 

24 

25 def __init__(self, component, source, scanid, metadata): 

26 self.__scanid = scanid 

27 self.__source = source 

28 self.__component = component 

29 self._init(metadata) 

30 

31 @property 

32 def scanid(self): 

33 return self.__scanid 

34 

35 @property 

36 def source(self): 

37 return self.__source 

38 

39 def to_rest(self) -> dict: 

40 return None 

41 

42 @property 

43 def component(self): 

44 return self.__component 

45 

46 def on_scan_started(self, metadata): 

47 pass 

48 

49 def on_data_received( 

50 self, master, progress, channel_name, channel_size, channel_progress 

51 ): 

52 pass 

53 

54 def on_0d_data_received(self, channel_name, channel_size): 

55 pass 

56 

57 def on_subscan_started(self, scanid, scan: typing.Optional[TomoScan]): 

58 pass 

59 

60 def on_scan_terminated(self, metadata): 

61 pass 

62 

63 def _get_daiquiri_scan_state(self, scan_info: dict) -> str: 

64 """Convert blissdata scans states to abstract scan state""" 

65 # BLISS 1.11 

66 if "state" in scan_info: 

67 return scan_info["state"] 

68 

69 # BLISS 2.0 

70 from daiquiri.core.hardware.blissdata import scans 

71 

72 termination = scan_info.get("end_reason", None) 

73 if termination is not None: 

74 iresult = scans.TERMINATION_MAPPING.get(termination) 

75 if iresult is None: 

76 logger.error( 

77 f"No state termination mapping for scan state {termination}" 

78 ) 

79 return "UNKNOWN" 

80 return scans.ScanStates[iresult]