1
"""Tests for the SFTP server VFS adapter."""
6
from twisted.conch.ssh.filetransfer import FXF_READ, FXF_WRITE, FXF_CREAT
7
from twisted.conch.ssh.filetransfer import FXF_APPEND, FXF_EXCL, FXF_TRUNC
8
from twisted.conch.ssh.filetransfer import SFTPError
9
from twisted.conch.ssh.filetransfer import FX_NO_SUCH_FILE, FX_FAILURE
10
from twisted.conch.ssh.filetransfer import FX_PERMISSION_DENIED
11
from twisted.conch.ssh.filetransfer import FX_OP_UNSUPPORTED, FX_NOT_A_DIRECTORY
12
from twisted.conch.ssh.filetransfer import FX_FILE_IS_A_DIRECTORY
13
from twisted.conch.ssh.filetransfer import FX_FILE_ALREADY_EXISTS
14
from twisted.conch.interfaces import ISFTPFile
15
from twisted.trial import unittest
16
from twisted.internet import defer
18
from twisted.vfs import ivfs, pathutils
19
from twisted.vfs.adapters import sftp
20
from twisted.vfs.backends import inmem, osfs
22
sftpAttrs = ['size', 'uid', 'gid', 'nlink', 'mtime', 'atime', 'permissions']
26
class SFTPErrorTranslationTests(unittest.TestCase):
28
def assertTranslation(self, error, code):
29
"""Asserts that the translate_error decorator translates 'error' to an
30
SFTPError with a code of 'code'."""
31
message = 'test error message'
34
f = sftp.translateErrors(f)
35
e = self.assertRaises(SFTPError, f)
36
self.assertEqual(code, e.code)
37
self.assertEqual(message, e.message)
39
def testPermissionError(self):
40
"""PermissionError is translated to FX_PERMISSION_DENIED."""
41
self.assertTranslation(ivfs.PermissionError, FX_PERMISSION_DENIED)
43
def testNotFoundError(self):
44
"""NotFoundError is translated to FX_NO_SUCH_FILE."""
45
self.assertTranslation(ivfs.NotFoundError, FX_NO_SUCH_FILE)
47
def testVFSError(self):
48
"""VFSErrors that aren't otherwise caught are translated to
50
self.assertTranslation(ivfs.VFSError, FX_FAILURE)
52
def testNotImplementedError(self):
53
"""NotImplementedError is translated to FX_OP_UNSUPPORTED."""
54
self.assertTranslation(NotImplementedError, FX_OP_UNSUPPORTED)
56
def testTranslateDeferredError(self):
57
"""If the decorated function returns a Deferred, the error should still
60
return defer.fail(ivfs.VFSError('error message'))
61
f = sftp.translateErrors(f)
63
return self.assertFailure(d, SFTPError)
65
def testTranslateDeferredError2(self):
66
"""If the decorated function returns a Deferred that hasn't fired
67
immediately, the error should still be translated."""
71
f = sftp.translateErrors(f)
73
d.errback(ivfs.VFSError("foo"))
74
return self.assertFailure(d2, SFTPError)
77
class SFTPAdapterTest(unittest.TestCase):
80
return inmem.FakeDirectory()
85
# Create a subdirectory 'ned'
86
self.ned = ned = root.createDirectory('ned')
88
# Create a file 'file.txt'
89
self.f = f = root.createFile('file.txt')
91
flags |= getattr(os, 'O_BINARY', 0) # for windows
92
f.open(flags).writeChunk(0, 'wobble\n')
96
self.avatar = sftp.VFSConchUser('radix', root)
97
self.sftp = sftp.AdaptFileSystemUserToISFTP(self.avatar)
99
def _assertNodes(self, dir, mynodes):
100
nodes = [x[0] for x in pathutils.fetch(self.root, dir).children()]
103
return self.assertEquals(nodes, mynodes)
105
def test_openFile(self):
106
child = self.sftp.openFile('file.txt', 0, None)
107
self.failUnless(ISFTPFile.providedBy(child))
109
def test_openNewFile(self):
110
# Opening a new file with FXF_READ alone should fail with
112
e = self.assertRaises(SFTPError,
113
self.sftp.openFile, 'new file.txt', FXF_READ, None)
114
self.assertEqual(FX_NO_SUCH_FILE, e.code)
116
def test_openNewFileCreate(self):
117
# Opening a new file should work if FXF_CREAT is passed.
118
child = self.sftp.openFile('new file.txt', FXF_READ|FXF_CREAT, None)
119
self.failUnless(ISFTPFile.providedBy(child))
121
def test_openNewFileWrite(self):
122
# The FXF_WRITE flag alone can create a file.
123
child = self.sftp.openFile('new file.txt', FXF_WRITE, None)
124
self.failUnless(ISFTPFile.providedBy(child))
126
def test_openNewFileReadWrite(self):
127
# So, of course FXF_WRITE plus FXF_READ can create a file too.
128
child = self.sftp.openFile('new file.txt', FXF_WRITE|FXF_READ, None)
129
self.failUnless(ISFTPFile.providedBy(child))
131
def test_openNewFileAppend(self):
132
# The FXF_APPEND flag alone can create a file.
133
child = self.sftp.openFile('new file.txt', FXF_APPEND, None)
134
self.failUnless(ISFTPFile.providedBy(child))
136
def test_openNewFileExclusive(self):
137
flags = FXF_WRITE|FXF_CREAT|FXF_EXCL
138
# But if the file doesn't exist, then it should work.
139
child = self.sftp.openFile('new file.txt', flags, None)
140
self.failUnless(ISFTPFile.providedBy(child))
142
def test_openExistingFileExclusive(self):
143
# Creating a file should fail if the FXF_EXCL flag is given and the file
144
# already exists. This fails with FX_FILE_ALREADY_EXISTS (which is
145
# actually just FX_FAILURE for now, see the comment in
146
# twisted/conch/ssh/filetransfer.py).
147
flags = FXF_WRITE|FXF_CREAT|FXF_EXCL
148
e = self.assertRaises(SFTPError,
149
self.sftp.openFile, 'file.txt', flags, None)
150
self.assertEqual(FX_FILE_ALREADY_EXISTS, e.code)
152
def test_openFileTrunc(self):
153
# The FXF_TRUNC flag causes an existing file to be truncated.
154
child = self.sftp.openFile('file.txt', FXF_WRITE|FXF_TRUNC, None)
155
self.failUnless(ISFTPFile.providedBy(child))
157
# The file should have been truncated to 0 size.
158
attrs = child.getAttrs()
159
self.failUnlessEqual(0, attrs['size'])
161
def test_removeFile(self):
162
self.sftp.removeFile('/file.txt')
163
self._assertNodes('/', ['.', '..', 'ned'])
165
def test_removeFileMissing(self):
166
# Trying to remove a file that doesn't exist should fail with
168
e = self.assertRaises(SFTPError,
169
self.sftp.removeFile, 'file-that-does-not-exist.txt')
170
self.assertEqual(FX_NO_SUCH_FILE, e.code)
172
def test_renameFile(self):
173
self.sftp.renameFile('/file.txt', '/radixiscool.txt')
174
self._assertNodes('/', ['.', '..', 'ned', 'radixiscool.txt'])
176
def test_renameFileRelative(self):
177
self.sftp.renameFile('file.txt', 'radixiscool.txt')
178
self._assertNodes('/', ['.', '..', 'ned', 'radixiscool.txt'])
180
def test_renameToDirectory(self):
181
self.sftp.renameFile('/file.txt', '/ned')
182
self._assertNodes('/', ['.', '..', 'ned'])
183
self._assertNodes('/ned', ['.', '..', 'file.txt'])
185
def test_renameInDirectory(self):
186
self.sftp.renameFile('/file.txt', '/ned')
187
self._assertNodes('/', ['.', '..', 'ned'])
188
self._assertNodes('/ned', ['.', '..', 'file.txt'])
189
self.sftp.renameFile('/ned/file.txt', '/ned/file2.txt')
190
self._assertNodes('/ned', ['.', '..', 'file2.txt'])
192
def test_makeDirectory(self):
193
self.sftp.makeDirectory('/dir', None)
194
self._assertNodes('/', ['.', '..', 'file.txt', 'ned', 'dir'])
195
self._assertNodes('/dir', ['.', '..'])
197
def test_makeSubDirectory(self):
198
self.sftp.makeDirectory('/dir', None)
199
self.sftp.makeDirectory('/dir/subdir', None)
200
self._assertNodes('/', ['.', '..', 'file.txt', 'ned', 'dir'])
201
self._assertNodes('/dir', ['.', '..', 'subdir'])
202
self._assertNodes('/dir/subdir', ['.', '..'])
204
def test_removeDirectory(self):
205
self.sftp.makeDirectory('/dir', None)
206
self.sftp.removeDirectory('/dir')
207
self._assertNodes('/', ['.', '..', 'file.txt', 'ned'])
209
def test_openDirectory(self):
210
for name, lsline, attrs in self.sftp.openDirectory('/ned'):
213
self.failUnless(sftpAttrs, keys)
215
def test_getAttrsPath(self):
216
# getAttrs by path name
217
attrs = self.sftp.getAttrs('/ned', None).keys()
219
self.failUnless(sftpAttrs, attrs)
221
def test_getAttrsFile(self):
222
# getAttrs on an open file
224
child = self.sftp.openFile(path, 0, None)
225
attrs = child.getAttrs()
226
self.failUnlessEqual(7, attrs['size'])
228
def test_setAttrsPath(self):
229
# setAttrs on a path name
230
for mtime in [86401, 200000, int(time.time())]:
232
self.sftp.setAttrs('/file.txt', {'mtime': mtime})
234
if e.code == FX_OP_UNSUPPORTED:
235
raise unittest.SkipTest(
236
"The VFS backend %r doesn't support setAttrs"
242
mtime, self.sftp.getAttrs('/file.txt', False)['mtime'])
244
def test_setAttrsFile(self):
245
# setAttrs on an open file
246
file = self.sftp.openFile('file.txt', 0, None)
247
for mtime in [86401, 200000, int(time.time())]:
249
file.setAttrs({'mtime': mtime})
250
except NotImplementedError:
251
raise unittest.SkipTest(
252
"The VFS backend %r doesn't support setAttrs"
256
mtime, file.getAttrs()['mtime'])
258
def test_dirlistWithoutAttrs(self):
259
self.ned.getMetadata = self.f.getMetadata = lambda: {}
260
for name, lsline, attrs in self.sftp.openDirectory('/'):
263
self.failUnless(sftpAttrs, keys)
265
def test_openDirectoryAsFile(self):
266
# http://www.ietf.org/internet-drafts/draft-ietf-secsh-filexfer-12.txt
267
# 8.1.1.1 says: "If 'filename' is a directory file, the server MUST
268
# return an SSH_FX_FILE_IS_A_DIRECTORY error."
269
e = self.assertRaises(SFTPError, self.sftp.openFile, 'ned', 0, None)
270
self.assertEqual(FX_FILE_IS_A_DIRECTORY, e.code)
272
def test_openFileAsDirectory(self):
273
# 8.1.2: "If 'path' does not refer to a directory, the server MUST
274
# return SSH_FX_NOT_A_DIRECTORY."
275
e = self.assertRaises(SFTPError, self.sftp.openDirectory, 'file.txt')
276
self.assertEqual(FX_NOT_A_DIRECTORY, e.code)
278
def test_removeDirectoryAsFile(self):
279
# 8.3: "This request cannot be used to remove directories. The server
280
# MUST return SSH_FX_FILE_IS_A_DIRECTORY in this case."
281
e = self.assertRaises(SFTPError, self.sftp.removeFile, 'ned')
282
self.assertEqual(FX_FILE_IS_A_DIRECTORY, e.code)
284
def test_removeDirectoryMissing(self):
285
# Trying to remove a directory that doesn't exist should give
287
e = self.assertRaises(SFTPError, self.sftp.removeDirectory, 'missing')
288
self.assertEqual(FX_NO_SUCH_FILE, e.code)
290
def test_getAttrsMissing(self):
291
# getAttrs on a file that doesn't exist gives FX_NO_SUCH_FILE.
292
e = self.assertRaises(SFTPError, self.sftp.getAttrs, 'missing', None)
293
self.assertEqual(FX_NO_SUCH_FILE, e.code)
295
def test_setAttrsMissing(self):
296
# setAttrs on a file that doesn't exist gives FX_NO_SUCH_FILE.
297
e = self.assertRaises(SFTPError, self.sftp.setAttrs, 'missing', {})
298
self.assertEqual(FX_NO_SUCH_FILE, e.code)
301
class SFTPAdapterOSFSTest(SFTPAdapterTest):
305
return osfs.OSDirectory(path)
308
class DummyDir(inmem.FakeDirectory):
309
def createDirectory(self, childName):
311
d2 = defer.maybeDeferred(inmem.FakeDirectory.createDirectory,
313
from twisted.internet import reactor
314
reactor.callLater(1, d2.chainDeferred, d)
317
class SFTPAdapterDeferredTestCase(unittest.TestCase):
320
filesystem = pathutils.FileSystem(root)
321
self.filesystem = filesystem
323
avatar = sftp.VFSConchUser('radix', root)
324
self.sftp = sftp.AdaptFileSystemUserToISFTP(avatar)
326
def _assertNodes(self, dir, mynodes):
327
nodes = [x[0] for x in self.filesystem.fetch(dir).children()]
330
return self.assertEquals(nodes, mynodes)
332
def test_makeDirectoryDeferred(self):
333
# Allow Deferreds to be returned from createDirectory
334
d = defer.maybeDeferred(self.sftp.makeDirectory, '/dir', None)
336
self._assertNodes('/', ['.', '..', 'dir'])
337
return d.addCallback(cb)