~certify-web-dev/twisted/certify-trunk

« back to all changes in this revision

Viewing changes to twisted/vfs/backends/osfs.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2007-01-17 14:52:35 UTC
  • mfrom: (1.1.5 upstream) (2.1.2 etch)
  • Revision ID: james.westby@ubuntu.com-20070117145235-btmig6qfmqfen0om
Tags: 2.5.0-0ubuntu1
New upstream version, compatible with python2.5.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2005 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
"""Filesystem backend for VFS."""
 
5
 
 
6
import os
 
7
import os.path
 
8
import errno
 
9
 
 
10
from twisted.vfs import ivfs
 
11
 
 
12
from zope.interface import implements
 
13
 
 
14
__all__ = ['OSDirectory', 'OSFile', 'RunWithPrivSep', 'SetUIDProxy',
 
15
           'ForceCreateModeProxy']
 
16
 
 
17
class OSNode:
 
18
 
 
19
    implements(ivfs.IFileSystemNode)
 
20
 
 
21
    def __init__(self, realPath, name=None, parent=None):
 
22
 
 
23
        self.name = name
 
24
        self.realPath = realPath
 
25
 
 
26
        if not parent: self.parent = self
 
27
        else: self.parent = parent
 
28
 
 
29
    def getMetadata(self):
 
30
        s = os.stat(self.realPath)
 
31
        return {
 
32
            "size"         : s.st_size,
 
33
            "uid"          : s.st_uid,
 
34
            "gid"          : s.st_gid,
 
35
            "permissions"  : s.st_mode,
 
36
            "atime"        : s.st_atime,
 
37
            "mtime"        : s.st_mtime,
 
38
            "nlink"        : s.st_nlink
 
39
        }
 
40
 
 
41
    def setMetadata(self, attrs):
 
42
        if 'uid' in attrs and 'gid' in attrs:
 
43
            os.chown(self.realPath, attrs["uid"], attrs["gid"])
 
44
        if 'permissions' in attrs:
 
45
            os.chmod(self.realPath, attrs["permissions"])
 
46
        if 'atime' in attrs or 'mtime' in attrs:
 
47
            if None in (attrs.get("atime"), attrs.get("mtime")):
 
48
                st = os.stat(self.realPath)
 
49
                atime = attrs.get("atime", st.st_atime)
 
50
                mtime = attrs.get("mtime", st.st_mtime)
 
51
            else:
 
52
                atime = attrs['atime']
 
53
                mtime = attrs['mtime']
 
54
            os.utime(self.realPath, (atime, mtime))
 
55
 
 
56
    def rename(self, newName):
 
57
        from twisted.vfs import pathutils
 
58
        newParent = pathutils.fetch(pathutils.getRoot(self),
 
59
                                    pathutils.dirname(newName))
 
60
        # XXX spiv 2005-12-15
 
61
        # assumes newParent is also an OSDirectory.  Probably should politely
 
62
        # decline (rather than break with an undefined error) if it's not.
 
63
        newPath = os.path.join(newParent.realPath, pathutils.basename(newName))
 
64
        os.rename(self.realPath, newPath)
 
65
        self.realPath = newPath
 
66
        self.name = newName
 
67
        self.parent = newParent
 
68
 
 
69
    def remove(self):
 
70
        raise NotImplementedError("Override me.")
 
71
 
 
72
 
 
73
class OSFile(OSNode):
 
74
 
 
75
    implements(ivfs.IFileSystemLeaf)
 
76
 
 
77
    def create(self, exclusive=True):
 
78
        flags = os.O_WRONLY | os.O_CREAT
 
79
        if exclusive:
 
80
            flags |= os.O_EXCL
 
81
        try:
 
82
            fd = os.open(self.realPath, flags)
 
83
        except OSError, e:
 
84
            if e.errno == errno.EEXIST:
 
85
                raise ivfs.AlreadyExistsError(self.name)
 
86
 
 
87
            # Something unexpected happened.  Let it propagate.
 
88
            raise
 
89
        f = os.fdopen(fd, "w")
 
90
        f.close()
 
91
 
 
92
    def open(self, flags):
 
93
        self.fd = os.open(self.realPath, flags)
 
94
        return self
 
95
 
 
96
    def readChunk(self, offset, length):
 
97
        os.lseek(self.fd, offset, 0)
 
98
        return os.read(self.fd, length)
 
99
 
 
100
    def writeChunk(self, offset, data):
 
101
        os.lseek(self.fd, offset, 0)
 
102
        return os.write(self.fd, data)
 
103
 
 
104
    def close(self):
 
105
        os.close(self.fd)
 
106
 
 
107
    def remove(self):
 
108
        os.remove(self.realPath)
 
109
 
 
110
 
 
111
 
 
112
class OSDirectory(OSNode):
 
113
 
 
114
    implements(ivfs.IFileSystemContainer)
 
115
 
 
116
    def children(self):
 
117
        """See IFileSystemContainer."""
 
118
        return ([('.', self), ('..', self.parent)] +
 
119
                [(childName, self.child(childName))
 
120
                 for childName in os.listdir(self.realPath)])
 
121
 
 
122
    def child(self, childName):
 
123
        """See IFileSystemContainer."""
 
124
        fullPath = os.path.join(self.realPath, childName)
 
125
 
 
126
        if not os.path.exists(fullPath):
 
127
            raise ivfs.NotFoundError(childName)
 
128
 
 
129
        if os.path.isdir(fullPath):
 
130
            nodeFactory = self.childDirFactory()
 
131
        else:
 
132
            nodeFactory = self.childFileFactory()
 
133
 
 
134
        return nodeFactory(fullPath, childName, self)
 
