~landscape/zope3/newer-from-ztk

« back to all changes in this revision

Viewing changes to src/twisted/test/test_paths.py

  • Committer: Thomas Hervé
  • Date: 2009-07-08 13:52:04 UTC
  • Revision ID: thomas@canonical.com-20090708135204-df5eesrthifpylf8
Remove twisted copy

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
 
2
 
import os, time, pickle, errno, zipfile
3
 
 
4
 
from twisted.python import filepath
5
 
from twisted.python.runtime import platform
6
 
from twisted.trial import unittest
7
 
 
8
 
class AbstractFilePathTestCase(unittest.TestCase):
9
 
 
10
 
    f1content = "file 1"
11
 
    f2content = "file 2"
12
 
 
13
 
    def _mkpath(self, *p):
14
 
        x = os.path.abspath(os.path.join(self.cmn, *p))
15
 
        self.all.append(x)
16
 
        return x
17
 
 
18
 
    def subdir(self, *dirname):
19
 
        os.mkdir(self._mkpath(*dirname))
20
 
 
21
 
    def subfile(self, *dirname):
22
 
        return open(self._mkpath(*dirname), "wb")
23
 
 
24
 
    def setUp(self):
25
 
        self.now = time.time()
26
 
        cmn = self.cmn = os.path.abspath(self.mktemp())
27
 
        self.all = [cmn]
28
 
        os.mkdir(cmn)
29
 
        self.subdir("sub1")
30
 
        f = self.subfile("file1")
31
 
        f.write(self.f1content)
32
 
        f = self.subfile("sub1", "file2")
33
 
        f.write(self.f2content)
34
 
        self.subdir('sub3')
35
 
        f = self.subfile("sub3", "file3.ext1")
36
 
        f = self.subfile("sub3", "file3.ext2")
37
 
        f = self.subfile("sub3", "file3.ext3")
38
 
        self.all.sort()
39
 
 
40
 
        self.path = filepath.FilePath(cmn)
41
 
 
42
 
    def test_segmentsFromPositive(self):
43
 
        """
44
 
        Verify that the segments between two paths are correctly identified.
45
 
        """
46
 
        self.assertEquals(
47
 
            self.path.child("a").child("b").child("c").segmentsFrom(self.path),
48
 
            ["a", "b", "c"])
49
 
 
50
 
    def test_segmentsFromNegative(self):
51
 
        """Verify that segmentsFrom notices when the ancestor isn't an ancestor.
52
 
        """
53
 
        self.assertRaises(
54
 
            ValueError,
55
 
            self.path.child("a").child("b").child("c").segmentsFrom,
56
 
                self.path.child("d").child("c").child("e"))
57
 
 
58
 
    def test_walk(self):
59
 
        """Verify that walking the path gives the same result as the known file
60
 
        hierarchy.
61
 
        """
62
 
        x = [foo.path for foo in self.path.walk()]
63
 
        x.sort()
64
 
        self.assertEquals(x, self.all)
65
 
 
66
 
    def test_validSubdir(self):
67
 
        """Verify that a valid subdirectory will show up as a directory, but not as a
68
 
        file, not as a symlink, and be listable.
69
 
        """
70
 
        sub1 = self.path.child('sub1')
71
 
        self.failUnless(sub1.exists(),
72
 
                        "This directory does exist.")
73
 
        self.failUnless(sub1.isdir(),
74
 
                        "It's a directory.")
75
 
        self.failUnless(not sub1.isfile(),
76
 
                        "It's a directory.")
77
 
        self.failUnless(not sub1.islink(),
78
 
                        "It's a directory.")
79
 
        self.failUnlessEqual(sub1.listdir(),
80
 
                             ['file2'])
81
 
 
82
 
 
83
 
    def test_invalidSubdir(self):
84
 
        """
85
 
        Verify that a subdirectory that doesn't exist is reported as such.
86
 
        """
87
 
        sub2 = self.path.child('sub2')
88
 
        self.failIf(sub2.exists(),
89
 
                    "This directory does not exist.")
90
 
 
91
 
    def test_validFiles(self):
92
 
        """
93
 
        Make sure that we can read existent non-empty files.
94
 
        """
