~landscape/zope3/newer-from-ztk

« back to all changes in this revision

Viewing changes to src/twisted/conch/test/test_cftp.py

  • Committer: Thomas Hervé
  • Date: 2009-07-08 13:52:04 UTC
  • Revision ID: thomas@canonical.com-20090708135204-df5eesrthifpylf8
Remove twisted copy

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# -*- test-case-name: twisted.conch.test.test_cftp -*-
2
 
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
 
# See LICENSE file for details.
4
 
 
5
 
import sys
6
 
 
7
 
try:
8
 
    from twisted.conch import unix
9
 
    from twisted.conch.scripts import cftp
10
 
    from twisted.conch.client import connect, default, options
11
 
except ImportError:
12
 
    unix = None
13
 
    try:
14
 
        del sys.modules['twisted.conch.unix'] # remove the bad import
15
 
    except KeyError:
16
 
        # In Python 2.4, the bad import has already been cleaned up for us.
17
 
        pass
18
 
 
19
 
try:
20
 
    import Crypto
21
 
except ImportError:
22
 
    Crypto = None
23
 
 
24
 
from twisted.cred import portal
25
 
from twisted.internet import reactor, protocol, interfaces, defer, error
26
 
from twisted.internet.utils import getProcessOutputAndValue
27
 
from twisted.python import log
28
 
from twisted.test import test_process
29
 
 
30
 
import test_ssh, test_conch
31
 
from test_filetransfer import SFTPTestBase, FileTransferTestAvatar
32
 
import sys, os, time, tempfile
33
 
 
34
 
class FileTransferTestRealm:
35
 
 
36
 
    def __init__(self, testDir):
37
 
        self.testDir = testDir
38
 
 
39
 
    def requestAvatar(self, avatarID, mind, *interfaces):
40
 
        a = FileTransferTestAvatar(self.testDir)
41
 
        return interfaces[0], a, lambda: None
42
 
 
43
 
 
44
 
class SFTPTestProcess(protocol.ProcessProtocol):
45
 
 
46
 
    def __init__(self):
47
 
        self.clearBuffer()
48
 
        self.connected = 0
49
 
 
50
 
    def connectionMade(self):
51
 
        self.connected = 1
52
 
 
53
 
    def clearBuffer(self):
54
 
        self.buffer = ''
55
 
 
56
 
    def outReceived(self, data):
57
 
        log.msg('got %s' % data)
58
 
        self.buffer += data
59
 
 
60
 
    def errReceived(self, data):
61
 
        log.msg('err: %s' % data)
62
 
 
63
 
    def connectionLost(self, reason):
64
 
        self.connected = 0
65
 
    def getBuffer(self):
66
 
        return self.buffer
67
 
 
68
 
class CFTPClientTestBase(SFTPTestBase):
69
 
 
70
 
    def setUpClass(self):
71
 
        open('dsa_test.pub','w').write(test_ssh.publicDSA_openssh)
72
 
        open('dsa_test','w').write(test_ssh.privateDSA_openssh)
73
 
        os.chmod('dsa_test', 33152)
74
 
        open('kh_test','w').write('127.0.0.1 '+test_ssh.publicRSA_openssh)
75
 
 
76
 
    def startServer(self):
77
 
        realm = FileTransferTestRealm(self.testDir)
78
 
        p = portal.Portal(realm)
79
 
        p.registerChecker(test_ssh.ConchTestPublicKeyChecker())
80
 
        fac = test_ssh.ConchTestServerFactory()
81
 
        fac.portal = p
82
 
        self.server = reactor.listenTCP(0, fac, interface="127.0.0.1")
83
 
 
84
 
    def stopServer(self):
85
 
        if not hasattr(self.server.factory, 'proto'):
86
 
            return self._cbStopServer(None)
87
 
        self.server.factory.proto.expectedLoseConnection = 1
88
 
        d = defer.maybeDeferred(
89
 
            self.server.factory.proto.transport.loseConnection)
90
 
        d.addCallback(self._cbStopServer)
91
 
        return d
92
 
 
93
 
    def _cbStopServer(self, ignored):
94
 
        return defer.maybeDeferred(self.server.stopListening)
95
 
 
96
 
    def tearDownClass(self):
97
 
        for f in ['dsa_test.pub', 'dsa_test', 'kh_test']:
98
 
            try:
99
 
                os.remove(f)
100
 
            except:
101
 
                pass
102
 
 
103
 