135
 
 
136
    def childDirFactory(cls):
 
137
        """Returns a callable that will be used to construct instances for
 
138
        subdirectories of this OSDirectory.  The callable should accept the same
 
139
        interface as OSDirectory.__init__; i.e. take three args (path, name,
 
140
        parent), and return an IFileSystemContainer.
 
141
 
 
142
        By default, this will be the class of the child's parent.  Override this
 
143
        method if you want a different behaviour.
 
144
        """
 
145
        # If you subclass OSDirectory, this will ensure children of OSDirectory
 
146
        # are also your subclass.
 
147
        return cls
 
148
    childDirFactory = classmethod(childDirFactory)
 
149
 
 
150
    def childFileFactory(self):
 
151
        """Returns a callable that will be used to construct instances for files
 
152
        in this OSDirectory.  The callable should accept the same interface as
 
153
        OSFile.__init__; i.e. take three args (path, name, parent), and return
 
154
        an IFileSystemLeaf.
 
155
 
 
156
        By default, this will be OSFile.  Override this method if you want a
 
157
        different behaviour.
 
158
        """
 
159
        return OSFile
 
160
 
 
161
    def createDirectory(self, childName):
 
162
        """See IFileSystemContainer."""
 
163
        child = self.childDirFactory()(os.path.join(self.realPath, childName),
 
164
                                       childName, self)
 
165
        child.create()
 
166
        return child
 
167
 
 
168
    def createFile(self, childName, exclusive=True):
 
169
        """See IFileSystemContainer."""
 
170
        child = self.childFileFactory()(os.path.join(self.realPath, childName),
 
171
                                        childName, self)
 
172
        child.create(exclusive=exclusive)
 
173
        return child
 
174
 
 
175
    def create(self):
 
176
        os.mkdir(self.realPath)
 
177
 
 
178
    def remove(self):
 
179
        os.rmdir(self.realPath)
 
180
 
 
181
    def exists(self, childName):
 
182
        """See IFileSystemContainer."""
 
183
        return os.path.exists(os.path.join(self.realPath, childName))
 
184
 
 
185
 
 
186
class WrapFunc:
 
187
    def __init__(self, func, wrapper):
 
188
        self.func = func
 
189
        self.wrapper = wrapper
 
190
    def __call__(self, *args, **kwargs):
 
191
        return self.wrapper(self.func(*args, **kwargs))
 
192
 
 
193
class _OSNodeProxy:
 
194
    def __init__(self, target):
 
195
        self.target = target
 
196
    def __getattr__(self, name):
 
197
        attr =  getattr(self.target, name)
 
198
        if name in ['child', 'createDirectory', 'createFile']:
 
199
            attr = WrapFunc(attr, self._wrapChild)
 
200
        return attr
 
201
    def _wrapChild(self, child):
 
202
        return _OSNodeProxy(child)
 
203
 
 
204
 
 
205
class RunWithPrivSep:
 
206
    def __init__(self, func, euid, egid):
 
207
        self.func = func
 
208
        self.euid = euid
 
209
        self.egid = egid
 
210
 
 
211
    def __call__(self, *args, **kwargs):
 
212
        cureuid = os.geteuid()
 
213
        curegid = os.getegid()
 
214
 
 
215
        os.setegid(0)
 
216
        os.seteuid(0)
 
217
        os.setegid(self.egid)
 
218
        os.seteuid(self.euid)
 
219
 
 
220
        try:
 
221
            result = self.func(*args, **kwargs)
 
222
        finally:
 
223
            os.setegid(0)
 
224
            os.seteuid(0)
 
225
            os.setegid(cureuid)
 
226
            os.seteuid(curegid)
 
227
        return result
 
228
 
 
229
 
 
230
class SetUIDProxy(_OSNodeProxy):
 
231
    def __init__(self, target, euid, egid):
 
232
        self.target = target
 
233
        self.euid   = euid
 
234
        self.egid   = egid
 
235
 
 
236
    def __getattr__(self, attrName):
 
237
        attr = _OSNodeProxy.__getattr__(self, attrName)
 
238
        if callable(attr):
 
239
            return RunWithPrivSep(attr, self.euid, self.egid)
 
240
        return attr
 
241
 
 
242
    def _wrapChild(self, child):
 
243
        return SetUIDProxy(child, self.euid, self.egid)
 
244
 
 
245
 
 
246
def getMode(mode):
 
247
    if type(mode) is str:
 
248
        mode = int(mode, 8)
 
249
    assert type(mode) is int, 'invalid mode: %s' % mode
 
250
    return mode
 
251
 
 
252
 
 
253
class ForceCreateModeProxy(_OSNodeProxy):
 
254
    def __init__(self, target, dirmode=None, filemode=None):
 
255
        self.target = target
 
256
        self.dirmode = None
 
257
        self.filemode = None
 
258
        if dirmode != None:
 
259
            self.dirmode = getMode(dirmode)
 
260
        if filemode != None:
 
261
            self.filemode = getMode(filemode)
 
262
 
 
263
    def createDirectory(self, *args, **kwargs):
 
264
        child = self.target.createDirectory(*args, **kwargs)
 
265
        if self.dirmode != None:
 
266
            os.chmod(child.realPath, self.dirmode)
 
267
        return self._wrapChild(child)
 
268
 
 
269
    def createFile(self, *args, **kwargs):
 
270
        child = self.target.createFile(*args, **kwargs)
 
271
        if self.filemode != None:
 
272
            os.chmod(child.realPath, self.filemode)
 
273
        return self._wrapChild(child)
 
274
 
 
275
    def _wrapChild(self, child):
 
276
        return ForceCreateModeProxy(child, self.dirmode, self.filemode)
 
277
 
 
278