~landscape/zope3/ztk-1.1.3

« back to all changes in this revision

Viewing changes to src/twisted/vfs/test/test_sftp.py

  • Committer: Andreas Hasenack
  • Date: 2009-07-20 17:49:16 UTC
  • Revision ID: andreas@canonical.com-20090720174916-g2tn6qmietz2hn0u
Revert twisted removal, it breaks several dozen tests [trivial]

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""Tests for the SFTP server VFS adapter."""
 
2
 
 
3
import os
 
4
import time
 
5
 
 
6
from twisted.conch.ssh.filetransfer import FXF_READ, FXF_WRITE, FXF_CREAT
 
7
from twisted.conch.ssh.filetransfer import FXF_APPEND, FXF_EXCL, FXF_TRUNC
 
8
from twisted.conch.ssh.filetransfer import SFTPError
 
9
from twisted.conch.ssh.filetransfer import FX_NO_SUCH_FILE, FX_FAILURE
 
10
from twisted.conch.ssh.filetransfer import FX_PERMISSION_DENIED
 
11
from twisted.conch.ssh.filetransfer import FX_OP_UNSUPPORTED, FX_NOT_A_DIRECTORY
 
12
from twisted.conch.ssh.filetransfer import FX_FILE_IS_A_DIRECTORY
 
13
from twisted.conch.ssh.filetransfer import FX_FILE_ALREADY_EXISTS
 
14
from twisted.conch.interfaces import ISFTPFile
 
15
from twisted.trial import unittest
 
16
from twisted.internet import defer
 
17
 
 
18
from twisted.vfs import ivfs, pathutils
 
19
from twisted.vfs.adapters import sftp
 
20
from twisted.vfs.backends import inmem, osfs
 
21
 
 
22
sftpAttrs = ['size', 'uid', 'gid', 'nlink', 'mtime', 'atime', 'permissions']
 
23
sftpAttrs.sort()
 
24
 
 
25
 
 
26
class SFTPErrorTranslationTests(unittest.TestCase):
 
27
 
 
28
    def assertTranslation(self, error, code):
 
29
        """Asserts that the translate_error decorator translates 'error' to an
 
30
        SFTPError with a code of 'code'."""
 
31
        message = 'test error message'
 
32
        def f():
 
33
            raise error(message)
 
34
        f = sftp.translateErrors(f)
 
35
        e = self.assertRaises(SFTPError, f)
 
36
        self.assertEqual(code, e.code)
 
37
        self.assertEqual(message, e.message)
 
38
    
 
39
    def testPermissionError(self):
 
40
        """PermissionError is translated to FX_PERMISSION_DENIED."""
 
41
        self.assertTranslation(ivfs.PermissionError, FX_PERMISSION_DENIED)
 
42
 
 
43
    def testNotFoundError(self):
 
44
        """NotFoundError is translated to FX_NO_SUCH_FILE."""
 
45
        self.assertTranslation(ivfs.NotFoundError, FX_NO_SUCH_FILE)
 
46
 
 
47
    def testVFSError(self):
 
48
        """VFSErrors that aren't otherwise caught are translated to
 
49
        FX_FAILURE."""
 
50
        self.assertTranslation(ivfs.VFSError, FX_FAILURE)
 
51
 
 
52
    def testNotImplementedError(self):
 
53
        """NotImplementedError is translated to FX_OP_UNSUPPORTED."""
 
54
        self.assertTranslation(NotImplementedError, FX_OP_UNSUPPORTED)
 
55
 
 
56
    def testTranslateDeferredError(self):
 
57
        """If the decorated function returns a Deferred, the error should still
 
58
        be translated."""
 
59
        def f():
 
60
            return defer.fail(ivfs.VFSError('error message'))
 
61
        f = sftp.translateErrors(f)
 
62
        d = f()
 
63
        return self.assertFailure(d, SFTPError)
 
64
 
 
65
    def testTranslateDeferredError2(self):
 
66
        """If the decorated function returns a Deferred that hasn't fired
 
