Coverage for /opt/conda/envs/apienv/lib/python3.10/site-packages/daiquiri/core/components/fileeditor.py: 95%
94 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-14 02:13 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import os
4import importlib
5from marshmallow import Schema, fields
7from daiquiri.core import marshal, require_staff
8from daiquiri.resources.utils import get_resource_provider
9from daiquiri.core.resources.file_resource_provider import FileResourceProvider
10from daiquiri.core.components import Component, ComponentResource
11from daiquiri.core.schema import ErrorSchema
12from daiquiri.core.schema.metadata import paginated
14import logging
16logger = logging.getLogger(__name__)
19class FileSchema(Schema):
20 path = fields.Str()
21 contents = fields.Str(required=True)
22 ext = fields.Str()
25class DirectorySchema(Schema):
26 """Directory Schema"""
28 name = fields.Str()
29 path = fields.Str()
30 type = fields.Str()
31 ext = fields.Str()
32 children = fields.Nested(lambda: DirectorySchema(many=True))
35class DirectoryResource(ComponentResource):
36 @require_staff
37 @marshal(
38 out=[
39 [200, paginated(DirectorySchema), "Directory listing for current resources"]
40 ]
41 )
42 def get(self, **kwargs):
43 """Get the directory listing"""
44 return self._parent.get_directory(**kwargs)
47class FileResource(ComponentResource):
48 @require_staff
49 @marshal(
50 inp={"path": fields.Str()},
51 out=[
52 [200, FileSchema(), "Read file contents"],
53 [400, ErrorSchema(), "Could not read file contents"],
54 ],
55 )
56 def get(self, path, **kwargs):
57 """Get a file"""
58 file = self._parent.read_file(path, **kwargs)
59 if file:
60 return file
61 else:
62 return {"message": "Could not read file"}, 400
64 @require_staff
65 @marshal(
66 inp=FileSchema,
67 out=[
68 [200, FileSchema(), "File successfully updated"],
69 [400, ErrorSchema(), "Could not update file"],
70 ],
71 )
72 def patch(self, path, **kwargs):
73 """Update a file"""
74 file = self._parent.update_file(path, **kwargs)
75 if file:
76 return file
78 return {"error": "Could not update file"}, 400
81def path_to_dict(path, root=None):
82 name = os.path.basename(path)
84 if name in ["__pycache__"]:
85 return None
87 d = {"name": os.path.basename(path), "path": path[1:]}
88 if os.path.isdir(path):
89 d["type"] = "directory"
90 d["children"] = [
91 path_to_dict(os.path.join(path, x), root)
92 for x in os.listdir(path)
93 if path_to_dict(os.path.join(path, x), root) is not None
94 ]
95 else:
96 d["type"] = "file"
97 d["ext"] = os.path.splitext(name)[1][1:]
98 return d
101class Fileeditor(Component):
102 _base_url = "editor"
104 def setup(self):
105 self.register_route(DirectoryResource, "/directory")
106 self.register_route(FileResource, "/file/<path:path>")
108 implementors = importlib.import_module(
109 self._base_config["implementors"].replace("/", ".")
110 )
112 def get_roots():
113 provider = get_resource_provider()
114 roots = [
115 p.root
116 for p in provider.providers
117 if isinstance(p, FileResourceProvider)
118 ]
119 return roots
121 roots = get_roots()
122 self._directories = [
123 os.path.abspath(roots[0]),
124 os.path.dirname(implementors.__file__),
125 ]
127 if self._config.get("directories"):
128 self._directories.extend(self._config["directories"])
130 def get_directory(self, **kwargs):
131 contents = [
132 path_to_dict(directory, directory + os.path.sep)
133 for directory in self._directories
134 ]
135 return {"total": len(contents), "rows": contents}
137 def read_file(self, filepath, **kwargs):
138 contents = ""
140 fullpath = None
141 for d in self._directories:
142 test = os.path.abspath(os.path.sep + filepath)
143 if os.path.exists(test):
144 if os.path.commonprefix([test, d]) == d:
145 fullpath = test
146 break
147 else:
148 logger.warning(f"File {filepath} is not in allowed root {d}")
150 if fullpath is None:
151 return
153 with open(fullpath) as file:
154 contents = file.read()
156 return {
157 "path": fullpath[1:],
158 "fullpath": fullpath,
159 "contents": contents,
160 "ext": os.path.splitext(filepath)[1][1:],
161 }
163 def update_file(self, filepath, **kwargs):
164 file = self.read_file(filepath)
165 if file:
166 try:
167 logger.info(f"tring to write {file['fullpath']}")
168 with open(file["fullpath"], "w") as file:
169 file.write(kwargs["contents"])
170 return {"contents": kwargs["contents"], "path": filepath}
172 except Exception:
173 logger.exception(f"Could not update file {filepath}")