1
# Copyright (C) 2005-2010 Robey Pointer <robey@lag.net>
2
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
29
transport as _mod_transport,
32
from bzrlib.osutils import (
37
from bzrlib.tests import (
39
TestCaseWithTransport,
43
from bzrlib.tests.http_server import HttpServer
44
from bzrlib.transport import get_transport
45
import bzrlib.transport.http
47
if features.paramiko.available():
48
from bzrlib.transport import sftp as _mod_sftp
49
from bzrlib.tests import stub_sftp
51
from bzrlib.workingtree import WorkingTree
54
def set_test_transport_to_sftp(testcase):
55
"""A helper to set transports on test case instances."""
56
if getattr(testcase, '_get_remote_is_absolute', None) is None:
57
testcase._get_remote_is_absolute = True
58
if testcase._get_remote_is_absolute:
59
testcase.transport_server = stub_sftp.SFTPAbsoluteServer
61
testcase.transport_server = stub_sftp.SFTPHomeDirServer
62
testcase.transport_readonly_server = HttpServer
65
class TestCaseWithSFTPServer(TestCaseWithTransport):
66
"""A test case base class that provides a sftp server on localhost."""
69
super(TestCaseWithSFTPServer, self).setUp()
70
self.requireFeature(features.paramiko)
71
set_test_transport_to_sftp(self)
74
class SFTPLockTests(TestCaseWithSFTPServer):
76
def test_sftp_locks(self):
77
from bzrlib.errors import LockError
78
t = self.get_transport()
80
l = t.lock_write('bogus')
81
self.failUnlessExists('bogus.write-lock')
83
# Don't wait for the lock, locking an already locked
84
# file should raise an assert
85
self.assertRaises(LockError, t.lock_write, 'bogus')
88
self.failIf(lexists('bogus.write-lock'))
90
open('something.write-lock', 'wb').write('fake lock\n')
91
self.assertRaises(LockError, t.lock_write, 'something')
92
os.remove('something.write-lock')
94
l = t.lock_write('something')
96
l2 = t.lock_write('bogus')
102
class SFTPTransportTestRelative(TestCaseWithSFTPServer):
103
"""Test the SFTP transport with homedir based relative paths."""
105
def test__remote_path(self):
106
if sys.platform == 'darwin':
107
# This test is about sftp absolute path handling. There is already
108
# (in this test) a TODO about windows needing an absolute path
109
# without drive letter. To me, using self.test_dir is a trick to
110
# get an absolute path for comparison purposes. That fails for OSX
111
# because the sftp server doesn't resolve the links (and it doesn't
112
# have to). --vila 20070924
113
self.knownFailure('Mac OSX symlinks /tmp to /private/tmp,'
114
' testing against self.test_dir'
115
' is not appropriate')
116
t = self.get_transport()
117
# This test require unix-like absolute path
118
test_dir = self.test_dir
119
if sys.platform == 'win32':
120
# using hack suggested by John Meinel.
121
# TODO: write another mock server for this test
122
# and use absolute path without drive letter
123
test_dir = '/' + test_dir
124
# try what is currently used:
125
# remote path = self._abspath(relpath)
126
self.assertIsSameRealPath(test_dir + '/relative',
127
t._remote_path('relative'))
128
# we dont os.path.join because windows gives us the wrong path
129
root_segments = test_dir.split('/')
130
root_parent = '/'.join(root_segments[:-1])
131
# .. should be honoured
132
self.assertIsSameRealPath(root_parent + '/sibling',
133
t._remote_path('../sibling'))
134
# / should be illegal ?
135
### FIXME decide and then test for all transports. RBC20051208
138
class SFTPTransportTestRelativeRoot(TestCaseWithSFTPServer):
139
"""Test the SFTP transport with homedir based relative paths."""
142
# Only SFTPHomeDirServer is tested here
143
self._get_remote_is_absolute = False
144
super(SFTPTransportTestRelativeRoot, self).setUp()
146
def test__remote_path_relative_root(self):
147
# relative paths are preserved
148
t = self.get_transport('')
149
self.assertEqual('/~/', t._path)
150
# the remote path should be relative to home dir
151
# (i.e. not begining with a '/')
152
self.assertEqual('a', t._remote_path('a'))
155
class SFTPNonServerTest(TestCase):
158
self.requireFeature(features.paramiko)
160
def test_parse_url_with_home_dir(self):
161
s = _mod_sftp.SFTPTransport(
162
'sftp://ro%62ey:h%40t@example.com:2222/~/relative')
163
self.assertEquals(s._host, 'example.com')
164
self.assertEquals(s._port, 2222)
165
self.assertEquals(s._user, 'robey')
166
self.assertEquals(s._password, 'h@t')
167
self.assertEquals(s._path, '/~/relative/')
169
def test_relpath(self):
170
s = _mod_sftp.SFTPTransport('sftp://user@host.com/abs/path')
171
self.assertRaises(errors.PathNotChild, s.relpath,
172
'sftp://user@host.com/~/rel/path/sub')
174
def test_get_paramiko_vendor(self):
175
"""Test that if no 'ssh' is available we get builtin paramiko"""
176
from bzrlib.transport import ssh
177
# set '.' as the only location in the path, forcing no 'ssh' to exist
178
orig_vendor = ssh._ssh_vendor_manager._cached_ssh_vendor
179
orig_path = set_or_unset_env('PATH', '.')
181
# No vendor defined yet, query for one
182
ssh._ssh_vendor_manager.clear_cache()
183
vendor = ssh._get_ssh_vendor()
184
self.assertIsInstance(vendor, ssh.ParamikoVendor)
186
set_or_unset_env('PATH', orig_path)
187
ssh._ssh_vendor_manager._cached_ssh_vendor = orig_vendor
189
def test_abspath_root_sibling_server(self):
190
server = stub_sftp.SFTPSiblingAbsoluteServer()
191
server.start_server()
193
transport = get_transport(server.get_url())
194
self.assertFalse(transport.abspath('/').endswith('/~/'))
195
self.assertTrue(transport.abspath('/').endswith('/'))
201
class SFTPBranchTest(TestCaseWithSFTPServer):
202
"""Test some stuff when accessing a bzr Branch over sftp"""
204
def test_lock_file(self):
205
# old format branches use a special lock file on sftp.
206
b = self.make_branch('', format=bzrdir.BzrDirFormat6())
207
b = bzrlib.branch.Branch.open(self.get_url())
208
self.failUnlessExists('.bzr/')
209
self.failUnlessExists('.bzr/branch-format')
210
self.failUnlessExists('.bzr/branch-lock')
212
self.failIf(lexists('.bzr/branch-lock.write-lock'))
214
self.failUnlessExists('.bzr/branch-lock.write-lock')
216
self.failIf(lexists('.bzr/branch-lock.write-lock'))
218
def test_push_support(self):
219
self.build_tree(['a/', 'a/foo'])
220
t = bzrdir.BzrDir.create_standalone_workingtree('a')
223
t.commit('foo', rev_id='a1')
225
b2 = bzrdir.BzrDir.create_branch_and_repo(self.get_url('/b'))
228
self.assertEquals(b2.revision_history(), ['a1'])
230
open('a/foo', 'wt').write('something new in foo\n')
231
t.commit('new', rev_id='a2')
234
self.assertEquals(b2.revision_history(), ['a1', 'a2'])
237
class SSHVendorConnection(TestCaseWithSFTPServer):
238
"""Test that the ssh vendors can all connect.
240
Verify that a full-handshake (SSH over loopback TCP) sftp connection works.
242
We have 3 sftp implementations in the test suite:
243
'loopback': Doesn't use ssh, just uses a local socket. Most tests are
244
done this way to save the handshaking time, so it is not
246
'none': This uses paramiko's built-in ssh client and server, and layers
248
None: If 'ssh' exists on the machine, then it will be spawned as a
253
super(SSHVendorConnection, self).setUp()
256
"""Just a wrapper so that when created, it will set _vendor"""
257
# SFTPFullAbsoluteServer can handle any vendor,
258
# it just needs to be set between the time it is instantiated
259
# and the time .setUp() is called
260
server = stub_sftp.SFTPFullAbsoluteServer()
261
server._vendor = self._test_vendor
263
self._test_vendor = 'loopback'
264
self.vfs_transport_server = create_server
265
f = open('a_file', 'wb')
271
def set_vendor(self, vendor):
272
self._test_vendor = vendor
274
def test_connection_paramiko(self):
275
from bzrlib.transport import ssh
276
self.set_vendor(ssh.ParamikoVendor())
277
t = self.get_transport()
278
self.assertEqual('foobar\n', t.get('a_file').read())
280
def test_connection_vendor(self):
281
raise TestSkipped("We don't test spawning real ssh,"
282
" because it prompts for a password."
283
" Enable this test if we figure out"
284
" how to prevent this.")
285
self.set_vendor(None)
286
t = self.get_transport()
287
self.assertEqual('foobar\n', t.get('a_file').read())
290
class SSHVendorBadConnection(TestCaseWithTransport):
291
"""Test that the ssh vendors handle bad connection properly
293
We don't subclass TestCaseWithSFTPServer, because we don't actually
294
need an SFTP connection.
298
self.requireFeature(features.paramiko)
299
super(SSHVendorBadConnection, self).setUp()
300
import bzrlib.transport.ssh
302
# open a random port, so we know nobody else is using it
303
# but don't actually listen on the port.
305
s.bind(('localhost', 0))
306
self.bogus_url = 'sftp://%s:%s/' % s.getsockname()
308
orig_vendor = bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor
310
bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor = orig_vendor
312
self.addCleanup(reset)
314
def set_vendor(self, vendor):
315
import bzrlib.transport.ssh
316
bzrlib.transport.ssh._ssh_vendor_manager._cached_ssh_vendor = vendor
318
def test_bad_connection_paramiko(self):
319
"""Test that a real connection attempt raises the right error"""
320
from bzrlib.transport import ssh
321
self.set_vendor(ssh.ParamikoVendor())
322
t = bzrlib.transport.get_transport(self.bogus_url)
323
self.assertRaises(errors.ConnectionError, t.get, 'foobar')
325
def test_bad_connection_ssh(self):
326
"""None => auto-detect vendor"""
327
self.set_vendor(None)
328
# This is how I would normally test the connection code
329
# it makes it very clear what we are testing.
330
# However, 'ssh' will create stipple on the output, so instead
331
# I'm using run_bzr_subprocess, and parsing the output
333
# t = bzrlib.transport.get_transport(self.bogus_url)
334
# except errors.ConnectionError:
337
# except errors.NameError, e:
338
# if 'SSHException' in str(e):
339
# raise TestSkipped('Known NameError bug in paramiko 1.6.1')
342
# self.fail('Excepted ConnectionError to be raised')
344
out, err = self.run_bzr_subprocess(['log', self.bogus_url], retcode=3)
345
self.assertEqual('', out)
346
if "NameError: global name 'SSHException'" in err:
347
# We aren't fixing this bug, because it is a bug in
348
# paramiko, but we know about it, so we don't have to
350
raise TestSkipped('Known NameError bug with paramiko-1.6.1')
351
self.assertContainsRe(err, r'bzr: ERROR: Unable to connect to SSH host'
352
r' 127\.0\.0\.1:\d+; ')
355
class SFTPLatencyKnob(TestCaseWithSFTPServer):
356
"""Test that the testing SFTPServer's latency knob works."""
358
def test_latency_knob_slows_transport(self):
359
# change the latency knob to 500ms. We take about 40ms for a
360
# loopback connection ordinarily.
361
start_time = time.time()
362
self.get_server().add_latency = 0.5
363
transport = self.get_transport()
364
transport.has('not me') # Force connection by issuing a request
365
with_latency_knob_time = time.time() - start_time
366
self.assertTrue(with_latency_knob_time > 0.4)
368
def test_default(self):
369
# This test is potentially brittle: under extremely high machine load
370
# it could fail, but that is quite unlikely
371
raise TestSkipped('Timing-sensitive test')
372
start_time = time.time()
373
transport = self.get_transport()
374
transport.has('not me') # Force connection by issuing a request
375
regular_time = time.time() - start_time
376
self.assertTrue(regular_time < 0.5)
379
class FakeSocket(object):
380
"""Fake socket object used to test the SocketDelay wrapper without
387
def send(self, data, flags=0):
391
def sendall(self, data, flags=0):
395
def recv(self, size, flags=0):
396
if size < len(self._data):
397
result = self._data[:size]
398
self._data = self._data[size:]
406
class TestSocketDelay(TestCase):
410
self.requireFeature(features.paramiko)
412
def test_delay(self):
413
sending = FakeSocket()
414
receiving = stub_sftp.SocketDelay(sending, 0.1, bandwidth=1000000,
416
# check that simulated time is charged only per round-trip:
417
t1 = stub_sftp.SocketDelay.simulated_time
418
receiving.send("connect1")
419
self.assertEqual(sending.recv(1024), "connect1")
420
t2 = stub_sftp.SocketDelay.simulated_time
421
self.assertAlmostEqual(t2 - t1, 0.1)
422
receiving.send("connect2")
423
self.assertEqual(sending.recv(1024), "connect2")
424
sending.send("hello")
425
self.assertEqual(receiving.recv(1024), "hello")
426
t3 = stub_sftp.SocketDelay.simulated_time
427
self.assertAlmostEqual(t3 - t2, 0.1)
428
sending.send("hello")
429
self.assertEqual(receiving.recv(1024), "hello")
430
sending.send("hello")
431
self.assertEqual(receiving.recv(1024), "hello")
432
sending.send("hello")
433
self.assertEqual(receiving.recv(1024), "hello")
434
t4 = stub_sftp.SocketDelay.simulated_time
435
self.assertAlmostEqual(t4, t3)
437
def test_bandwidth(self):
438
sending = FakeSocket()
439
receiving = stub_sftp.SocketDelay(sending, 0, bandwidth=8.0/(1024*1024),
441
# check that simulated time is charged only per round-trip:
442
t1 = stub_sftp.SocketDelay.simulated_time
443
receiving.send("connect")
444
self.assertEqual(sending.recv(1024), "connect")
445
sending.send("a" * 100)
446
self.assertEqual(receiving.recv(1024), "a" * 100)
447
t2 = stub_sftp.SocketDelay.simulated_time
448
self.assertAlmostEqual(t2 - t1, 100 + 7)
451
class ReadvFile(object):
452
"""An object that acts like Paramiko's SFTPFile.readv()"""
454
def __init__(self, data):
457
def readv(self, requests):
458
for start, length in requests:
459
yield self._data[start:start+length]
462
def _null_report_activity(*a, **k):
466
class Test_SFTPReadvHelper(tests.TestCase):
468
def checkGetRequests(self, expected_requests, offsets):
469
self.requireFeature(features.paramiko)
470
helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test',
471
_null_report_activity)
472
self.assertEqual(expected_requests, helper._get_requests())
474
def test__get_requests(self):
475
# Small single requests become a single readv request
476
self.checkGetRequests([(0, 100)],
477
[(0, 20), (30, 50), (20, 10), (80, 20)])
478
# Non-contiguous ranges are given as multiple requests
479
self.checkGetRequests([(0, 20), (30, 50)],
480
[(10, 10), (30, 20), (0, 10), (50, 30)])
481
# Ranges larger than _max_request_size (32kB) are broken up into
482
# multiple requests, even if it actually spans multiple logical
484
self.checkGetRequests([(0, 32768), (32768, 32768), (65536, 464)],
485
[(0, 40000), (40000, 100), (40100, 1900),
488
def checkRequestAndYield(self, expected, data, offsets):
489
self.requireFeature(features.paramiko)
490
helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test',
491
_null_report_activity)
492
data_f = ReadvFile(data)
493
result = list(helper.request_and_yield_offsets(data_f))
494
self.assertEqual(expected, result)
496
def test_request_and_yield_offsets(self):
497
data = 'abcdefghijklmnopqrstuvwxyz'
498
self.checkRequestAndYield([(0, 'a'), (5, 'f'), (10, 'klm')], data,
499
[(0, 1), (5, 1), (10, 3)])
500
# Should combine requests, and split them again
501
self.checkRequestAndYield([(0, 'a'), (1, 'b'), (10, 'klm')], data,
502
[(0, 1), (1, 1), (10, 3)])
503
# Out of order requests. The requests should get combined, but then be
504
# yielded out-of-order. We also need one that is at the end of a
505
# previous range. See bug #293746
506
self.checkRequestAndYield([(0, 'a'), (10, 'k'), (4, 'efg'), (1, 'bcd')],
507
data, [(0, 1), (10, 1), (4, 3), (1, 3)])
510
class TestUsesAuthConfig(TestCaseWithSFTPServer):
511
"""Test that AuthenticationConfig can supply default usernames."""
513
def get_transport_for_connection(self, set_config):
514
port = self.get_server()._listener.port
516
conf = config.AuthenticationConfig()
517
conf._get_config().update(
518
{'sftptest': {'scheme': 'ssh', 'port': port, 'user': 'bar'}})
520
t = get_transport('sftp://localhost:%d' % port)
521
# force a connection to be performed.
525
def test_sftp_uses_config(self):
526
t = self.get_transport_for_connection(set_config=True)
527
self.assertEqual('bar', t._get_credentials()[0])
529
def test_sftp_is_none_if_no_config(self):
530
t = self.get_transport_for_connection(set_config=False)
531
self.assertIs(None, t._get_credentials()[0])
533
def test_sftp_doesnt_prompt_username(self):
534
stdout = tests.StringIOWrapper()
535
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n', stdout=stdout)
536
t = self.get_transport_for_connection(set_config=False)
537
self.assertIs(None, t._get_credentials()[0])
538
# No prompts should've been printed, stdin shouldn't have been read
539
self.assertEquals("", stdout.getvalue())
540
self.assertEquals(0, ui.ui_factory.stdin.tell())