Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/transform/hw_sets.py: 87%
108 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
1"""Sets based in daiquiri hardware objects
2"""
4import gevent
5import logging
6from abc import ABC, abstractproperty
7from daiquiri.core.transform import sets
10logger = logging.getLogger(__name__)
13class AbstractMotorDomain(ABC):
14 """A real interval based on hardware motors"""
16 @abstractproperty
17 def motors(self):
18 pass
20 @abstractproperty
21 def closed(self):
22 pass
24 @property
25 def limits(self):
26 # TODO: Cache this?
27 # if numpy.isinf(self._limits[0][0]):
28 self._limits = self._calc_limits
29 return self._limits
31 @property
32 def _calc_limits(self):
33 limits = []
34 for m, (clow, chigh) in zip(self.motors, self.closed):
35 low, high = m.get("limits")
36 if low > high:
37 low, high = high, low
38 limits.append([low, high, clow, chigh])
39 return self._parse_limits(limits)
41 @property
42 def current_position(self):
43 """
44 :returns array: shape is (ndomain,)
45 """
46 return self.as_sequence([m.get("position") for m in self.motors])[0]
48 @current_position.setter
49 def current_position(self, values):
50 self.raiseIfNotIn(values)
51 for mot, pos in zip(self.motors, values):
52 logger.info(f"Move {mot.name()} to {pos}")
53 mot.move(pos)
55 @property
56 def current_position_dict(self):
57 """
58 :returns dict: length is ndomain
59 """
60 result = {}
61 for mot, pos in zip(self.motors, self.current_position):
62 result[mot.id()] = pos
63 return result
65 @current_position_dict.setter
66 def current_position_dict(self, dic):
67 positions = [0] * self.ndomain
68 for motname, motpos in dic.items():
69 idx = self.motor_index(motname)
70 positions[idx] = motpos
71 self.current_position = positions
73 def wait_motion_done(self, timeout=None):
74 """Wait until the motors do not move anymore
76 :param num or None timeout: `None` waits indefinitely
77 """
78 with gevent.Timeout(timeout):
79 logger.info("Wait motion done ...")
80 for mot in self.motors:
81 mot.wait()
82 logger.info("Motion done.")
84 def move(self, values, wait=True, timeout=None):
85 """Set the `current_position` with optional waiting (the default)
87 :param array or dict values:
88 :param bool wait: wait until motion has finished
89 :param num or None timeout: `None` waits indefinitely
90 """
91 if isinstance(values, dict):
92 self.current_position_dict = values
93 else:
94 self.current_position = values
95 if wait:
96 self.wait_motion_done(timeout=timeout)
98 def rmove(self, values, wait=True, timeout=None):
99 """Set the `current_position` with optional waiting (the default)
101 :param array or dict values:
102 :param bool wait: wait until motion has finished
103 :param num or None timeout: `None` waits indefinitely
104 """
105 if isinstance(values, dict):
106 _values = self.current_position_dict
107 for motname, rpos in values.items():
108 _values[motname] = rpos
109 values = _values
110 else:
111 values = self.current_position + values
112 self.move(values, wait=wait, timeout=timeout)
114 def motor_limits(self, name):
115 """Get motor limits
117 :param str name:
118 :returns 2-tuple: low, high
119 """
120 motlim = self.limits[self.motor_index(name)]
121 return motlim.low, motlim.high
123 def motor_index(self, name):
124 """Get the index of the motor name
126 :param str name:
127 :returns int:
128 """
129 for i, mot in enumerate(self.motors):
130 if mot.id() == name:
131 return i
132 raise IndexError(name)
134 def motor_name(self, i):
135 """Get the motor name from the index
137 :param int:
138 :returns str:
139 """
140 return self.motors[i].id()
143class MotorDomain(AbstractMotorDomain, sets.RealInterval):
144 """A real interval based on hardware motors"""
146 def __init__(self, motors, closed=None):
147 """
148 :param list(Motor) motors:
149 :param list(2-tuple(bool)) closed: min/max included or not for each motor
150 """
151 self._motors = motors
152 ndim = len(self.motors)
153 if closed:
154 if ndim != len(closed):
155 raise ValueError("Number of motors and number of closed does not match")
156 if not all(len(c) == 2 for c in closed):
157 raise ValueError("Closed must a a list of 2-tuples")
158 self._closed = closed
159 else:
160 self._closed = [[True, True]] * ndim
161 super().__init__(ndim=ndim)
163 @property
164 def motors(self):
165 return self._motors
167 @property
168 def closed(self):
169 return self._closed
172class CompositeMotorDomain(AbstractMotorDomain, sets.RealIntervalComposite):
173 """A real interval based on hardware motors"""
175 @property
176 def motors(self):
177 lst = []
178 for interval in self._intervals:
179 lst.extend(interval.motors)
180 return lst
182 @property
183 def closed(self):
184 lst = []
185 for interval in self._intervals:
186 lst.extend(interval.closed)
187 return lst