Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/fileeditor.py: 95%

94 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 02:13 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3import os 

4import importlib 

5from marshmallow import Schema, fields 

6 

7from daiquiri.core import marshal, require_staff 

8from daiquiri.resources.utils import get_resource_provider 

9from daiquiri.core.resources.file_resource_provider import FileResourceProvider 

10from daiquiri.core.components import Component, ComponentResource 

11from daiquiri.core.schema import ErrorSchema 

12from daiquiri.core.schema.metadata import paginated 

13 

14import logging 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19class FileSchema(Schema): 

20 path = fields.Str() 

21 contents = fields.Str(required=True) 

22 ext = fields.Str() 

23 

24 

25class DirectorySchema(Schema): 

26 """Directory Schema""" 

27 

28 name = fields.Str() 

29 path = fields.Str() 

30 type = fields.Str() 

31 ext = fields.Str() 

32 children = fields.Nested(lambda: DirectorySchema(many=True)) 

33 

34 

35class DirectoryResource(ComponentResource): 

36 @require_staff 

37 @marshal( 

38 out=[ 

39 [200, paginated(DirectorySchema), "Directory listing for current resources"] 

40 ] 

41 ) 

42 def get(self, **kwargs): 

43 """Get the directory listing""" 

44 return self._parent.get_directory(**kwargs) 

45 

46 

47class FileResource(ComponentResource): 

48 @require_staff 

49 @marshal( 

50 inp={"path": fields.Str()}, 

51 out=[ 

52 [200, FileSchema(), "Read file contents"], 

53 [400, ErrorSchema(), "Could not read file contents"], 

54 ], 

55 ) 

56 def get(self, path, **kwargs): 

57 """Get a file""" 

58 file = self._parent.read_file(path, **kwargs) 

59 if file: 

60 return file 

61 else: 

62 return {"message": "Could not read file"}, 400 

63 

64 @require_staff 

65 @marshal( 

66 inp=FileSchema, 

67 out=[ 

68 [200, FileSchema(), "File successfully updated"], 

69 [400, ErrorSchema(), "Could not update file"], 

70 ], 

71 ) 

72 def patch(self, path, **kwargs): 

73 """Update a file""" 

74 file = self._parent.update_file(path, **kwargs) 

75 if file: 

76 return file 

77 

78 return {"error": "Could not update file"}, 400 

79 

80 

81def path_to_dict(path, root=None): 

82 name = os.path.basename(path) 

83 

84 if name in ["__pycache__"]: 

85 return None 

86 

87 d = {"name": os.path.basename(path), "path": path[1:]} 

88 if os.path.isdir(path): 

89 d["type"] = "directory" 

90 d["children"] = [ 

91 path_to_dict(os.path.join(path, x), root) 

92 for x in os.listdir(path) 

93 if path_to_dict(os.path.join(path, x), root) is not None 

94 ] 

95 else: 

96 d["type"] = "file" 

97 d["ext"] = os.path.splitext(name)[1][1:] 

98 return d 

99 

100 

101class Fileeditor(Component): 

102 _base_url = "editor" 

103 

104 def setup(self): 

105 self.register_route(DirectoryResource, "/directory") 

106 self.register_route(FileResource, "/file/<path:path>") 

107 

108 implementors = importlib.import_module( 

109 self._base_config["implementors"].replace("/", ".") 

110 ) 

111 

112 def get_roots(): 

113 provider = get_resource_provider() 

114 roots = [ 

115 p.root 

116 for p in provider.providers 

117 if isinstance(p, FileResourceProvider) 

118 ] 

119 return roots 

120 

121 roots = get_roots() 

122 self._directories = [ 

123 os.path.abspath(roots[0]), 

124 os.path.dirname(implementors.__file__), 

125 ] 

126 

127 if self._config.get("directories"): 

128 self._directories.extend(self._config["directories"]) 

129 

130 def get_directory(self, **kwargs): 

131 contents = [ 

132 path_to_dict(directory, directory + os.path.sep) 

133 for directory in self._directories 

134 ] 

135 return {"total": len(contents), "rows": contents} 

136 

137 def read_file(self, filepath, **kwargs): 

138 contents = "" 

139 

140 fullpath = None 

141 for d in self._directories: 

142 test = os.path.abspath(os.path.sep + filepath) 

143 if os.path.exists(test): 

144 if os.path.commonprefix([test, d]) == d: 

145 fullpath = test 

146 break 

147 else: 

148 logger.warning(f"File {filepath} is not in allowed root {d}") 

149 

150 if fullpath is None: 

151 return 

152 

153 with open(fullpath) as file: 

154 contents = file.read() 

155 

156 return { 

157 "path": fullpath[1:], 

158 "fullpath": fullpath, 

159 "contents": contents, 

160 "ext": os.path.splitext(filepath)[1][1:], 

161 } 

162 

163 def update_file(self, filepath, **kwargs): 

164 file = self.read_file(filepath) 

165 if file: 

166 try: 

167 logger.info(f"tring to write {file['fullpath']}") 

168 with open(file["fullpath"], "w") as file: 

169 file.write(kwargs["contents"]) 

170 return {"contents": kwargs["contents"], "path": filepath} 

171 

172 except Exception: 

173 logger.exception(f"Could not update file {filepath}")