Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/tango/__init__.py: 89%
44 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from marshmallow import ValidationError, fields, validates_schema, post_load
5from daiquiri.core.hardware.abstract import ProtocolHandler
6from daiquiri.core.schema.hardware import HOConfigSchema
7from daiquiri.core.exceptions import InvalidYAML
8from daiquiri.core.utils import loader
10import logging
12logger = logging.getLogger(__name__)
15class TangoHOConfig(HOConfigSchema):
16 """The Tango Hardware Object Config Schema"""
18 type = fields.Str(required=True, metadata={"description": "The object type"})
19 tango_url = fields.Str(metadata={"description": "URL to tango device/attribute"})
20 attrname = fields.Str(
21 required=False, metadata={"description": "Used by TangoAttr object"}
22 )
24 @validates_schema
25 def schema_validate(self, data, **kwargs):
26 if not (data.get("tango_url") or data.get("url")):
27 raise ValidationError(
28 "Object must have either a `tango_url` or `url` defined"
29 )
31 @post_load
32 def populate(self, data, **kwargs):
33 """Maintain backwards compatibility with `tango_url`
35 TODO: Remove me
36 """
37 url = data.get("url")
38 if url is not None:
39 # Normalize URL to a valid tango URL
40 if url.startswith("tango:///"):
41 # short schema without host and port
42 url = url[len("tango:///") :]
43 data["tango_url"] = url
45 return data
48class TangoHandler(ProtocolHandler):
49 library = "tango"
51 def get(self, **kwargs):
52 # Discover tangoattr using from url
53 if "url" in kwargs:
54 url = kwargs["url"]
56 if "type" not in kwargs or kwargs["type"] == "tangoattr":
57 if url.count("/") == 6:
58 url, attrname = url.rsplit("/", 1)
59 kwargs["url"] = url
60 kwargs["attrname"] = attrname
61 if "type" not in kwargs:
62 kwargs["type"] = "tangoattr"
64 try:
65 kwargs = TangoHOConfig().load(kwargs)
66 except ValidationError as err:
67 raise InvalidYAML(
68 {
69 "message": "Tango hardware object definition is invalid",
70 "file": "hardware.yml",
71 "obj": kwargs,
72 "errors": err.messages,
73 }
74 ) from err
76 try:
77 return loader(
78 "daiquiri.core.hardware.tango",
79 "",
80 kwargs.get("type"),
81 app=self._app,
82 **kwargs
83 )
84 except ModuleNotFoundError:
85 logger.error(
86 "Could not find class for tango object {type}".format(
87 type=kwargs.get("type")
88 )
89 )
90 raise