Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/tango/__init__.py: 89%

44 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3from marshmallow import ValidationError, fields, validates_schema, post_load 

4 

5from daiquiri.core.hardware.abstract import ProtocolHandler 

6from daiquiri.core.schema.hardware import HOConfigSchema 

7from daiquiri.core.exceptions import InvalidYAML 

8from daiquiri.core.utils import loader 

9 

10import logging 

11 

12logger = logging.getLogger(__name__) 

13 

14 

15class TangoHOConfig(HOConfigSchema): 

16 """The Tango Hardware Object Config Schema""" 

17 

18 type = fields.Str(required=True, metadata={"description": "The object type"}) 

19 tango_url = fields.Str(metadata={"description": "URL to tango device/attribute"}) 

20 attrname = fields.Str( 

21 required=False, metadata={"description": "Used by TangoAttr object"} 

22 ) 

23 

24 @validates_schema 

25 def schema_validate(self, data, **kwargs): 

26 if not (data.get("tango_url") or data.get("url")): 

27 raise ValidationError( 

28 "Object must have either a `tango_url` or `url` defined" 

29 ) 

30 

31 @post_load 

32 def populate(self, data, **kwargs): 

33 """Maintain backwards compatibility with `tango_url` 

34 

35 TODO: Remove me 

36 """ 

37 url = data.get("url") 

38 if url is not None: 

39 # Normalize URL to a valid tango URL 

40 if url.startswith("tango:///"): 

41 # short schema without host and port 

42 url = url[len("tango:///") :] 

43 data["tango_url"] = url 

44 

45 return data 

46 

47 

48class TangoHandler(ProtocolHandler): 

49 library = "tango" 

50 

51 def get(self, **kwargs): 

52 # Discover tangoattr using from url 

53 if "url" in kwargs: 

54 url = kwargs["url"] 

55 

56 if "type" not in kwargs or kwargs["type"] == "tangoattr": 

57 if url.count("/") == 6: 

58 url, attrname = url.rsplit("/", 1) 

59 kwargs["url"] = url 

60 kwargs["attrname"] = attrname 

61 if "type" not in kwargs: 

62 kwargs["type"] = "tangoattr" 

63 

64 try: 

65 kwargs = TangoHOConfig().load(kwargs) 

66 except ValidationError as err: 

67 raise InvalidYAML( 

68 { 

69 "message": "Tango hardware object definition is invalid", 

70 "file": "hardware.yml", 

71 "obj": kwargs, 

72 "errors": err.messages, 

73 } 

74 ) from err 

75 

76 try: 

77 return loader( 

78 "daiquiri.core.hardware.tango", 

79 "", 

80 kwargs.get("type"), 

81 app=self._app, 

82 **kwargs 

83 ) 

84 except ModuleNotFoundError: 

85 logger.error( 

86 "Could not find class for tango object {type}".format( 

87 type=kwargs.get("type") 

88 ) 

89 ) 

90 raise