Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/resources/utils.py: 97%
135 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-15 02:12 +0000
1import os
2import logging
3import re
5import ruamel.yaml
7from daiquiri.core.exceptions import SyntaxErrorYAML
8from daiquiri.core.resources.multi_resource_provider import MultiResourceProvider
9from daiquiri.core.resources.file_resource_provider import FileResourceProvider
10from daiquiri.core.resources.resource_provider import ResourceNotAvailable
11from daiquiri.core.utils.dictutils import prudent_update
14logger = logging.getLogger(__name__)
16RESOURCE_PROVIDER = None
18VAR_MATCHER = re.compile(r"(\${([\w_]+)})")
21def _interpolate(value, args):
22 if isinstance(value, dict):
23 return interpolate(value, args)
24 elif isinstance(value, list):
25 return [_interpolate(i, args) for i in value]
26 elif isinstance(value, str):
27 match = VAR_MATCHER.search(value)
28 if match:
29 # Full variable replacement (allowing complex types)
30 if match[1] == value:
31 return args.get(match[2])
33 # String replace
34 else:
36 def repl_var(mat):
37 return str(args.get(mat[2]))
39 value = VAR_MATCHER.sub(repl_var, value)
41 return value
43 return value
46def interpolate(layout, variables={}):
47 """Recursively interpolate variables in a nested dict"""
48 if isinstance(layout, dict):
49 new_layout = {}
50 for k, v in layout.items():
51 new_layout[k] = _interpolate(v, variables)
53 return new_layout
55 elif isinstance(layout, list):
56 new_layout = []
57 for v in layout:
58 new_layout.append(_interpolate(v, variables))
60 return new_layout
63class CustomYamlLoader(ruamel.yaml.Loader):
64 pass
67def _construct_include(loader: CustomYamlLoader, node: ruamel.yaml.Node):
68 """Include file referenced at node."""
69 if isinstance(node.value, list):
70 args = loader.construct_mapping(node, deep=True)
71 if not args.get("file"):
72 raise SyntaxErrorYAML(
73 {"error": "Include definition must include a `file` key"}
74 )
75 parts = args.get("file").split("#")
76 variables = args.get("variables", {})
78 elif isinstance(node.value, str):
79 parts = node.value.split("#")
80 variables = {}
82 else:
83 raise SyntaxErrorYAML({"error": "Unknown include configuration"})
85 resourcename = parts[0]
86 ref = parts[1] if len(parts) > 1 else None
88 # assume the name is in form resourcetype/resourcename.yml
89 # FIXME: This have to be handled properly
90 root = loader.stream.name
91 paths = os.path.split(root)
92 roottype = os.path.split(paths[-2])[-1]
94 reference = f"in !include '{root}', line {node.start_mark.line}, column {node.start_mark.column}"
95 extension = os.path.splitext(resourcename)[1][1:]
96 if extension not in ("yaml", "yml"):
97 raise SyntaxErrorYAML(
98 {
99 "error": f"!include file must have extension 'yml' or 'yaml'\n extension was {extension}\n{reference}"
100 }
101 )
103 try:
104 provider = get_resource_provider()
105 resource = provider.resource(roottype, resourcename)
106 with provider.open_resource(resource, mode="t") as stream:
107 loader = CustomYamlLoader(stream)
108 contents = loader.get_single_data()
109 if isinstance(contents, (dict, list)):
110 if isinstance(contents, dict):
111 defaults = contents.pop("default_variables", {})
112 else:
113 defaults = {}
114 defaults.update(variables)
115 contents = interpolate(contents, defaults)
117 except ResourceNotAvailable:
118 raise SyntaxErrorYAML(
119 {
120 "error": f"can't find included resource {roottype}/{resourcename}\n{reference}"
121 }
122 )
124 except ruamel.yaml.YAMLError as ex:
125 raise SyntaxErrorYAML({"error": ex}) from None
127 else:
128 if ref is not None:
129 try:
130 if not isinstance(contents, dict):
131 raise SyntaxErrorYAML(
132 {
133 "error": f"!include has #{ref}, but contents of {roottype}/{resourcename} is not an object\n{reference}"
134 }
135 )
136 return contents[ref]
137 except KeyError:
138 raise SyntaxErrorYAML(
139 {
140 "error": f"no such key '{ref}' in '{roottype}/{resourcename}'\n{reference}"
141 }
142 )
144 else:
145 return contents
148ruamel.yaml.add_constructor("!include", _construct_include, CustomYamlLoader)
151def get_resource_provider() -> MultiResourceProvider:
152 """Get the resource provider"""
153 global RESOURCE_PROVIDER
155 if RESOURCE_PROVIDER is None:
156 RESOURCE_PROVIDER = MultiResourceProvider()
157 root = os.path.join(os.path.dirname(__file__))
158 mainprovider = FileResourceProvider(root)
159 RESOURCE_PROVIDER.prepend_provider(mainprovider)
161 return RESOURCE_PROVIDER
164def add_resource_root(path: str):
165 """The new resource root directory will be used
166 before the existing ones
167 """
168 file_provider = FileResourceProvider(path)
169 provider = get_resource_provider()
170 provider.prepend_provider(file_provider)
173class YamlDict(dict):
174 """
175 Create a dictionary from resource.
177 Provides a dictionary API and an extra `reload` method to reread the
178 resource file.
180 Arguments:
181 resourcetype: subdirectory
182 resourcename: filename
183 """
185 def __init__(self, resourcetype: str, resourcename: str):
186 dict.__init__(self)
187 self._resource_type = resourcetype
188 self._resource_name = resourcename
189 self.__update()
191 @property
192 def resource(self):
193 return f"{self._resource_type}/{self._resource_name}"
195 def __update(self):
196 provider = get_resource_provider()
197 resource = provider.resource(
198 resourcetype=self._resource_type, resourcename=self._resource_name
199 )
200 self.clear()
201 with provider.open_resource(resource) as stream:
202 try:
203 loader = CustomYamlLoader(stream)
204 self.update(loader.get_single_data())
205 except ruamel.yaml.YAMLError as ex:
206 raise SyntaxErrorYAML({"error": ex}) from None
208 def save(self):
209 """Save an updated yaml file"""
210 provider = get_resource_provider()
211 resource = provider.resource(
212 resourcetype=self._resource_type, resourcename=self._resource_name
213 )
215 yaml = ruamel.yaml.YAML()
216 yaml.allow_duplicate_keys = True
217 yaml.default_flow_style = False
218 yaml.indent(mapping=2, sequence=4, offset=2)
220 yaml_contents = None
221 with provider.open_resource(resource) as stream:
222 # The default loader in ruamel is RoundTrip which inherits from SafeLoader
223 # https://sourceforge.net/p/ruamel-yaml/code/ci/default/tree/constructor.py#l1011
224 yaml_contents = yaml.load(stream) # nosec
226 prudent_update(yaml_contents, self, ignore_unknown=True)
228 logger.info(f"Saving {self._resource_type}/{self._resource_name}")
229 with provider.open_resource(resource, "wt") as stream:
230 yaml.dump(yaml_contents, stream=stream)
232 def reload(self):
233 """Reload the resource name"""
234 logger.info(f"Reloading {self._resource_type}/{self._resource_name}")
235 self.__update()
238class ConfigDict(YamlDict):
239 """
240 Create a dictionary from resource name from the config.
242 Provides a dictionary API and an extra `reload` method to reread the
243 resource file.
245 Arguments:
246 resourcename: filename
247 """
249 def __init__(self, resourcename):
250 YamlDict.__init__(self, "config", resourcename)