Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/transform/hw_sets.py: 87%

108 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 02:13 +0000

1"""Sets based in daiquiri hardware objects 

2""" 

3 

4import gevent 

5import logging 

6from abc import ABC, abstractproperty 

7from daiquiri.core.transform import sets 

8 

9 

10logger = logging.getLogger(__name__) 

11 

12 

13class AbstractMotorDomain(ABC): 

14 """A real interval based on hardware motors""" 

15 

16 @abstractproperty 

17 def motors(self): 

18 pass 

19 

20 @abstractproperty 

21 def closed(self): 

22 pass 

23 

24 @property 

25 def limits(self): 

26 # TODO: Cache this? 

27 # if numpy.isinf(self._limits[0][0]): 

28 self._limits = self._calc_limits 

29 return self._limits 

30 

31 @property 

32 def _calc_limits(self): 

33 limits = [] 

34 for m, (clow, chigh) in zip(self.motors, self.closed): 

35 low, high = m.get("limits") 

36 if low > high: 

37 low, high = high, low 

38 limits.append([low, high, clow, chigh]) 

39 return self._parse_limits(limits) 

40 

41 @property 

42 def current_position(self): 

43 """ 

44 :returns array: shape is (ndomain,) 

45 """ 

46 return self.as_sequence([m.get("position") for m in self.motors])[0] 

47 

48 @current_position.setter 

49 def current_position(self, values): 

50 self.raiseIfNotIn(values) 

51 for mot, pos in zip(self.motors, values): 

52 logger.info(f"Move {mot.name()} to {pos}") 

53 mot.move(pos) 

54 

55 @property 

56 def current_position_dict(self): 

57 """ 

58 :returns dict: length is ndomain 

59 """ 

60 result = {} 

61 for mot, pos in zip(self.motors, self.current_position): 

62 result[mot.id()] = pos 

63 return result 

64 

65 @current_position_dict.setter 

66 def current_position_dict(self, dic): 

67 positions = [0] * self.ndomain 

68 for motname, motpos in dic.items(): 

69 idx = self.motor_index(motname) 

70 positions[idx] = motpos 

71 self.current_position = positions 

72 

73 def wait_motion_done(self, timeout=None): 

74 """Wait until the motors do not move anymore 

75 

76 :param num or None timeout: `None` waits indefinitely 

77 """ 

78 with gevent.Timeout(timeout): 

79 logger.info("Wait motion done ...") 

80 for mot in self.motors: 

81 mot.wait() 

82 logger.info("Motion done.") 

83 

84 def move(self, values, wait=True, timeout=None): 

85 """Set the `current_position` with optional waiting (the default) 

86 

87 :param array or dict values: 

88 :param bool wait: wait until motion has finished 

89 :param num or None timeout: `None` waits indefinitely 

90 """ 

91 if isinstance(values, dict): 

92 self.current_position_dict = values 

93 else: 

94 self.current_position = values 

95 if wait: 

96 self.wait_motion_done(timeout=timeout) 

97 

98 def rmove(self, values, wait=True, timeout=None): 

99 """Set the `current_position` with optional waiting (the default) 

100 

101 :param array or dict values: 

102 :param bool wait: wait until motion has finished 

103 :param num or None timeout: `None` waits indefinitely 

104 """ 

105 if isinstance(values, dict): 

106 _values = self.current_position_dict 

107 for motname, rpos in values.items(): 

108 _values[motname] = rpos 

109 values = _values 

110 else: 

111 values = self.current_position + values 

112 self.move(values, wait=wait, timeout=timeout) 

113 

114 def motor_limits(self, name): 

115 """Get motor limits 

116 

117 :param str name: 

118 :returns 2-tuple: low, high 

119 """ 

120 motlim = self.limits[self.motor_index(name)] 

121 return motlim.low, motlim.high 

122 

123 def motor_index(self, name): 

124 """Get the index of the motor name 

125 

126 :param str name: 

127 :returns int: 

128 """ 

129 for i, mot in enumerate(self.motors): 

130 if mot.id() == name: 

131 return i 

132 raise IndexError(name) 

133 

134 def motor_name(self, i): 

135 """Get the motor name from the index 

136 

137 :param int: 

138 :returns str: 

139 """ 

140 return self.motors[i].id() 

141 

142 

143class MotorDomain(AbstractMotorDomain, sets.RealInterval): 

144 """A real interval based on hardware motors""" 

145 

146 def __init__(self, motors, closed=None): 

147 """ 

148 :param list(Motor) motors: 

149 :param list(2-tuple(bool)) closed: min/max included or not for each motor 

150 """ 

151 self._motors = motors 

152 ndim = len(self.motors) 

153 if closed: 

154 if ndim != len(closed): 

155 raise ValueError("Number of motors and number of closed does not match") 

156 if not all(len(c) == 2 for c in closed): 

157 raise ValueError("Closed must a a list of 2-tuples") 

158 self._closed = closed 

159 else: 

160 self._closed = [[True, True]] * ndim 

161 super().__init__(ndim=ndim) 

162 

163 @property 

164 def motors(self): 

165 return self._motors 

166 

167 @property 

168 def closed(self): 

169 return self._closed 

170 

171 

172class CompositeMotorDomain(AbstractMotorDomain, sets.RealIntervalComposite): 

173 """A real interval based on hardware motors""" 

174 

175 @property 

176 def motors(self): 

177 lst = [] 

178 for interval in self._intervals: 

179 lst.extend(interval.motors) 

180 return lst 

181 

182 @property 

183 def closed(self): 

184 lst = [] 

185 for interval in self._intervals: 

186 lst.extend(interval.closed) 

187 return lst