Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/resources/utils.py: 97%

135 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-15 02:12 +0000

1import os 

2import logging 

3import re 

4 

5import ruamel.yaml 

6 

7from daiquiri.core.exceptions import SyntaxErrorYAML 

8from daiquiri.core.resources.multi_resource_provider import MultiResourceProvider 

9from daiquiri.core.resources.file_resource_provider import FileResourceProvider 

10from daiquiri.core.resources.resource_provider import ResourceNotAvailable 

11from daiquiri.core.utils.dictutils import prudent_update 

12 

13 

14logger = logging.getLogger(__name__) 

15 

16RESOURCE_PROVIDER = None 

17 

18VAR_MATCHER = re.compile(r"(\${([\w_]+)})") 

19 

20 

21def _interpolate(value, args): 

22 if isinstance(value, dict): 

23 return interpolate(value, args) 

24 elif isinstance(value, list): 

25 return [_interpolate(i, args) for i in value] 

26 elif isinstance(value, str): 

27 match = VAR_MATCHER.search(value) 

28 if match: 

29 # Full variable replacement (allowing complex types) 

30 if match[1] == value: 

31 return args.get(match[2]) 

32 

33 # String replace 

34 else: 

35 

36 def repl_var(mat): 

37 return str(args.get(mat[2])) 

38 

39 value = VAR_MATCHER.sub(repl_var, value) 

40 

41 return value 

42 

43 return value 

44 

45 

46def interpolate(layout, variables={}): 

47 """Recursively interpolate variables in a nested dict""" 

48 if isinstance(layout, dict): 

49 new_layout = {} 

50 for k, v in layout.items(): 

51 new_layout[k] = _interpolate(v, variables) 

52 

53 return new_layout 

54 

55 elif isinstance(layout, list): 

56 new_layout = [] 

57 for v in layout: 

58 new_layout.append(_interpolate(v, variables)) 

59 

60 return new_layout 

61 

62 

63class CustomYamlLoader(ruamel.yaml.Loader): 

64 pass 

65 

66 

67def _construct_include(loader: CustomYamlLoader, node: ruamel.yaml.Node): 

68 """Include file referenced at node.""" 

69 if isinstance(node.value, list): 

70 args = loader.construct_mapping(node, deep=True) 

71 if not args.get("file"): 

72 raise SyntaxErrorYAML( 

73 {"error": "Include definition must include a `file` key"} 

74 ) 

75 parts = args.get("file").split("#") 

76 variables = args.get("variables", {}) 

77 

78 elif isinstance(node.value, str): 

79 parts = node.value.split("#") 

80 variables = {} 

81 

82 else: 

83 raise SyntaxErrorYAML({"error": "Unknown include configuration"}) 

84 

85 resourcename = parts[0] 

86 ref = parts[1] if len(parts) > 1 else None 

87 

88 # assume the name is in form resourcetype/resourcename.yml 

89 # FIXME: This have to be handled properly 

90 root = loader.stream.name 

91 paths = os.path.split(root) 

92 roottype = os.path.split(paths[-2])[-1] 

93 

94 reference = f"in !include '{root}', line {node.start_mark.line}, column {node.start_mark.column}" 

95 extension = os.path.splitext(resourcename)[1][1:] 

96 if extension not in ("yaml", "yml"): 

97 raise SyntaxErrorYAML( 

98 { 

99 "error": f"!include file must have extension 'yml' or 'yaml'\n extension was {extension}\n{reference}" 

100 } 

101 ) 

102 

103 try: 

104 provider = get_resource_provider() 

105 resource = provider.resource(roottype, resourcename) 

106 with provider.open_resource(resource, mode="t") as stream: 

107 loader = CustomYamlLoader(stream) 

108 contents = loader.get_single_data() 

109 if isinstance(contents, (dict, list)): 

110 if isinstance(contents, dict): 

111 defaults = contents.pop("default_variables", {}) 

112 else: 

113 defaults = {} 

114 defaults.update(variables) 

115 contents = interpolate(contents, defaults) 

116 

117 except ResourceNotAvailable: 

118 raise SyntaxErrorYAML( 

119 { 

120 "error": f"can't find included resource {roottype}/{resourcename}\n{reference}" 

121 } 

122 ) 

123 

124 except ruamel.yaml.YAMLError as ex: 

