1
# -*- test-case-name: twisted.test.test_paths -*-
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
# See LICENSE for details.
5
from __future__ import generators
13
from os.path import isabs, exists, normpath, abspath, splitext
14
from os.path import basename, dirname
15
from os.path import join as joinpath
16
from os import sep as slash
17
from os import listdir, utime, stat
20
from stat import ST_MODE, ST_MTIME, ST_ATIME, ST_CTIME, ST_SIZE
22
from stat import S_ISREG, S_ISDIR, S_ISLNK
25
from os.path import islink
31
from os import urandom as randomBytes
34
randomData = [random.randrange(256) for n in xrange(n)]
35
return ''.join(map(chr, randomData))
38
from base64 import urlsafe_b64encode as armor
41
return s.encode('hex')
43
class InsecurePath(Exception):
46
def _secureEnoughString():
48
Create a pseudorandom, 16-character string for use in secure filenames.
50
return armor(sha.new(randomBytes(64)).digest())[:16]
53
"""I am a path on the filesystem that only permits 'downwards' access.
55
Instantiate me with a pathname (for example,
56
FilePath('/home/myuser/public_html')) and I will attempt to only provide
57
access to files which reside inside that path. I may be a path to a file,
58
a directory, or a file which does not exist.
60
The correct way to use me is to instantiate me, and then do ALL filesystem
61
access through me. In other words, do not import the 'os' module; if you
62
need to open a file, call my 'open' method. If you need to list a
63
directory, call my 'path' method.
65
Even if you pass me a relative path, I will convert that to an absolute
68
@type alwaysCreate: C{bool}
69
@ivar alwaysCreate: When opening this file, only succeed if the file does not
73
# __slots__ = 'path abs'.split()
77
def __init__(self, path, alwaysCreate=False):
78
self.path = abspath(path)
79
self.alwaysCreate = alwaysCreate
81
def __getstate__(self):
82
d = self.__dict__.copy()
83
if d.has_key('statinfo'):
87
def child(self, path):
90
raise InsecurePath("%r contains one or more directory separators" % (path,))
91
newpath = abspath(joinpath(self.path, norm))
92
if not newpath.startswith(self.path):
93
raise InsecurePath("%r is not a child of %s" % (newpath, self.path))
94
return self.clonePath(newpath)
96
def preauthChild(self, path):
98
Use me if `path' might have slashes in it, but you know they're safe.
100
(NOT slashes at the beginning. It still needs to be a _child_).
102
newpath = abspath(joinpath(self.path, normpath(path)))
103
if not newpath.startswith(self.path):
104
raise InsecurePath("%s is not a child of %s" % (newpath, self.path))
105
return self.clonePath(newpath)
107
def childSearchPreauth(self, *paths):
108
"""Return my first existing child with a name in 'paths'.
110
paths is expected to be a list of *pre-secured* path fragments; in most
111
cases this will be specified by a system administrator and not an
114
If no appropriately-named children exist, this will return None.
118
jp = joinpath(p, child)
120
return self.clonePath(jp)
122
def siblingExtensionSearch(self, *exts):
123
"""Attempt to return a path with my name, given multiple possible
126
Each extension in exts will be tested and the first path which exists
127
will be returned. If no path exists, None will be returned. If '' is
128
in exts, then if the file referred to by this path exists, 'self' will
131
The extension '*' has a magic meaning, which means "any path that
132
begins with self.path+'.' is acceptable".
136
if not ext and self.exists():
139
basedot = basename(p)+'.'
140
for fn in listdir(dirname(p)):
141
if fn.startswith(basedot):
142
return self.clonePath(joinpath(dirname(p), fn))
145
return self.clonePath(p2)
147
def siblingExtension(self, ext):
148
return self.clonePath(self.path+ext)
150
def open(self, mode='r'):
151
if self.alwaysCreate:
152
assert 'a' not in mode, "Appending not supported when alwaysCreate == True"
154
return open(self.path, mode+'b')
158
def restat(self, reraise=True):
160
self.statinfo = stat(self.path)
197
elif self.statinfo is None:
210
return S_ISDIR(st[ST_MODE])
219
return S_ISREG(st[ST_MODE])
228
return S_ISLNK(st[ST_MODE])
231
return isabs(self.path)
234
return listdir(self.path)
237
return splitext(self.path)
240
return 'FilePath(%r)' % self.path
244
self.open('a').close()
247
utime(self.path, None)
251
for child in self.children():
259
return os.makedirs(self.path)
261
def globChildren(self, pattern):
263
Assuming I am representing a directory, return a list of
264
FilePaths representing my children that match the given
268
path = self.path[-1] == '/' and self.path + pattern or slash.join([self.path, pattern])
269
return map(self.clonePath, glob.glob(path))
272
return basename(self.path)
275
return dirname(self.path)
278
return self.clonePath(self.dirname())
280
def setContent(self, content, ext='.new'):
281
sib = self.siblingExtension(ext)
282
sib.open('w').write(content)
283
os.rename(sib.path, self.path)
285
def getContent(self):
286
return self.open().read()
290
def __cmp__(self, other):
291
if not isinstance(other, FilePath):
292
return NotImplemented
293
return cmp(self.path, other.path)
295
def createDirectory(self):
298
def requireCreate(self, val=1):
299
self.alwaysCreate = val
302
"""Exclusively create a file, only if this file previously did not exist.
304
fdint = os.open(self.path, (os.O_EXCL |
308
# XXX TODO: 'name' attribute of returned files is not mutable or
309
# settable via fdopen, so this file is slighly less functional than the
310
# one returned from 'open' by default. send a patch to Python...
312
return os.fdopen(fdint, 'w+b')
314
def temporarySibling(self):
316
Create a path naming a temporary sibling of this path in a secure fashion.
318
sib = self.parent().child(_secureEnoughString() + self.basename())
323
return map(self.child, self.listdir())
328
for c in self.children():
329
for subc in c.walk():
332
_chunkSize = 2 ** 2 ** 2 ** 2
334
def copyTo(self, destination):
335
# XXX TODO: *thorough* audit and documentation of the exact desired
336
# semantics of this code. Right now the behavior of existent
337
# destination symlinks is convenient, and quite possibly correct, but
338
# its security properties need to be explained.
340
if not destination.exists():
341
destination.createDirectory()
342
for child in self.children():
343
destChild = destination.child(child.basename())
344
child.copyTo(destChild)
346
writefile = destination.open('w')
347
readfile = self.open()
349
# XXX TODO: optionally use os.open, os.read and O_DIRECT and
350
# use os.fstatvfs to determine chunk sizes and make
351
# *****sure**** copy is page-atomic; the following is good
352
# enough for 99.9% of everybody and won't take a week to audit
354
chunk = readfile.read(self._chunkSize)
355
writefile.write(chunk)
356
if len(chunk) < self._chunkSize:
361
# If you see the following message because you want to copy
362
# symlinks, fifos, block devices, character devices, or unix
363
# sockets, please feel free to add support to do sensible things in
364
# reaction to those types!
365
raise NotImplementedError(
366
"Only copying of files and directories supported")
368
def moveTo(self, destination):
370
os.rename(self.path, destination.path)
373
if ose.errno == errno.EXDEV:
374
# man 2 rename, ubuntu linux 5.10 "breezy":
376
# oldpath and newpath are not on the same mounted filesystem.
377
# (Linux permits a filesystem to be mounted at multiple
378
# points, but rename(2) does not work across different mount
379
# points, even if the same filesystem is mounted on both.)
381
# that means it's time to copy trees of directories!
382
secsib = destination.secureSibling()
383
self.copyTo(secsib) # slow
384
secsib.moveTo(destination) # visible
386
# done creating new stuff. let's clean me up.
387
mysecsib = self.secureSibling()
388
self.moveTo(mysecsib) # visible
389
mysecsib.remove() # slow
394
FilePath.clonePath = FilePath
400
from twisted.python import filepath
401
filepath.FilePath.__dict__ = FilePath.__dict__
402
filepath.FilePath.__dict__['__module__'] = 'twisted.python.filepath'
403
FilePath = filepath.FilePath