~landscape/zope3/ztk-1.1.3

« back to all changes in this revision

Viewing changes to src/twisted/conch/test/test_cftp.py

  • Committer: Andreas Hasenack
  • Date: 2009-07-20 17:49:16 UTC
  • Revision ID: andreas@canonical.com-20090720174916-g2tn6qmietz2hn0u
Revert twisted removal, it breaks several dozen tests [trivial]

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.conch.test.test_cftp -*-
 
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
 
3
# See LICENSE file for details.
 
4
 
 
5
import sys
 
6
 
 
7
try:
 
8
    from twisted.conch import unix
 
9
    from twisted.conch.scripts import cftp
 
10
    from twisted.conch.client import connect, default, options
 
11
except ImportError:
 
12
    unix = None
 
13
    try:
 
14
        del sys.modules['twisted.conch.unix'] # remove the bad import
 
15
    except KeyError:
 
16
        # In Python 2.4, the bad import has already been cleaned up for us.
 
17
        pass
 
18
 
 
19
try:
 
20
    import Crypto
 
21
except ImportError:
 
22
    Crypto = None
 
23
 
 
24
from twisted.cred import portal
 
25
from twisted.internet import reactor, protocol, interfaces, defer, error
 
26
from twisted.internet.utils import getProcessOutputAndValue
 
27
from twisted.python import log
 
28
from twisted.test import test_process
 
29
 
 
30
import test_ssh, test_conch
 
31
from test_filetransfer import SFTPTestBase, FileTransferTestAvatar
 
32
import sys, os, time, tempfile
 
33
 
 
34
class FileTransferTestRealm:
 
35
 
 
36
    def __init__(self, testDir):
 
37
        self.testDir = testDir
 
38
 
 
39
    def requestAvatar(self, avatarID, mind, *interfaces):
 
40
        a = FileTransferTestAvatar(self.testDir)
 
41
        return interfaces[0], a, lambda: None
 
42
 
 
43
 
 
44
class SFTPTestProcess(protocol.ProcessProtocol):
 
45
 
 
46
    def __init__(self):
 
47
        self.clearBuffer()
 
48
        self.connected = 0
 
49
 
 
50
    def connectionMade(self):
 
51
        self.connected = 1
 
52
 
 
53
    def clearBuffer(self):
 
54
        self.buffer = ''
 
55
 
 
56
    def outReceived(self, data):
 
57
        log.msg('got %s' % data)
 
58
        self.buffer += data
 
59
 
 
60
    def errReceived(self, data):
 
61
        log.msg('err: %s' % data)
 
62
 
 
63
    def connectionLost(self, reason):
 
64
        self.connected = 0
 
65
    def getBuffer(self):
 
66
        return self.buffer
 
67
 
 
68
class CFTPClientTestBase(SFTPTestBase):
 
69
 
 
70
    def setUpClass(self):
 
71
        open('dsa_test.pub','w').write(test_ssh.publicDSA_openssh)
 
72
        open('dsa_test','w').write(test_ssh.privateDSA_openssh)
 
73
        os.chmod('dsa_test', 33152)
 
74
        open('kh_test','w').write('127.0.0.1 '+test_ssh.publicRSA_openssh)
 
75
 
 
76
    def startServer(self):
 
77
        realm = FileTransferTestRealm(self.testDir)
 
78
        p = portal.Portal(realm)
 
79
        p.registerChecker(test_ssh.ConchTestPublicKeyChecker())
 
80
        fac = test_ssh.ConchTestServerFactory()
 
81
        fac.portal = p
 
82
        self.server = reactor.listenTCP(0, fac, interface="127.0.0.1")
 
83
 
 
84
    def stopServer(self):
 
85
        if not hasattr(self.server.factory, 'proto'):
 
86
            return self._cbStopServer(None)
 
87
        self.server.factory.proto.expectedLoseConnection = 1
 
88
        d = defer.maybeDeferred(
 
89
            self.server.factory.proto.transport.loseConnection)
 
90
        d.addCallback(self._cbStopServer)
 
91
        return d
 
92
 
 
93
    def _cbStopServer(self, ignored):
 
94
        return defer.maybeDeferred(self.server.stopListening)
 
95
 
 
96
    def tearDownClass(self):
 
97
        for f in ['dsa_test.pub', 'dsa_test', 'kh_test']:
 
98
            try:
 
99
                os.remove(f)
 
100
            except:
 
101
                pass
 
102
 
 
103
class TestOurServerCmdLineClient(test_process.SignalMixin, CFTPClientTestBase):
 