125 raise SyntaxErrorYAML({"error": ex}) from None 

126 

127 else: 

128 if ref is not None: 

129 try: 

130 if not isinstance(contents, dict): 

131 raise SyntaxErrorYAML( 

132 { 

133 "error": f"!include has #{ref}, but contents of {roottype}/{resourcename} is not an object\n{reference}" 

134 } 

135 ) 

136 return contents[ref] 

137 except KeyError: 

138 raise SyntaxErrorYAML( 

139 { 

140 "error": f"no such key '{ref}' in '{roottype}/{resourcename}'\n{reference}" 

141 } 

142 ) 

143 

144 else: 

145 return contents 

146 

147 

148ruamel.yaml.add_constructor("!include", _construct_include, CustomYamlLoader) 

149 

150 

151def get_resource_provider() -> MultiResourceProvider: 

152 """Get the resource provider""" 

153 global RESOURCE_PROVIDER 

154 

155 if RESOURCE_PROVIDER is None: 

156 RESOURCE_PROVIDER = MultiResourceProvider() 

157 root = os.path.join(os.path.dirname(__file__)) 

158 mainprovider = FileResourceProvider(root) 

159 RESOURCE_PROVIDER.prepend_provider(mainprovider) 

160 

161 return RESOURCE_PROVIDER 

162 

163 

164def add_resource_root(path: str): 

165 """The new resource root directory will be used 

166 before the existing ones 

167 """ 

168 file_provider = FileResourceProvider(path) 

169 provider = get_resource_provider() 

170 provider.prepend_provider(file_provider) 

171 

172 

173class YamlDict(dict): 

174 """ 

175 Create a dictionary from resource. 

176 

177 Provides a dictionary API and an extra `reload` method to reread the 

178 resource file. 

179 

180 Arguments: 

181 resourcetype: subdirectory 

182 resourcename: filename 

183 """ 

184 

185 def __init__(self, resourcetype: str, resourcename: str): 

186 dict.__init__(self) 

187 self._resource_type = resourcetype 

188 self._resource_name = resourcename 

189 self.__update() 

190 

191 @property 

192 def resource(self): 

193 return f"{self._resource_type}/{self._resource_name}" 

194 

195 def __update(self): 

196 provider = get_resource_provider() 

197 resource = provider.resource( 

198 resourcetype=self._resource_type, resourcename=self._resource_name 

199 ) 

200 self.clear() 

201 with provider.open_resource(resource) as stream: 

202 try: 

203 loader = CustomYamlLoader(stream) 

204 self.update(loader.get_single_data()) 

205 except ruamel.yaml.YAMLError as ex: 

206 raise SyntaxErrorYAML({"error": ex}) from None 

207 

208 def save(self): 

209 """Save an updated yaml file""" 

210 provider = get_resource_provider() 

211 resource = provider.resource( 

212 resourcetype=self._resource_type, resourcename=self._resource_name 

213 ) 

214 

215 yaml = ruamel.yaml.YAML() 

216 yaml.allow_duplicate_keys = True 

217 yaml.default_flow_style = False 

218 yaml.indent(mapping=2, sequence=4, offset=2) 

219 

220 yaml_contents = None 

221 with provider.open_resource(resource) as stream: 

222 # The default loader in ruamel is RoundTrip which inherits from SafeLoader 

223 # https://sourceforge.net/p/ruamel-yaml/code/ci/default/tree/constructor.py#l1011 

224 yaml_contents = yaml.load(stream) # nosec 

225 

226 prudent_update(yaml_contents, self, ignore_unknown=True) 

227 

228 logger.info(f"Saving {self._resource_type}/{self._resource_name}") 

229 with provider.open_resource(resource, "wt") as stream: 

230 yaml.dump(yaml_contents, stream=stream) 

231 

232 def reload(self): 

233 """Reload the resource name""" 

234 logger.info(f"Reloading {self._resource_type}/{self._resource_name}") 

235 self.__update() 

236 

237 

238class ConfigDict(YamlDict): 

239 """ 

240 Create a dictionary from resource name from the config. 

241 

242 Provides a dictionary API and an extra `reload` method to reread the 

243 resource file. 

244 

245 Arguments: 

246 resourcename: filename 

247 """ 

248 

249 def __init__(self, resourcename): 

250 YamlDict.__init__(self, "config", resourcename)