class TestOurServerCmdLineClient(test_process.SignalMixin, CFTPClientTestBase):
104
 
 
105
 
    def setUpClass(self):
106
 
        if hasattr(self, 'skip'):
107
 
           return
108
 
        test_process.SignalMixin.setUpClass(self)
109
 
        CFTPClientTestBase.setUpClass(self)
110
 
 
111
 
    def setUp(self):
112
 
        CFTPClientTestBase.setUp(self)
113
 
 
114
 
        self.startServer()
115
 
        cmds = ('-p %i -l testuser '
116
 
               '--known-hosts kh_test '
117
 
               '--user-authentications publickey '
118
 
               '--host-key-algorithms ssh-rsa '
119
 
               '-K direct '
120
 
               '-i dsa_test '
121
 
               '-a --nocache '
122
 
               '-v '
123
 
               '127.0.0.1')
124
 
        port = self.server.getHost().port
125
 
        cmds = test_conch._makeArgs((cmds % port).split(), mod='cftp')
126
 
        log.msg('running %s %s' % (sys.executable, cmds))
127
 
        self.processProtocol = SFTPTestProcess()
128
 
 
129
 
        env = os.environ.copy()
130
 
        env['PYTHONPATH'] = os.pathsep.join(sys.path)
131
 
        reactor.spawnProcess(self.processProtocol, sys.executable, cmds,
132
 
                             env=env)
133
 
 
134
 
        timeout = time.time() + 10
135
 
        while (not self.processProtocol.buffer) and (time.time() < timeout):
136
 
            reactor.iterate(0.1)
137
 
        if time.time() > timeout:
138
 
            self.skip = "couldn't start process"
139
 
        else:
140
 
            self.processProtocol.clearBuffer()
141
 
 
142
 
    def tearDownClass(self):
143
 
        if hasattr(self, 'skip'):
144
 
            return
145
 
        test_process.SignalMixin.tearDownClass(self)
146
 
        CFTPClientTestBase.tearDownClass(self)
147
 
 
148
 
    def tearDown(self):
149
 
        d = self.stopServer()
150
 
        d.addCallback(self._killProcess)
151
 
        return d
152
 
 
153
 
    def _killProcess(self, ignored):
154
 
        try:
155
 
            self.processProtocol.transport.signalProcess('KILL')
156
 
        except error.ProcessExitedAlready:
157
 
            pass
158
 
 
159
 
    def _getCmdResult(self, cmd):
160
 
        self.processProtocol.clearBuffer()
161
 
        self.processProtocol.transport.write(cmd+'\n')
162
 
        timeout = time.time() + 10
163
 
        while (self.processProtocol.buffer.find('cftp> ') == -1) and (time.time() < timeout):
164
 
            reactor.iterate(0.1)
165
 
        self.failIf(time.time() > timeout, "timeout")
166
 
        if self.processProtocol.buffer.startswith('cftp> '):
167
 
            self.processProtocol.buffer = self.processProtocol.buffer[6:]
168
 
        return self.processProtocol.buffer[:-6].strip()
169
 
 
170
 
    def testCdPwd(self):
171
 
        homeDir = os.path.join(os.getcwd(), self.testDir)
172
 
        pwdRes = self._getCmdResult('pwd')
173
 
        lpwdRes = self._getCmdResult('lpwd')
174
 
        cdRes = self._getCmdResult('cd testDirectory')
175
 
        self._getCmdResult('cd ..')
176
 
        pwd2Res = self._getCmdResult('pwd')
177
 
        self.failUnlessEqual(pwdRes, homeDir)
178
 
        self.failUnlessEqual(lpwdRes, os.getcwd())
179
 
        self.failUnlessEqual(cdRes, '')
180
 
        self.failUnlessEqual(pwd2Res, pwdRes)
181
 
 
182
 
    def testChAttrs(self):
183
 
       lsRes = self._getCmdResult('ls -l testfile1')
184
 
       self.failUnless(lsRes.startswith('-rw-r--r--'), lsRes)
185
 
       self.failIf(self._getCmdResult('chmod 0 testfile1'))
186
 
       lsRes = self._getCmdResult('ls -l testfile1')
187
 
       self.failUnless(lsRes.startswith('----------'), lsRes)
188
 
       self.failIf(self._getCmdResult('chmod 644 testfile1'))
189
 
       log.flushErrors()
190
 
       # XXX test chgrp/own
191
 
 
192
 
    def testList(self):
193
 
        lsRes = self._getCmdResult('ls').split('\n')