104
 
 
105
    def setUpClass(self):
 
106
        if hasattr(self, 'skip'):
 
107
           return
 
108
        test_process.SignalMixin.setUpClass(self)
 
109
        CFTPClientTestBase.setUpClass(self)
 
110
 
 
111
    def setUp(self):
 
112
        CFTPClientTestBase.setUp(self)
 
113
 
 
114
        self.startServer()
 
115
        cmds = ('-p %i -l testuser '
 
116
               '--known-hosts kh_test '
 
117
               '--user-authentications publickey '
 
118
               '--host-key-algorithms ssh-rsa '
 
119
               '-K direct '
 
120
               '-i dsa_test '
 
121
               '-a --nocache '
 
122
               '-v '
 
123
               '127.0.0.1')
 
124
        port = self.server.getHost().port
 
125
        cmds = test_conch._makeArgs((cmds % port).split(), mod='cftp')
 
126
        log.msg('running %s %s' % (sys.executable, cmds))
 
127
        self.processProtocol = SFTPTestProcess()
 
128
 
 
129
        env = os.environ.copy()
 
130
        env['PYTHONPATH'] = os.pathsep.join(sys.path)
 
131
        reactor.spawnProcess(self.processProtocol, sys.executable, cmds,
 
132
                             env=env)
 
133
 
 
134
        timeout = time.time() + 10
 
135
        while (not self.processProtocol.buffer) and (time.time() < timeout):
 
136
            reactor.iterate(0.1)
 
137
        if time.time() > timeout:
 
138
            self.skip = "couldn't start process"
 
139
        else:
 
140
            self.processProtocol.clearBuffer()
 
141
 
 
142
    def tearDownClass(self):
 
143
        if hasattr(self, 'skip'):
 
144
            return
 
145
        test_process.SignalMixin.tearDownClass(self)
 
146
        CFTPClientTestBase.tearDownClass(self)
 
147
 
 
148
    def tearDown(self):
 
149
        d = self.stopServer()
 
150
        d.addCallback(self._killProcess)
 
151
        return d
 
152
 
 
153
    def _killProcess(self, ignored):
 
154
        try:
 
155
            self.processProtocol.transport.signalProcess('KILL')
 
156
        except error.ProcessExitedAlready:
 
157
            pass
 
158
 
 
159
    def _getCmdResult(self, cmd):
 
160
        self.processProtocol.clearBuffer()
 
161
        self.processProtocol.transport.write(cmd+'\n')
 
162
        timeout = time.time() + 10
 
163
        while (self.processProtocol.buffer.find('cftp> ') == -1) and (time.time() < timeout):
 
164
            reactor.iterate(0.1)
 
165
        self.failIf(time.time() > timeout, "timeout")
 
166
        if self.processProtocol.buffer.startswith('cftp> '):
 
167
            self.processProtocol.buffer = self.processProtocol.buffer[6:]
 
168
        return self.processProtocol.buffer[:-6].strip()
 
169
 
 
170
    def testCdPwd(self):
 
171
        homeDir = os.path.join(os.getcwd(), self.testDir)
 
172
        pwdRes = self._getCmdResult('pwd')
 
173
        lpwdRes = self._getCmdResult('lpwd')
 
174
        cdRes = self._getCmdResult('cd testDirectory')
 
175
        self._getCmdResult('cd ..')
 
176
        pwd2Res = self._getCmdResult('pwd')
 
177
        self.failUnlessEqual(pwdRes, homeDir)
 
178
        self.failUnlessEqual(lpwdRes, os.getcwd())
 
179
        self.failUnlessEqual(cdRes, '')
 
180
        self.failUnlessEqual(pwd2Res, pwdRes)
 
181
 
 
182
    def testChAttrs(self):
 
183
       lsRes = self._getCmdResult('ls -l testfile1')
 
184
       self.failUnless(lsRes.startswith('-rw-r--r--'), lsRes)
 
185
       self.failIf(self._getCmdResult('chmod 0 testfile1'))
 
186
       lsRes = self._getCmdResult('ls -l testfile1')
 
187
       self.failUnless(lsRes.startswith('----------'), lsRes)
 
188
       self.failIf(self._getCmdResult('chmod 644 testfile1'))
 
189
       log.flushErrors()
 
190
       # XXX test chgrp/own
 
191
 
 
192
    def testList(self):
 
193
        lsRes = self._getCmdResult('ls').split('\n')
 
