Coverage for /opt/conda/envs/apienv/lib/python3.11/site-packages/daiquiri/core/hardware/bliss/motor.py: 75%
72 statements
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-29 02:12 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-29 02:12 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from __future__ import annotations
4import math
5import gevent
7from daiquiri.core.hardware.abstract import HardwareProperty
8from daiquiri.core.hardware.abstract.motor import Motor as AbstractMotor, MotorStates
9from daiquiri.core.hardware.bliss.object import BlissObject
10from daiquiri.core.hardware.bliss.object import CouldBeNotImplementedProperty
11from daiquiri.core.logging import log
12from bliss.common.axis import AxisState
15import logging
17logger = logging.getLogger(__name__)
20class LimitsProperty(HardwareProperty):
21 def translate_from(self, value):
22 if value is None:
23 return [None, None]
25 value = list(value)
26 for i, v in enumerate(value):
27 if math.isinf(v):
28 if v < 0:
29 value[i] = -999999999
30 else:
31 value[i] = 999999999
32 if value[0] > value[1]:
33 # BLISS returns the biggest value first, when the sign is negative
34 return value[::-1]
35 return value
38# Convert BLISS state into Daiquiri state.
39# Other state names already matches.
40STATE_MAPPING = {
41 "LIMPOS": "HIGHLIMIT",
42 "LIMNEG": "LOWLIMIT",
43}
46class StateProperty(HardwareProperty):
47 def translate_from(self, value: AxisState | None):
48 if value is None:
49 return ["UNKNOWN"]
51 states = []
52 for name in value.current_states_names:
53 normalized = STATE_MAPPING.get(name, name)
54 if normalized in MotorStates:
55 states.append(normalized)
56 else:
57 desc = value._state_desc[name]
58 states.append(f"_{name}:{desc}")
60 if len(states):
61 return states
63 # It's "not READY"
64 return []
67class PositionProperty(HardwareProperty):
68 def translate_from(self, value):
69 if value is None:
70 return None
72 if math.isnan(value):
73 return None
75 return value
78class NoneIfNotImplementedProperty(CouldBeNotImplementedProperty):
79 """Acceleration and velocity can not be exposed in case of a
80 CalcController"""
82 def notImplementedValue(self):
83 """Returned if the property is not implemented in the hardware"""
84 return None
87class Motor(BlissObject, AbstractMotor):
88 PROPERTY_MAP = {
89 "position": PositionProperty("position"),
90 "target": PositionProperty("_set_position"),
91 "tolerance": HardwareProperty("tolerance"),
92 "acceleration": NoneIfNotImplementedProperty("acceleration"),
93 "velocity": NoneIfNotImplementedProperty("velocity"),
94 "limits": LimitsProperty("limits"),
95 "state": StateProperty("state"),
96 "unit": HardwareProperty("unit"),
97 "offset": HardwareProperty("offset"),
98 "sign": HardwareProperty("sign"),
99 "display_digits": HardwareProperty("display_digits"),
100 }
102 CALLABLE_MAP = {"stop": "stop", "wait": "wait_move"}
104 def _call_move(self, value, **kwargs):
105 logger.debug(f"_call_move {self.name()} {value} {kwargs}")
106 bliss_axis = self._object
107 bliss_axis.move(value, wait=False)
109 def propagate_motion_exception():
110 try:
111 bliss_axis.wait_move()
112 except Exception:
113 log.get("user").error(
114 f"move {self.name()} {value} {kwargs} failed",
115 type="hardware",
116 exc_info=True,
117 )
119 gevent.spawn(propagate_motion_exception)
121 def _call_rmove(self, value, **kwargs):
122 logger.debug(f"_call_rmove {self.name()} {value} {kwargs}")
123 bliss_axis = self._object
124 bliss_axis.move(value, wait=False, relative=True)
126 def propagate_motion_exception():
127 try:
128 bliss_axis.wait_move()
129 except Exception:
130 log.get("user").error(
131 f"rmove {self.name()} {value} {kwargs} failed",
132 type="hardware",
133 exc_info=True,
134 )
136 gevent.spawn(propagate_motion_exception)