67
        immediately, the error should still be translated."""
 
68
        d = defer.Deferred()
 
69
        def f():
 
70
            return d
 
71
        f = sftp.translateErrors(f)
 
72
        d2 = f()
 
73
        d.errback(ivfs.VFSError("foo"))
 
74
        return self.assertFailure(d2, SFTPError)
 
75
 
 
76
 
 
77
class SFTPAdapterTest(unittest.TestCase):
 
78
 
 
79
    def rootDir(self):
 
80
        return inmem.FakeDirectory()
 
81
 
 
82
    def setUp(self):
 
83
        root = self.rootDir()
 
84
 
 
85
        # Create a subdirectory 'ned'
 
86
        self.ned = ned = root.createDirectory('ned')
 
87
 
 
88
        # Create a file 'file.txt'
 
89
        self.f = f = root.createFile('file.txt')
 
90
        flags = os.O_WRONLY
 
91
        flags |= getattr(os, 'O_BINARY', 0)  # for windows
 
92
        f.open(flags).writeChunk(0, 'wobble\n')
 
93
        f.close()
 
94
 
 
95
        self.root = root
 
96
        self.avatar = sftp.VFSConchUser('radix', root)
 
97
        self.sftp = sftp.AdaptFileSystemUserToISFTP(self.avatar)
 
98
 
 
99
    def _assertNodes(self, dir, mynodes):
 
100
        nodes = [x[0] for x in pathutils.fetch(self.root, dir).children()]
 
101
        nodes.sort()
 
102
        mynodes.sort()
 
103
        return self.assertEquals(nodes, mynodes)
 
104
 
 
105
    def test_openFile(self):
 
106
        child = self.sftp.openFile('file.txt', 0, None)
 
107
        self.failUnless(ISFTPFile.providedBy(child))
 
108
 
 
109
    def test_openNewFile(self):
 
110
        # Opening a new file with FXF_READ alone should fail with
 
111
        # FX_NO_SUCH_FILE.
 
112
        e = self.assertRaises(SFTPError,
 
113
           self.sftp.openFile, 'new file.txt', FXF_READ, None)
 
114
        self.assertEqual(FX_NO_SUCH_FILE, e.code)
 
115
 
 
116
    def test_openNewFileCreate(self):
 
117
        # Opening a new file should work if FXF_CREAT is passed.
 
118
        child = self.sftp.openFile('new file.txt', FXF_READ|FXF_CREAT, None)
 
119
        self.failUnless(ISFTPFile.providedBy(child))
 
120
 
 
121
    def test_openNewFileWrite(self):
 
122
        # The FXF_WRITE flag alone can create a file.
 
123
        child = self.sftp.openFile('new file.txt', FXF_WRITE, None)
 
124
        self.failUnless(ISFTPFile.providedBy(child))
 
125
 
 
126
    def test_openNewFileReadWrite(self):
 
127
        # So, of course FXF_WRITE plus FXF_READ can create a file too.
 
128
        child = self.sftp.openFile('new file.txt', FXF_WRITE|FXF_READ, None)
 
129
        self.failUnless(ISFTPFile.providedBy(child))
 
130
 
 
131
    def test_openNewFileAppend(self):
 
132
        # The FXF_APPEND flag alone can create a file.
 
133
        child = self.sftp.openFile('new file.txt', FXF_APPEND, None)
 
134
        self.failUnless(ISFTPFile.providedBy(child))
 
135
 
 
136
    def test_openNewFileExclusive(self):
 
137
        flags = FXF_WRITE|FXF_CREAT|FXF_EXCL
 
138
        # But if the file doesn't exist, then it should work.
 
139
        child = self.sftp.openFile('new file.txt', flags, None)
 
140
        self.failUnless(ISFTPFile.providedBy(child))
 
141
 
 
142
    def test_openExistingFileExclusive(self):
 
143
        # Creating a file should fail if the FXF_EXCL flag is given and the file
 
144
        # already exists.  This fails with FX_FILE_ALREADY_EXISTS (which is
 
145
        # actually just FX_FAILURE for now, see the comment in
 
146
        # twisted/conch/ssh/filetransfer.py).
 
147
        flags = FXF_WRITE|FXF_CREAT|FXF_EXCL
 
148
        e = self.assertRaises(SFTPError,
 
149
                              self.sftp.openFile, 'file.txt', flags, None)
 
150
        self.assertEqual(FX_FILE_ALREADY_EXISTS, e.code)
 
151
 
 
152
    def test_openFileTrunc(self):
 
153
        # The FXF_TRUNC flag causes an existing file to be truncated.
 
154
        child = self.sftp.openFile('file.txt', FXF_WRITE|FXF_TRUNC, None)
 
155
        self.failUnless(ISFTPFile.providedBy(child))
 
156
        
 
157
        # The file should have been truncated to 0 size.
 
158
        attrs = child.getAttrs()
 
159
        self.failUnlessEqual(0, attrs['size'])
 
160
 
 
161
    def test_removeFile(self):
 
162
        self.sftp.removeFile('/file.txt')
 
163
        self._assertNodes('/', ['.', '..', 'ned'])
 
164
 
 
165
    def test_removeFileMissing(self):
 
166
        # Trying to remove a file that doesn't exist should fail with
 
167
        # FX_NO_SUCH_FILE.
 
168
        e = self.assertRaises(SFTPError,
 
169
           self.sftp.removeFile, 'file-that-does-not-exist.txt')
 
170
        self.assertEqual(FX_NO_SUCH_FILE, e.code)
 
171
 
 
172
    def test_renameFile(self):
 
173
        self.sftp.renameFile('/file.txt', '/radixiscool.txt')
 
174
        self._assertNodes('/', ['.', '..', 'ned', 'radixiscool.txt'])
 
175
 
 
176
    def test_renameFileRelative(self):
 
177
        self.sftp.renameFile('file.txt', 'radixiscool.txt')
 
178
        self._assertNodes('/', ['.', '..', 'ned', 'radixiscool.txt'])
 
179
 
 
180
    def test_renameToDirectory(self):
 
181
        self.sftp.renameFile('/file.txt', '/ned')
 
182
        self._assertNodes('/', ['.', '..', 'ned'])
 
183
        self._assertNodes('/ned', ['.', '..', 'file.txt'])
 
184
 
 
185
    def test_renameInDirectory(self):
 
186
        self.sftp.renameFile('/file.txt', '/ned')
 
187
        self._assertNodes('/', ['.', '..', 'ned'])
 
188
        self._assertNodes('/ned', ['.', '..', 'file.txt'])
 
189
        self.sftp.renameFile('/ned/file.txt', '/ned/file2.txt')
 
190
        self._assertNodes('/ned', ['.', '..', 'file2.txt'])
 
191
 
 
192
    def test_makeDirectory(self):
 
193
        self.sftp.makeDirectory('/dir', None)
 
194
        self._assertNodes('/', ['.', '..', 'file.txt', 'ned', 'dir'])
 
195
        self._assertNodes('/dir', ['.', '..'])
 
196
 
 
197
    def test_makeSubDirectory(self):
 
198
        self.sftp.makeDirectory('/dir', None)
 
199
        self.sftp.makeDirectory('/dir/subdir', None)
 
200
        self._assertNodes('/', ['.', '..', 'file.txt', 'ned', 'dir'])
 
201
        self._assertNodes('/dir', ['.', '..', 'subdir'])
 
202
        self._assertNodes('/dir/subdir', ['.', '..'])
 
203
 
 
204
    def test_removeDirectory(self):
 
205
        self.sftp.makeDirectory('/dir', None)
 
206
        self.sftp.removeDirectory('/dir')
 
207
        self._assertNodes('/', ['.', '..', 'file.txt', 'ned'])
 
208
 
 
209
    def test_openDirectory(self):
 
210
        for name, lsline, attrs in self.sftp.openDirectory('/ned'):
 
211
            keys = attrs.keys()
 
212
            keys.sort()
 
213
            self.failUnless(sftpAttrs, keys)
 
214
 
 
215
    def test_getAttrsPath(self):
 
216
        # getAttrs by path name
 
217
        attrs = self.sftp.getAttrs('/ned', None).keys()
 
218
        attrs.sort()
 
219
        self.failUnless(sftpAttrs, attrs)
 
220
 
 
221
    def test_getAttrsFile(self):
 
222
        # getAttrs on an open file
 
223
        path = 'file.txt'
 
224
        child = self.sftp.openFile(path, 0, None)
 
225
        attrs = child.getAttrs()
 
226
        self.failUnlessEqual(7, attrs['size'])
 
227
 
 
228
    def test_setAttrsPath(self):
 
229
        # setAttrs on a path name
 
230
        for mtime in [86401, 200000, int(time.time())]:
 
231
            try:
 
232
                self.sftp.setAttrs('/file.txt', {'mtime': mtime})
 
233
            except SFTPError, e:
 
234
                if e.code == FX_OP_UNSUPPORTED:
 
235
                    raise unittest.SkipTest(
 
236
                        "The VFS backend %r doesn't support setAttrs" 
 
237
                        % (self.root,))
 
238
                else:
 
239
                    raise
 
240
            else:
 
241
                self.assertEqual(
 
242
                    mtime, self.sftp.getAttrs('/file.txt', False)['mtime'])
 
243
 
 
244
    def test_setAttrsFile(self):
 
245
        # setAttrs on an open file
 
246
        file = self.sftp.openFile('file.txt', 0, None)
 
247
        for mtime in [86401, 200000, int(time.time())]:
 
248
            try:
 
249
                file.setAttrs({'mtime': mtime})
 
250
            except NotImplementedError:
 
251
                raise unittest.SkipTest(
 
252
                    "The VFS backend %r doesn't support setAttrs" 
 
253
                    % (self.root,))
 
254
            else:
 
255
                self.assertEqual(
 
256
                    mtime, file.getAttrs()['mtime'])
 
257
 
 
258
    def test_dirlistWithoutAttrs(self):
 
259
        self.ned.getMetadata = self.f.getMetadata = lambda: {}
 
260
        for name, lsline, attrs in self.sftp.openDirectory('/'):
 
261
            keys = attrs.keys()
 
262
            keys.sort()
 
263
            self.failUnless(sftpAttrs, keys)
 
264
 
 
265
    def test_openDirectoryAsFile(self):
 
266
        # http://www.ietf.org/internet-drafts/draft-ietf-secsh-filexfer-12.txt
 
267
        # 8.1.1.1 says: "If 'filename' is a directory file, the server MUST
 
268
        # return an SSH_FX_FILE_IS_A_DIRECTORY error."
 
269
        e = self.assertRaises(SFTPError, self.sftp.openFile, 'ned', 0, None)
 
270
        self.assertEqual(FX_FILE_IS_A_DIRECTORY, e.code)
 
271
 
 
272
    def test_openFileAsDirectory(self):
 
273
        # 8.1.2: "If 'path' does not refer to a directory, the server MUST
 
274
        # return SSH_FX_NOT_A_DIRECTORY."
 
275
        e = self.assertRaises(SFTPError, self.sftp.openDirectory, 'file.txt')
 
276
        self.assertEqual(FX_NOT_A_DIRECTORY, e.code)
 
277
 
 
278
    def test_removeDirectoryAsFile(self):
 
279
        # 8.3: "This request cannot be used to remove directories.  The server
 
280
        # MUST return SSH_FX_FILE_IS_A_DIRECTORY in this case."
 
281
        e = self.assertRaises(SFTPError, self.sftp.removeFile, 'ned')
 
282
        self.assertEqual(FX_FILE_IS_A_DIRECTORY, e.code)
 
283
 
 
284
    def test_removeDirectoryMissing(self):
 
285
        # Trying to remove a directory that doesn't exist should give
 
286
        # FX_NO_SUCH_FILE.
 
287
        e = self.assertRaises(SFTPError, self.sftp.removeDirectory, 'missing')
 
288
        self.assertEqual(FX_NO_SUCH_FILE, e.code)
 
289
 
 
290
    def test_getAttrsMissing(self):
 
291
        # getAttrs on a file that doesn't exist gives FX_NO_SUCH_FILE.
 
292
        e = self.assertRaises(SFTPError, self.sftp.getAttrs, 'missing', None)
 
293
        self.assertEqual(FX_NO_SUCH_FILE, e.code)
 
294
 
 
295
    def test_setAttrsMissing(self):
 
296
        # setAttrs on a file that doesn't exist gives FX_NO_SUCH_FILE.
 
297
        e = self.assertRaises(SFTPError, self.sftp.setAttrs, 'missing', {})
 
298
        self.assertEqual(FX_NO_SUCH_FILE, e.code)
 
299
 
 
300
 
 
301
class SFTPAdapterOSFSTest(SFTPAdapterTest):
 
302
    def rootDir(self):
 
303
        path = self.mktemp()
 
304
        os.mkdir(path)
 
305
        return osfs.OSDirectory(path)
 
306
 
 
307
 
 
308
class DummyDir(inmem.FakeDirectory):
 
309
    def createDirectory(self, childName):
 
310
        d = defer.Deferred()
 
311
        d2 = defer.maybeDeferred(inmem.FakeDirectory.createDirectory, 
 
312
                                 self, childName)
 
313
        from twisted.internet import reactor
 
314
        reactor.callLater(1, d2.chainDeferred, d)
 
315
        return d
 
316
 
 
317
class SFTPAdapterDeferredTestCase(unittest.TestCase):
 
318
    def setUp(self):
 
319
        root = DummyDir()
 
320
        filesystem = pathutils.FileSystem(root)
 
321
        self.filesystem = filesystem
 
322
 
 
323
        avatar = sftp.VFSConchUser('radix', root)
 
324
        self.sftp = sftp.AdaptFileSystemUserToISFTP(avatar)
 
325
 
 
326
    def _assertNodes(self, dir, mynodes):
 
327
        nodes = [x[0] for x in self.filesystem.fetch(dir).children()]
 
328
        nodes.sort()
 
329
        mynodes.sort()
 
330
        return self.assertEquals(nodes, mynodes)
 
331
 
 
332
    def test_makeDirectoryDeferred(self):
 
333
        # Allow Deferreds to be returned from createDirectory
 
334
        d = defer.maybeDeferred(self.sftp.makeDirectory, '/dir', None)
 
335
        def cb(result):
 
336
            self._assertNodes('/', ['.', '..', 'dir'])
 
337
        return d.addCallback(cb)
 
338