~landscape/zope3/ztk-1.1.3

« back to all changes in this revision

Viewing changes to src/twisted/conch/test/test_filetransfer.py

  • Committer: Andreas Hasenack
  • Date: 2009-07-20 17:49:16 UTC
  • Revision ID: andreas@canonical.com-20090720174916-g2tn6qmietz2hn0u
Revert twisted removal, it breaks several dozen tests [trivial]

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.conch.test.test_filetransfer -*-
 
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
 
3
# See LICENSE file for details.
 
4
 
 
5
import os
 
6
import sys
 
7
 
 
8
from twisted.trial import unittest
 
9
try:
 
10
    from twisted.conch import unix
 
11
except ImportError:
 
12
    unix = None
 
13
    try:
 
14
        del sys.modules['twisted.conch.unix'] # remove the bad import
 
15
    except KeyError:
 
16
        # In Python 2.4, the bad import has already been cleaned up for us.
 
17
        # Hooray.
 
18
        pass
 
19
 
 
20
from twisted.conch import avatar
 
21
from twisted.conch.ssh import filetransfer, session
 
22
from twisted.internet import defer, reactor
 
23
from twisted.protocols import loopback
 
24
from twisted.python import components, log
 
25
 
 
26
 
 
27
class FileTransferTestAvatar(avatar.ConchUser):
 
28
 
 
29
    def __init__(self, homeDir):
 
30
        avatar.ConchUser.__init__(self)
 
31
        self.channelLookup['session'] = session.SSHSession
 
32
        self.subsystemLookup['sftp'] = filetransfer.FileTransferServer
 
33
        self.homeDir = homeDir
 
34
 
 
35
    def _runAsUser(self, f, *args, **kw):
 
36
        try:
 
37
            f = iter(f)
 
38
        except TypeError:
 
39
            f = [(f, args, kw)]
 
40
        for i in f:
 
41
            func = i[0]
 
42
            args = len(i)>1 and i[1] or ()
 
43
            kw = len(i)>2 and i[2] or {}
 
44
            r = func(*args, **kw)
 
45
        return r
 
46
 
 
47
    def getHomeDir(self):
 
48
        return os.path.join(os.getcwd(), self.homeDir)
 
49
 
 
50
class ConchSessionForTestAvatar:
 
51
 
 
52
    def __init__(self, avatar):
 
53
        self.avatar = avatar
 
54
 
 
55
if unix:
 
56
    if not hasattr(unix, 'SFTPServerForUnixConchUser'):
 
57
        # unix should either be a fully working module, or None.  I'm not sure
 
58
        # how this happens, but on win32 it does.  Try to cope.  --spiv.
 
59
        import warnings
 
60
        warnings.warn(("twisted.conch.unix imported %r, "
 
61
                       "but doesn't define SFTPServerForUnixConchUser'")
 
62
                      % (unix,))
 
63
        unix = None
 
64
    else:
 
65
        class FileTransferForTestAvatar(unix.SFTPServerForUnixConchUser):
 
66
 
 
67
            def gotVersion(self, version, otherExt):
 
68
                return {'conchTest' : 'ext data'}
 
69
 
 
70
            def extendedRequest(self, extName, extData):
 
71
                if extName == 'testExtendedRequest':
 
72
                    return 'bar'
 
73
                raise NotImplementedError
 
74
 
 
75
        components.registerAdapter(FileTransferForTestAvatar,
 
76
                                   FileTransferTestAvatar,
 
77
                                   filetransfer.ISFTPServer)
 
78
 
 
79
class SFTPTestBase(unittest.TestCase):
 
80
 
 
81
    def setUp(self):
 
82
        self.testDir = self.mktemp()
 
83
        # Give the testDir another level so we can safely "cd .." from it in
 
84
        # tests.
 
85
        self.testDir = os.path.join(self.testDir, 'extra')
 
86
        os.makedirs(os.path.join(self.testDir, 'testDirectory'))
 
87
 
 
88
        f = file(os.path.join(self.testDir, 'testfile1'),'w')
 
89
        f.write('a'*10+'b'*10)
 
90
        f.write(file('/dev/urandom').read(1024*64)) # random data
 
91
        os.chmod(os.path.join(self.testDir, 'testfile1'), 0644)
 