194
        self.failUnlessEqual(lsRes, ['testDirectory', 'testRemoveFile', \
 
195
                'testRenameFile', 'testfile1'])
 
196
        lsRes = self._getCmdResult(
 
197
                'ls ../' + os.path.basename(self.testDir)).split('\n')
 
198
        self.failUnlessEqual(lsRes, ['testDirectory', 'testRemoveFile', \
 
199
                'testRenameFile', 'testfile1'])
 
200
        lsRes = self._getCmdResult('ls *File').split('\n')
 
201
        self.failUnlessEqual(lsRes, ['testRemoveFile', 'testRenameFile'])
 
202
        lsRes = self._getCmdResult('ls -a *File').split('\n')
 
203
        self.failUnlessEqual(lsRes, ['.testHiddenFile', 'testRemoveFile', 'testRenameFile'])
 
204
        lsRes = self._getCmdResult('ls -l testDirectory')
 
205
        self.failIf(lsRes)
 
206
        # XXX test lls in a way that doesn't depend on local semantics
 
207
 
 
208
    def testHelp(self):
 
209
        helpRes = self._getCmdResult('?')
 
210
        self.failUnlessEqual(helpRes, cftp.StdioClient(None).cmd_HELP('').strip())
 
211
 
 
212
    def _failUnlessFilesEqual(self, name1, name2, msg=None):
 
213
        f1 = file(name1).read()
 
214
        f2 = file(name2).read()
 
215
        self.failUnlessEqual(f1, f2, msg)
 
216
 
 
217
    def testGet(self):
 
218
        getRes = self._getCmdResult(
 
219
            'get testfile1 "%s/test file2"' % (self.testDir,))
 
220
        self._failUnlessFilesEqual(
 
221
            self.testDir + '/testfile1',
 
222
            self.testDir + '/test file2', "get failed")
 
223
        self.failUnless(
 
224
            getRes.endswith("Transferred %s/%s/testfile1 to %s/test file2"
 
225
                            % (os.getcwd(), self.testDir, self.testDir)))
 
226
        self.failIf(self._getCmdResult('rm "test file2"'))
 
227
        self.failIf(os.path.exists(self.testDir + '/test file2'))
 
228
 
 
229
    def testWildcardGet(self):
 
230
        getRes = self._getCmdResult('get testR*')
 
231
        self._failUnlessFilesEqual(
 
232
            self.testDir + '/testRemoveFile',
 
233
            'testRemoveFile', 'testRemoveFile get failed')
 
234
        self._failUnlessFilesEqual(
 
235
            self.testDir + '/testRenameFile',
 
236
            'testRenameFile', 'testRenameFile get failed')
 
237
 
 
238
    def testPut(self):
 
239
        putRes = self._getCmdResult(
 
240
            'put %s/testfile1 "test\\"file2"' % (self.testDir,))
 
241
        f1 = file(self.testDir + '/testfile1').read()
 
242
        f2 = file(self.testDir + '/test"file2').read()
 
243
        self.failUnlessEqual(f1, f2, "put failed")
 
244
        self.failUnless(
 
245
            putRes.endswith('Transferred %s/testfile1 to %s/%s/test"file2'
 
246
                            % (self.testDir, os.getcwd(), self.testDir)))
 
247
        self.failIf(self._getCmdResult('rm "test\\"file2"'))
 
248
        self.failIf(os.path.exists(self.testDir + '/test"file2'))
 
249
 
 
250
    def testWildcardPut(self):
 
251
        self.failIf(self._getCmdResult('cd ..'))
 
252
        getRes = self._getCmdResult('put %s/testR*' % (self.testDir,))
 
253
        self._failUnlessFilesEqual(
 
254
            self.testDir + '/testRemoveFile',
 
255
            self.testDir + '/../testRemoveFile', 'testRemoveFile get failed')
 
256
        self._failUnlessFilesEqual(
 
257
            self.testDir + '/testRenameFile',
 
258
            self.testDir + '/../testRenameFile', 'testRenameFile get failed')
 
259
        self.failIf(self._getCmdResult('cd ' + os.path.basename(self.testDir)))
 
260
 
 
261
    def testLink(self):
 
262
        linkRes = self._getCmdResult('ln testLink testfile1')
 
263
        self.failIf(linkRes)
 
264
        lslRes = self._getCmdResult('ls -l testLink')
 
265
        log.flushErrors()
 
266
        self.failUnless(lslRes.startswith('l'), 'link failed')
 
267
        self.failIf(self._getCmdResult('rm testLink'))
 
