1
# (C) 2005, 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for branch implementations - tests a branch format."""
23
import bzrlib.bzrdir as bzrdir
24
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
25
from bzrlib.commit import commit
26
import bzrlib.errors as errors
27
from bzrlib.errors import (FileExists,
30
UninitializableFormat,
34
from bzrlib.osutils import getcwd
35
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
36
from bzrlib.trace import mutter
37
import bzrlib.transactions as transactions
38
from bzrlib.transport import get_transport
39
from bzrlib.transport.http import HttpServer
40
from bzrlib.transport.memory import MemoryServer
41
from bzrlib.upgrade import upgrade
42
from bzrlib.workingtree import WorkingTree
45
# TODO: Make a branch using basis branch, and check that it
46
# doesn't request any files that could have been avoided, by
47
# hooking into the Transport.
50
class TestCaseWithBranch(TestCaseWithTransport):
53
super(TestCaseWithBranch, self).setUp()
57
if self.branch is None:
58
self.branch = self.make_branch(None)
61
def make_branch(self, relpath):
62
repo = self.make_repository(relpath)
63
# fixme RBC 20060210 this isnt necessarily a fixable thing,
64
# Skipped is the wrong exception to raise.
66
return self.branch_format.initialize(repo.bzrdir)
67
except errors.UninitializableFormat:
68
raise TestSkipped('Uninitializable branch format')
70
def make_repository(self, relpath, shared=False):
72
url = self.get_url(relpath)
73
segments = url.split('/')
74
if segments and segments[-1] not in ('', '.'):
75
parent = '/'.join(segments[:-1])
76
t = get_transport(parent)
81
made_control = self.bzrdir_format.initialize(url)
82
return made_control.create_repository(shared=shared)
83
except UninitializableFormat:
84
raise TestSkipped("Format %s is not initializable.")
87
class TestBranch(TestCaseWithBranch):
89
def test_append_revisions(self):
90
"""Test appending more than one revision"""
91
br = self.get_branch()
92
br.append_revision("rev1")
93
self.assertEquals(br.revision_history(), ["rev1",])
94
br.append_revision("rev2", "rev3")
95
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
97
def test_fetch_revisions(self):
98
"""Test fetch-revision operation."""
99
get_transport(self.get_url()).mkdir('b1')
100
get_transport(self.get_url()).mkdir('b2')
101
wt = self.make_branch_and_tree('b1')
103
b2 = self.make_branch('b2')
104
file('b1/foo', 'w').write('hello')
105
wt.add(['foo'], ['foo-id'])
106
wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
108
mutter('start fetch')
109
self.assertEqual((1, []), b2.fetch(b1))
111
rev = b2.repository.get_revision('revision-1')
112
tree = b2.repository.revision_tree('revision-1')
113
self.assertEqual(tree.get_file_text('foo-id'), 'hello')
115
def get_unbalanced_tree_pair(self):
116
"""Return two branches, a and b, with one file in a."""
117
get_transport(self.get_url()).mkdir('a')
118
tree_a = self.make_branch_and_tree('a')
119
file('a/b', 'wb').write('b')
121
tree_a.commit("silly commit", rev_id='A')
123
get_transport(self.get_url()).mkdir('b')
124
tree_b = self.make_branch_and_tree('b')
125
return tree_a, tree_b
127
def get_balanced_branch_pair(self):
128
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
129
tree_a, tree_b = self.get_unbalanced_tree_pair()
130
tree_b.branch.repository.fetch(tree_a.branch.repository)
131
return tree_a, tree_b
133
def test_clone_branch(self):
134
"""Copy the stores from one branch to another"""
135
tree_a, tree_b = self.get_balanced_branch_pair()
136
tree_b.commit("silly commit")
138
# this fails to test that the history from a was not used.
139
dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
140
self.assertEqual(tree_a.branch.revision_history(),
141
dir_c.open_branch().revision_history())
143
def test_clone_partial(self):
144
"""Copy only part of the history of a branch."""
145
# TODO: RBC 20060208 test with a revision not on revision-history.
146
# what should that behaviour be ? Emailed the list.
147
wt_a = self.make_branch_and_tree('a')
148
self.build_tree(['a/one'])
150
wt_a.commit('commit one', rev_id='1')
151
self.build_tree(['a/two'])
153
wt_a.commit('commit two', rev_id='2')
154
repo_b = self.make_repository('b')
155
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
156
br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
157
self.assertEqual(br_b.last_revision(), '1')
159
def test_sprout_partial(self):
160
# test sprouting with a prefix of the revision-history.
161
# also needs not-on-revision-history behaviour defined.
162
wt_a = self.make_branch_and_tree('a')
163
self.build_tree(['a/one'])
165
wt_a.commit('commit one', rev_id='1')
166
self.build_tree(['a/two'])
168
wt_a.commit('commit two', rev_id='2')
169
repo_b = self.make_repository('b')
170
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
171
br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
172
self.assertEqual(br_b.last_revision(), '1')
174
def test_clone_branch_nickname(self):
175
# test the nick name is preserved always
176
raise TestSkipped('XXX branch cloning is not yet tested..')
178
def test_clone_branch_parent(self):
179
# test the parent is preserved always
180
raise TestSkipped('XXX branch cloning is not yet tested..')
182
def test_sprout_branch_nickname(self):
183
# test the nick name is reset always
184
raise TestSkipped('XXX branch sprouting is not yet tested..')
186
def test_sprout_branch_parent(self):
187
source = self.make_branch('source')
188
target = source.bzrdir.sprout(self.get_url('target')).open_branch()
189
self.assertEqual(source.bzrdir.root_transport.base, target.get_parent())
191
def test_record_initial_ghost_merge(self):
192
"""A pending merge with no revision present is still a merge."""
193
wt = self.make_branch_and_tree('.')
195
wt.add_pending_merge('non:existent@rev--ision--0--2')
196
wt.commit('pretend to merge nonexistent-revision', rev_id='first')
197
rev = branch.repository.get_revision(branch.last_revision())
198
self.assertEqual(len(rev.parent_ids), 1)
199
# parent_sha1s is not populated now, WTF. rbc 20051003
200
self.assertEqual(len(rev.parent_sha1s), 0)
201
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
203
def test_bad_revision(self):
204
self.assertRaises(errors.InvalidRevisionId,
205
self.get_branch().repository.get_revision,
209
# compare the gpg-to-sign info for a commit with a ghost and
210
# an identical tree without a ghost
211
# fetch missing should rewrite the TOC of weaves to list newly available parents.
213
def test_pending_merges(self):
214
"""Tracking pending-merged revisions."""
215
wt = self.make_branch_and_tree('.')
217
self.assertEquals(wt.pending_merges(), [])
218
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
219
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
220
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
221
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
222
wt.add_pending_merge('wibble@fofof--20050401--1928390812')
223
self.assertEquals(wt.pending_merges(),
224
['foo@azkhazan-123123-abcabc',
225
'wibble@fofof--20050401--1928390812'])
226
wt.commit("commit from base with two merges")
227
rev = b.repository.get_revision(b.revision_history()[0])
228
self.assertEquals(len(rev.parent_ids), 2)
229
self.assertEquals(rev.parent_ids[0],
230
'foo@azkhazan-123123-abcabc')
231
self.assertEquals(rev.parent_ids[1],
232
'wibble@fofof--20050401--1928390812')
233
# list should be cleared when we do a commit
234
self.assertEquals(wt.pending_merges(), [])
236
def test_sign_existing_revision(self):
237
wt = self.make_branch_and_tree('.')
239
wt.commit("base", allow_pointless=True, rev_id='A')
240
from bzrlib.testament import Testament
241
strategy = bzrlib.gpg.LoopbackGPGStrategy(None)
242
branch.repository.sign_revision('A', strategy)
243
self.assertEqual(Testament.from_revision(branch.repository,
244
'A').as_short_text(),
245
branch.repository.revision_store.get('A',
248
def test_store_signature(self):
249
branch = self.get_branch()
250
branch.repository.store_revision_signature(
251
bzrlib.gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
252
self.assertEqual('FOO',
253
branch.repository.revision_store.get('A',
256
def test_branch_keeps_signatures(self):
257
wt = self.make_branch_and_tree('source')
258
wt.commit('A', allow_pointless=True, rev_id='A')
259
wt.branch.repository.sign_revision('A',
260
bzrlib.gpg.LoopbackGPGStrategy(None))
261
#FIXME: clone should work to urls,
262
# wt.clone should work to disks.
263
self.build_tree(['target/'])
264
d2 = wt.bzrdir.clone('target')
265
self.assertEqual(wt.branch.repository.revision_store.get('A',
267
d2.open_repository().revision_store.get('A',
270
def test_nicks(self):
271
"""Branch nicknames"""
272
t = get_transport(self.get_url())
274
branch = self.make_branch('bzr.dev')
275
self.assertEqual(branch.nick, 'bzr.dev')
276
t.move('bzr.dev', 'bzr.ab')
277
branch = Branch.open(self.get_url('bzr.ab'))
278
self.assertEqual(branch.nick, 'bzr.ab')
279
branch.nick = "Aaron's branch"
280
branch.nick = "Aaron's branch"
284
branch.control_files.controlfilename("branch.conf")
288
self.assertEqual(branch.nick, "Aaron's branch")
289
t.move('bzr.ab', 'integration')
290
branch = Branch.open(self.get_url('integration'))
291
self.assertEqual(branch.nick, "Aaron's branch")
292
branch.nick = u"\u1234"
293
self.assertEqual(branch.nick, u"\u1234")
295
def test_commit_nicks(self):
296
"""Nicknames are committed to the revision"""
297
get_transport(self.get_url()).mkdir('bzr.dev')
298
wt = self.make_branch_and_tree('bzr.dev')
300
branch.nick = "My happy branch"
301
wt.commit('My commit respect da nick.')
302
committed = branch.repository.get_revision(branch.last_revision())
303
self.assertEqual(committed.properties["branch-nick"],
306
def test_create_open_branch_uses_repository(self):
308
repo = self.make_repository('.', shared=True)
309
except errors.IncompatibleFormat:
311
repo.bzrdir.root_transport.mkdir('child')
312
child_dir = self.bzrdir_format.initialize('child')
314
child_branch = self.branch_format.initialize(child_dir)
315
except errors.UninitializableFormat:
316
# branch references are not default init'able.
318
self.assertEqual(repo.bzrdir.root_transport.base,
319
child_branch.repository.bzrdir.root_transport.base)
320
child_branch = bzrlib.branch.Branch.open(self.get_url('child'))
321
self.assertEqual(repo.bzrdir.root_transport.base,
322
child_branch.repository.bzrdir.root_transport.base)
325
class ChrootedTests(TestCaseWithBranch):
326
"""A support class that provides readonly urls outside the local namespace.
328
This is done by checking if self.transport_server is a MemoryServer. if it
329
is then we are chrooted already, if it is not then an HttpServer is used
334
super(ChrootedTests, self).setUp()
335
if not self.transport_server == MemoryServer:
336
self.transport_readonly_server = HttpServer
338
def test_open_containing(self):
339
self.assertRaises(NotBranchError, Branch.open_containing,
340
self.get_readonly_url(''))
341
self.assertRaises(NotBranchError, Branch.open_containing,
342
self.get_readonly_url('g/p/q'))
343
branch = self.make_branch('.')
344
branch, relpath = Branch.open_containing(self.get_readonly_url(''))
345
self.assertEqual('', relpath)
346
branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
347
self.assertEqual('g/p/q', relpath)
349
# TODO: rewrite this as a regular unittest, without relying on the displayed output
350
# >>> from bzrlib.commit import commit
351
# >>> bzrlib.trace.silent = True
352
# >>> br1 = ScratchBranch(files=['foo', 'bar'])
353
# >>> br1.working_tree().add('foo')
354
# >>> br1.working_tree().add('bar')
355
# >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
356
# >>> br2 = ScratchBranch()
357
# >>> br2.update_revisions(br1)
359
# Added 1 inventories.
361
# >>> br2.revision_history()
363
# >>> br2.update_revisions(br1)
365
# >>> br1.text_store.total_size() == br2.text_store.total_size()
368
class InstrumentedTransaction(object):
371
self.calls.append('finish')
377
class TestDecorator(object):
383
self._calls.append('lr')
385
def lock_write(self):
386
self._calls.append('lw')
389
self._calls.append('ul')
392
def do_with_read(self):
396
def except_with_read(self):
400
def do_with_write(self):
404
def except_with_write(self):
408
class TestDecorators(TestCase):
410
def test_needs_read_lock(self):
411
branch = TestDecorator()
412
self.assertEqual(1, branch.do_with_read())
413
self.assertEqual(['lr', 'ul'], branch._calls)
415
def test_excepts_in_read_lock(self):
416
branch = TestDecorator()
417
self.assertRaises(RuntimeError, branch.except_with_read)
418
self.assertEqual(['lr', 'ul'], branch._calls)
420
def test_needs_write_lock(self):
421
branch = TestDecorator()
422
self.assertEqual(2, branch.do_with_write())
423
self.assertEqual(['lw', 'ul'], branch._calls)
425
def test_excepts_in_write_lock(self):
426
branch = TestDecorator()
427
self.assertRaises(RuntimeError, branch.except_with_write)
428
self.assertEqual(['lw', 'ul'], branch._calls)
431
class TestBranchTransaction(TestCaseWithBranch):
434
super(TestBranchTransaction, self).setUp()
437
def test_default_get_transaction(self):
438
"""branch.get_transaction on a new branch should give a PassThrough."""
439
self.failUnless(isinstance(self.get_branch().get_transaction(),
440
transactions.PassThroughTransaction))
442
def test__set_new_transaction(self):
443
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
445
def test__set_over_existing_transaction_raises(self):
446
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
447
self.assertRaises(errors.LockError,
448
self.get_branch()._set_transaction,
449
transactions.ReadOnlyTransaction())
451
def test_finish_no_transaction_raises(self):
452
self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
454
def test_finish_readonly_transaction_works(self):
455
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
456
self.get_branch()._finish_transaction()
457
self.assertEqual(None, self.get_branch().control_files._transaction)
459
def test_unlock_calls_finish(self):
460
self.get_branch().lock_read()
461
transaction = InstrumentedTransaction()
462
self.get_branch().control_files._transaction = transaction
463
self.get_branch().unlock()
464
self.assertEqual(['finish'], transaction.calls)
466
def test_lock_read_acquires_ro_transaction(self):
467
self.get_branch().lock_read()
468
self.failUnless(isinstance(self.get_branch().get_transaction(),
469
transactions.ReadOnlyTransaction))
470
self.get_branch().unlock()
472
def test_lock_write_acquires_passthrough_transaction(self):
473
self.get_branch().lock_write()
474
# cannot use get_transaction as its magic
475
self.failUnless(isinstance(self.get_branch().control_files._transaction,
476
transactions.PassThroughTransaction))
477
self.get_branch().unlock()
480
class TestBranchPushLocations(TestCaseWithBranch):
482
def test_get_push_location_unset(self):
483
self.assertEqual(None, self.get_branch().get_push_location())
485
def test_get_push_location_exact(self):
486
from bzrlib.config import (branches_config_filename,
487
ensure_config_dir_exists)
488
ensure_config_dir_exists()
489
fn = branches_config_filename()
490
print >> open(fn, 'wt'), ("[%s]\n"
491
"push_location=foo" %
492
self.get_branch().base[:-1])
493
self.assertEqual("foo", self.get_branch().get_push_location())
495
def test_set_push_location(self):
496
from bzrlib.config import (branches_config_filename,
497
ensure_config_dir_exists)
498
ensure_config_dir_exists()
499
fn = branches_config_filename()
500
self.get_branch().set_push_location('foo')
501
self.assertFileEqual("[%s]\n"
502
"push_location = foo" % self.get_branch().base[:-1],
505
# TODO RBC 20051029 test getting a push location from a branch in a
506
# recursive section - that is, it appends the branch name.
509
class TestFormat(TestCaseWithBranch):
510
"""Tests for the format itself."""
512
def test_format_initialize_find_open(self):
513
# loopback test to check the current format initializes to itself.
514
if not self.branch_format.is_supported():
515
# unsupported formats are not loopback testable
516
# because the default open will not open them and
517
# they may not be initializable.
519
# supported formats must be able to init and open
520
t = get_transport(self.get_url())
521
readonly_t = get_transport(self.get_readonly_url())
522
made_branch = self.make_branch('.')
523
self.failUnless(isinstance(made_branch, bzrlib.branch.Branch))
525
# find it via bzrdir opening:
526
opened_control = bzrdir.BzrDir.open(readonly_t.base)
527
direct_opened_branch = opened_control.open_branch()
528
self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
529
self.assertEqual(opened_control, direct_opened_branch.bzrdir)
530
self.failUnless(isinstance(direct_opened_branch._format,
531
self.branch_format.__class__))
533
# find it via Branch.open
534
opened_branch = bzrlib.branch.Branch.open(readonly_t.base)
535
self.failUnless(isinstance(opened_branch, made_branch.__class__))
536
self.assertEqual(made_branch._format.__class__,
537
opened_branch._format.__class__)
538
# if it has a unique id string, can we probe for it ?
540
self.branch_format.get_format_string()
541
except NotImplementedError:
543
self.assertEqual(self.branch_format,
544
bzrlib.branch.BranchFormat.find_format(opened_control))