92
        file(os.path.join(self.testDir, 'testRemoveFile'), 'w').write('a')
 
93
        file(os.path.join(self.testDir, 'testRenameFile'), 'w').write('a')
 
94
        file(os.path.join(self.testDir, '.testHiddenFile'), 'w').write('a')
 
95
 
 
96
 
 
97
class TestOurServerOurClient(SFTPTestBase):
 
98
 
 
99
    if not unix:
 
100
        skip = "can't run on non-posix computers"
 
101
 
 
102
    def setUp(self):
 
103
        SFTPTestBase.setUp(self)
 
104
 
 
105
        self.avatar = FileTransferTestAvatar(self.testDir)
 
106
        self.server = filetransfer.FileTransferServer(avatar=self.avatar)
 
107
        clientTransport = loopback.LoopbackRelay(self.server)
 
108
 
 
109
        self.client = filetransfer.FileTransferClient()
 
110
        self._serverVersion = None
 
111
        self._extData = None
 
112
        def _(serverVersion, extData):
 
113
            self._serverVersion = serverVersion
 
114
            self._extData = extData
 
115
        self.client.gotServerVersion = _
 
116
        serverTransport = loopback.LoopbackRelay(self.client)
 
117
        self.client.makeConnection(clientTransport)
 
118
        self.server.makeConnection(serverTransport)
 
119
 
 
120
        self.clientTransport = clientTransport
 
121
        self.serverTransport = serverTransport
 
122
 
 
123
        self._emptyBuffers()
 
124
 
 
125
    def _emptyBuffers(self):
 
126
        while self.serverTransport.buffer or self.clientTransport.buffer:
 
127
            self.serverTransport.clearBuffer()
 
128
            self.clientTransport.clearBuffer()
 
129
 
 
130
    def _delayedEmptyBuffers(self):
 
131
        reactor.callLater(0.1, self._emptyBuffers)
 
132
 
 
133
    def testServerVersion(self):
 
134
        self.failUnlessEqual(self._serverVersion, 3)
 
135
        self.failUnlessEqual(self._extData, {'conchTest' : 'ext data'})
 
136
 
 
137
    def testOpenFileIO(self):
 
138
        d = self.client.openFile("testfile1", filetransfer.FXF_READ |
 
139
                                 filetransfer.FXF_WRITE, {})
 
140
        self._emptyBuffers()
 
141
 
 
142
        def _fileOpened(openFile):
 
143
            self.failUnlessEqual(openFile, filetransfer.ISFTPFile(openFile))
 
144
            d = _readChunk(openFile)
 
145
            d.addCallback(_writeChunk, openFile)
 
146
            return d
 
147
 
 
148
        def _readChunk(openFile):
 
149
            d = openFile.readChunk(0, 20)
 
150
            self._emptyBuffers()
 
151
            d.addCallback(self.failUnlessEqual, 'a'*10 + 'b'*10)
 
152
            return d
 
153
 
 
154
        def _writeChunk(_, openFile):
 
155
            d = openFile.writeChunk(20, 'c'*10)
 
156
            self._emptyBuffers()
 
157
            d.addCallback(_readChunk2, openFile)
 
158
            return d
 
159
 
 
160
        def _readChunk2(_, openFile):
 
161
            d = openFile.readChunk(0, 30)
 
162
            self._emptyBuffers()
 
163
            d.addCallback(self.failUnlessEqual, 'a'*10 + 'b'*10 + 'c'*10)
 
164
            return d
 
165
 
 
166
        d.addCallback(_fileOpened)
 
167
        return d
 
168
 
 
169
    def testClosedFileGetAttrs(self):
 
170
        d = self.client.openFile("testfile1", filetransfer.FXF_READ |
 
171
                                 filetransfer.FXF_WRITE, {})
 
172
        self._emptyBuffers()
 
173
 
 
174
        def _getAttrs(_, openFile):
 
175
            d = openFile.getAttrs()
 
176
            self._emptyBuffers()
 
177
            return d
 
178
 
 
179
        def _err(f):
 
180
            log.flushErrors()
 
181
            return f
 
182
 
 
183
        def _close(openFile):
 
184
            d = openFile.close()
 
