~bzr/ubuntu/lucid/bzr/beta-ppa

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_sftp_transport.py

  • Committer: Martin Pool
  • Date: 2010-07-02 07:29:40 UTC
  • mfrom: (129.1.7 packaging-karmic)
  • Revision ID: mbp@sourcefrog.net-20100702072940-hpzq5elg8wjve8rh
* PPA rebuild.
* PPA rebuild for Karmic.
* PPA rebuild for Jaunty.
* PPA rebuild for Hardy.
* From postinst, actually remove the example bash completion scripts.
  (LP: #249452)
* New upstream release.
* New upstream release.
* New upstream release.
* Revert change to Build-depends: Dapper does not have python-central.
  Should be python-support..
* Target ppa..
* Target ppa..
* Target ppa..
* Target ppa..
* New upstream release.
* Switch to dpkg-source 3.0 (quilt) format.
* Bump standards version to 3.8.4.
* Remove embedded copy of python-configobj. Closes: #555336
* Remove embedded copy of python-elementtree. Closes: #555343
* Change section from 'Devel' to 'Vcs'..
* Change section from 'Devel' to 'Vcs'..
* Change section from 'Devel' to 'Vcs'..
* Change section from 'Devel' to 'Vcs'..
* Change section from 'Devel' to 'Vcs'..
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* debian/control: Fix obsolete-relation-form-in-source
  lintian warning. 
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* Split out docs into bzr-doc package.
* New upstream release.
* Added John Francesco Ferlito to Uploaders.
* Fix install path to quick-reference guide
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* Fix FTBFS due to path changes, again.
* Fix FTBFS due to doc paths changing
* New upstream release.
* Fix FTBFS due to path changes, again.
* Fix FTBFS due to doc paths changing
* New upstream release.
* Fix FTBFS due to path changes, again.
* Fix FTBFS due to doc paths changing
* New upstream release.
* Fix FTBFS due to path changes, again, again.
* Fix FTBFS due to path changes, again.
* Fix FTBFS due to path changes.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* Bump standards version to 3.8.3.
* Remove unused patch system.
* New upstream release.
* New upstream release.
* New upstream release.
* Fix copy and paste tab error in .install file
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
 + Fixes compatibility with Python 2.4. Closes: #537708
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream version.
* Bump standards version to 3.8.2.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* Add python-pyrex to build-deps to ensure C extensions are always build.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* Split documentation into bzr-doc package. ((LP: #385074)
* Multiple packaging changes to make us more linitan clean.
* New upstream release.
* Split documentation into bzr-doc package. ((LP: #385074)
* Multiple packaging changes to make us more linitan clean.
* New upstream release.
* Split documentation into bzr-doc package. ((LP: #385074)
* Multiple packaging changes to make us more linitan clean.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* Fix API compatibility version. (Closes: #526233)
* New upstream release.
  + Fixes default format for upgrade command. (Closes: #464688)
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* Add missing dependency on zlib development library. (Closes:
  #523595)
* Add zlib build-depends.
* Add zlib build-depends.
* Add zlib build-depends.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* Move to section vcs.
* Bump standards version to 3.8.1.
* New upstream release.
* Remove temporary patch for missing .c files from distribution
* New upstream release.
* Remove temporary patch for missing .c files from distribution
* New upstream release.
* Remove temporary patch for missing .c files from distribution
* Add temporary patch for missing .c files from distribution
* Add temporary patch for missing .c files from distribution
* Add temporary patch for missing .c files from distribution
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* Recommend ca-certificates. (Closes: #452024)
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* Update watch file. bazaar now uses launchpad to host its sources.
* Remove patch for inventory root revision copy, applied upstream.
* New upstream release.
* New upstream release.
* New upstream release
* Force removal of files installed in error to /etc/bash_completion.d/
  (LP: #249452)
* New upstream release.
* New upstream release
* New upstream release.
* Bump standards version.
* Include patch for inventory root revision copy, required for bzr-svn.
* New upstream release.
* Remove unused lintian overrides.
* Correct the package version not to be native.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* New upstream release.
* Final 1.5 release.
* New upstream release.
* New upstream release.
* New upstream release.
* Add myself as a co-maintainer.
* Add a Dm-Upload-Allowed: yes header.
* New upstream bugfix release.
* New upstream release.
* Final 1.3 release.
* New upstream release.
* First release candidate of the upcoming 1.3 release.
* Rebuild to fix the problem caused by a build with a broken python-central.
* New upstream release.
* Rebuild for dapper PPA.
* Apply Lamont's patches to fix build-dependencies on dapper.
  (See: https://bugs.launchpad.net/bzr/+bug/189915)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2010 Robey Pointer <robey@lag.net>
 
2
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
3
#
 
4
# This program is free software; you can redistribute it and/or modify
 
5
# it under the terms of the GNU General Public License as published by
 
6
# the Free Software Foundation; either version 2 of the License, or
 
7
# (at your option) any later version.
 
8
#
 
9
# This program is distributed in the hope that it will be useful,
 
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
# GNU General Public License for more details.
 
13
#
 
14
# You should have received a copy of the GNU General Public License
 
15
# along with this program; if not, write to the Free Software
 
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
17
 
 
18
import os
 
19
import socket
 
20
import sys
 
21
import threading
 
22
import time
 
23
 
 
24
from bzrlib import (
 
25
    bzrdir,
 
26
    config,
 
27
    errors,
 
28
    tests,
 
29
    transport as _mod_transport,
 
30
    ui,
 
31
    )
 
32
from bzrlib.osutils import (
 
33
    pathjoin,
 
34
    lexists,
 
35
    set_or_unset_env,
 
36
    )
 
37
from bzrlib.tests import (
 
38
    features,
 
39
    TestCaseWithTransport,
 
40
    TestCase,
 
41
    TestSkipped,
 
42
    )
 
43
from bzrlib.tests.http_server import HttpServer
 
44
from bzrlib.transport import get_transport
 
45
import bzrlib.transport.http
 
46
 
 
47
if features.paramiko.available():
 
48
    from bzrlib.transport import sftp as _mod_sftp
 
49
    from bzrlib.tests import stub_sftp
 
50
 
 
51
from bzrlib.workingtree import WorkingTree
 
52
 
 
53
 
 
54
def set_test_transport_to_sftp(testcase):
 
55
    """A helper to set transports on test case instances."""
 
56
    if getattr(testcase, '_get_remote_is_absolute', None) is None:
 
57
        testcase._get_remote_is_absolute = True
 
58
    if testcase._get_remote_is_absolute:
 
59
        testcase.transport_server = stub_sftp.SFTPAbsoluteServer
 
60
    else:
 
61
        testcase.transport_server = stub_sftp.SFTPHomeDirServer
 
62
    testcase.transport_readonly_server = HttpServer
 
63
 
 
64
 
 
65
class TestCaseWithSFTPServer(TestCaseWithTransport):
 
66
    """A test case base class that provides a sftp server on localhost."""
 
67
 
 
68
    def setUp(self):
 
69
        super(TestCaseWithSFTPServer, self).setUp()
 
70
        self.requireFeature(features.paramiko)
 
71
        set_test_transport_to_sftp(self)
 
72
 
 
73
 
 
74
class SFTPLockTests(TestCaseWithSFTPServer):
 
75
 
 
76
    def test_sftp_locks(self):
 
77
        from bzrlib.errors import LockError
 
78
        t = self.get_transport()
 
79
 
 
80
        l = t.lock_write('bogus')
 
81
        self.failUnlessExists('bogus.write-lock')
 
82
 
 
83
        # Don't wait for the lock, locking an already locked
 
84
        # file should raise an assert
 
85
        self.assertRaises(LockError, t.lock_write, 'bogus')
 
86
 
 
87
        l.unlock()
 
88
        self.failIf(lexists('bogus.write-lock'))
 
89
 
 
90
        open('something.write-lock', 'wb').write('fake lock\n')
 
91
        self.assertRaises(LockError, t.lock_write, 'something')
 
92
        os.remove('something.write-lock')
 
93
 
 
94
        l = t.lock_write('something')
 
95
 
 
96
        l2 = t.lock_write('bogus')
 
97
 
 
98
        l.unlock()
 
99
        l2.unlock()
 
100
 
 
101
 
 
102
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
 
103
    """Test the SFTP transport with homedir based relative paths."""
 
104
 
 
105
    def test__remote_path(self):
 
106
        if sys.platform == 'darwin':
 
107
            # This test is about sftp absolute path handling. There is already
 
108
            # (in this test) a TODO about windows needing an absolute path
 
109
            # without drive letter. To me, using self.test_dir is a trick to
 
110
            # get an absolute path for comparison purposes.  That fails for OSX
 
111
            # because the sftp server doesn't resolve the links (and it doesn't
 
112
            # have to). --vila 20070924
 
113
            self.knownFailure('Mac OSX symlinks /tmp to /private/tmp,'
 
114
                              ' testing against self.test_dir'
 
115
                              ' is not appropriate')
 
116
        t = self.get_transport()
 
117
        # This test require unix-like absolute path
 
118
        test_dir = self.test_dir
 
119
        if sys.platform == 'win32':
 
120
            # using hack suggested by John Meinel.
 
121
            # TODO: write another mock server for this test
 
122
            #       and use absolute path without drive letter
 
123
            test_dir = '/' + test_dir
 
124
        # try what is currently used:
 
125
        # remote path = self._abspath(relpath)
 
126
        self.assertIsSameRealPath(test_dir + '/relative',
 
127
                                  t._remote_path('relative'))
 
128
        # we dont os.path.join because windows gives us the wrong path
 
129
        root_segments = test_dir.split('/')
 
130
        root_parent = '/'.join(root_segments[:-1])
 
131
        # .. should be honoured
 
132
        self.assertIsSameRealPath(root_parent + '/sibling',
 
133
                                  t._remote_path('../sibling'))
 
134
        # /  should be illegal ?
 
135
        ### FIXME decide and then test for all transports. RBC20051208
 
136
 
 
137
 
 
138
class SFTPTransportTestRelativeRoot(TestCaseWithSFTPServer):
 
139
    """Test the SFTP transport with homedir based relative paths."""
 
140
 
 
141
    def setUp(self):
 
142
        # Only SFTPHomeDirServer is tested here
 
143
        self._get_remote_is_absolute = False
 
144
        super(SFTPTransportTestRelativeRoot, self).setUp()
 
145
 
 
146
    def test__remote_path_relative_root(self):
 
147
        # relative paths are preserved
 
148
        t = self.get_transport('')
 
149
        self.assertEqual('/~/', t._path)
 
150
        # the remote path should be relative to home dir
 
151
        # (i.e. not begining with a '/')
 
152
        self.assertEqual('a', t._remote_path('a'))
 
153
 
 
154
 
 
155
class SFTPNonServerTest(TestCase):
 
156
    def setUp(self):
 
157
        TestCase.setUp(self)
 
158
        self.requireFeature(features.paramiko)
 
159
 
 
160
    def test_parse_url_with_home_dir(self):
 
161
        s = _mod_sftp.SFTPTransport(
 
162
            'sftp://ro%62ey:h%40t@example.com:2222/~/relative')
 
163
        self.assertEquals(s._host, 'example.com')
 
164
        self.assertEquals(s._port, 2222)
 
165
        self.assertEquals(s._user, 'robey')
 
166
        self.assertEquals(s._password, 'h@t')
 
167
        self.assertEquals(s._path, '/~/relative/')
 
168
 
 
169
    def test_relpath(self):
 
170
        s = _mod_sftp.SFTPTransport('sftp://user@host.com/abs/path')
 
171
        self.assertRaises(errors.PathNotChild, s.relpath,
 
172
                          'sftp://user@host.com/~/rel/path/sub')
 
173
 
 
174
    def test_get_paramiko_vendor(self):
 
175
        """Test that if no 'ssh' is available we get builtin paramiko"""
 
176
        from bzrlib.transport import ssh
 
177
        # set '.' as the only location in the path, forcing no 'ssh' to exist
 
178
        orig_vendor = ssh._ssh_vendor_manager._cached_ssh_vendor
 
179
        orig_path = set_or_unset_env('PATH', '.')
 
180
        try:
 
181
            # No vendor defined yet, query for one
 
182
            ssh._ssh_vendor_manager.clear_cache()
 
183
            vendor = ssh._get_ssh_vendor()
 
184
            self.assertIsInstance(vendor, ssh.ParamikoVendor)
 
185
        finally:
 
186
            set_or_unset_env('PATH', orig_path)
 
187
            ssh._ssh_vendor_manager._cached_ssh_vendor = orig_vendor
 
188
 
 
189
    def test_abspath_root_sibling_server(self):
 
190
        server = stub_sftp.SFTPSiblingAbsoluteServer()
 
191
        server.start_server()
 
192
        try:
 
193
            transport = get_transport(server.get_url())
 
194
            self.assertFalse(transport.abspath('/').endswith('/~/'))
 
195
            self.assertTrue(transport.abspath('/').endswith('/'))
 
196
            del transport
 
197
        finally:
 
198
            server.stop_server()
 
199
 
 
200
 
 
201
class SFTPBranchTest(TestCaseWithSFTPServer):
 
202
    """Test some stuff when accessing a bzr Branch over sftp"""
 
203
 
 
204
    def test_lock_file(self):
 
205
        # old format branches use a special lock file on sftp.
 
206
        b = self.make_branch('', format=bzrdir.BzrDirFormat6())
 
207
        b = bzrlib.branch.Branch.open(self.get_url())
 
208
        self.failUnlessExists('.bzr/')
 
209
        self.failUnlessExists('.bzr/branch-format')
 
210
        self.failUnlessExists('.bzr/branch-lock')
 
211
 
 
212
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
 
213
        b.lock_write()
 
214
        self.failUnlessExists('.bzr/branch-lock.write-lock')
 
215
        b.unlock()
 
216
        self.failIf(lexists('.bzr/branch-lock.write-lock'))
 
217
 
 
218
    def test_push_support(self):
 
219
        self.build_tree(['a/', 'a/foo'])
 
220
        t = bzrdir.BzrDir.create_standalone_workingtree('a')
 
221
        b = t.branch
 
222
        t.add('foo')
 
223
        t.commit('foo', rev_id='a1')
 
224
 
 
225
        b2 = bzrdir.BzrDir.create_branch_and_repo(self.get_url('/b'))
 
226
        b2.pull(b)
 
227
 
 
228
        self.assertEquals(b2.revision_history(), ['a1'])
 
229
 
 
230
        open('a/foo', 'wt').write('something new in foo\n')
 
231
        t.commit('new', rev_id='a2')
 
232
        b2.pull(b)
 
233
 
 
234
        self.assertEquals(b2.revision_history(), ['a1', 'a2'])
 
235
 
 
236
 
 
237
class SSHVendorConnection(TestCaseWithSFTPServer):
 
238
    """Test that the ssh vendors can all connect.
 
239
 
 
240
    Verify that a full-handshake (SSH over loopback TCP) sftp connection works.
 
241
 
 
242
    We have 3 sftp implementations in the test suite:
 
243
      'loopback': Doesn't use ssh, just uses a local socket. Most tests are
 
244
                  done this way to save the handshaking time, so it is not
 
245
                  tested again here
 
246
      'none':     This uses paramiko's built-in ssh client and server, and layers
 
247
                  sftp on top of it.
 
248
      None:       If 'ssh' exists on the machine, then it will be spawned as a
 
249
                  child process.
 
250
    """
 
251
 
 
252
    def setUp(self):
 
253
        super(SSHVendorConnection, self).setUp()
 
254
 
 
255
        def create_server():
 
256
            """Just a wrapper so that when created, it will set _vendor"""
 
257
            # SFTPFullAbsoluteServer can handle any vendor,
 
258
            # it just needs to be set between the time it is instantiated
 
259
            # and the time .setUp() is called
 
260
            server = stub_sftp.SFTPFullAbsoluteServer()
 
261
            server._vendor = self._test_vendor
 
262
            return server
 
263
        self._test_vendor = 'loopback'
 
264
        self.vfs_transport_server = create_server
 
265
        f = open('a_file', 'wb')
 
266
        try:
 
267
            f.write('foobar\n')
 
268
        finally:
 
269
            f.close()
 
270
 
 
271
    def set_vendor(self, vendor):
 
272
        self._test_vendor = vendor
 
273
 
 
274
    def test_connection_paramiko(self):
 
275
        from bzrlib.transport import ssh
 
276
        self.set_vendor(ssh.ParamikoVendor())
 
277
        t = self.get_transport()
 
278
        self.assertEqual('foobar\n', t.get('a_file').read())
 
279
 
 
280
    def test_connection_vendor(self):
 
281
        raise TestSkipped("We don't test spawning real ssh,"
 
282
                          " because it prompts for a password."
 
283
                          " Enable this test if we figure out"
 
284
                          " how to prevent this.")
 
285
        self.set_vendor(None)
 
286
        t = self.get_transport()
 
287
        self.assertEqual('foobar\n', t.get('a_file').read())
 
288
 
 
289
 
 
290
class SSHVendorBadConnection(TestCaseWithTransport):
 
291
    """Test that the ssh vendors handle bad connection properly
 
292
 
 
293
    We don't subclass TestCaseWithSFTPServer, because we don't actually
 
294
    need an SFTP connection.
 
295
    """
 
296
 
 
297
    def setUp(self):
 
298
        self.requireFeature(features.paramiko)
 
299
        super(SSHVendorBadConnection, self).setUp()
 
300
        import bzrlib.transport.ssh
 
301
 
 
302
        # open a random port, so we know nobody else is using it
 
303
        # but don't actually listen on the port.
 
304
        s = socket.socket()
 
305
        s.bind(('localhost', 0))
 
306
        self.bogus_url = 'sftp://%s:%s/' % s.getsockname()
 
307
 
 
308
        orig_vendor = bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor
 
309
        def reset():
 
310
            bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor = orig_vendor
 
311
            s.close()
 
312
        self.addCleanup(reset)
 
313
 
 
314
    def set_vendor(self, vendor):
 
315
        import bzrlib.transport.ssh
 
316
        bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor = vendor
 
317
 
 
318
    def test_bad_connection_paramiko(self):
 
319
        """Test that a real connection attempt raises the right error"""
 
320
        from bzrlib.transport import ssh
 
321
        self.set_vendor(ssh.ParamikoVendor())
 
322
        t = bzrlib.transport.get_transport(self.bogus_url)
 
323
        self.assertRaises(errors.ConnectionError, t.get, 'foobar')
 
324
 
 
325
    def test_bad_connection_ssh(self):
 
326
        """None => auto-detect vendor"""
 
327
        self.set_vendor(None)
 
328
        # This is how I would normally test the connection code
 
329
        # it makes it very clear what we are testing.
 
330
        # However, 'ssh' will create stipple on the output, so instead
 
331
        # I'm using run_bzr_subprocess, and parsing the output
 
332
        # try:
 
333
        #     t = bzrlib.transport.get_transport(self.bogus_url)
 
334
        # except errors.ConnectionError:
 
335
        #     # Correct error
 
336
        #     pass
 
337
        # except errors.NameError, e:
 
338
        #     if 'SSHException' in str(e):
 
339
        #         raise TestSkipped('Known NameError bug in paramiko 1.6.1')
 
340
        #     raise
 
341
        # else:
 
342
        #     self.fail('Excepted ConnectionError to be raised')
 
343
 
 
344
        out, err = self.run_bzr_subprocess(['log', self.bogus_url], retcode=3)
 
345
        self.assertEqual('', out)
 
346
        if "NameError: global name 'SSHException'" in err:
 
347
            # We aren't fixing this bug, because it is a bug in
 
348
            # paramiko, but we know about it, so we don't have to
 
349
            # fail the test
 
350
            raise TestSkipped('Known NameError bug with paramiko-1.6.1')
 
351
        self.assertContainsRe(err, r'bzr: ERROR: Unable to connect to SSH host'
 
352
                                   r' 127\.0\.0\.1:\d+; ')
 
353
 
 
354
 
 
355
class SFTPLatencyKnob(TestCaseWithSFTPServer):
 
356
    """Test that the testing SFTPServer's latency knob works."""
 
357
 
 
358
    def test_latency_knob_slows_transport(self):
 
359
        # change the latency knob to 500ms. We take about 40ms for a
 
360
        # loopback connection ordinarily.
 
361
        start_time = time.time()
 
362
        self.get_server().add_latency = 0.5
 
363
        transport = self.get_transport()
 
364
        transport.has('not me') # Force connection by issuing a request
 
365
        with_latency_knob_time = time.time() - start_time
 
366
        self.assertTrue(with_latency_knob_time > 0.4)
 
367
 
 
368
    def test_default(self):
 
369
        # This test is potentially brittle: under extremely high machine load
 
370
        # it could fail, but that is quite unlikely
 
371
        raise TestSkipped('Timing-sensitive test')
 
372
        start_time = time.time()
 
373
        transport = self.get_transport()
 
374
        transport.has('not me') # Force connection by issuing a request
 
375
        regular_time = time.time() - start_time
 
376
        self.assertTrue(regular_time < 0.5)
 
377
 
 
378
 
 
379
class FakeSocket(object):
 
380
    """Fake socket object used to test the SocketDelay wrapper without
 
381
    using a real socket.
 
382
    """
 
383
 
 
384
    def __init__(self):
 
385
        self._data = ""
 
386
 
 
387
    def send(self, data, flags=0):
 
388
        self._data += data
 
389
        return len(data)
 
390
 
 
391
    def sendall(self, data, flags=0):
 
392
        self._data += data
 
393
        return len(data)
 
394
 
 
395
    def recv(self, size, flags=0):
 
396
        if size < len(self._data):
 
397
            result = self._data[:size]
 
398
            self._data = self._data[size:]
 
399
            return result
 
400
        else:
 
401
            result = self._data
 
402
            self._data = ""
 
403
            return result
 
404
 
 
405
 
 
406
class TestSocketDelay(TestCase):
 
407
 
 
408
    def setUp(self):
 
409
        TestCase.setUp(self)
 
410
        self.requireFeature(features.paramiko)
 
411
 
 
412
    def test_delay(self):
 
413
        sending = FakeSocket()
 
414
        receiving = stub_sftp.SocketDelay(sending, 0.1, bandwidth=1000000,
 
415
                                          really_sleep=False)
 
416
        # check that simulated time is charged only per round-trip:
 
417
        t1 = stub_sftp.SocketDelay.simulated_time
 
418
        receiving.send("connect1")
 
419
        self.assertEqual(sending.recv(1024), "connect1")
 
420
        t2 = stub_sftp.SocketDelay.simulated_time
 
421
        self.assertAlmostEqual(t2 - t1, 0.1)
 
422
        receiving.send("connect2")
 
423
        self.assertEqual(sending.recv(1024), "connect2")
 
424
        sending.send("hello")
 
425
        self.assertEqual(receiving.recv(1024), "hello")
 
426
        t3 = stub_sftp.SocketDelay.simulated_time
 
427
        self.assertAlmostEqual(t3 - t2, 0.1)
 
428
        sending.send("hello")
 
429
        self.assertEqual(receiving.recv(1024), "hello")
 
430
        sending.send("hello")
 
431
        self.assertEqual(receiving.recv(1024), "hello")
 
432
        sending.send("hello")
 
433
        self.assertEqual(receiving.recv(1024), "hello")
 
434
        t4 = stub_sftp.SocketDelay.simulated_time
 
435
        self.assertAlmostEqual(t4, t3)
 
436
 
 
437
    def test_bandwidth(self):
 
438
        sending = FakeSocket()
 
439
        receiving = stub_sftp.SocketDelay(sending, 0, bandwidth=8.0/(1024*1024),
 
440
                                          really_sleep=False)
 
441
        # check that simulated time is charged only per round-trip:
 
442
        t1 = stub_sftp.SocketDelay.simulated_time
 
443
        receiving.send("connect")
 
444
        self.assertEqual(sending.recv(1024), "connect")
 
445
        sending.send("a" * 100)
 
446
        self.assertEqual(receiving.recv(1024), "a" * 100)
 
447
        t2 = stub_sftp.SocketDelay.simulated_time
 
448
        self.assertAlmostEqual(t2 - t1, 100 + 7)
 
449
 
 
450
 
 
451
class ReadvFile(object):
 
452
    """An object that acts like Paramiko's SFTPFile.readv()"""
 
453
 
 
454
    def __init__(self, data):
 
455
        self._data = data
 
456
 
 
457
    def readv(self, requests):
 
458
        for start, length in requests:
 
459
            yield self._data[start:start+length]
 
460
 
 
461
 
 
462
def _null_report_activity(*a, **k):
 
463
    pass
 
464
 
 
465
 
 
466
class Test_SFTPReadvHelper(tests.TestCase):
 
467
 
 
468
    def checkGetRequests(self, expected_requests, offsets):
 
469
        self.requireFeature(features.paramiko)
 
470
        helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test',
 
471
            _null_report_activity)
 
472
        self.assertEqual(expected_requests, helper._get_requests())
 
473
 
 
474
    def test__get_requests(self):
 
475
        # Small single requests become a single readv request
 
476
        self.checkGetRequests([(0, 100)],
 
477
                              [(0, 20), (30, 50), (20, 10), (80, 20)])
 
478
        # Non-contiguous ranges are given as multiple requests
 
479
        self.checkGetRequests([(0, 20), (30, 50)],
 
480
                              [(10, 10), (30, 20), (0, 10), (50, 30)])
 
481
        # Ranges larger than _max_request_size (32kB) are broken up into
 
482
        # multiple requests, even if it actually spans multiple logical
 
483
        # requests
 
484
        self.checkGetRequests([(0, 32768), (32768, 32768), (65536, 464)],
 
485
                              [(0, 40000), (40000, 100), (40100, 1900),
 
486
                               (42000, 24000)])
 
487
 
 
488
    def checkRequestAndYield(self, expected, data, offsets):
 
489
        self.requireFeature(features.paramiko)
 
490
        helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test',
 
491
            _null_report_activity)
 
492
        data_f = ReadvFile(data)
 
493
        result = list(helper.request_and_yield_offsets(data_f))
 
494
        self.assertEqual(expected, result)
 
495
 
 
496
    def test_request_and_yield_offsets(self):
 
497
        data = 'abcdefghijklmnopqrstuvwxyz'
 
498
        self.checkRequestAndYield([(0, 'a'), (5, 'f'), (10, 'klm')], data,
 
499
                                  [(0, 1), (5, 1), (10, 3)])
 
500
        # Should combine requests, and split them again
 
501
        self.checkRequestAndYield([(0, 'a'), (1, 'b'), (10, 'klm')], data,
 
502
                                  [(0, 1), (1, 1), (10, 3)])
 
503
        # Out of order requests. The requests should get combined, but then be
 
504
        # yielded out-of-order. We also need one that is at the end of a
 
505
        # previous range. See bug #293746
 
506
        self.checkRequestAndYield([(0, 'a'), (10, 'k'), (4, 'efg'), (1, 'bcd')],
 
507
                                  data, [(0, 1), (10, 1), (4, 3), (1, 3)])
 
508
 
 
509
 
 
510
class TestUsesAuthConfig(TestCaseWithSFTPServer):
 
511
    """Test that AuthenticationConfig can supply default usernames."""
 
512
 
 
513
    def get_transport_for_connection(self, set_config):
 
514
        port = self.get_server()._listener.port
 
515
        if set_config:
 
516
            conf = config.AuthenticationConfig()
 
517
            conf._get_config().update(
 
518
                {'sftptest': {'scheme': 'ssh', 'port': port, 'user': 'bar'}})
 
519
            conf._save()
 
520
        t = get_transport('sftp://localhost:%d' % port)
 
521
        # force a connection to be performed.
 
522
        t.has('foo')
 
523
        return t
 
524
 
 
525
    def test_sftp_uses_config(self):
 
526
        t = self.get_transport_for_connection(set_config=True)
 
527
        self.assertEqual('bar', t._get_credentials()[0])
 
528
 
 
529
    def test_sftp_is_none_if_no_config(self):
 
530
        t = self.get_transport_for_connection(set_config=False)
 
531
        self.assertIs(None, t._get_credentials()[0])
 
532
 
 
533
    def test_sftp_doesnt_prompt_username(self):
 
534
        stdout = tests.StringIOWrapper()
 
535
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n', stdout=stdout)
 
536
        t = self.get_transport_for_connection(set_config=False)
 
537
        self.assertIs(None, t._get_credentials()[0])
 
538
        # No prompts should've been printed, stdin shouldn't have been read
 
539
        self.assertEquals("", stdout.getvalue())
 
540
        self.assertEquals(0, ui.ui_factory.stdin.tell())