194
 
        self.failUnlessEqual(lsRes, ['testDirectory', 'testRemoveFile', \
195
 
                'testRenameFile', 'testfile1'])
196
 
        lsRes = self._getCmdResult(
197
 
                'ls ../' + os.path.basename(self.testDir)).split('\n')
198
 
        self.failUnlessEqual(lsRes, ['testDirectory', 'testRemoveFile', \
199
 
                'testRenameFile', 'testfile1'])
200
 
        lsRes = self._getCmdResult('ls *File').split('\n')
201
 
        self.failUnlessEqual(lsRes, ['testRemoveFile', 'testRenameFile'])
202
 
        lsRes = self._getCmdResult('ls -a *File').split('\n')
203
 
        self.failUnlessEqual(lsRes, ['.testHiddenFile', 'testRemoveFile', 'testRenameFile'])
204
 
        lsRes = self._getCmdResult('ls -l testDirectory')
205
 
        self.failIf(lsRes)
206
 
        # XXX test lls in a way that doesn't depend on local semantics
207
 
 
208
 
    def testHelp(self):
209
 
        helpRes = self._getCmdResult('?')
210
 
        self.failUnlessEqual(helpRes, cftp.StdioClient(None).cmd_HELP('').strip())
211
 
 
212
 
    def _failUnlessFilesEqual(self, name1, name2, msg=None):
213
 
        f1 = file(name1).read()
214
 
        f2 = file(name2).read()
215
 
        self.failUnlessEqual(f1, f2, msg)
216
 
 
217
 
    def testGet(self):
218
 
        getRes = self._getCmdResult(
219
 
            'get testfile1 "%s/test file2"' % (self.testDir,))
220
 
        self._failUnlessFilesEqual(
221
 
            self.testDir + '/testfile1',
222
 
            self.testDir + '/test file2', "get failed")
223
 
        self.failUnless(
224
 
            getRes.endswith("Transferred %s/%s/testfile1 to %s/test file2"
225
 
                            % (os.getcwd(), self.testDir, self.testDir)))
226
 
        self.failIf(self._getCmdResult('rm "test file2"'))
227
 
        self.failIf(os.path.exists(self.testDir + '/test file2'))
228
 
 
229
 
    def testWildcardGet(self):
230
 
        getRes = self._getCmdResult('get testR*')
231
 
        self._failUnlessFilesEqual(
232
 
            self.testDir + '/testRemoveFile',
233
 
            'testRemoveFile', 'testRemoveFile get failed')
234
 
        self._failUnlessFilesEqual(
235
 
            self.testDir + '/testRenameFile',
236
 
            'testRenameFile', 'testRenameFile get failed')
237
 
 
238
 
    def testPut(self):
239
 
        putRes = self._getCmdResult(
240
 
            'put %s/testfile1 "test\\"file2"' % (self.testDir,))
241
 
        f1 = file(self.testDir + '/testfile1').read()
242
 
        f2 = file(self.testDir + '/test"file2').read()
243
 
        self.failUnlessEqual(f1, f2, "put failed")
244
 
        self.failUnless(
245
 
            putRes.endswith('Transferred %s/testfile1 to %s/%s/test"file2'
246
 
                            % (self.testDir, os.getcwd(), self.testDir)))
247
 
        self.failIf(self._getCmdResult('rm "test\\"file2"'))
248
 
        self.failIf(os.path.exists(self.testDir + '/test"file2'))
249
 
 
250
 
    def testWildcardPut(self):
251
 
        self.failIf(self._getCmdResult('cd ..'))
252
 
        getRes = self._getCmdResult('put %s/testR*' % (self.testDir,))
253
 
        self._failUnlessFilesEqual(
254
 
            self.testDir + '/testRemoveFile',
255
 
            self.testDir + '/../testRemoveFile', 'testRemoveFile get failed')
256
 
        self._failUnlessFilesEqual(
257
 
            self.testDir + '/testRenameFile',
258
 
            self.testDir + '/../testRenameFile', 'testRenameFile get failed')
259
 
        self.failIf(self._getCmdResult('cd ' + os.path.basename(self.testDir)))
260
 
 
261
 
    def testLink(self):
262
 
        linkRes = self._getCmdResult('ln testLink testfile1')
263
 
        self.failIf(linkRes)
264
 
        lslRes = self._getCmdResult('ls -l testLink')
265
 
        log.flushErrors()
266
 
        self.failUnless(lslRes.startswith('l'), 'link failed')
267
 
        self.failIf(self._getCmdResult('rm testLink'))
