28
from StringIO import StringIO
28
from cStringIO import StringIO
32
from bzrlib import bzrdir, errors, pack, smart, tests
39
from bzrlib.branch import BranchReferenceFormat
40
import bzrlib.smart.branch
41
import bzrlib.smart.bzrdir
42
import bzrlib.smart.repository
33
43
from bzrlib.smart.request import (
34
44
FailedSmartServerResponse,
35
46
SmartServerResponse,
36
47
SuccessfulSmartServerResponse,
38
import bzrlib.smart.bzrdir
39
import bzrlib.smart.branch
40
import bzrlib.smart.repository
49
from bzrlib.tests import (
54
from bzrlib.transport import chroot, get_transport
41
55
from bzrlib.util import bencode
58
def load_tests(standard_tests, module, loader):
59
"""Multiply tests version and protocol consistency."""
60
# FindRepository tests.
61
bzrdir_mod = bzrlib.smart.bzrdir
62
applier = TestScenarioApplier()
65
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV1}),
66
("find_repositoryV2", {
67
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV2}),
69
to_adapt, result = split_suite_by_re(standard_tests,
70
"TestSmartServerRequestFindRepository")
71
v2_only, v1_and_2 = split_suite_by_re(to_adapt,
73
for test in iter_suite_tests(v1_and_2):
74
result.addTests(applier.adapt(test))
75
del applier.scenarios[0]
76
for test in iter_suite_tests(v2_only):
77
result.addTests(applier.adapt(test))
81
class TestCaseWithChrootedTransport(tests.TestCaseWithTransport):
84
tests.TestCaseWithTransport.setUp(self)
85
self._chroot_server = None
87
def get_transport(self, relpath=None):
88
if self._chroot_server is None:
89
backing_transport = tests.TestCaseWithTransport.get_transport(self)
90
self._chroot_server = chroot.ChrootServer(backing_transport)
91
self._chroot_server.setUp()
92
self.addCleanup(self._chroot_server.tearDown)
93
t = get_transport(self._chroot_server.get_url())
94
if relpath is not None:
44
99
class TestCaseWithSmartMedium(tests.TestCaseWithTransport):
70
128
self.assertNotEqual(None,
71
129
SmartServerResponse(('ok', )))
74
class TestSmartServerRequestFindRepository(tests.TestCaseWithTransport):
131
def test__str__(self):
132
"""SmartServerResponses can be stringified."""
134
"<SmartServerResponse status=OK args=('args',) body='body'>",
135
str(SuccessfulSmartServerResponse(('args',), 'body')))
137
"<SmartServerResponse status=ERR args=('args',) body='body'>",
138
str(FailedSmartServerResponse(('args',), 'body')))
141
class TestSmartServerRequest(tests.TestCaseWithMemoryTransport):
143
def test_translate_client_path(self):
144
transport = self.get_transport()
145
request = SmartServerRequest(transport, 'foo/')
146
self.assertEqual('./', request.translate_client_path('foo/'))
148
errors.InvalidURLJoin, request.translate_client_path, 'foo/..')
150
errors.PathNotChild, request.translate_client_path, '/')
152
errors.PathNotChild, request.translate_client_path, 'bar/')
153
self.assertEqual('./baz', request.translate_client_path('foo/baz'))
155
def test_transport_from_client_path(self):
156
transport = self.get_transport()
157
request = SmartServerRequest(transport, 'foo/')
160
request.transport_from_client_path('foo/').base)
163
class TestSmartServerRequestFindRepository(tests.TestCaseWithMemoryTransport):
75
164
"""Tests for BzrDir.find_repository."""
77
166
def test_no_repository(self):
78
167
"""When there is no repository to be found, ('norepository', ) is returned."""
79
168
backing = self.get_transport()
80
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
169
request = self._request_class(backing)
81
170
self.make_bzrdir('.')
82
171
self.assertEqual(SmartServerResponse(('norepository', )),
83
request.execute(backing.local_abspath('')))
85
174
def test_nonshared_repository(self):
86
175
# nonshared repositorys only allow 'find' to return a handle when the
87
176
# path the repository is being searched on is the same as that that
88
177
# the repository is at.
89
178
backing = self.get_transport()
90
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
179
request = self._request_class(backing)
91
180
result = self._make_repository_and_result()
92
self.assertEqual(result, request.execute(backing.local_abspath('')))
181
self.assertEqual(result, request.execute(''))
93
182
self.make_bzrdir('subdir')
94
183
self.assertEqual(SmartServerResponse(('norepository', )),
95
request.execute(backing.local_abspath('subdir')))
184
request.execute('subdir'))
97
186
def _make_repository_and_result(self, shared=False, format=None):
98
187
"""Convenience function to setup a repository.
111
return SmartServerResponse(('ok', '', rich_root, subtrees))
200
if (smart.bzrdir.SmartServerRequestFindRepositoryV2 ==
201
self._request_class):
202
# All tests so far are on formats, and for non-external
204
return SuccessfulSmartServerResponse(
205
('ok', '', rich_root, subtrees, 'no'))
207
return SuccessfulSmartServerResponse(('ok', '', rich_root, subtrees))
113
209
def test_shared_repository(self):
114
210
"""When there is a shared repository, we get 'ok', 'relpath-to-repo'."""
115
211
backing = self.get_transport()
116
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
212
request = self._request_class(backing)
117
213
result = self._make_repository_and_result(shared=True)
118
self.assertEqual(result, request.execute(backing.local_abspath('')))
214
self.assertEqual(result, request.execute(''))
119
215
self.make_bzrdir('subdir')
120
216
result2 = SmartServerResponse(result.args[0:1] + ('..', ) + result.args[2:])
121
217
self.assertEqual(result2,
122
request.execute(backing.local_abspath('subdir')))
218
request.execute('subdir'))
123
219
self.make_bzrdir('subdir/deeper')
124
220
result3 = SmartServerResponse(result.args[0:1] + ('../..', ) + result.args[2:])
125
221
self.assertEqual(result3,
126
request.execute(backing.local_abspath('subdir/deeper')))
222
request.execute('subdir/deeper'))
128
224
def test_rich_root_and_subtree_encoding(self):
129
225
"""Test for the format attributes for rich root and subtree support."""
130
226
backing = self.get_transport()
131
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
227
request = self._request_class(backing)
132
228
result = self._make_repository_and_result(format='dirstate-with-subtree')
133
229
# check the test will be valid
134
230
self.assertEqual('yes', result.args[2])
135
231
self.assertEqual('yes', result.args[3])
136
self.assertEqual(result, request.execute(backing.local_abspath('')))
139
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithTransport):
232
self.assertEqual(result, request.execute(''))
234
def test_supports_external_lookups_no_v2(self):
235
"""Test for the supports_external_lookups attribute."""
236
backing = self.get_transport()
237
request = self._request_class(backing)
238
result = self._make_repository_and_result(format='dirstate-with-subtree')
239
# check the test will be valid
240
self.assertEqual('no', result.args[4])
241
self.assertEqual(result, request.execute(''))
244
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithMemoryTransport):
141
246
def test_empty_dir(self):
142
247
"""Initializing an empty dir should succeed and do it."""
143
248
backing = self.get_transport()
144
249
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
145
250
self.assertEqual(SmartServerResponse(('ok', )),
146
request.execute(backing.local_abspath('.')))
147
252
made_dir = bzrdir.BzrDir.open_from_transport(backing)
148
253
# no branch, tree or repository is expected with the current
149
254
# default formart.
191
296
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
192
297
branch = self.make_branch('branch')
193
298
checkout = branch.create_checkout('reference',lightweight=True)
194
# TODO: once we have an API to probe for references of any sort, we
196
reference_url = backing.abspath('branch') + '/'
299
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
197
300
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
198
301
self.assertEqual(SmartServerResponse(('ok', reference_url)),
199
request.execute(backing.local_abspath('reference')))
202
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithTransport):
302
request.execute('reference'))
305
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithMemoryTransport):
204
307
def test_empty(self):
205
308
"""For an empty branch, the body is empty."""
369
472
self.assertEqual(
370
473
SmartServerResponse(('ok',)),
372
backing.local_abspath(''), branch_token, repo_token,
475
'', branch_token, repo_token,
374
477
self.assertEqual([rev_id_utf8], tree.branch.revision_history())
376
479
tree.branch.unlock()
379
class TestSmartServerBranchRequestLockWrite(tests.TestCaseWithTransport):
482
class TestSmartServerBranchRequestSetLastRevisionInfo(tests.TestCaseWithTransport):
484
def lock_branch(self, branch):
485
branch_token = branch.lock_write()
486
repo_token = branch.repository.lock_write()
487
branch.repository.unlock()
488
self.addCleanup(branch.unlock)
489
return branch_token, repo_token
491
def make_locked_branch(self, format=None):
492
branch = self.make_branch('.', format=format)
493
branch_token, repo_token = self.lock_branch(branch)
494
return branch, branch_token, repo_token
496
def test_empty(self):
497
"""An empty branch can have its last revision set to 'null:'."""
498
b, branch_token, repo_token = self.make_locked_branch()
499
backing = self.get_transport()
500
request = smart.branch.SmartServerBranchRequestSetLastRevisionInfo(
502
response = request.execute('', branch_token, repo_token, '0', 'null:')
503
self.assertEqual(SmartServerResponse(('ok',)), response)
505
def assertBranchLastRevisionInfo(self, expected_info, branch_relpath):
506
branch = bzrdir.BzrDir.open(branch_relpath).open_branch()
507
self.assertEqual(expected_info, branch.last_revision_info())
509
def test_branch_revision_info_is_updated(self):
510
"""This method really does update the branch last revision info."""
511
tree = self.make_branch_and_memory_tree('.')
514
tree.commit('First commit', rev_id='revision-1')
515
tree.commit('Second commit', rev_id='revision-2')
519
branch_token, repo_token = self.lock_branch(branch)
520
backing = self.get_transport()
521
request = smart.branch.SmartServerBranchRequestSetLastRevisionInfo(
523
self.assertBranchLastRevisionInfo((2, 'revision-2'), '.')
524
response = request.execute(
525
'', branch_token, repo_token, '1', 'revision-1')
526
self.assertEqual(SmartServerResponse(('ok',)), response)
527
self.assertBranchLastRevisionInfo((1, 'revision-1'), '.')
529
def test_not_present_revid(self):
530
"""Some branch formats will check that the revision is present in the
531
repository. When that check fails, a NoSuchRevision error is returned
534
# Make a knit format branch, because that format checks the values
535
# given to set_last_revision_info.
536
b, branch_token, repo_token = self.make_locked_branch(format='knit')
537
backing = self.get_transport()
538
request = smart.branch.SmartServerBranchRequestSetLastRevisionInfo(
540
response = request.execute(
541
'', branch_token, repo_token, '1', 'not-present')
543
SmartServerResponse(('NoSuchRevision', 'not-present')), response)
546
class TestSmartServerBranchRequestLockWrite(tests.TestCaseWithMemoryTransport):
382
tests.TestCaseWithTransport.setUp(self)
383
self.reduceLockdirTimeout()
549
tests.TestCaseWithMemoryTransport.setUp(self)
385
551
def test_lock_write_on_unlocked_branch(self):
386
552
backing = self.get_transport()
387
553
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
388
554
branch = self.make_branch('.', format='knit')
389
555
repository = branch.repository
390
response = request.execute(backing.local_abspath(''))
556
response = request.execute('')
391
557
branch_nonce = branch.control_files._lock.peek().get('nonce')
392
558
repository_nonce = repository.control_files._lock.peek().get('nonce')
393
559
self.assertEqual(
454
620
backing = self.get_readonly_transport()
455
621
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
456
622
branch = self.make_branch('.')
457
response = request.execute('')
623
root = self.get_transport().clone('/')
624
path = urlutils.relative_url(root.base, self.get_transport().base)
625
response = request.execute(path)
458
626
error_name, lock_str, why_str = response.args
459
627
self.assertFalse(response.is_successful())
460
628
self.assertEqual('LockFailed', error_name)
463
class TestSmartServerBranchRequestUnlock(tests.TestCaseWithTransport):
631
class TestSmartServerBranchRequestUnlock(tests.TestCaseWithMemoryTransport):
466
tests.TestCaseWithTransport.setUp(self)
467
self.reduceLockdirTimeout()
634
tests.TestCaseWithMemoryTransport.setUp(self)
469
636
def test_unlock_on_locked_branch_and_repo(self):
470
637
backing = self.get_transport()
701
868
request = smart.repository.SmartServerRepositoryIsShared(backing)
702
869
self.make_repository('.', shared=False)
703
870
self.assertEqual(SmartServerResponse(('no', )),
704
request.execute(backing.local_abspath(''), ))
707
class TestSmartServerRepositoryLockWrite(tests.TestCaseWithTransport):
871
request.execute('', ))
874
class TestSmartServerRepositoryLockWrite(tests.TestCaseWithMemoryTransport):
710
tests.TestCaseWithTransport.setUp(self)
711
self.reduceLockdirTimeout()
877
tests.TestCaseWithMemoryTransport.setUp(self)
713
879
def test_lock_write_on_unlocked_repo(self):
714
880
backing = self.get_transport()
715
881
request = smart.repository.SmartServerRepositoryLockWrite(backing)
716
882
repository = self.make_repository('.', format='knit')
717
response = request.execute(backing.local_abspath(''))
883
response = request.execute('')
718
884
nonce = repository.control_files._lock.peek().get('nonce')
719
885
self.assertEqual(SmartServerResponse(('ok', nonce)), response)
720
886
# The repository is now locked. Verify that with a new repository
918
1081
smart.request.request_handlers.get('Branch.set_last_revision'),
919
1082
smart.branch.SmartServerBranchRequestSetLastRevision)
920
1083
self.assertEqual(
1084
smart.request.request_handlers.get('Branch.set_last_revision_info'),
1085
smart.branch.SmartServerBranchRequestSetLastRevisionInfo)
921
1087
smart.request.request_handlers.get('Branch.unlock'),
922
1088
smart.branch.SmartServerBranchRequestUnlock)
923
1089
self.assertEqual(
924
1090
smart.request.request_handlers.get('BzrDir.find_repository'),
925
smart.bzrdir.SmartServerRequestFindRepository)
1091
smart.bzrdir.SmartServerRequestFindRepositoryV1)
1093
smart.request.request_handlers.get('BzrDir.find_repositoryV2'),
1094
smart.bzrdir.SmartServerRequestFindRepositoryV2)
926
1095
self.assertEqual(
927
1096
smart.request.request_handlers.get('BzrDirFormat.initialize'),
928
1097
smart.bzrdir.SmartServerRequestInitializeBzrDir)