1
# Copyright (c) 2001-2005 Twisted Matrix Laboratories.
2
# See LICENSE for details.
4
"""Filesystem backend for VFS."""
10
from twisted.vfs import ivfs
12
from zope.interface import implements
14
__all__ = ['OSDirectory', 'OSFile', 'RunWithPrivSep', 'SetUIDProxy',
15
'ForceCreateModeProxy']
19
implements(ivfs.IFileSystemNode)
21
def __init__(self, realPath, name=None, parent=None):
24
self.realPath = realPath
26
if not parent: self.parent = self
27
else: self.parent = parent
29
def getMetadata(self):
30
s = os.stat(self.realPath)
35
"permissions" : s.st_mode,
41
def setMetadata(self, attrs):
42
if 'uid' in attrs and 'gid' in attrs:
43
os.chown(self.realPath, attrs["uid"], attrs["gid"])
44
if 'permissions' in attrs:
45
os.chmod(self.realPath, attrs["permissions"])
46
if 'atime' in attrs or 'mtime' in attrs:
47
if None in (attrs.get("atime"), attrs.get("mtime")):
48
st = os.stat(self.realPath)
49
atime = attrs.get("atime", st.st_atime)
50
mtime = attrs.get("mtime", st.st_mtime)
52
atime = attrs['atime']
53
mtime = attrs['mtime']
54
os.utime(self.realPath, (atime, mtime))
56
def rename(self, newName):
57
from twisted.vfs import pathutils
58
newParent = pathutils.fetch(pathutils.getRoot(self),
59
pathutils.dirname(newName))
61
# assumes newParent is also an OSDirectory. Probably should politely
62
# decline (rather than break with an undefined error) if it's not.
63
newPath = os.path.join(newParent.realPath, pathutils.basename(newName))
64
os.rename(self.realPath, newPath)
65
self.realPath = newPath
67
self.parent = newParent
70
raise NotImplementedError("Override me.")
75
implements(ivfs.IFileSystemLeaf)
77
def create(self, exclusive=True):
78
flags = os.O_WRONLY | os.O_CREAT
82
fd = os.open(self.realPath, flags)
84
if e.errno == errno.EEXIST:
85
raise ivfs.AlreadyExistsError(self.name)
87
# Something unexpected happened. Let it propagate.
89
f = os.fdopen(fd, "w")
92
def open(self, flags):
93
self.fd = os.open(self.realPath, flags)
96
def readChunk(self, offset, length):
97
os.lseek(self.fd, offset, 0)
98
return os.read(self.fd, length)
100
def writeChunk(self, offset, data):
101
os.lseek(self.fd, offset, 0)
102
return os.write(self.fd, data)
108
os.remove(self.realPath)
112
class OSDirectory(OSNode):
114
implements(ivfs.IFileSystemContainer)
117
"""See IFileSystemContainer."""
118
return ([('.', self), ('..', self.parent)] +
119
[(childName, self.child(childName))
120
for childName in os.listdir(self.realPath)])
122
def child(self, childName):
123
"""See IFileSystemContainer."""
124
fullPath = os.path.join(self.realPath, childName)
126
if not os.path.exists(fullPath):
127
raise ivfs.NotFoundError(childName)
129
if os.path.isdir(fullPath):
130
nodeFactory = self.childDirFactory()
132
nodeFactory = self.childFileFactory()
134
return nodeFactory(fullPath, childName, self)
136
def childDirFactory(cls):
137
"""Returns a callable that will be used to construct instances for
138
subdirectories of this OSDirectory. The callable should accept the same
139
interface as OSDirectory.__init__; i.e. take three args (path, name,
140
parent), and return an IFileSystemContainer.
142
By default, this will be the class of the child's parent. Override this
143
method if you want a different behaviour.
145
# If you subclass OSDirectory, this will ensure children of OSDirectory
146
# are also your subclass.
148
childDirFactory = classmethod(childDirFactory)
150
def childFileFactory(self):
151
"""Returns a callable that will be used to construct instances for files
152
in this OSDirectory. The callable should accept the same interface as
153
OSFile.__init__; i.e. take three args (path, name, parent), and return
156
By default, this will be OSFile. Override this method if you want a
161
def createDirectory(self, childName):
162
"""See IFileSystemContainer."""
163
child = self.childDirFactory()(os.path.join(self.realPath, childName),
168
def createFile(self, childName, exclusive=True):
169
"""See IFileSystemContainer."""
170
child = self.childFileFactory()(os.path.join(self.realPath, childName),
172
child.create(exclusive=exclusive)
176
os.mkdir(self.realPath)
179
os.rmdir(self.realPath)
181
def exists(self, childName):
182
"""See IFileSystemContainer."""
183
return os.path.exists(os.path.join(self.realPath, childName))
187
def __init__(self, func, wrapper):
189
self.wrapper = wrapper
190
def __call__(self, *args, **kwargs):
191
return self.wrapper(self.func(*args, **kwargs))
194
def __init__(self, target):
196
def __getattr__(self, name):
197
attr = getattr(self.target, name)
198
if name in ['child', 'createDirectory', 'createFile']:
199
attr = WrapFunc(attr, self._wrapChild)
201
def _wrapChild(self, child):
202
return _OSNodeProxy(child)
205
class RunWithPrivSep:
206
def __init__(self, func, euid, egid):
211
def __call__(self, *args, **kwargs):
212
cureuid = os.geteuid()
213
curegid = os.getegid()
217
os.setegid(self.egid)
218
os.seteuid(self.euid)
221
result = self.func(*args, **kwargs)
230
class SetUIDProxy(_OSNodeProxy):
231
def __init__(self, target, euid, egid):
236
def __getattr__(self, attrName):
237
attr = _OSNodeProxy.__getattr__(self, attrName)
239
return RunWithPrivSep(attr, self.euid, self.egid)
242
def _wrapChild(self, child):
243
return SetUIDProxy(child, self.euid, self.egid)
247
if type(mode) is str:
249
assert type(mode) is int, 'invalid mode: %s' % mode
253
class ForceCreateModeProxy(_OSNodeProxy):
254
def __init__(self, target, dirmode=None, filemode=None):
259
self.dirmode = getMode(dirmode)
261
self.filemode = getMode(filemode)
263
def createDirectory(self, *args, **kwargs):
264
child = self.target.createDirectory(*args, **kwargs)
265
if self.dirmode != None:
266
os.chmod(child.realPath, self.dirmode)
267
return self._wrapChild(child)
269
def createFile(self, *args, **kwargs):
270
child = self.target.createFile(*args, **kwargs)
271
if self.filemode != None:
272
os.chmod(child.realPath, self.filemode)
273
return self._wrapChild(child)
275
def _wrapChild(self, child):
276
return ForceCreateModeProxy(child, self.dirmode, self.filemode)