95
 
        f1 = self.path.child('file1')
96
 
        self.failUnlessEqual(f1.open().read(), self.f1content)
97
 
        f2 = self.path.child('sub1').child('file2')
98
 
        self.failUnlessEqual(f2.open().read(), self.f2content)
99
 
 
100
 
 
101
 
def zipit(dirname, zfname):
102
 
    """
103
 
    create a zipfile on zfname, containing the contents of dirname'
104
 
    """
105
 
    zf = zipfile.ZipFile(zfname, "w")
106
 
    basedir = os.path.basename(dirname)
107
 
    for root, dirs, files, in os.walk(dirname):
108
 
        for fname in files:
109
 
            fspath = os.path.join(root, fname)
110
 
            arcpath = os.path.join(root, fname)[len(dirname)+1:]
111
 
            # print fspath, '=>', arcpath
112
 
            zf.write(fspath, arcpath)
113
 
    zf.close()
114
 
 
115
 
from twisted.python.zippath import ZipArchive
116
 
 
117
 
class ZipFilePathTestCase(AbstractFilePathTestCase):
118
 
 
119
 
    def setUp(self):
120
 
        AbstractFilePathTestCase.setUp(self)
121
 
        zipit(self.cmn, self.cmn+'.zip')
122
 
        self.path = ZipArchive(self.cmn+'.zip')
123
 
        self.all = [x.replace(self.cmn, self.cmn+'.zip') for x in self.all]
124
 
 
125
 
 
126
 
class FilePathTestCase(AbstractFilePathTestCase):
127
 
 
128
 
    def test_getAndSet(self):
129
 
        content = 'newcontent'
130
 
        self.path.child('new').setContent(content)
131
 
        newcontent = self.path.child('new').getContent()
132
 
        self.failUnlessEqual(content, newcontent)
133
 
        content = 'content'
134
 
        self.path.child('new').setContent(content, '.tmp')
135
 
        newcontent = self.path.child('new').getContent()
136
 
        self.failUnlessEqual(content, newcontent)
137
 
 
138
 
    def testSymbolicLink(self):
139
 
        s4 = self.path.child("sub4")
140
 
        s3 = self.path.child("sub3")
141
 
        os.symlink(s3.path, s4.path)
142
 
        self.failUnless(s4.islink())
143
 
        self.failIf(s3.islink())
144
 
        self.failUnless(s4.isdir())
145
 
        self.failUnless(s3.isdir())
146
 
 
147
 
    if not hasattr(os, "symlink"):
148
 
        testSymbolicLink.skip = "Your platform does not support symbolic links."
149
 
 
150
 
    def testMultiExt(self):
151
 
        f3 = self.path.child('sub3').child('file3')
152
 
        exts = '.foo','.bar', 'ext1','ext2','ext3'
153
 
        self.failIf(f3.siblingExtensionSearch(*exts))
154
 
        f3e = f3.siblingExtension(".foo")
155
 
        f3e.touch()
156
 
        self.failIf(not f3.siblingExtensionSearch(*exts).exists())
157
 
        self.failIf(not f3.siblingExtensionSearch('*').exists())
158
 
        f3e.remove()
159
 
        self.failIf(f3.siblingExtensionSearch(*exts))
160
 
 
161
 
    def testPreauthChild(self):
162
 
        fp = filepath.FilePath('.')
163
 
        fp.preauthChild('foo/bar')
164
 
        self.assertRaises(filepath.InsecurePath, fp.child, '/foo')
165
 
 
166
 
    def testStatCache(self):
167
 
        p = self.path.child('stattest')
168
 
        p.touch()
169
 
        self.failUnlessEqual(p.getsize(), 0)
