Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/bliss/__init__.py: 70%
106 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-06 02:13 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-06 02:13 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import importlib
4import logging
5import gevent
7from marshmallow import ValidationError, fields, validates_schema, post_load
8from bliss.config.static import get_config
9from bliss.config.conductor import client
11from daiquiri.core.hardware.abstract import ProtocolHandler
12from daiquiri.core.hardware.bliss.object import BlissDummyObject
13from daiquiri.core.schema.hardware import HOConfigSchema
14from daiquiri.core.exceptions import InvalidYAML
15from daiquiri.core.utils import loader
18logger = logging.getLogger(__name__)
20bl = logging.getLogger("bliss")
21bl.setLevel(logging.WARNING)
22bl.disabled = True
24bl = logging.getLogger("bliss.common.mapping")
25bl.disabled = True
28class BlissHOConfigSchema(HOConfigSchema):
29 """The Bliss Hardware Object Config Schema"""
31 address = fields.Str(metadata={"description": "Beacon object id"})
32 type = fields.Str(metadata={"description": "Object type for objects without id"})
34 @validates_schema
35 def schema_validate(self, data, **kwargs):
36 if not (data.get("address") or data.get("url")):
37 raise ValidationError(
38 "Object must have either an `address` or `url` defined"
39 )
41 @post_load
42 def populate(self, data, **kwargs):
43 """Generate the device address from its url"""
44 if data.get("url"):
45 _, address = data["url"].split("://")
46 data["address"] = address
48 return data
51class BlissHandler(ProtocolHandler):
52 """The bliss protocol handler
54 Returns an instance of an abstracted bliss object
56 The bliss protocol handler first checks the kwargs conform to the BlissHOConfigSchema
57 defined above. This address is used to retrieve the bliss object. Its class is then mapped
58 to an abstract class and a bliss specific instance is created (see hardware/bliss/motor.py)
59 """
61 library = "bliss"
63 _class_map = {
64 "bliss.common.axis.Axis": "motor",
65 "bliss.controllers.actuator.Actuator": "actuator",
66 "bliss.controllers.multiplepositions.MultiplePositions": "multiposition",
67 "bliss.common.shutter.BaseShutter": "shutter",
68 "bliss.controllers.test.objectref.ObjectRef": "objectref",
69 "bliss.controllers.intraled.Intraled": "intraled",
70 "bliss.controllers.lima.lima_base.Lima": "lima",
71 "tomo.controllers.pusher.Pusher": "pusher",
72 "tomo.controllers.air_bearing.AirBearing": "airbearing",
73 "tomo.procedures.base_procedure.SessionProcedure": "procedure",
74 "tomo.procedures.user_script.UserScriptProcedure": "procedure",
75 "tomo.controllers.tomo_config.TomoConfig": "tomoconfig",
76 "tomo.controllers.tomo_detector.TomoDetector": "tomodetector",
77 "tomo.controllers.tomo_detectors.TomoDetectors": "tomodetectors",
78 "tomo.controllers.tomo_imaging.TomoImaging": "tomoimaging",
79 "tomo.controllers.tomo_sample_stage.TomoSampleStage": "tomosamplestage",
80 "tomo.controllers.flat_motion.FlatMotion": "tomoflatmotion",
81 "tomo.controllers.holotomo.Holotomo": "tomoholo",
82 "tomo.controllers.flat_motion.FlatMotion": "tomoflatmotion",
83 "tomo.controllers.parking_position.ParkingPosition": "tomoreferenceposition",
84 "tomo.optic.base_optic.BaseOptic": "tomooptic",
85 }
87 _class_name_map = {
88 "EBV": "beamviewer",
89 "ChannelFromConfig": "channelfromconfig",
90 "volpi": "volpi",
91 "TestObject": "test",
92 "Fshutter": "shutter",
93 "transmission": "transmission",
94 "tango_attr_as_counter": "tango_attr_as_counter",
95 "ShimadzuCBM20": "shimadzucbm20",
96 "ShimadzuPDA": "shimadzupda",
97 "ID26Attenuator": "attenuator_wago",
98 "PresetManager": "presetmanager",
99 "LaserController": "laser",
100 "LaserHeating": "laserheating",
101 "IcePapTrack": "remotemotor",
102 "DaiquiriProcessor": "processor",
103 }
105 def get(self, **kwargs):
106 try:
107 kwargs = BlissHOConfigSchema().load(kwargs)
108 except ValidationError as err:
109 raise InvalidYAML(
110 {
111 "message": "Bliss hardware object definition is invalid",
112 "file": "hardware.yml",
113 "obj": kwargs,
114 "errors": err.messages,
115 }
116 ) from err
118 obj_type = kwargs.get("type")
120 if obj_type == "activetomoconfig":
121 # It's a global proxy without real instance
122 class GlobalBlissObject:
123 name = None
125 obj = GlobalBlissObject()
126 else:
127 config = get_config()
128 try:
129 obj = config.get(kwargs.get("address"))
130 except Exception:
131 logger.exception(f"Couldn't get bliss object {kwargs.get('address')}")
132 return BlissDummyObject(**kwargs)
133 kwargs["obj"] = obj
135 obj_type = kwargs.get("type")
136 if obj_type is not None:
137 return loader(
138 "daiquiri.core.hardware.bliss",
139 "",
140 obj_type,
141 **kwargs,
142 )
144 for bliss_mapping, mapped_class in self._class_map.items():
145 bliss_file, bliss_class_name = bliss_mapping.rsplit(".", 1)
146 # Some classes may not be available depending on the bliss version
147 try:
148 bliss_module = importlib.import_module(bliss_file)
149 bliss_class = getattr(bliss_module, bliss_class_name)
150 except ModuleNotFoundError:
151 logger.warning(f"Could not find bliss module {bliss_mapping}")
152 continue
154 if isinstance(kwargs["obj"], bliss_class):
155 return loader(
156 "daiquiri.core.hardware.bliss", "", mapped_class, **kwargs
157 )
159 cls = kwargs["obj"].__class__.__name__
160 if cls in self._class_name_map:
161 return loader(
162 "daiquiri.core.hardware.bliss",
163 "",
164 self._class_name_map[cls],
165 **kwargs,
166 )
168 logger.error("No class found for {cls}".format(cls=cls))
169 return BlissDummyObject(**kwargs)
171 def _get_hardware(self):
172 """Return the hardware service.
174 This should be part of the daiquiri API.
175 """
176 app = self._app
178 logger.debug("Wait for hardware component")
179 while True:
180 gevent.sleep(5.0)
181 try:
182 hardware = app.hardware
183 break
184 except Exception:
185 logger.error("Error while getting hardware service", exc_info=True)
187 logger.debug("Hardware initialized")
188 return hardware
190 def _monitor_locked_device_loop(self):
191 try:
192 hardware = self._get_hardware()
193 connection = client.get_default_connection()
195 all_locks = {}
196 while True:
197 gevent.sleep(1.0)
199 new_locks = connection.who_locked()
200 new_keys = set(new_locks.keys())
201 previous_keys = set(all_locks.keys())
202 for k in new_keys - previous_keys:
203 obj = hardware.get_object(k)
204 if obj is None:
205 continue
206 obj.set_locked(new_locks[k])
207 for k in previous_keys - new_keys:
208 obj = hardware.get_object(k)
209 if obj is None:
210 continue
211 obj.set_locked(None)
212 all_locks = new_locks
213 except Exception:
214 logger.error("Error while monitoring locks", exc_info=True)
216 def create_monitor(self):
217 return gevent.spawn(self._monitor_locked_device_loop)