Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/multivisor/__init__.py: 85%
40 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from marshmallow import ValidationError, fields, validates_schema, post_load
5from daiquiri.core.hardware.abstract import ProtocolHandler
6from daiquiri.core.schema.hardware import HOConfigSchema
7from daiquiri.core.exceptions import InvalidYAML
8from daiquiri.core.utils import loader
9from ._httpclient import Multivisor
11import logging
13logger = logging.getLogger(__name__)
16class MultivisorHOConfigSchema(HOConfigSchema):
17 """Define the way Multivisor hardware object config is parsed"""
19 address = fields.Str(metadata={"description": "UID from Multivisor"})
21 @validates_schema
22 def schema_validate(self, data, **kwargs):
23 if not (data.get("address") or data.get("url")):
24 raise ValidationError(
25 "Object must have either an `address` or `url` defined"
26 )
28 @post_load
29 def populate(self, data, **kwargs):
30 """Generate the device address from its url"""
31 if data.get("url"):
32 _, address = data["url"].split("://")
33 data["address"] = address
34 if "type" not in data:
35 data["type"] = "service"
37 return data
40class MultivisorHandler(ProtocolHandler):
41 """Daiquiri implementation of the Multivisor protocol.
43 This class define the protocol and hold the access to the Multivisor API.
45 To use such protocol, the address of the Multivisor service have to be
46 configured the following way.
48 .. code-block: yaml
50 protocols:
51 - id: multivisor
52 type: multivisor
53 address: http://localhost:22000
54 """
56 library = "multivisor"
58 def __init__(self, *args, id, address, **kwargs):
59 logger.info(
60 "Connect Multivisor service (url: %s) as protocol name: '%s'", address, id
61 )
62 self._multivisor = Multivisor(address)
63 ProtocolHandler.__init__(self, *args, **kwargs)
65 def disconnect(self):
66 self._multivisor.disconnect()
68 def get(self, **kwargs):
69 try:
70 kwargs = MultivisorHOConfigSchema().load(kwargs)
71 except ValidationError as err:
72 raise InvalidYAML(
73 {
74 "message": "Multivisor hardware object definition is invalid",
75 "file": "hardware.yml",
76 "obj": kwargs,
77 "errors": err.messages,
78 }
79 ) from err
81 try:
82 return loader(
83 "daiquiri.core.hardware.multivisor",
84 "",
85 kwargs.get("type"),
86 app=self._app,
87 multivisor=self._multivisor,
88 **kwargs,
89 )
90 except ModuleNotFoundError:
91 logger.error(
92 "Could not find class for multivisor object %s", kwargs.get("type")
93 )
94 raise