1
# -*- test-case-name: twisted.conch.test.test_cftp -*-
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
# See LICENSE file for details.
8
from twisted.conch import unix
9
from twisted.conch.scripts import cftp
10
from twisted.conch.client import connect, default, options
14
del sys.modules['twisted.conch.unix'] # remove the bad import
16
# In Python 2.4, the bad import has already been cleaned up for us.
24
from twisted.cred import portal
25
from twisted.internet import reactor, protocol, interfaces, defer, error
26
from twisted.internet.utils import getProcessOutputAndValue
27
from twisted.python import log
28
from twisted.test import test_process
30
import test_ssh, test_conch
31
from test_filetransfer import SFTPTestBase, FileTransferTestAvatar
32
import sys, os, time, tempfile
34
class FileTransferTestRealm:
36
def __init__(self, testDir):
37
self.testDir = testDir
39
def requestAvatar(self, avatarID, mind, *interfaces):
40
a = FileTransferTestAvatar(self.testDir)
41
return interfaces[0], a, lambda: None
44
class SFTPTestProcess(protocol.ProcessProtocol):
50
def connectionMade(self):
53
def clearBuffer(self):
56
def outReceived(self, data):
57
log.msg('got %s' % data)
60
def errReceived(self, data):
61
log.msg('err: %s' % data)
63
def connectionLost(self, reason):
68
class CFTPClientTestBase(SFTPTestBase):
71
open('dsa_test.pub','w').write(test_ssh.publicDSA_openssh)
72
open('dsa_test','w').write(test_ssh.privateDSA_openssh)
73
os.chmod('dsa_test', 33152)
74
open('kh_test','w').write('127.0.0.1 '+test_ssh.publicRSA_openssh)
76
def startServer(self):
77
realm = FileTransferTestRealm(self.testDir)
78
p = portal.Portal(realm)
79
p.registerChecker(test_ssh.ConchTestPublicKeyChecker())
80
fac = test_ssh.ConchTestServerFactory()
82
self.server = reactor.listenTCP(0, fac, interface="127.0.0.1")
85
if not hasattr(self.server.factory, 'proto'):
86
return self._cbStopServer(None)
87
self.server.factory.proto.expectedLoseConnection = 1
88
d = defer.maybeDeferred(
89
self.server.factory.proto.transport.loseConnection)
90
d.addCallback(self._cbStopServer)
93
def _cbStopServer(self, ignored):
94
return defer.maybeDeferred(self.server.stopListening)
96
def tearDownClass(self):
97
for f in ['dsa_test.pub', 'dsa_test', 'kh_test']:
103
class TestOurServerCmdLineClient(test_process.SignalMixin, CFTPClientTestBase):
105
def setUpClass(self):
106
if hasattr(self, 'skip'):
108
test_process.SignalMixin.setUpClass(self)
109
CFTPClientTestBase.setUpClass(self)
112
CFTPClientTestBase.setUp(self)
115
cmds = ('-p %i -l testuser '
116
'--known-hosts kh_test '
117
'--user-authentications publickey '
118
'--host-key-algorithms ssh-rsa '
124
port = self.server.getHost().port
125
cmds = test_conch._makeArgs((cmds % port).split(), mod='cftp')
126
log.msg('running %s %s' % (sys.executable, cmds))
127
self.processProtocol = SFTPTestProcess()
129
env = os.environ.copy()
130
env['PYTHONPATH'] = os.pathsep.join(sys.path)
131
reactor.spawnProcess(self.processProtocol, sys.executable, cmds,
134
timeout = time.time() + 10
135
while (not self.processProtocol.buffer) and (time.time() < timeout):
137
if time.time() > timeout:
138
self.skip = "couldn't start process"
140
self.processProtocol.clearBuffer()
142
def tearDownClass(self):
143
if hasattr(self, 'skip'):
145
test_process.SignalMixin.tearDownClass(self)
146
CFTPClientTestBase.tearDownClass(self)
149
d = self.stopServer()
150
d.addCallback(self._killProcess)
153
def _killProcess(self, ignored):
155
self.processProtocol.transport.signalProcess('KILL')
156
except error.ProcessExitedAlready:
159
def _getCmdResult(self, cmd):
160
self.processProtocol.clearBuffer()
161
self.processProtocol.transport.write(cmd+'\n')
162
timeout = time.time() + 10
163
while (self.processProtocol.buffer.find('cftp> ') == -1) and (time.time() < timeout):
165
self.failIf(time.time() > timeout, "timeout")
166
if self.processProtocol.buffer.startswith('cftp> '):
167
self.processProtocol.buffer = self.processProtocol.buffer[6:]
168
return self.processProtocol.buffer[:-6].strip()
171
homeDir = os.path.join(os.getcwd(), self.testDir)
172
pwdRes = self._getCmdResult('pwd')
173
lpwdRes = self._getCmdResult('lpwd')
174
cdRes = self._getCmdResult('cd testDirectory')
175
self._getCmdResult('cd ..')
176
pwd2Res = self._getCmdResult('pwd')
177
self.failUnlessEqual(pwdRes, homeDir)
178
self.failUnlessEqual(lpwdRes, os.getcwd())
179
self.failUnlessEqual(cdRes, '')
180
self.failUnlessEqual(pwd2Res, pwdRes)
182
def testChAttrs(self):
183
lsRes = self._getCmdResult('ls -l testfile1')
184
self.failUnless(lsRes.startswith('-rw-r--r--'), lsRes)
185
self.failIf(self._getCmdResult('chmod 0 testfile1'))
186
lsRes = self._getCmdResult('ls -l testfile1')
187
self.failUnless(lsRes.startswith('----------'), lsRes)
188
self.failIf(self._getCmdResult('chmod 644 testfile1'))
193
lsRes = self._getCmdResult('ls').split('\n')
194
self.failUnlessEqual(lsRes, ['testDirectory', 'testRemoveFile', \
195
'testRenameFile', 'testfile1'])
196
lsRes = self._getCmdResult(
197
'ls ../' + os.path.basename(self.testDir)).split('\n')
198
self.failUnlessEqual(lsRes, ['testDirectory', 'testRemoveFile', \
199
'testRenameFile', 'testfile1'])
200
lsRes = self._getCmdResult('ls *File').split('\n')
201
self.failUnlessEqual(lsRes, ['testRemoveFile', 'testRenameFile'])
202
lsRes = self._getCmdResult('ls -a *File').split('\n')
203
self.failUnlessEqual(lsRes, ['.testHiddenFile', 'testRemoveFile', 'testRenameFile'])
204
lsRes = self._getCmdResult('ls -l testDirectory')
206
# XXX test lls in a way that doesn't depend on local semantics
209
helpRes = self._getCmdResult('?')
210
self.failUnlessEqual(helpRes, cftp.StdioClient(None).cmd_HELP('').strip())
212
def _failUnlessFilesEqual(self, name1, name2, msg=None):
213
f1 = file(name1).read()
214
f2 = file(name2).read()
215
self.failUnlessEqual(f1, f2, msg)
218
getRes = self._getCmdResult(
219
'get testfile1 "%s/test file2"' % (self.testDir,))
220
self._failUnlessFilesEqual(
221
self.testDir + '/testfile1',
222
self.testDir + '/test file2', "get failed")
224
getRes.endswith("Transferred %s/%s/testfile1 to %s/test file2"
225
% (os.getcwd(), self.testDir, self.testDir)))
226
self.failIf(self._getCmdResult('rm "test file2"'))
227
self.failIf(os.path.exists(self.testDir + '/test file2'))
229
def testWildcardGet(self):
230
getRes = self._getCmdResult('get testR*')
231
self._failUnlessFilesEqual(
232
self.testDir + '/testRemoveFile',
233
'testRemoveFile', 'testRemoveFile get failed')
234
self._failUnlessFilesEqual(
235
self.testDir + '/testRenameFile',
236
'testRenameFile', 'testRenameFile get failed')
239
putRes = self._getCmdResult(
240
'put %s/testfile1 "test\\"file2"' % (self.testDir,))
241
f1 = file(self.testDir + '/testfile1').read()
242
f2 = file(self.testDir + '/test"file2').read()
243
self.failUnlessEqual(f1, f2, "put failed")
245
putRes.endswith('Transferred %s/testfile1 to %s/%s/test"file2'
246
% (self.testDir, os.getcwd(), self.testDir)))
247
self.failIf(self._getCmdResult('rm "test\\"file2"'))
248
self.failIf(os.path.exists(self.testDir + '/test"file2'))
250
def testWildcardPut(self):
251
self.failIf(self._getCmdResult('cd ..'))
252
getRes = self._getCmdResult('put %s/testR*' % (self.testDir,))
253
self._failUnlessFilesEqual(
254
self.testDir + '/testRemoveFile',
255
self.testDir + '/../testRemoveFile', 'testRemoveFile get failed')
256
self._failUnlessFilesEqual(
257
self.testDir + '/testRenameFile',
258
self.testDir + '/../testRenameFile', 'testRenameFile get failed')
259
self.failIf(self._getCmdResult('cd ' + os.path.basename(self.testDir)))
262
linkRes = self._getCmdResult('ln testLink testfile1')
264
lslRes = self._getCmdResult('ls -l testLink')
266
self.failUnless(lslRes.startswith('l'), 'link failed')
267
self.failIf(self._getCmdResult('rm testLink'))
269
def testDirectory(self):
270
self.failIf(self._getCmdResult('mkdir testMakeDirectory'))
271
lslRes = self._getCmdResult('ls -l testMakeDirector?')
272
self.failUnless(lslRes.startswith('d'), lslRes)
273
self.failIf(self._getCmdResult('rmdir testMakeDirectory'))
274
self.failIf(self._getCmdResult(
275
'lmkdir %s/testLocalDirectory' % (self.testDir,)))
276
self.failIf(self._getCmdResult('rmdir testLocalDirectory'))
278
def testRename(self):
279
self.failIf(self._getCmdResult('rename testfile1 testfile2'))
280
lsRes = self._getCmdResult('ls testfile?').split('\n')
281
self.failUnlessEqual(lsRes, ['testfile2'])
282
self.failIf(self._getCmdResult('rename testfile2 testfile1'))
284
def testCommand(self):
285
cmdRes = self._getCmdResult('!echo hello')
286
self.failUnlessEqual(cmdRes, 'hello')
288
class TestOurServerBatchFile(test_process.SignalMixin, CFTPClientTestBase):
290
def setUpClass(self):
291
test_process.SignalMixin.setUpClass(self)
292
CFTPClientTestBase.setUpClass(self)
295
CFTPClientTestBase.setUp(self)
299
CFTPClientTestBase.tearDown(self)
300
return self.stopServer()
302
def tearDownClass(self):
303
test_process.SignalMixin.tearDownClass(self)
304
CFTPClientTestBase.tearDownClass(self)
306
def _getBatchOutput(self, f):
307
fn = tempfile.mktemp()
308
open(fn, 'w').write(f)
310
port = self.server.getHost().port
311
cmds = ('-p %i -l testuser '
312
'--known-hosts kh_test '
313
'--user-authentications publickey '
314
'--host-key-algorithms ssh-rsa '
318
'-v -b %s 127.0.0.1') % (port, fn)
319
cmds = test_conch._makeArgs(cmds.split(), mod='cftp')[1:]
320
log.msg('running %s %s' % (sys.executable, cmds))
321
env = os.environ.copy()
322
env['PYTHONPATH'] = os.pathsep.join(sys.path)
324
self.server.factory.expectedLoseConnection = 1
326
d = getProcessOutputAndValue(sys.executable, cmds, env=env)
332
d.addCallback(lambda res: res[0])
337
def testBatchFile(self):
338
"""Test whether batch file function of cftp ('cftp -b batchfile').
339
This works by treating the file as a list of commands to be run.
345
def _cbCheckResult(res):
346
res = res.split('\n')
347
log.msg('RES %s' % str(res))
348
self.failUnless(res[1].find(self.testDir) != -1, repr(res))
349
self.failUnlessEqual(res[3:-2], ['testDirectory', 'testRemoveFile',
350
'testRenameFile', 'testfile1'])
352
d = self._getBatchOutput(cmds)
353
d.addCallback(_cbCheckResult)
357
"""Test that an error in the batch file stops running the batch.
359
cmds = """chown 0 missingFile
363
def _cbCheckResult(res):
364
self.failIf(res.find(self.testDir) != -1)
366
d = self._getBatchOutput(cmds)
367
d.addCallback(_cbCheckResult)
370
def testIgnoredError(self):
371
"""Test that a minus sign '-' at the front of a line ignores
374
cmds = """-chown 0 missingFile
378
def _cbCheckResult(res):
379
self.failIf(res.find(self.testDir) == -1)
381
d = self._getBatchOutput(cmds)
382
d.addCallback(_cbCheckResult)
385
class TestOurServerUnixClient(test_process.SignalMixin, CFTPClientTestBase):
387
def setUpClass(self):
388
if hasattr(self, 'skip'):
390
test_process.SignalMixin.setUpClass(self)
391
CFTPClientTestBase.setUpClass(self)
394
CFTPClientTestBase.setUp(self)
396
cmd1 = ('-p %i -l testuser '
397
'--known-hosts kh_test '
398
'--host-key-algorithms ssh-rsa '
404
port = self.server.getHost().port
405
cmds1 = (cmd1 % port).split()
406
o = options.ConchOptions()
410
o.parseOptions(cmds1)
411
vhk = default.verifyHostKey
412
self.conn = conn = test_conch.SSHTestConnectionForUnix(None)
413
uao = default.SSHUserAuthClient(o['user'], o, conn)
414
return connect.connect(o['host'], int(o['port']), o, vhk, uao)
416
def tearDownClass(self):
417
test_process.SignalMixin.tearDownClass(self)
418
CFTPClientTestBase.tearDownClass(self)
421
d = defer.maybeDeferred(self.conn.transport.loseConnection)
422
d.addCallback(lambda x : self.stopServer())
425
def _getBatchOutput(self, f):
426
fn = tempfile.mktemp()
427
open(fn, 'w').write(f)
428
port = self.server.getHost().port
429
cmds = ('-p %i -l testuser '
432
'-v -b %s 127.0.0.1') % (port, fn)
433
cmds = test_conch._makeArgs(cmds.split(), mod='cftp')[1:]
434
log.msg('running %s %s' % (sys.executable, cmds))
435
env = os.environ.copy()
436
env['PYTHONPATH'] = os.pathsep.join(sys.path)
438
self.server.factory.expectedLoseConnection = 1
440
d = getProcessOutputAndValue(sys.executable, cmds, env=env)
446
d.addCallback(lambda res: res[0])
451
def testBatchFile(self):
452
"""Test that the client works even over a UNIX connection.
457
d = self._getBatchOutput(cmds)
459
lambda res : self.failIf(res.find(self.testDir) == -1,
460
"%s not in %r" % (self.testDir, res)))
464
if not unix or not Crypto or not interfaces.IReactorProcess(reactor, None):
465
TestOurServerCmdLineClient.skip = "don't run w/o spawnprocess or PyCrypto"
466
TestOurServerBatchFile.skip = "don't run w/o spawnProcess or PyCrypto"
467
TestOurServerUnixClient.skip = "don't run w/o spawnProcess or PyCrypto"