268
 
 
269
 
    def testDirectory(self):
270
 
        self.failIf(self._getCmdResult('mkdir testMakeDirectory'))
271
 
        lslRes = self._getCmdResult('ls -l testMakeDirector?')
272
 
        self.failUnless(lslRes.startswith('d'), lslRes)
273
 
        self.failIf(self._getCmdResult('rmdir testMakeDirectory'))
274
 
        self.failIf(self._getCmdResult(
275
 
            'lmkdir %s/testLocalDirectory' % (self.testDir,)))
276
 
        self.failIf(self._getCmdResult('rmdir testLocalDirectory'))
277
 
 
278
 
    def testRename(self):
279
 
        self.failIf(self._getCmdResult('rename testfile1 testfile2'))
280
 
        lsRes = self._getCmdResult('ls testfile?').split('\n')
281
 
        self.failUnlessEqual(lsRes, ['testfile2'])
282
 
        self.failIf(self._getCmdResult('rename testfile2 testfile1'))
283
 
 
284
 
    def testCommand(self):
285
 
        cmdRes = self._getCmdResult('!echo hello')
286
 
        self.failUnlessEqual(cmdRes, 'hello')
287
 
 
288
 
class TestOurServerBatchFile(test_process.SignalMixin, CFTPClientTestBase):
289
 
 
290
 
    def setUpClass(self):
291
 
        test_process.SignalMixin.setUpClass(self)
292
 
        CFTPClientTestBase.setUpClass(self)
293
 
 
294
 
    def setUp(self):
295
 
        CFTPClientTestBase.setUp(self)
296
 
        self.startServer()
297
 
 
298
 
    def tearDown(self):
299
 
        CFTPClientTestBase.tearDown(self)
300
 
        return self.stopServer()
301
 
 
302
 
    def tearDownClass(self):
303
 
        test_process.SignalMixin.tearDownClass(self)
304
 
        CFTPClientTestBase.tearDownClass(self)
305
 
 
306
 
    def _getBatchOutput(self, f):
307
 
        fn = tempfile.mktemp()
308
 
        open(fn, 'w').write(f)
309
 
        l = []
310
 
        port = self.server.getHost().port
311
 
        cmds = ('-p %i -l testuser '
312
 
                    '--known-hosts kh_test '
313
 
                    '--user-authentications publickey '
314
 
                    '--host-key-algorithms ssh-rsa '
315
 
                    '-K direct '
316
 
                    '-i dsa_test '
317
 
                    '-a --nocache '
318
 
                    '-v -b %s 127.0.0.1') % (port, fn)
319
 
        cmds = test_conch._makeArgs(cmds.split(), mod='cftp')[1:]
320
 
        log.msg('running %s %s' % (sys.executable, cmds))
321
 
        env = os.environ.copy()
322
 
        env['PYTHONPATH'] = os.pathsep.join(sys.path)
323
 
 
324
 
        self.server.factory.expectedLoseConnection = 1
325
 
 
326
 
        d = getProcessOutputAndValue(sys.executable, cmds, env=env)
327
 
 
328
 
        def _cleanup(res):
329
 
            os.remove(fn)
330
 
            return res
331
 
 
332
 
        d.addCallback(lambda res: res[0])
333
 
        d.addBoth(_cleanup)
334
 
 
335
 
        return d
336
 
 
337
 
    def testBatchFile(self):
338
 
        """Test whether batch file function of cftp ('cftp -b batchfile').
339
 
        This works by treating the file as a list of commands to be run.
340
 
        """
341
 
        cmds = """pwd
342
 
ls
343
 
exit
344
 
"""
345
 
        def _cbCheckResult(res):
346
 
            res = res.split('\n')
347
 
            log.msg('RES %s' % str(res))
348
 
            self.failUnless(res[1].find(self.testDir) != -1, repr(res))
349
 
            self.failUnlessEqual(res[3:-2], ['testDirectory', 'testRemoveFile',
350
 
                                             'testRenameFile', 'testfile1'])
351
 
 
352
 
        d = self._getBatchOutput(cmds)
353
 
        d.addCallback(_cbCheckResult)
354
 
        return d
355
 
 
356
 
    def testError(self):
357
 
        """Test that an error in the batch file stops running the batch.
358
 
        """
359
 
        cmds = """chown 0 missingFile
360
 
pwd
361
 
exit
362
 
"""
363
 
        def _cbCheckResult(res):
364
 
            self.failIf(res.find(self.testDir) != -1)