268
 
 
269
    def testDirectory(self):
 
270
        self.failIf(self._getCmdResult('mkdir testMakeDirectory'))
 
271
        lslRes = self._getCmdResult('ls -l testMakeDirector?')
 
272
        self.failUnless(lslRes.startswith('d'), lslRes)
 
273
        self.failIf(self._getCmdResult('rmdir testMakeDirectory'))
 
274
        self.failIf(self._getCmdResult(
 
275
            'lmkdir %s/testLocalDirectory' % (self.testDir,)))
 
276
        self.failIf(self._getCmdResult('rmdir testLocalDirectory'))
 
277
 
 
278
    def testRename(self):
 
279
        self.failIf(self._getCmdResult('rename testfile1 testfile2'))
 
280
        lsRes = self._getCmdResult('ls testfile?').split('\n')
 
281
        self.failUnlessEqual(lsRes, ['testfile2'])
 
282
        self.failIf(self._getCmdResult('rename testfile2 testfile1'))
 
283
 
 
284
    def testCommand(self):
 
285
        cmdRes = self._getCmdResult('!echo hello')
 
286
        self.failUnlessEqual(cmdRes, 'hello')
 
287
 
 
288
class TestOurServerBatchFile(test_process.SignalMixin, CFTPClientTestBase):
 
289
 
 
290
    def setUpClass(self):
 
291
        test_process.SignalMixin.setUpClass(self)
 
292
        CFTPClientTestBase.setUpClass(self)
 
293
 
 
294
    def setUp(self):
 
295
        CFTPClientTestBase.setUp(self)
 
296
        self.startServer()
 
297
 
 
298
    def tearDown(self):
 
299
        CFTPClientTestBase.tearDown(self)
 
300
        return self.stopServer()
 
301
 
 
302
    def tearDownClass(self):
 
303
        test_process.SignalMixin.tearDownClass(self)
 
304
        CFTPClientTestBase.tearDownClass(self)
 
305
 
 
306
    def _getBatchOutput(self, f):
 
307
        fn = tempfile.mktemp()
 
308
        open(fn, 'w').write(f)
 
309
        l = []
 
310
        port = self.server.getHost().port
 
311
        cmds = ('-p %i -l testuser '
 
312
                    '--known-hosts kh_test '
 
313
                    '--user-authentications publickey '
 
314
                    '--host-key-algorithms ssh-rsa '
 
315
                    '-K direct '
 
316
                    '-i dsa_test '
 
317
                    '-a --nocache '
 
318
                    '-v -b %s 127.0.0.1') % (port, fn)
 
319
        cmds = test_conch._makeArgs(cmds.split(), mod='cftp')[1:]
 
320
        log.msg('running %s %s' % (sys.executable, cmds))
 
321
        env = os.environ.copy()
 
322
        env['PYTHONPATH'] = os.pathsep.join(sys.path)
 
323
 
 
324
        self.server.factory.expectedLoseConnection = 1
 
325
 
 
326
        d = getProcessOutputAndValue(sys.executable, cmds, env=env)
 
327
 
 
328
        def _cleanup(res):
 
329
            os.remove(fn)
 
330
            return res
 
331
 
 
332
        d.addCallback(lambda res: res[0])
 
333
        d.addBoth(_cleanup)
 
334
 
 
335
        return d
 
336
 
 
337
    def testBatchFile(self):
 
338
        """Test whether batch file function of cftp ('cftp -b batchfile').
 
339
        This works by treating the file as a list of commands to be run.
 
340
        """
 
341
        cmds = """pwd
 
342
ls
 
343
exit
 
344
"""
 
345
        def _cbCheckResult(res):
 
346
            res = res.split('\n')
 
347
            log.msg('RES %s' % str(res))
 
348
            self.failUnless(res[1].find(self.testDir) != -1, repr(res))
 
349
            self.failUnlessEqual(res[3:-2], ['testDirectory', 'testRemoveFile',
 
350
                                             'testRenameFile', 'testfile1'])
 
351
 
 
352
        d = self._getBatchOutput(cmds)
 
353
        d.addCallback(_cbCheckResult)
 
354
        return d
 
355
 
 
356
    def testError(self):
 
357
        """Test that an error in the batch file stops running the batch.
 
358
        """
 
359
        cmds = """chown 0 missingFile
 
360
pwd
 
361
exit
 
362
"""
 
363
        def _cbCheckResult(res):
 
364
            self.failIf(res.find(self.testDir) != -1)
 