185
            self._emptyBuffers()
 
186
            d.addCallback(_getAttrs, openFile)
 
187
            d.addErrback(_err)
 
188
            return self.assertFailure(d, filetransfer.SFTPError)
 
189
 
 
190
        d.addCallback(_close)
 
191
        return d
 
192
 
 
193
    def testOpenFileAttributes(self):
 
194
        d = self.client.openFile("testfile1", filetransfer.FXF_READ |
 
195
                                 filetransfer.FXF_WRITE, {})
 
196
        self._emptyBuffers()
 
197
 
 
198
        def _getAttrs(openFile):
 
199
            d = openFile.getAttrs()
 
200
            self._emptyBuffers()
 
201
            d.addCallback(_getAttrs2)
 
202
            return d
 
203
 
 
204
        def _getAttrs2(attrs1):
 
205
            d = self.client.getAttrs('testfile1')
 
206
            self._emptyBuffers()
 
207
            d.addCallback(self.failUnlessEqual, attrs1)
 
208
            return d
 
209
 
 
210
        return d.addCallback(_getAttrs)
 
211
 
 
212
 
 
213
    def testOpenFileSetAttrs(self):
 
214
        # XXX test setAttrs
 
215
        # Ok, how about this for a start?  It caught a bug :)  -- spiv.
 
216
        d = self.client.openFile("testfile1", filetransfer.FXF_READ |
 
217
                                 filetransfer.FXF_WRITE, {})
 
218
        self._emptyBuffers()
 
219
 
 
220
        def _getAttrs(openFile):
 
221
            d = openFile.getAttrs()
 
222
            self._emptyBuffers()
 
223
            d.addCallback(_setAttrs)
 
224
            return d
 
225
 
 
226
        def _setAttrs(attrs):
 
227
            attrs['atime'] = 0
 
228
            d = self.client.setAttrs('testfile1', attrs)
 
229
            self._emptyBuffers()
 
230
            d.addCallback(_getAttrs2)
 
231
            d.addCallback(self.failUnlessEqual, attrs)
 
232
            return d
 
233
 
 
234
        def _getAttrs2(_):
 
235
            d = self.client.getAttrs('testfile1')
 
236
            self._emptyBuffers()
 
237
            return d
 
238
 
 
239
        d.addCallback(_getAttrs)
 
240
        return d
 
241
 
 
242
    def testRemoveFile(self):
 
243
        d = self.client.getAttrs("testRemoveFile")
 
244
        self._emptyBuffers()
 
245
        def _removeFile(ignored):
 
246
            d = self.client.removeFile("testRemoveFile")
 
247
            self._emptyBuffers()
 
248
            return d
 
249
        d.addCallback(_removeFile)
 
250
        d.addCallback(_removeFile)
 
251
        return self.assertFailure(d, filetransfer.SFTPError)
 
252
 
 
253
    def testRenameFile(self):
 
254
        d = self.client.getAttrs("testRenameFile")
 
255
        self._emptyBuffers()
 
256
        def _rename(attrs):
 
257
            d = self.client.renameFile("testRenameFile", "testRenamedFile")
 
258
            self._emptyBuffers()
 
259
            d.addCallback(_testRenamed, attrs)
 
260
            return d
 
261
        def _testRenamed(_, attrs):
 
262
            d = self.client.getAttrs("testRenamedFile")
 
263
            self._emptyBuffers()
 
264
            d.addCallback(self.failUnlessEqual, attrs)
 
265
        return d.addCallback(_rename)
 
266
 
 
267
    def testDirectoryBad(self):
 
268
        d = self.client.getAttrs("testMakeDirectory")
 
269
        self._emptyBuffers()
 
270
        return self.assertFailure(d, filetransfer.SFTPError)
 
271
 
 
272
    def testDirectoryCreation(self):
 
273
        d = self.client.makeDirectory("testMakeDirectory", {})
 
274
        self._emptyBuffers()
 
275
 
 
276
        def _getAttrs(_):
 
277
            d = self.client.getAttrs("testMakeDirectory")
 
278
            self._emptyBuffers()
 
279
            return d
 
280
 
 
281
        # XXX not until version 4/5
 
