Coverage for /opt/conda/envs/apienv/lib/python3.11/site-packages/daiquiri/core/hardware/bliss/motor.py: 75%

72 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2025-03-29 02:12 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3from __future__ import annotations 

4import math 

5import gevent 

6 

7from daiquiri.core.hardware.abstract import HardwareProperty 

8from daiquiri.core.hardware.abstract.motor import Motor as AbstractMotor, MotorStates 

9from daiquiri.core.hardware.bliss.object import BlissObject 

10from daiquiri.core.hardware.bliss.object import CouldBeNotImplementedProperty 

11from daiquiri.core.logging import log 

12from bliss.common.axis import AxisState 

13 

14 

15import logging 

16 

17logger = logging.getLogger(__name__) 

18 

19 

20class LimitsProperty(HardwareProperty): 

21 def translate_from(self, value): 

22 if value is None: 

23 return [None, None] 

24 

25 value = list(value) 

26 for i, v in enumerate(value): 

27 if math.isinf(v): 

28 if v < 0: 

29 value[i] = -999999999 

30 else: 

31 value[i] = 999999999 

32 if value[0] > value[1]: 

33 # BLISS returns the biggest value first, when the sign is negative 

34 return value[::-1] 

35 return value 

36 

37 

38# Convert BLISS state into Daiquiri state. 

39# Other state names already matches. 

40STATE_MAPPING = { 

41 "LIMPOS": "HIGHLIMIT", 

42 "LIMNEG": "LOWLIMIT", 

43} 

44 

45 

46class StateProperty(HardwareProperty): 

47 def translate_from(self, value: AxisState | None): 

48 if value is None: 

49 return ["UNKNOWN"] 

50 

51 states = [] 

52 for name in value.current_states_names: 

53 normalized = STATE_MAPPING.get(name, name) 

54 if normalized in MotorStates: 

55 states.append(normalized) 

56 else: 

57 desc = value._state_desc[name] 

58 states.append(f"_{name}:{desc}") 

59 

60 if len(states): 

61 return states 

62 

63 # It's "not READY" 

64 return [] 

65 

66 

67class PositionProperty(HardwareProperty): 

68 def translate_from(self, value): 

69 if value is None: 

70 return None 

71 

72 if math.isnan(value): 

73 return None 

74 

75 return value 

76 

77 

78class NoneIfNotImplementedProperty(CouldBeNotImplementedProperty): 

79 """Acceleration and velocity can not be exposed in case of a 

80 CalcController""" 

81 

82 def notImplementedValue(self): 

83 """Returned if the property is not implemented in the hardware""" 

84 return None 

85 

86 

87class Motor(BlissObject, AbstractMotor): 

88 PROPERTY_MAP = { 

89 "position": PositionProperty("position"), 

90 "target": PositionProperty("_set_position"), 

91 "tolerance": HardwareProperty("tolerance"), 

92 "acceleration": NoneIfNotImplementedProperty("acceleration"), 

93 "velocity": NoneIfNotImplementedProperty("velocity"), 

94 "limits": LimitsProperty("limits"), 

95 "state": StateProperty("state"), 

96 "unit": HardwareProperty("unit"), 

97 "offset": HardwareProperty("offset"), 

98 "sign": HardwareProperty("sign"), 

99 "display_digits": HardwareProperty("display_digits"), 

100 } 

101 

102 CALLABLE_MAP = {"stop": "stop", "wait": "wait_move"} 

103 

104 def _call_move(self, value, **kwargs): 

105 logger.debug(f"_call_move {self.name()} {value} {kwargs}") 

106 bliss_axis = self._object 

107 bliss_axis.move(value, wait=False) 

108 

109 def propagate_motion_exception(): 

110 try: 

111 bliss_axis.wait_move() 

112 except Exception: 

113 log.get("user").error( 

114 f"move {self.name()} {value} {kwargs} failed", 

115 type="hardware", 

116 exc_info=True, 

117 ) 

118 

119 gevent.spawn(propagate_motion_exception) 

120 

121 def _call_rmove(self, value, **kwargs): 

122 logger.debug(f"_call_rmove {self.name()} {value} {kwargs}") 

123 bliss_axis = self._object 

124 bliss_axis.move(value, wait=False, relative=True) 

125 

126 def propagate_motion_exception(): 

127 try: 

128 bliss_axis.wait_move() 

129 except Exception: 

130 log.get("user").error( 

131 f"rmove {self.name()} {value} {kwargs} failed", 

132 type="hardware", 

133 exc_info=True, 

134 ) 

135 

136 gevent.spawn(propagate_motion_exception)