365
 
 
366
        d = self._getBatchOutput(cmds)
 
367
        d.addCallback(_cbCheckResult)
 
368
        return d
 
369
 
 
370
    def testIgnoredError(self):
 
371
        """Test that a minus sign '-' at the front of a line ignores
 
372
        any errors.
 
373
        """
 
374
        cmds = """-chown 0 missingFile
 
375
pwd
 
376
exit
 
377
"""
 
378
        def _cbCheckResult(res):
 
379
            self.failIf(res.find(self.testDir) == -1)
 
380
 
 
381
        d = self._getBatchOutput(cmds)
 
382
        d.addCallback(_cbCheckResult)
 
383
        return d
 
384
 
 
385
class TestOurServerUnixClient(test_process.SignalMixin, CFTPClientTestBase):
 
386
 
 
387
    def setUpClass(self):
 
388
        if hasattr(self, 'skip'):
 
389
            return
 
390
        test_process.SignalMixin.setUpClass(self)
 
391
        CFTPClientTestBase.setUpClass(self)
 
392
 
 
393
    def setUp(self):
 
394
        CFTPClientTestBase.setUp(self)
 
395
        self.startServer()
 
396
        cmd1 = ('-p %i -l testuser '
 
397
                '--known-hosts kh_test '
 
398
                '--host-key-algorithms ssh-rsa '
 
399
                '-a '
 
400
                '-K direct '
 
401
                '-i dsa_test '
 
402
                '127.0.0.1'
 
403
                )
 
404
        port = self.server.getHost().port
 
405
        cmds1 = (cmd1 % port).split()
 
406
        o = options.ConchOptions()
 
407
        def _(host, *args):
 
408
            o['host'] = host
 
409
        o.parseArgs = _
 
410
        o.parseOptions(cmds1)
 
411
        vhk = default.verifyHostKey
 
412
        self.conn = conn = test_conch.SSHTestConnectionForUnix(None)
 
413
        uao = default.SSHUserAuthClient(o['user'], o, conn)
 
414
        return connect.connect(o['host'], int(o['port']), o, vhk, uao)
 
415
 
 
416
    def tearDownClass(self):
 
417
        test_process.SignalMixin.tearDownClass(self)
 
418
        CFTPClientTestBase.tearDownClass(self)
 
419
 
 
420
    def tearDown(self):
 
421
        d = defer.maybeDeferred(self.conn.transport.loseConnection)
 
422
        d.addCallback(lambda x : self.stopServer())
 
423
        return d
 
424
 
 
425
    def _getBatchOutput(self, f):
 
426
        fn = tempfile.mktemp()
 
427
        open(fn, 'w').write(f)
 
428
        port = self.server.getHost().port
 
429
        cmds = ('-p %i -l testuser '
 
430
                    '-K unix '
 
431
                    '-a '
 
432
                    '-v -b %s 127.0.0.1') % (port, fn)
 
433
        cmds = test_conch._makeArgs(cmds.split(), mod='cftp')[1:]
 
434
        log.msg('running %s %s' % (sys.executable, cmds))
 
435
        env = os.environ.copy()
 
436
        env['PYTHONPATH'] = os.pathsep.join(sys.path)
 
437
 
 
438
        self.server.factory.expectedLoseConnection = 1
 
439
 
 
440
        d = getProcessOutputAndValue(sys.executable, cmds, env=env)
 
441
 
 
442
        def _cleanup(res):
 
443
            os.remove(fn)
 
444
            return res
 
445
 
 
446
        d.addCallback(lambda res: res[0])
 
447
        d.addBoth(_cleanup)
 
448
 
 
449
        return d
 
450
 
 
451
    def testBatchFile(self):
 
452
        """Test that the client works even over a UNIX connection.
 
453
        """
 
454
        cmds = """pwd
 
455
exit
 
456
"""
 
457
        d = self._getBatchOutput(cmds)
 
458
        d.addCallback(
 
459
            lambda res : self.failIf(res.find(self.testDir) == -1,
 
460
                                     "%s not in %r" % (self.testDir, res)))
 
461
        return d
 
462
 
 
463
 
 
464
if not unix or not Crypto or not interfaces.IReactorProcess(reactor, None):
 
465
    TestOurServerCmdLineClient.skip = "don't run w/o spawnprocess or PyCrypto"
 
466
    TestOurServerBatchFile.skip = "don't run w/o spawnProcess or PyCrypto"
 
467
    TestOurServerUnixClient.skip = "don't run w/o spawnProcess or PyCrypto"