365
 
 
366
 
        d = self._getBatchOutput(cmds)
367
 
        d.addCallback(_cbCheckResult)
368
 
        return d
369
 
 
370
 
    def testIgnoredError(self):
371
 
        """Test that a minus sign '-' at the front of a line ignores
372
 
        any errors.
373
 
        """
374
 
        cmds = """-chown 0 missingFile
375
 
pwd
376
 
exit
377
 
"""
378
 
        def _cbCheckResult(res):
379
 
            self.failIf(res.find(self.testDir) == -1)
380
 
 
381
 
        d = self._getBatchOutput(cmds)
382
 
        d.addCallback(_cbCheckResult)
383
 
        return d
384
 
 
385
 
class TestOurServerUnixClient(test_process.SignalMixin, CFTPClientTestBase):
386
 
 
387
 
    def setUpClass(self):
388
 
        if hasattr(self, 'skip'):
389
 
            return
390
 
        test_process.SignalMixin.setUpClass(self)
391
 
        CFTPClientTestBase.setUpClass(self)
392
 
 
393
 
    def setUp(self):
394
 
        CFTPClientTestBase.setUp(self)
395
 
        self.startServer()
396
 
        cmd1 = ('-p %i -l testuser '
397
 
                '--known-hosts kh_test '
398
 
                '--host-key-algorithms ssh-rsa '
399
 
                '-a '
400
 
                '-K direct '
401
 
                '-i dsa_test '
402
 
                '127.0.0.1'
403
 
                )
404
 
        port = self.server.getHost().port
405
 
        cmds1 = (cmd1 % port).split()
406
 
        o = options.ConchOptions()
407
 
        def _(host, *args):
408
 
            o['host'] = host
409
 
        o.parseArgs = _
410
 
        o.parseOptions(cmds1)
411
 
        vhk = default.verifyHostKey
412
 
        self.conn = conn = test_conch.SSHTestConnectionForUnix(None)
413
 
        uao = default.SSHUserAuthClient(o['user'], o, conn)
414
 
        return connect.connect(o['host'], int(o['port']), o, vhk, uao)
415
 
 
416
 
    def tearDownClass(self):
417
 
        test_process.SignalMixin.tearDownClass(self)
418
 
        CFTPClientTestBase.tearDownClass(self)
419
 
 
420
 
    def tearDown(self):
421
 
        d = defer.maybeDeferred(self.conn.transport.loseConnection)
422
 
        d.addCallback(lambda x : self.stopServer())
423
 
        return d
424
 
 
425
 
    def _getBatchOutput(self, f):
426
 
        fn = tempfile.mktemp()
427
 
        open(fn, 'w').write(f)
428
 
        port = self.server.getHost().port
429
 
        cmds = ('-p %i -l testuser '
430
 
                    '-K unix '
431
 
                    '-a '
432
 
                    '-v -b %s 127.0.0.1') % (port, fn)
433
 
        cmds = test_conch._makeArgs(cmds.split(), mod='cftp')[1:]
434
 
        log.msg('running %s %s' % (sys.executable, cmds))
435
 
        env = os.environ.copy()
436
 
        env['PYTHONPATH'] = os.pathsep.join(sys.path)
437
 
 
438
 
        self.server.factory.expectedLoseConnection = 1
439
 
 
440
 
        d = getProcessOutputAndValue(sys.executable, cmds, env=env)
441
 
 
442
 
        def _cleanup(res):
443
 
            os.remove(fn)
444
 
            return res
445
 
 
446
 
        d.addCallback(lambda res: res[0])
447
 
        d.addBoth(_cleanup)
448
 
 
449
 
        return d
450
 
 
451
 
    def testBatchFile(self):
452
 
        """Test that the client works even over a UNIX connection.
453
 
        """
454
 
        cmds = """pwd
455
 
exit
456
 
"""
457
 
        d = self._getBatchOutput(cmds)
458
 
        d.addCallback(
459
 
            lambda res : self.failIf(res.find(self.testDir) == -1,
460
 
                                     "%s not in %r" % (self.testDir, res)))
461
 
        return d
462
 
 
463
 
 
464
 
if not unix or not Crypto or not interfaces.IReactorProcess(reactor, None):
465
 
    TestOurServerCmdLineClient.skip = "don't run w/o spawnprocess or PyCrypto"
466
 
    TestOurServerBatchFile.skip = "don't run w/o spawnProcess or PyCrypto"
467
 
    TestOurServerUnixClient.skip = "don't run w/o spawnProcess or PyCrypto"