Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/hardware/multivisor/__init__.py: 85%

40 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3from marshmallow import ValidationError, fields, validates_schema, post_load 

4 

5from daiquiri.core.hardware.abstract import ProtocolHandler 

6from daiquiri.core.schema.hardware import HOConfigSchema 

7from daiquiri.core.exceptions import InvalidYAML 

8from daiquiri.core.utils import loader 

9from ._httpclient import Multivisor 

10 

11import logging 

12 

13logger = logging.getLogger(__name__) 

14 

15 

16class MultivisorHOConfigSchema(HOConfigSchema): 

17 """Define the way Multivisor hardware object config is parsed""" 

18 

19 address = fields.Str(metadata={"description": "UID from Multivisor"}) 

20 

21 @validates_schema 

22 def schema_validate(self, data, **kwargs): 

23 if not (data.get("address") or data.get("url")): 

24 raise ValidationError( 

25 "Object must have either an `address` or `url` defined" 

26 ) 

27 

28 @post_load 

29 def populate(self, data, **kwargs): 

30 """Generate the device address from its url""" 

31 if data.get("url"): 

32 _, address = data["url"].split("://") 

33 data["address"] = address 

34 if "type" not in data: 

35 data["type"] = "service" 

36 

37 return data 

38 

39 

40class MultivisorHandler(ProtocolHandler): 

41 """Daiquiri implementation of the Multivisor protocol. 

42 

43 This class define the protocol and hold the access to the Multivisor API. 

44 

45 To use such protocol, the address of the Multivisor service have to be 

46 configured the following way. 

47 

48 .. code-block: yaml 

49 

50 protocols: 

51 - id: multivisor 

52 type: multivisor 

53 address: http://localhost:22000 

54 """ 

55 

56 library = "multivisor" 

57 

58 def __init__(self, *args, id, address, **kwargs): 

59 logger.info( 

60 "Connect Multivisor service (url: %s) as protocol name: '%s'", address, id 

61 ) 

62 self._multivisor = Multivisor(address) 

63 ProtocolHandler.__init__(self, *args, **kwargs) 

64 

65 def disconnect(self): 

66 self._multivisor.disconnect() 

67 

68 def get(self, **kwargs): 

69 try: 

70 kwargs = MultivisorHOConfigSchema().load(kwargs) 

71 except ValidationError as err: 

72 raise InvalidYAML( 

73 { 

74 "message": "Multivisor hardware object definition is invalid", 

75 "file": "hardware.yml", 

76 "obj": kwargs, 

77 "errors": err.messages, 

78 } 

79 ) from err 

80 

81 try: 

82 return loader( 

83 "daiquiri.core.hardware.multivisor", 

84 "", 

85 kwargs.get("type"), 

86 app=self._app, 

87 multivisor=self._multivisor, 

88 **kwargs, 

89 ) 

90 except ModuleNotFoundError: 

91 logger.error( 

92 "Could not find class for multivisor object %s", kwargs.get("type") 

93 ) 

94 raise