Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/bliss/__init__.py: 70%

106 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-02-06 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3import importlib 

4import logging 

5import gevent 

6 

7from marshmallow import ValidationError, fields, validates_schema, post_load 

8from bliss.config.static import get_config 

9from bliss.config.conductor import client 

10 

11from daiquiri.core.hardware.abstract import ProtocolHandler 

12from daiquiri.core.hardware.bliss.object import BlissDummyObject 

13from daiquiri.core.schema.hardware import HOConfigSchema 

14from daiquiri.core.exceptions import InvalidYAML 

15from daiquiri.core.utils import loader 

16 

17 

18logger = logging.getLogger(__name__) 

19 

20bl = logging.getLogger("bliss") 

21bl.setLevel(logging.WARNING) 

22bl.disabled = True 

23 

24bl = logging.getLogger("bliss.common.mapping") 

25bl.disabled = True 

26 

27 

28class BlissHOConfigSchema(HOConfigSchema): 

29 """The Bliss Hardware Object Config Schema""" 

30 

31 address = fields.Str(metadata={"description": "Beacon object id"}) 

32 type = fields.Str(metadata={"description": "Object type for objects without id"}) 

33 

34 @validates_schema 

35 def schema_validate(self, data, **kwargs): 

36 if not (data.get("address") or data.get("url")): 

37 raise ValidationError( 

38 "Object must have either an `address` or `url` defined" 

39 ) 

40 

41 @post_load 

42 def populate(self, data, **kwargs): 

43 """Generate the device address from its url""" 

44 if data.get("url"): 

45 _, address = data["url"].split("://") 

46 data["address"] = address 

47 

48 return data 

49 

50 

51class BlissHandler(ProtocolHandler): 

52 """The bliss protocol handler 

53 

54 Returns an instance of an abstracted bliss object 

55 

56 The bliss protocol handler first checks the kwargs conform to the BlissHOConfigSchema 

57 defined above. This address is used to retrieve the bliss object. Its class is then mapped 

58 to an abstract class and a bliss specific instance is created (see hardware/bliss/motor.py) 

59 """ 

60 

61 library = "bliss" 

62 

63 _class_map = { 

64 "bliss.common.axis.Axis": "motor", 

65 "bliss.controllers.actuator.Actuator": "actuator", 

66 "bliss.controllers.multiplepositions.MultiplePositions": "multiposition", 

67 "bliss.common.shutter.BaseShutter": "shutter", 

68 "bliss.controllers.test.objectref.ObjectRef": "objectref", 

69 "bliss.controllers.intraled.Intraled": "intraled", 

70 "bliss.controllers.lima.lima_base.Lima": "lima", 

71 "tomo.controllers.pusher.Pusher": "pusher", 

72 "tomo.controllers.air_bearing.AirBearing": "airbearing", 

73 "tomo.procedures.base_procedure.SessionProcedure": "procedure", 

74 "tomo.procedures.user_script.UserScriptProcedure": "procedure", 

75 "tomo.controllers.tomo_config.TomoConfig": "tomoconfig", 

76 "tomo.controllers.tomo_detector.TomoDetector": "tomodetector", 

77 "tomo.controllers.tomo_detectors.TomoDetectors": "tomodetectors", 

78 "tomo.controllers.tomo_imaging.TomoImaging": "tomoimaging", 

79 "tomo.controllers.tomo_sample_stage.TomoSampleStage": "tomosamplestage", 

80 "tomo.controllers.flat_motion.FlatMotion": "tomoflatmotion", 

81 "tomo.controllers.holotomo.Holotomo": "tomoholo", 

82 "tomo.controllers.flat_motion.FlatMotion": "tomoflatmotion", 

83 "tomo.controllers.parking_position.ParkingPosition": "tomoreferenceposition", 

84 "tomo.optic.base_optic.BaseOptic": "tomooptic", 

85 } 

86 

