1
# -*- test-case-name: twisted.conch.test.test_filetransfer -*-
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
# See LICENSE file for details.
8
from twisted.trial import unittest
10
from twisted.conch import unix
14
del sys.modules['twisted.conch.unix'] # remove the bad import
16
# In Python 2.4, the bad import has already been cleaned up for us.
20
from twisted.conch import avatar
21
from twisted.conch.ssh import filetransfer, session
22
from twisted.internet import defer, reactor
23
from twisted.protocols import loopback
24
from twisted.python import components, log
27
class FileTransferTestAvatar(avatar.ConchUser):
29
def __init__(self, homeDir):
30
avatar.ConchUser.__init__(self)
31
self.channelLookup['session'] = session.SSHSession
32
self.subsystemLookup['sftp'] = filetransfer.FileTransferServer
33
self.homeDir = homeDir
35
def _runAsUser(self, f, *args, **kw):
42
args = len(i)>1 and i[1] or ()
43
kw = len(i)>2 and i[2] or {}
48
return os.path.join(os.getcwd(), self.homeDir)
50
class ConchSessionForTestAvatar:
52
def __init__(self, avatar):
56
if not hasattr(unix, 'SFTPServerForUnixConchUser'):
57
# unix should either be a fully working module, or None. I'm not sure
58
# how this happens, but on win32 it does. Try to cope. --spiv.
60
warnings.warn(("twisted.conch.unix imported %r, "
61
"but doesn't define SFTPServerForUnixConchUser'")
65
class FileTransferForTestAvatar(unix.SFTPServerForUnixConchUser):
67
def gotVersion(self, version, otherExt):
68
return {'conchTest' : 'ext data'}
70
def extendedRequest(self, extName, extData):
71
if extName == 'testExtendedRequest':
73
raise NotImplementedError
75
components.registerAdapter(FileTransferForTestAvatar,
76
FileTransferTestAvatar,
77
filetransfer.ISFTPServer)
79
class SFTPTestBase(unittest.TestCase):
82
self.testDir = self.mktemp()
83
# Give the testDir another level so we can safely "cd .." from it in
85
self.testDir = os.path.join(self.testDir, 'extra')
86
os.makedirs(os.path.join(self.testDir, 'testDirectory'))
88
f = file(os.path.join(self.testDir, 'testfile1'),'w')
89
f.write('a'*10+'b'*10)
90
f.write(file('/dev/urandom').read(1024*64)) # random data
91
os.chmod(os.path.join(self.testDir, 'testfile1'), 0644)
92
file(os.path.join(self.testDir, 'testRemoveFile'), 'w').write('a')
93
file(os.path.join(self.testDir, 'testRenameFile'), 'w').write('a')
94
file(os.path.join(self.testDir, '.testHiddenFile'), 'w').write('a')
97
class TestOurServerOurClient(SFTPTestBase):
100
skip = "can't run on non-posix computers"
103
SFTPTestBase.setUp(self)
105
self.avatar = FileTransferTestAvatar(self.testDir)
106
self.server = filetransfer.FileTransferServer(avatar=self.avatar)
107
clientTransport = loopback.LoopbackRelay(self.server)
109
self.client = filetransfer.FileTransferClient()
110
self._serverVersion = None
112
def _(serverVersion, extData):
113
self._serverVersion = serverVersion
114
self._extData = extData
115
self.client.gotServerVersion = _
116
serverTransport = loopback.LoopbackRelay(self.client)
117
self.client.makeConnection(clientTransport)
118
self.server.makeConnection(serverTransport)
120
self.clientTransport = clientTransport
121
self.serverTransport = serverTransport
125
def _emptyBuffers(self):
126
while self.serverTransport.buffer or self.clientTransport.buffer:
127
self.serverTransport.clearBuffer()
128
self.clientTransport.clearBuffer()
130
def _delayedEmptyBuffers(self):
131
reactor.callLater(0.1, self._emptyBuffers)
133
def testServerVersion(self):
134
self.failUnlessEqual(self._serverVersion, 3)
135
self.failUnlessEqual(self._extData, {'conchTest' : 'ext data'})
137
def testOpenFileIO(self):
138
d = self.client.openFile("testfile1", filetransfer.FXF_READ |
139
filetransfer.FXF_WRITE, {})
142
def _fileOpened(openFile):
143
self.failUnlessEqual(openFile, filetransfer.ISFTPFile(openFile))
144
d = _readChunk(openFile)
145
d.addCallback(_writeChunk, openFile)
148
def _readChunk(openFile):
149
d = openFile.readChunk(0, 20)
151
d.addCallback(self.failUnlessEqual, 'a'*10 + 'b'*10)
154
def _writeChunk(_, openFile):
155
d = openFile.writeChunk(20, 'c'*10)
157
d.addCallback(_readChunk2, openFile)
160
def _readChunk2(_, openFile):
161
d = openFile.readChunk(0, 30)
163
d.addCallback(self.failUnlessEqual, 'a'*10 + 'b'*10 + 'c'*10)
166
d.addCallback(_fileOpened)
169
def testClosedFileGetAttrs(self):
170
d = self.client.openFile("testfile1", filetransfer.FXF_READ |
171
filetransfer.FXF_WRITE, {})
174
def _getAttrs(_, openFile):
175
d = openFile.getAttrs()
183
def _close(openFile):
186
d.addCallback(_getAttrs, openFile)
188
return self.assertFailure(d, filetransfer.SFTPError)
190
d.addCallback(_close)
193
def testOpenFileAttributes(self):
194
d = self.client.openFile("testfile1", filetransfer.FXF_READ |
195
filetransfer.FXF_WRITE, {})
198
def _getAttrs(openFile):
199
d = openFile.getAttrs()
201
d.addCallback(_getAttrs2)
204
def _getAttrs2(attrs1):
205
d = self.client.getAttrs('testfile1')
207
d.addCallback(self.failUnlessEqual, attrs1)
210
return d.addCallback(_getAttrs)
213
def testOpenFileSetAttrs(self):
215
# Ok, how about this for a start? It caught a bug :) -- spiv.
216
d = self.client.openFile("testfile1", filetransfer.FXF_READ |
217
filetransfer.FXF_WRITE, {})
220
def _getAttrs(openFile):
221
d = openFile.getAttrs()
223
d.addCallback(_setAttrs)
226
def _setAttrs(attrs):
228
d = self.client.setAttrs('testfile1', attrs)
230
d.addCallback(_getAttrs2)
231
d.addCallback(self.failUnlessEqual, attrs)
235
d = self.client.getAttrs('testfile1')
239
d.addCallback(_getAttrs)
242
def testRemoveFile(self):
243
d = self.client.getAttrs("testRemoveFile")
245
def _removeFile(ignored):
246
d = self.client.removeFile("testRemoveFile")
249
d.addCallback(_removeFile)
250
d.addCallback(_removeFile)
251
return self.assertFailure(d, filetransfer.SFTPError)
253
def testRenameFile(self):
254
d = self.client.getAttrs("testRenameFile")
257
d = self.client.renameFile("testRenameFile", "testRenamedFile")
259
d.addCallback(_testRenamed, attrs)
261
def _testRenamed(_, attrs):
262
d = self.client.getAttrs("testRenamedFile")
264
d.addCallback(self.failUnlessEqual, attrs)
265
return d.addCallback(_rename)
267
def testDirectoryBad(self):
268
d = self.client.getAttrs("testMakeDirectory")
270
return self.assertFailure(d, filetransfer.SFTPError)
272
def testDirectoryCreation(self):
273
d = self.client.makeDirectory("testMakeDirectory", {})
277
d = self.client.getAttrs("testMakeDirectory")
281
# XXX not until version 4/5
282
# self.failUnlessEqual(filetransfer.FILEXFER_TYPE_DIRECTORY&attrs['type'],
283
# filetransfer.FILEXFER_TYPE_DIRECTORY)
285
def _removeDirectory(_):
286
d = self.client.removeDirectory("testMakeDirectory")
290
d.addCallback(_getAttrs)
291
d.addCallback(_removeDirectory)
292
d.addCallback(_getAttrs)
293
return self.assertFailure(d, filetransfer.SFTPError)
295
def testOpenDirectory(self):
296
d = self.client.openDirectory('')
300
def _getFiles(openDir):
304
d = defer.maybeDeferred(openDir.next)
306
d.addCallback(append)
307
d.addCallback(_getFiles)
308
d.addErrback(_close, openDir)
311
def _checkFiles(ignored):
312
fs = list(zip(*files)[0])
314
self.failUnlessEqual(fs,
315
['.testHiddenFile', 'testDirectory',
316
'testRemoveFile', 'testRenameFile',
319
def _close(_, openDir):
324
d.addCallback(_getFiles)
325
d.addCallback(_checkFiles)
328
def testLinkDoesntExist(self):
329
d = self.client.getAttrs('testLink')
331
return self.assertFailure(d, filetransfer.SFTPError)
333
def testLinkSharesAttrs(self):
334
d = self.client.makeLink('testLink', 'testfile1')
336
def _getFirstAttrs(_):
337
d = self.client.getAttrs('testLink', 1)
340
def _getSecondAttrs(firstAttrs):
341
d = self.client.getAttrs('testfile1')
343
d.addCallback(self.assertEqual, firstAttrs)
345
d.addCallback(_getFirstAttrs)
346
return d.addCallback(_getSecondAttrs)
348
def testLinkPath(self):
349
d = self.client.makeLink('testLink', 'testfile1')
352
d = self.client.readLink('testLink')
354
d.addCallback(self.failUnlessEqual,
355
os.path.join(os.getcwd(), self.testDir, 'testfile1'))
358
d = self.client.realPath('testLink')
360
d.addCallback(self.failUnlessEqual,
361
os.path.join(os.getcwd(), self.testDir, 'testfile1'))
363
d.addCallback(_readLink)
364
d.addCallback(_realPath)
367
def testExtendedRequest(self):
368
d = self.client.extendedRequest('testExtendedRequest', 'foo')
370
d.addCallback(self.failUnlessEqual, 'bar')
371
d.addCallback(self._cbTestExtendedRequest)
374
def _cbTestExtendedRequest(self, ignored):
375
d = self.client.extendedRequest('testBadRequest', '')
377
return self.assertFailure(d, NotImplementedError)