170
 
        self.failUnlessEqual(abs(p.getmtime() - time.time()) // 20, 0)
171
 
        self.failUnlessEqual(abs(p.getctime() - time.time()) // 20, 0)
172
 
        self.failUnlessEqual(abs(p.getatime() - time.time()) // 20, 0)
173
 
        self.failUnlessEqual(p.exists(), True)
174
 
        self.failUnlessEqual(p.exists(), True)
175
 
        # OOB removal: FilePath.remove() will automatically restat
176
 
        os.remove(p.path)
177
 
        # test caching
178
 
        self.failUnlessEqual(p.exists(), True)
179
 
        p.restat(reraise=False)
180
 
        self.failUnlessEqual(p.exists(), False)
181
 
        self.failUnlessEqual(p.islink(), False)
182
 
        self.failUnlessEqual(p.isdir(), False)
183
 
        self.failUnlessEqual(p.isfile(), False)
184
 
 
185
 
    def testPersist(self):
186
 
        newpath = pickle.loads(pickle.dumps(self.path))
187
 
        self.failUnlessEqual(self.path.__class__, newpath.__class__)
188
 
        self.failUnlessEqual(self.path.path, newpath.path)
189
 
 
190
 
    def testInsecureUNIX(self):
191
 
        self.assertRaises(filepath.InsecurePath, self.path.child, "..")
192
 
        self.assertRaises(filepath.InsecurePath, self.path.child, "/etc")
193
 
        self.assertRaises(filepath.InsecurePath, self.path.child, "../..")
194
 
 
195
 
    def testInsecureWin32(self):
196
 
        self.assertRaises(filepath.InsecurePath, self.path.child, r"..\..")
197
 
        self.assertRaises(filepath.InsecurePath, self.path.child, r"C:randomfile")
198
 
 
199
 
    if platform.getType() != 'win32':
200
 
        testInsecureWin32.skip = "Consider yourself lucky."
201
 
 
202
 
    def testInsecureWin32Whacky(self):
203
 
        """Windows has 'special' filenames like NUL and CON and COM1 and LPR
204
 
        and PRN and ... god knows what else.  They can be located anywhere in
205
 
        the filesystem.  For obvious reasons, we do not wish to normally permit
206
 
        access to these.
207
 
        """
208
 
        self.assertRaises(filepath.InsecurePath, self.path.child, "CON")
209
 
        self.assertRaises(filepath.InsecurePath, self.path.child, "C:CON")
210
 
        self.assertRaises(filepath.InsecurePath, self.path.child, r"C:\CON")
211
 
 
212
 
    if platform.getType() != 'win32':
213
 
        testInsecureWin32Whacky.skip = "Consider yourself lucky."
214
 
 
215
 
    def testComparison(self):
216
 
        self.assertEquals(filepath.FilePath('a'),
217
 
                          filepath.FilePath('a'))
218
 
        self.failUnless(filepath.FilePath('z') >
219
 
                        filepath.FilePath('a'))
220
 
        self.failUnless(filepath.FilePath('z') >=
221
 
                        filepath.FilePath('a'))
222
 
        self.failUnless(filepath.FilePath('a') >=
223
 
                        filepath.FilePath('a'))
224
 
        self.failUnless(filepath.FilePath('a') <=
225
 
                        filepath.FilePath('a'))
226
 
        self.failUnless(filepath.FilePath('a') <
227
 
                        filepath.FilePath('z'))
228
 
        self.failUnless(filepath.FilePath('a') <=
229
 
                        filepath.FilePath('z'))
230
 
        self.failUnless(filepath.FilePath('a') !=
231
 
                        filepath.FilePath('z'))
232
 
        self.failUnless(filepath.FilePath('z') !=
233
 
                        filepath.FilePath('a'))
234
 
 
235
 
        self.failIf(filepath.FilePath('z') !=
236
 
                    filepath.FilePath('z'))
237
 
 
238
 
    def testSibling(self):
239
 
        p = self.path.child('sibling_start')
240
 
        ts = p.sibling('sibling_test')
241
 
        self.assertEquals(ts.dirname(), p.dirname())
242
 
        self.assertEquals(ts.basename(), 'sibling_test')
243
 
        ts.createDirectory()
244
 
        self.assertIn(ts, self.path.children())
245
 
 
246
 
    def testTemporarySibling(self):
247
 
        ts = self.path.temporarySibling()
248
 
        self.assertEquals(ts.dirname(), self.path.dirname())
249
 
        self.assertNotIn(ts.basename(), self.path.listdir())
250
 
        ts.createDirectory()
251
 
        self.assertIn(ts, self.path.parent().children())
252
 
 
253
 
    def testRemove(self):
254
 
        self.path.remove()
255
 
        self.failIf(self.path.exists())
256
 
 
257
 
    def testCopyTo(self):
258
 
        self.assertRaises((OSError, IOError), self.path.copyTo, self.path.child('file1'))
259
 
        oldPaths = list(self.path.walk()) # Record initial state
260
 
        fp = filepath.FilePath(self.mktemp())
261
 
        self.path.copyTo(fp)
262
 
        self.path.remove()
263
 
        fp.copyTo(self.path)
264
 
        newPaths = list(self.path.walk()) # Record double-copy state
265
 
        newPaths.sort()
266
 
        oldPaths.sort()
267
 
        self.assertEquals(newPaths, oldPaths)
268
 
 
269
 
    def testMoveTo(self):
270
 
        self.assertRaises((OSError, IOError), self.path.moveTo, self.path.child('file1'))
271
 
        oldPaths = list(self.path.walk()) # Record initial state
272
 
        fp = filepath.FilePath(self.mktemp())
273
 
        self.path.moveTo(fp)
274
 
        fp.moveTo(self.path)
275
 
        newPaths = list(self.path.walk()) # Record double-move state
276
 
        newPaths.sort()
277
 
        oldPaths.sort()
278
 
        self.assertEquals(newPaths, oldPaths)
279
 
 
280
 
    def testCrossMountMoveTo(self):
281
 
        """
282
 
        """
283
 
        # Bit of a whitebox test - force os.rename, which moveTo tries
284
 
        # before falling back to a slower method, to fail, forcing moveTo to
285
 
        # use the slower behavior.
286
 
        invokedWith = []
287
 
        def faultyRename(src, dest):
288
 
            invokedWith.append((src, dest))
289
 
            if len(invokedWith) == 2:
290
 
                raise OSError(errno.EXDEV, 'Test-induced failure simulating cross-device rename failure')
291
 
            return originalRename(src, dest)
292
 
 
293
 
        originalRename = os.rename
294
 
        os.rename = faultyRename
295
 
        try:
296
 
            self.testMoveTo()
297
 
            # A bit of a sanity check for this whitebox test - if our rename
298
 
            # was never invoked, the test has probably fallen into
299
 
            # disrepair!
300
 
            self.failUnless(len(invokedWith) >= 2)
301
 
        finally:
302
 
            os.rename = originalRename
303
 
 
304
 
    def testOpen(self):
305
 
        # Opening a file for reading when it does not already exist is an error
306
 
        nonexistent = self.path.child('nonexistent')
307
 
        e = self.assertRaises(IOError, nonexistent.open)
308
 
        self.assertEquals(e.errno, errno.ENOENT)
309
 
 
310
 
        # Opening a file for writing when it does not exist is okay
311
 
        writer = self.path.child('writer')
312
 
        f = writer.open('w')
313
 
        f.write('abc\ndef')
314
 
        f.close()
315
 
 
316
 
        # Make sure those bytes ended up there - and test opening a file for
317
 
        # reading when it does exist at the same time
318
 
        f = writer.open()
319
 
        self.assertEquals(f.read(), 'abc\ndef')
320
 
        f.close()
321
 
 
322
 
        # Re-opening that file in write mode should erase whatever was there.
323
 
        f = writer.open('w')
324
 
        f.close()
325
 
        f = writer.open()
326
 
        self.assertEquals(f.read(), '')
327
 
        f.close()
328
 
 
329
 
        # Put some bytes in a file so we can test that appending does not
330
 
        # destroy them.
331
 
        appender = self.path.child('appender')
332
 
        f = appender.open('w')
333
 
        f.write('abc')
334
 
        f.close()
335
 
 
336
 
        f = appender.open('a')
337
 
        f.write('def')
338
 
        f.close()
339
 
 
340
 
        f = appender.open('r')
341
 
        self.assertEquals(f.read(), 'abcdef')
342
 
        f.close()
343
 
 
344
 
        # read/write should let us do both without erasing those bytes
345
 
        f = appender.open('r+')
346
 
        self.assertEquals(f.read(), 'abcdef')
347
 
        # ANSI C *requires* an fseek or an fgetpos between an fread and an
348
 
        # fwrite or an fwrite and a fread.  We can't reliable get Python to
349
 
        # invoke fgetpos, so we seek to a 0 byte offset from the current
350
 
        # position instead.  Also, Python sucks for making this seek
351
 
        # relative to 1 instead of a symbolic constant representing the
352
 
        # current file position.
353
 
        f.seek(0, 1)
354
 
        # Put in some new bytes for us to test for later.
355
 
        f.write('ghi')
356
 
        f.close()
357
 
 
358
 
        # Make sure those new bytes really showed up
359
 
        f = appender.open('r')
360
 
        self.assertEquals(f.read(), 'abcdefghi')
361
 
        f.close()
362
 
 
363
 
        # write/read should let us do both, but erase anything that's there
364
 
        # already.
365
 
        f = appender.open('w+')
366
 
        self.assertEquals(f.read(), '')
367
 
        f.seek(0, 1) # Don't forget this!
368
 
        f.write('123')
369
 
        f.close()
370
 
 
371
 
        # super append mode should let us read and write and also position the
372
 
        # cursor at the end of the file, without erasing everything.
373
 
        f = appender.open('a+')
374
 
 
375
 
        # The order of these lines may seem surprising, but it is necessary.
376
 
        # The cursor is not at the end of the file until after the first write.
377
 
        f.write('456')
378
 
        f.seek(0, 1) # Asinine.
379
 
        self.assertEquals(f.read(), '')
380
 
 
381
 
        f.seek(0, 0)
382
 
        self.assertEquals(f.read(), '123456')
383
 
        f.close()
384
 
 
385
 
        # Opening a file exclusively must fail if that file exists already.
386
 
        nonexistent.requireCreate(True)
387
 
        nonexistent.open('w').close()
388
 
        existent = nonexistent
389
 
        del nonexistent
390
 
        self.assertRaises((OSError, IOError), existent.open)
391
 
 
392
 
 
393
 
from twisted.python import urlpath
394
 
 
395
 
class URLPathTestCase(unittest.TestCase):
396
 
    def setUp(self):
397
 
        self.path = urlpath.URLPath.fromString("http://example.com/foo/bar?yes=no&no=yes#footer")
398
 
 
399
 
    def testStringConversion(self):
400
 
        self.assertEquals(str(self.path), "http://example.com/foo/bar?yes=no&no=yes#footer")
401
 
 
402
 
    def testChildString(self):
403
 
        self.assertEquals(str(self.path.child('hello')), "http://example.com/foo/bar/hello")
404
 
        self.assertEquals(str(self.path.child('hello').child('')), "http://example.com/foo/bar/hello/")
405
 
 
406
 
    def testSiblingString(self):
407
 
        self.assertEquals(str(self.path.sibling('baz')), 'http://example.com/foo/baz')
408
 
        
409
 
        # The sibling of http://example.com/foo/bar/
410
 
        #     is http://example.comf/foo/bar/baz
411
 
        # because really we are constructing a sibling of
412
 
        # http://example.com/foo/bar/index.html
413
 
        self.assertEquals(str(self.path.child('').sibling('baz')), 'http://example.com/foo/bar/baz')
414
 
 
415
 
    def testParentString(self):
416
 
        # parent should be equivalent to '..'
417
 
        # 'foo' is the current directory, '/' is the parent directory
418
 
        self.assertEquals(str(self.path.parent()), 'http://example.com/')
419
 
        self.assertEquals(str(self.path.child('').parent()), 'http://example.com/foo/')
420
 
        self.assertEquals(str(self.path.child('baz').parent()), 'http://example.com/foo/')
421
 
        self.assertEquals(str(self.path.parent().parent().parent().parent().parent()), 'http://example.com/')
422
 
 
423
 
    def testHereString(self):
424
 
        # here should be equivalent to '.'
425
 
        self.assertEquals(str(self.path.here()), 'http://example.com/foo/')
426
 
        self.assertEquals(str(self.path.child('').here()), 'http://example.com/foo/bar/')
427