87 _class_name_map = { 

88 "EBV": "beamviewer", 

89 "ChannelFromConfig": "channelfromconfig", 

90 "volpi": "volpi", 

91 "TestObject": "test", 

92 "Fshutter": "shutter", 

93 "transmission": "transmission", 

94 "tango_attr_as_counter": "tango_attr_as_counter", 

95 "ShimadzuCBM20": "shimadzucbm20", 

96 "ShimadzuPDA": "shimadzupda", 

97 "ID26Attenuator": "attenuator_wago", 

98 "PresetManager": "presetmanager", 

99 "LaserController": "laser", 

100 "LaserHeating": "laserheating", 

101 "IcePapTrack": "remotemotor", 

102 "DaiquiriProcessor": "processor", 

103 } 

104 

105 def get(self, **kwargs): 

106 try: 

107 kwargs = BlissHOConfigSchema().load(kwargs) 

108 except ValidationError as err: 

109 raise InvalidYAML( 

110 { 

111 "message": "Bliss hardware object definition is invalid", 

112 "file": "hardware.yml", 

113 "obj": kwargs, 

114 "errors": err.messages, 

115 } 

116 ) from err 

117 

118 obj_type = kwargs.get("type") 

119 

120 if obj_type == "activetomoconfig": 

121 # It's a global proxy without real instance 

122 class GlobalBlissObject: 

123 name = None 

124 

125 obj = GlobalBlissObject() 

126 else: 

127 config = get_config() 

128 try: 

129 obj = config.get(kwargs.get("address")) 

130 except Exception: 

131 logger.exception(f"Couldn't get bliss object {kwargs.get('address')}") 

132 return BlissDummyObject(**kwargs) 

133 kwargs["obj"] = obj 

134 

135 obj_type = kwargs.get("type") 

136 if obj_type is not None: 

137 return loader( 

138 "daiquiri.core.hardware.bliss", 

139 "", 

140 obj_type, 

141 **kwargs, 

142 ) 

143 

144 for bliss_mapping, mapped_class in self._class_map.items(): 

145 bliss_file, bliss_class_name = bliss_mapping.rsplit(".", 1) 

146 # Some classes may not be available depending on the bliss version 

147 try: 

148 bliss_module = importlib.import_module(bliss_file) 

149 bliss_class = getattr(bliss_module, bliss_class_name) 

150 except ModuleNotFoundError: 

151 logger.warning(f"Could not find bliss module {bliss_mapping}") 

152 continue 

153 

154 if isinstance(kwargs["obj"], bliss_class): 

155 return loader( 

156 "daiquiri.core.hardware.bliss", "", mapped_class, **kwargs 

157 ) 

158 

159 cls = kwargs["obj"].__class__.__name__ 

160 if cls in self._class_name_map: 

161 return loader( 

162 "daiquiri.core.hardware.bliss", 

163 "", 

164 self._class_name_map[cls], 

165 **kwargs, 

166 ) 

167 

168 logger.error("No class found for {cls}".format(cls=cls)) 

169 return BlissDummyObject(**kwargs) 

170 

171 def _get_hardware(self): 

172 """Return the hardware service. 

173 

174 This should be part of the daiquiri API. 

175 """ 

176 app = self._app 

177 

178 logger.debug("Wait for hardware component") 

179 while True: 

180 gevent.sleep(5.0) 

181 try: 

182 hardware = app.hardware 

183 break 

184 except Exception: 

185 logger.error("Error while getting hardware service", exc_info=True) 

186 

187 logger.debug("Hardware initialized") 

188 return hardware 

189 

190 def _monitor_locked_device_loop(self): 

191 try: 

192 hardware = self._get_hardware() 

193 connection = client.get_default_connection() 

194 

195 all_locks = {} 

196 while True: 

197 gevent.sleep(1.0) 

198 

199 new_locks = connection.who_locked() 

200 new_keys = set(new_locks.keys()) 

201 previous_keys = set(all_locks.keys()) 

202 for k in new_keys - previous_keys: 

203 obj = hardware.get_object(k) 

204 if obj is None: 

205 continue 

206 obj.set_locked(new_locks[k]) 

207 for k in previous_keys - new_keys: 

208 obj = hardware.get_object(k) 

209 if obj is None: 

210 continue 

211 obj.set_locked(None) 

212 all_locks = new_locks 

213 except Exception: 

214 logger.error("Error while monitoring locks", exc_info=True) 

215 

216 def create_monitor(self): 

217 return gevent.spawn(self._monitor_locked_device_loop)