2
import os, time, pickle, errno, zipfile
4
from twisted.python import filepath
5
from twisted.python.runtime import platform
6
from twisted.trial import unittest
8
class AbstractFilePathTestCase(unittest.TestCase):
13
def _mkpath(self, *p):
14
x = os.path.abspath(os.path.join(self.cmn, *p))
18
def subdir(self, *dirname):
19
os.mkdir(self._mkpath(*dirname))
21
def subfile(self, *dirname):
22
return open(self._mkpath(*dirname), "wb")
25
self.now = time.time()
26
cmn = self.cmn = os.path.abspath(self.mktemp())
30
f = self.subfile("file1")
31
f.write(self.f1content)
32
f = self.subfile("sub1", "file2")
33
f.write(self.f2content)
35
f = self.subfile("sub3", "file3.ext1")
36
f = self.subfile("sub3", "file3.ext2")
37
f = self.subfile("sub3", "file3.ext3")
40
self.path = filepath.FilePath(cmn)
42
def test_segmentsFromPositive(self):
44
Verify that the segments between two paths are correctly identified.
47
self.path.child("a").child("b").child("c").segmentsFrom(self.path),
50
def test_segmentsFromNegative(self):
51
"""Verify that segmentsFrom notices when the ancestor isn't an ancestor.
55
self.path.child("a").child("b").child("c").segmentsFrom,
56
self.path.child("d").child("c").child("e"))
59
"""Verify that walking the path gives the same result as the known file
62
x = [foo.path for foo in self.path.walk()]
64
self.assertEquals(x, self.all)
66
def test_validSubdir(self):
67
"""Verify that a valid subdirectory will show up as a directory, but not as a
68
file, not as a symlink, and be listable.
70
sub1 = self.path.child('sub1')
71
self.failUnless(sub1.exists(),
72
"This directory does exist.")
73
self.failUnless(sub1.isdir(),
75
self.failUnless(not sub1.isfile(),
77
self.failUnless(not sub1.islink(),
79
self.failUnlessEqual(sub1.listdir(),
83
def test_invalidSubdir(self):
85
Verify that a subdirectory that doesn't exist is reported as such.
87
sub2 = self.path.child('sub2')
88
self.failIf(sub2.exists(),
89
"This directory does not exist.")
91
def test_validFiles(self):
93
Make sure that we can read existent non-empty files.
95
f1 = self.path.child('file1')
96
self.failUnlessEqual(f1.open().read(), self.f1content)
97
f2 = self.path.child('sub1').child('file2')
98
self.failUnlessEqual(f2.open().read(), self.f2content)
101
def zipit(dirname, zfname):
103
create a zipfile on zfname, containing the contents of dirname'
105
zf = zipfile.ZipFile(zfname, "w")
106
basedir = os.path.basename(dirname)
107
for root, dirs, files, in os.walk(dirname):
109
fspath = os.path.join(root, fname)
110
arcpath = os.path.join(root, fname)[len(dirname)+1:]
111
# print fspath, '=>', arcpath
112
zf.write(fspath, arcpath)
115
from twisted.python.zippath import ZipArchive
117
class ZipFilePathTestCase(AbstractFilePathTestCase):
120
AbstractFilePathTestCase.setUp(self)
121
zipit(self.cmn, self.cmn+'.zip')
122
self.path = ZipArchive(self.cmn+'.zip')
123
self.all = [x.replace(self.cmn, self.cmn+'.zip') for x in self.all]
126
class FilePathTestCase(AbstractFilePathTestCase):
128
def test_getAndSet(self):
129
content = 'newcontent'
130
self.path.child('new').setContent(content)
131
newcontent = self.path.child('new').getContent()
132
self.failUnlessEqual(content, newcontent)
134
self.path.child('new').setContent(content, '.tmp')
135
newcontent = self.path.child('new').getContent()
136
self.failUnlessEqual(content, newcontent)
138
def testSymbolicLink(self):
139
s4 = self.path.child("sub4")
140
s3 = self.path.child("sub3")
141
os.symlink(s3.path, s4.path)
142
self.failUnless(s4.islink())
143
self.failIf(s3.islink())
144
self.failUnless(s4.isdir())
145
self.failUnless(s3.isdir())
147
if not hasattr(os, "symlink"):
148
testSymbolicLink.skip = "Your platform does not support symbolic links."
150
def testMultiExt(self):
151
f3 = self.path.child('sub3').child('file3')
152
exts = '.foo','.bar', 'ext1','ext2','ext3'
153
self.failIf(f3.siblingExtensionSearch(*exts))
154
f3e = f3.siblingExtension(".foo")
156
self.failIf(not f3.siblingExtensionSearch(*exts).exists())
157
self.failIf(not f3.siblingExtensionSearch('*').exists())
159
self.failIf(f3.siblingExtensionSearch(*exts))
161
def testPreauthChild(self):
162
fp = filepath.FilePath('.')
163
fp.preauthChild('foo/bar')
164
self.assertRaises(filepath.InsecurePath, fp.child, '/foo')
166
def testStatCache(self):
167
p = self.path.child('stattest')
169
self.failUnlessEqual(p.getsize(), 0)
170
self.failUnlessEqual(abs(p.getmtime() - time.time()) // 20, 0)
171
self.failUnlessEqual(abs(p.getctime() - time.time()) // 20, 0)
172
self.failUnlessEqual(abs(p.getatime() - time.time()) // 20, 0)
173
self.failUnlessEqual(p.exists(), True)
174
self.failUnlessEqual(p.exists(), True)
175
# OOB removal: FilePath.remove() will automatically restat
178
self.failUnlessEqual(p.exists(), True)
179
p.restat(reraise=False)
180
self.failUnlessEqual(p.exists(), False)
181
self.failUnlessEqual(p.islink(), False)
182
self.failUnlessEqual(p.isdir(), False)
183
self.failUnlessEqual(p.isfile(), False)
185
def testPersist(self):
186
newpath = pickle.loads(pickle.dumps(self.path))
187
self.failUnlessEqual(self.path.__class__, newpath.__class__)
188
self.failUnlessEqual(self.path.path, newpath.path)
190
def testInsecureUNIX(self):
191
self.assertRaises(filepath.InsecurePath, self.path.child, "..")
192
self.assertRaises(filepath.InsecurePath, self.path.child, "/etc")
193
self.assertRaises(filepath.InsecurePath, self.path.child, "../..")
195
def testInsecureWin32(self):
196
self.assertRaises(filepath.InsecurePath, self.path.child, r"..\..")
197
self.assertRaises(filepath.InsecurePath, self.path.child, r"C:randomfile")
199
if platform.getType() != 'win32':
200
testInsecureWin32.skip = "Consider yourself lucky."
202
def testInsecureWin32Whacky(self):
203
"""Windows has 'special' filenames like NUL and CON and COM1 and LPR
204
and PRN and ... god knows what else. They can be located anywhere in
205
the filesystem. For obvious reasons, we do not wish to normally permit
208
self.assertRaises(filepath.InsecurePath, self.path.child, "CON")
209
self.assertRaises(filepath.InsecurePath, self.path.child, "C:CON")
210
self.assertRaises(filepath.InsecurePath, self.path.child, r"C:\CON")
212
if platform.getType() != 'win32':
213
testInsecureWin32Whacky.skip = "Consider yourself lucky."
215
def testComparison(self):
216
self.assertEquals(filepath.FilePath('a'),
217
filepath.FilePath('a'))
218
self.failUnless(filepath.FilePath('z') >
219
filepath.FilePath('a'))
220
self.failUnless(filepath.FilePath('z') >=
221
filepath.FilePath('a'))
222
self.failUnless(filepath.FilePath('a') >=
223
filepath.FilePath('a'))
224
self.failUnless(filepath.FilePath('a') <=
225
filepath.FilePath('a'))
226
self.failUnless(filepath.FilePath('a') <
227
filepath.FilePath('z'))
228
self.failUnless(filepath.FilePath('a') <=
229
filepath.FilePath('z'))
230
self.failUnless(filepath.FilePath('a') !=
231
filepath.FilePath('z'))
232
self.failUnless(filepath.FilePath('z') !=
233
filepath.FilePath('a'))
235
self.failIf(filepath.FilePath('z') !=
236
filepath.FilePath('z'))
238
def testSibling(self):
239
p = self.path.child('sibling_start')
240
ts = p.sibling('sibling_test')
241
self.assertEquals(ts.dirname(), p.dirname())
242
self.assertEquals(ts.basename(), 'sibling_test')
244
self.assertIn(ts, self.path.children())
246
def testTemporarySibling(self):
247
ts = self.path.temporarySibling()
248
self.assertEquals(ts.dirname(), self.path.dirname())
249
self.assertNotIn(ts.basename(), self.path.listdir())
251
self.assertIn(ts, self.path.parent().children())
253
def testRemove(self):
255
self.failIf(self.path.exists())
257
def testCopyTo(self):
258
self.assertRaises((OSError, IOError), self.path.copyTo, self.path.child('file1'))
259
oldPaths = list(self.path.walk()) # Record initial state
260
fp = filepath.FilePath(self.mktemp())
264
newPaths = list(self.path.walk()) # Record double-copy state
267
self.assertEquals(newPaths, oldPaths)
269
def testMoveTo(self):
270
self.assertRaises((OSError, IOError), self.path.moveTo, self.path.child('file1'))
271
oldPaths = list(self.path.walk()) # Record initial state
272
fp = filepath.FilePath(self.mktemp())
275
newPaths = list(self.path.walk()) # Record double-move state
278
self.assertEquals(newPaths, oldPaths)
280
def testCrossMountMoveTo(self):
283
# Bit of a whitebox test - force os.rename, which moveTo tries
284
# before falling back to a slower method, to fail, forcing moveTo to
285
# use the slower behavior.
287
def faultyRename(src, dest):
288
invokedWith.append((src, dest))
289
if len(invokedWith) == 2:
290
raise OSError(errno.EXDEV, 'Test-induced failure simulating cross-device rename failure')
291
return originalRename(src, dest)
293
originalRename = os.rename
294
os.rename = faultyRename
297
# A bit of a sanity check for this whitebox test - if our rename
298
# was never invoked, the test has probably fallen into
300
self.failUnless(len(invokedWith) >= 2)
302
os.rename = originalRename
305
# Opening a file for reading when it does not already exist is an error
306
nonexistent = self.path.child('nonexistent')
307
e = self.assertRaises(IOError, nonexistent.open)
308
self.assertEquals(e.errno, errno.ENOENT)
310
# Opening a file for writing when it does not exist is okay
311
writer = self.path.child('writer')
316
# Make sure those bytes ended up there - and test opening a file for
317
# reading when it does exist at the same time
319
self.assertEquals(f.read(), 'abc\ndef')
322
# Re-opening that file in write mode should erase whatever was there.
326
self.assertEquals(f.read(), '')
329
# Put some bytes in a file so we can test that appending does not
331
appender = self.path.child('appender')
332
f = appender.open('w')
336
f = appender.open('a')
340
f = appender.open('r')
341
self.assertEquals(f.read(), 'abcdef')
344
# read/write should let us do both without erasing those bytes
345
f = appender.open('r+')
346
self.assertEquals(f.read(), 'abcdef')
347
# ANSI C *requires* an fseek or an fgetpos between an fread and an
348
# fwrite or an fwrite and a fread. We can't reliable get Python to
349
# invoke fgetpos, so we seek to a 0 byte offset from the current
350
# position instead. Also, Python sucks for making this seek
351
# relative to 1 instead of a symbolic constant representing the
352
# current file position.
354
# Put in some new bytes for us to test for later.
358
# Make sure those new bytes really showed up
359
f = appender.open('r')
360
self.assertEquals(f.read(), 'abcdefghi')
363
# write/read should let us do both, but erase anything that's there
365
f = appender.open('w+')
366
self.assertEquals(f.read(), '')
367
f.seek(0, 1) # Don't forget this!
371
# super append mode should let us read and write and also position the
372
# cursor at the end of the file, without erasing everything.
373
f = appender.open('a+')
375
# The order of these lines may seem surprising, but it is necessary.
376
# The cursor is not at the end of the file until after the first write.
378
f.seek(0, 1) # Asinine.
379
self.assertEquals(f.read(), '')
382
self.assertEquals(f.read(), '123456')
385
# Opening a file exclusively must fail if that file exists already.
386
nonexistent.requireCreate(True)
387
nonexistent.open('w').close()
388
existent = nonexistent
390
self.assertRaises((OSError, IOError), existent.open)
393
from twisted.python import urlpath
395
class URLPathTestCase(unittest.TestCase):
397
self.path = urlpath.URLPath.fromString("http://example.com/foo/bar?yes=no&no=yes#footer")
399
def testStringConversion(self):
400
self.assertEquals(str(self.path), "http://example.com/foo/bar?yes=no&no=yes#footer")
402
def testChildString(self):
403
self.assertEquals(str(self.path.child('hello')), "http://example.com/foo/bar/hello")
404
self.assertEquals(str(self.path.child('hello').child('')), "http://example.com/foo/bar/hello/")
406
def testSiblingString(self):
407
self.assertEquals(str(self.path.sibling('baz')), 'http://example.com/foo/baz')
409
# The sibling of http://example.com/foo/bar/
410
# is http://example.comf/foo/bar/baz
411
# because really we are constructing a sibling of
412
# http://example.com/foo/bar/index.html
413
self.assertEquals(str(self.path.child('').sibling('baz')), 'http://example.com/foo/bar/baz')
415
def testParentString(self):
416
# parent should be equivalent to '..'
417
# 'foo' is the current directory, '/' is the parent directory
418
self.assertEquals(str(self.path.parent()), 'http://example.com/')
419
self.assertEquals(str(self.path.child('').parent()), 'http://example.com/foo/')
420
self.assertEquals(str(self.path.child('baz').parent()), 'http://example.com/foo/')
421
self.assertEquals(str(self.path.parent().parent().parent().parent().parent()), 'http://example.com/')
423
def testHereString(self):
424
# here should be equivalent to '.'
425
self.assertEquals(str(self.path.here()), 'http://example.com/foo/')
426
self.assertEquals(str(self.path.child('').here()), 'http://example.com/foo/bar/')