282
        # self.failUnlessEqual(filetransfer.FILEXFER_TYPE_DIRECTORY&attrs['type'],
 
283
        #                     filetransfer.FILEXFER_TYPE_DIRECTORY)
 
284
 
 
285
        def _removeDirectory(_):
 
286
            d = self.client.removeDirectory("testMakeDirectory")
 
287
            self._emptyBuffers()
 
288
            return d
 
289
 
 
290
        d.addCallback(_getAttrs)
 
291
        d.addCallback(_removeDirectory)
 
292
        d.addCallback(_getAttrs)
 
293
        return self.assertFailure(d, filetransfer.SFTPError)
 
294
 
 
295
    def testOpenDirectory(self):
 
296
        d = self.client.openDirectory('')
 
297
        self._emptyBuffers()
 
298
        files = []
 
299
 
 
300
        def _getFiles(openDir):
 
301
            def append(f):
 
302
                files.append(f)
 
303
                return openDir
 
304
            d = defer.maybeDeferred(openDir.next)
 
305
            self._emptyBuffers()
 
306
            d.addCallback(append)
 
307
            d.addCallback(_getFiles)
 
308
            d.addErrback(_close, openDir)
 
309
            return d
 
310
 
 
311
        def _checkFiles(ignored):
 
312
            fs = list(zip(*files)[0])
 
313
            fs.sort()
 
314
            self.failUnlessEqual(fs,
 
315
                                 ['.testHiddenFile', 'testDirectory',
 
316
                                  'testRemoveFile', 'testRenameFile',
 
317
                                  'testfile1'])
 
318
 
 
319
        def _close(_, openDir):
 
320
            d = openDir.close()
 
321
            self._emptyBuffers()
 
322
            return d
 
323
 
 
324
        d.addCallback(_getFiles)
 
325
        d.addCallback(_checkFiles)
 
326
        return d
 
327
 
 
328
    def testLinkDoesntExist(self):
 
329
        d = self.client.getAttrs('testLink')
 
330
        self._emptyBuffers()
 
331
        return self.assertFailure(d, filetransfer.SFTPError)
 
332
 
 
333
    def testLinkSharesAttrs(self):
 
334
        d = self.client.makeLink('testLink', 'testfile1')
 
335
        self._emptyBuffers()
 
336
        def _getFirstAttrs(_):
 
337
            d = self.client.getAttrs('testLink', 1)
 
338
            self._emptyBuffers()
 
339
            return d
 
340
        def _getSecondAttrs(firstAttrs):
 
341
            d = self.client.getAttrs('testfile1')
 
342
            self._emptyBuffers()
 
343
            d.addCallback(self.assertEqual, firstAttrs)
 
344
            return d
 
345
        d.addCallback(_getFirstAttrs)
 
346
        return d.addCallback(_getSecondAttrs)
 
347
 
 
348
    def testLinkPath(self):
 
349
        d = self.client.makeLink('testLink', 'testfile1')
 
350
        self._emptyBuffers()
 
351
        def _readLink(_):
 
352
            d = self.client.readLink('testLink')
 
353
            self._emptyBuffers()
 
354
            d.addCallback(self.failUnlessEqual,
 
355
                          os.path.join(os.getcwd(), self.testDir, 'testfile1'))
 
356
            return d
 
357
        def _realPath(_):
 
358
            d = self.client.realPath('testLink')
 
359
            self._emptyBuffers()
 
360
            d.addCallback(self.failUnlessEqual,
 
361
                          os.path.join(os.getcwd(), self.testDir, 'testfile1'))
 
362
            return d
 
363
        d.addCallback(_readLink)
 
364
        d.addCallback(_realPath)
 
365
        return d
 
366
 
 
367
    def testExtendedRequest(self):
 
368
        d = self.client.extendedRequest('testExtendedRequest', 'foo')
 
369
        self._emptyBuffers()
 
370
        d.addCallback(self.failUnlessEqual, 'bar')
 
371
        d.addCallback(self._cbTestExtendedRequest)
 
372
        return d
 
373
 
 
374
    def _cbTestExtendedRequest(self, ignored):
 
375
        d = self.client.extendedRequest('testBadRequest', '')
 
376
        self._emptyBuffers()
 
377
        return self.assertFailure(d, NotImplementedError)