1
# Copyright (C) 2006-2010 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for bzrdir implementations - tests a bzrdir format."""
19
from cStringIO import StringIO
21
from itertools import izip
23
from stat import S_ISDIR
34
revision as _mod_revision,
41
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
42
from bzrlib.errors import (FileExists,
45
UninitializableFormat,
48
import bzrlib.revision
49
from bzrlib.tests import (
52
TestCaseWithTransport,
56
from bzrlib.tests.per_bzrdir import TestCaseWithBzrDir
57
from bzrlib.trace import mutter
58
from bzrlib.transport.local import LocalTransport
59
from bzrlib.ui import (
62
from bzrlib.upgrade import upgrade
63
from bzrlib.remote import RemoteBzrDir, RemoteRepository
64
from bzrlib.repofmt import weaverepo
67
class TestBzrDir(TestCaseWithBzrDir):
68
# Many of these tests test for disk equality rather than checking
69
# for semantic equivalence. This works well for some tests but
70
# is not good at handling changes in representation or the addition
71
# or removal of control data. It would be nice to for instance:
72
# sprout a new branch, check that the nickname has been reset by hand
73
# and then set the nickname to match the source branch, at which point
74
# a semantic equivalence should pass
76
def assertDirectoriesEqual(self, source, target, ignore_list=[]):
77
"""Assert that the content of source and target are identical.
79
paths in ignore list will be completely ignored.
81
We ignore paths that represent data which is allowed to change during
82
a clone or sprout: for instance, inventory.knit contains gzip fragements
83
which have timestamps in them, and as we have read the inventory from
84
the source knit, the already-read data is recompressed rather than
85
reading it again, which leads to changed timestamps. This is ok though,
86
because the inventory.kndx file is not ignored, and the integrity of
87
knit joins is tested by test_knit and test_versionedfile.
89
:seealso: Additionally, assertRepositoryHasSameItems provides value
90
rather than representation checking of repositories for
96
dir = directories.pop()
97
for path in set(source.list_dir(dir) + target.list_dir(dir)):
98
path = dir + '/' + path
99
if path in ignore_list:
102
stat = source.stat(path)
103
except errors.NoSuchFile:
104
self.fail('%s not in source' % path)
105
if S_ISDIR(stat.st_mode):
106
self.assertTrue(S_ISDIR(target.stat(path).st_mode))
107
directories.append(path)
109
self.assertEqualDiff(source.get(path).read(),
110
target.get(path).read(),
111
"text for file %r differs:\n" % path)
113
def assertRepositoryHasSameItems(self, left_repo, right_repo):
114
"""require left_repo and right_repo to contain the same data."""
115
# XXX: TODO: Doesn't work yet, because we need to be able to compare
116
# local repositories to remote ones... but this is an as-yet unsolved
117
# aspect of format management and the Remote protocols...
118
# self.assertEqual(left_repo._format.__class__,
119
# right_repo._format.__class__)
120
left_repo.lock_read()
122
right_repo.lock_read()
125
all_revs = left_repo.all_revision_ids()
126
self.assertEqual(left_repo.all_revision_ids(),
127
right_repo.all_revision_ids())
128
for rev_id in left_repo.all_revision_ids():
129
self.assertEqual(left_repo.get_revision(rev_id),
130
right_repo.get_revision(rev_id))
131
# Assert the revision trees (and thus the inventories) are equal
132
sort_key = lambda rev_tree: rev_tree.get_revision_id()
133
rev_trees_a = sorted(
134
left_repo.revision_trees(all_revs), key=sort_key)
135
rev_trees_b = sorted(
136
right_repo.revision_trees(all_revs), key=sort_key)
137
for tree_a, tree_b in zip(rev_trees_a, rev_trees_b):
138
self.assertEqual([], list(tree_a.iter_changes(tree_b)))
140
text_index = left_repo._generate_text_key_index()
141
self.assertEqual(text_index,
142
right_repo._generate_text_key_index())
144
for file_id, revision_id in text_index.iterkeys():
145
desired_files.append(
146
(file_id, revision_id, (file_id, revision_id)))
147
left_texts = list(left_repo.iter_files_bytes(desired_files))
148
right_texts = list(right_repo.iter_files_bytes(desired_files))
151
self.assertEqual(left_texts, right_texts)
153
for rev_id in all_revs:
155
left_text = left_repo.get_signature_text(rev_id)
156
except NoSuchRevision:
158
right_text = right_repo.get_signature_text(rev_id)
159
self.assertEqual(left_text, right_text)
165
def skipIfNoWorkingTree(self, a_bzrdir):
166
"""Raises TestSkipped if a_bzrdir doesn't have a working tree.
168
If the bzrdir does have a workingtree, this is a no-op.
171
a_bzrdir.open_workingtree()
172
except (errors.NotLocalUrl, errors.NoWorkingTree):
173
raise TestSkipped("bzrdir on transport %r has no working tree"
174
% a_bzrdir.transport)
176
def openWorkingTreeIfLocal(self, a_bzrdir):
177
"""If a_bzrdir is on a local transport, call open_workingtree() on it.
179
if not isinstance(a_bzrdir.root_transport, LocalTransport):
180
# it's not local, but that's ok
182
a_bzrdir.open_workingtree()
184
def createWorkingTreeOrSkip(self, a_bzrdir):
185
"""Create a working tree on a_bzrdir, or raise TestSkipped.
187
A simple wrapper for create_workingtree that translates NotLocalUrl into
188
TestSkipped. Returns the newly created working tree.
191
return a_bzrdir.create_workingtree()
192
except errors.NotLocalUrl:
193
raise TestSkipped("cannot make working tree with transport %r"
194
% a_bzrdir.transport)
196
def sproutOrSkip(self, from_bzrdir, to_url, revision_id=None,
197
force_new_repo=False, accelerator_tree=None,
198
create_tree_if_local=True):
199
"""Sprout from_bzrdir into to_url, or raise TestSkipped.
201
A simple wrapper for from_bzrdir.sprout that translates NotLocalUrl into
202
TestSkipped. Returns the newly sprouted bzrdir.
204
to_transport = transport.get_transport(to_url)
205
if not isinstance(to_transport, LocalTransport):
206
raise TestSkipped('Cannot sprout to remote bzrdirs.')
207
target = from_bzrdir.sprout(to_url, revision_id=revision_id,
208
force_new_repo=force_new_repo,
209
possible_transports=[to_transport],
210
accelerator_tree=accelerator_tree,
211
create_tree_if_local=create_tree_if_local)
214
def test_create_null_workingtree(self):
215
dir = self.make_bzrdir('dir1')
216
dir.create_repository()
219
wt = dir.create_workingtree(revision_id=bzrlib.revision.NULL_REVISION)
220
except errors.NotLocalUrl:
221
raise TestSkipped("cannot make working tree with transport %r"
223
self.assertEqual([], wt.get_parent_ids())
225
def test_destroy_workingtree(self):
226
tree = self.make_branch_and_tree('tree')
227
self.build_tree(['tree/file'])
229
tree.commit('first commit')
232
bzrdir.destroy_workingtree()
233
except errors.UnsupportedOperation:
234
raise TestSkipped('Format does not support destroying tree')
235
self.failIfExists('tree/file')
236
self.assertRaises(errors.NoWorkingTree, bzrdir.open_workingtree)
237
bzrdir.create_workingtree()
238
self.failUnlessExists('tree/file')
239
bzrdir.destroy_workingtree_metadata()
240
self.failUnlessExists('tree/file')
241
self.assertRaises(errors.NoWorkingTree, bzrdir.open_workingtree)
243
def test_destroy_branch(self):
244
branch = self.make_branch('branch')
245
bzrdir = branch.bzrdir
247
bzrdir.destroy_branch()
248
except (errors.UnsupportedOperation, errors.TransportNotPossible):
249
raise TestNotApplicable('Format does not support destroying branch')
250
self.assertRaises(errors.NotBranchError, bzrdir.open_branch)
251
bzrdir.create_branch()
254
def test_destroy_repository(self):
255
repo = self.make_repository('repository')
258
bzrdir.destroy_repository()
259
except (errors.UnsupportedOperation, errors.TransportNotPossible):
260
raise TestNotApplicable('Format does not support destroying'
262
self.assertRaises(errors.NoRepositoryPresent, bzrdir.open_repository)
263
bzrdir.create_repository()
264
bzrdir.open_repository()
266
def test_open_workingtree_raises_no_working_tree(self):
267
"""BzrDir.open_workingtree() should raise NoWorkingTree (rather than
268
e.g. NotLocalUrl) if there is no working tree.
270
dir = self.make_bzrdir('source')
271
vfs_dir = bzrdir.BzrDir.open(self.get_vfs_only_url('source'))
272
if vfs_dir.has_workingtree():
273
# This BzrDir format doesn't support BzrDirs without working trees,
274
# so this test is irrelevant.
276
self.assertRaises(errors.NoWorkingTree, dir.open_workingtree)
278
def test_clone_on_transport(self):
279
a_dir = self.make_bzrdir('source')
280
target_transport = a_dir.root_transport.clone('..').clone('target')
281
target = a_dir.clone_on_transport(target_transport)
282
self.assertNotEqual(a_dir.transport.base, target.transport.base)
283
self.assertDirectoriesEqual(a_dir.root_transport, target.root_transport,
284
['./.bzr/merge-hashes'])
286
def test_clone_bzrdir_empty(self):
287
dir = self.make_bzrdir('source')
288
target = dir.clone(self.get_url('target'))
289
self.assertNotEqual(dir.transport.base, target.transport.base)
290
self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
291
['./.bzr/merge-hashes'])
293
def test_clone_bzrdir_empty_force_new_ignored(self):
294
# the force_new_repo parameter should have no effect on an empty
295
# bzrdir's clone logic
296
dir = self.make_bzrdir('source')
297
target = dir.clone(self.get_url('target'), force_new_repo=True)
298
self.assertNotEqual(dir.transport.base, target.transport.base)
299
self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
300
['./.bzr/merge-hashes'])
302
def test_clone_bzrdir_repository(self):
303
tree = self.make_branch_and_tree('commit_tree')
304
self.build_tree(['foo'], transport=tree.bzrdir.transport.clone('..'))
306
tree.commit('revision 1', rev_id='1')
307
dir = self.make_bzrdir('source')
308
repo = dir.create_repository()
309
repo.fetch(tree.branch.repository)
310
self.assertTrue(repo.has_revision('1'))
311
target = dir.clone(self.get_url('target'))
312
self.assertNotEqual(dir.transport.base, target.transport.base)
313
self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
315
'./.bzr/merge-hashes',
318
self.assertRepositoryHasSameItems(tree.branch.repository,
319
target.open_repository())
321
def test_clone_bzrdir_repository_under_shared(self):
322
tree = self.make_branch_and_tree('commit_tree')
323
self.build_tree(['foo'], transport=tree.bzrdir.transport.clone('..'))
325
tree.commit('revision 1', rev_id='1')
326
dir = self.make_bzrdir('source')
327
repo = dir.create_repository()
328
repo.fetch(tree.branch.repository)
329
self.assertTrue(repo.has_revision('1'))
331
self.make_repository('target', shared=True)
332
except errors.IncompatibleFormat:
334
target = dir.clone(self.get_url('target/child'))
335
self.assertNotEqual(dir.transport.base, target.transport.base)
336
self.assertRaises(errors.NoRepositoryPresent, target.open_repository)
338
def test_clone_bzrdir_repository_branch_both_under_shared(self):
339
# Create a shared repository
341
shared_repo = self.make_repository('shared', shared=True)
342
except errors.IncompatibleFormat:
344
# Make a branch, 'commit_tree', and working tree outside of the shared
345
# repository, and commit some revisions to it.
346
tree = self.make_branch_and_tree('commit_tree')
347
self.build_tree(['foo'], transport=tree.bzrdir.root_transport)
349
tree.commit('revision 1', rev_id='1')
350
tree.bzrdir.open_branch().set_revision_history([])
351
tree.set_parent_trees([])
352
tree.commit('revision 2', rev_id='2')
353
# Copy the content (i.e. revisions) from the 'commit_tree' branch's
354
# repository into the shared repository.
355
tree.branch.repository.copy_content_into(shared_repo)
356
# Make a branch 'source' inside the shared repository.
357
dir = self.make_bzrdir('shared/source')
359
# Clone 'source' to 'target', also inside the shared repository.
360
target = dir.clone(self.get_url('shared/target'))
361
# 'source', 'target', and the shared repo all have distinct bzrdirs.
362
self.assertNotEqual(dir.transport.base, target.transport.base)
363
self.assertNotEqual(dir.transport.base, shared_repo.bzrdir.transport.base)
364
# The shared repository will contain revisions from the 'commit_tree'
365
# repository, even revisions that are not part of the history of the
366
# 'commit_tree' branch.
367
self.assertTrue(shared_repo.has_revision('1'))
369
def test_clone_bzrdir_repository_branch_only_source_under_shared(self):
371
shared_repo = self.make_repository('shared', shared=True)
372
except errors.IncompatibleFormat:
374
tree = self.make_branch_and_tree('commit_tree')
375
self.build_tree(['commit_tree/foo'])
377
tree.commit('revision 1', rev_id='1')
378
tree.branch.bzrdir.open_branch().set_revision_history([])
379
tree.set_parent_trees([])
380
tree.commit('revision 2', rev_id='2')
381
tree.branch.repository.copy_content_into(shared_repo)
382
if shared_repo.make_working_trees():
383
shared_repo.set_make_working_trees(False)
384
self.assertFalse(shared_repo.make_working_trees())
385
self.assertTrue(shared_repo.has_revision('1'))
386
dir = self.make_bzrdir('shared/source')
388
target = dir.clone(self.get_url('target'))
389
self.assertNotEqual(dir.transport.base, target.transport.base)
390
self.assertNotEqual(dir.transport.base, shared_repo.bzrdir.transport.base)
391
branch = target.open_branch()
392
self.assertTrue(branch.repository.has_revision('1'))
393
self.assertFalse(branch.repository.make_working_trees())
394
self.assertTrue(branch.repository.is_shared())
396
def test_clone_bzrdir_repository_under_shared_force_new_repo(self):
397
tree = self.make_branch_and_tree('commit_tree')
398
self.build_tree(['commit_tree/foo'])
400
tree.commit('revision 1', rev_id='1')
401
dir = self.make_bzrdir('source')
402
repo = dir.create_repository()
403
repo.fetch(tree.branch.repository)
404
self.assertTrue(repo.has_revision('1'))
406
self.make_repository('target', shared=True)
407
except errors.IncompatibleFormat:
409
target = dir.clone(self.get_url('target/child'), force_new_repo=True)
410
self.assertNotEqual(dir.transport.base, target.transport.base)
411
self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
412
['./.bzr/repository',
414
self.assertRepositoryHasSameItems(tree.branch.repository, repo)
416
def test_clone_bzrdir_repository_revision(self):
417
# test for revision limiting, [smoke test, not corner case checks].
418
# make a repository with some revisions,
419
# and clone it with a revision limit.
421
tree = self.make_branch_and_tree('commit_tree')
422
self.build_tree(['commit_tree/foo'])
424
tree.commit('revision 1', rev_id='1')
425
tree.branch.bzrdir.open_branch().set_revision_history([])
426
tree.set_parent_trees([])
427
tree.commit('revision 2', rev_id='2')
428
source = self.make_repository('source')
429
tree.branch.repository.copy_content_into(source)
431
target = dir.clone(self.get_url('target'), revision_id='2')
432
raise TestSkipped('revision limiting not strict yet')
434
def test_clone_bzrdir_branch_and_repo_fixed_user_id(self):
435
# Bug #430868 is about an email containing '.sig'
436
os.environ['BZR_EMAIL'] = 'murphy@host.sighup.org'
437
tree = self.make_branch_and_tree('commit_tree')
438
self.build_tree(['commit_tree/foo'])
440
rev1 = tree.commit('revision 1')
441
tree_repo = tree.branch.repository
442
tree_repo.lock_write()
443
tree_repo.start_write_group()
444
tree_repo.sign_revision(rev1, gpg.LoopbackGPGStrategy(None))
445
tree_repo.commit_write_group()
447
target = self.make_branch('target')
448
tree.branch.repository.copy_content_into(target.repository)
449
tree.branch.copy_content_into(target)
450
self.assertTrue(target.repository.has_revision(rev1))
452
tree_repo.get_signature_text(rev1),
453
target.repository.get_signature_text(rev1))
455
def test_clone_bzrdir_branch_and_repo(self):
456
tree = self.make_branch_and_tree('commit_tree')
457
self.build_tree(['commit_tree/foo'])
459
tree.commit('revision 1')
460
source = self.make_branch('source')
461
tree.branch.repository.copy_content_into(source.repository)
462
tree.branch.copy_content_into(source)
464
target = dir.clone(self.get_url('target'))
465
self.assertNotEqual(dir.transport.base, target.transport.base)
466
self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
468
'./.bzr/basis-inventory-cache',
469
'./.bzr/checkout/stat-cache',
470
'./.bzr/merge-hashes',
474
self.assertRepositoryHasSameItems(
475
tree.branch.repository, target.open_repository())
477
def test_clone_bzrdir_branch_and_repo_into_shared_repo(self):
478
# by default cloning into a shared repo uses the shared repo.
479
tree = self.make_branch_and_tree('commit_tree')
480
self.build_tree(['commit_tree/foo'])
482
tree.commit('revision 1')
483
source = self.make_branch('source')
484
tree.branch.repository.copy_content_into(source.repository)
485
tree.branch.copy_content_into(source)
487
self.make_repository('target', shared=True)
488
except errors.IncompatibleFormat:
491
target = dir.clone(self.get_url('target/child'))
492
self.assertNotEqual(dir.transport.base, target.transport.base)
493
self.assertRaises(errors.NoRepositoryPresent, target.open_repository)
494
self.assertEqual(source.revision_history(),
495
target.open_branch().revision_history())
497
def test_clone_bzrdir_branch_and_repo_into_shared_repo_force_new_repo(self):
498
# by default cloning into a shared repo uses the shared repo.
499
tree = self.make_branch_and_tree('commit_tree')
500
self.build_tree(['commit_tree/foo'])
502
tree.commit('revision 1')
503
source = self.make_branch('source')
504
tree.branch.repository.copy_content_into(source.repository)
505
tree.branch.copy_content_into(source)
507
self.make_repository('target', shared=True)
508
except errors.IncompatibleFormat:
511
target = dir.clone(self.get_url('target/child'), force_new_repo=True)
512
self.assertNotEqual(dir.transport.base, target.transport.base)
513
repo = target.open_repository()
514
self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
515
['./.bzr/repository',
517
self.assertRepositoryHasSameItems(tree.branch.repository, repo)
519
def test_clone_bzrdir_branch_reference(self):
520
# cloning should preserve the reference status of the branch in a bzrdir
521
referenced_branch = self.make_branch('referencced')
522
dir = self.make_bzrdir('source')
524
reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
525
target_branch=referenced_branch)
526
except errors.IncompatibleFormat:
527
# this is ok too, not all formats have to support references.
529
target = dir.clone(self.get_url('target'))
530
self.assertNotEqual(dir.transport.base, target.transport.base)
531
self.assertDirectoriesEqual(dir.root_transport, target.root_transport)
533
def test_clone_bzrdir_branch_revision(self):
534
# test for revision limiting, [smoke test, not corner case checks].
535
# make a branch with some revisions,
536
# and clone it with a revision limit.
538
tree = self.make_branch_and_tree('commit_tree')
539
self.build_tree(['commit_tree/foo'])
541
tree.commit('revision 1', rev_id='1')
542
tree.commit('revision 2', rev_id='2', allow_pointless=True)
543
source = self.make_branch('source')
544
tree.branch.repository.copy_content_into(source.repository)
545
tree.branch.copy_content_into(source)
547
target = dir.clone(self.get_url('target'), revision_id='1')
548
self.assertEqual('1', target.open_branch().last_revision())
550
def test_clone_bzrdir_tree_branch_repo(self):
551
tree = self.make_branch_and_tree('source')
552
self.build_tree(['source/foo'])
554
tree.commit('revision 1')
556
target = dir.clone(self.get_url('target'))
557
self.skipIfNoWorkingTree(target)
558
self.assertNotEqual(dir.transport.base, target.transport.base)
559
self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
560
['./.bzr/stat-cache',
561
'./.bzr/checkout/dirstate',
562
'./.bzr/checkout/stat-cache',
563
'./.bzr/checkout/merge-hashes',
564
'./.bzr/merge-hashes',
567
self.assertRepositoryHasSameItems(tree.branch.repository,
568
target.open_repository())
569
target.open_workingtree().revert()
571
def test_clone_on_transport_preserves_repo_format(self):
572
if self.bzrdir_format == bzrdir.format_registry.make_bzrdir('default'):
576
source_branch = self.make_branch('source', format=format)
577
# Ensure no format data is cached
578
a_dir = bzrlib.branch.Branch.open_from_transport(
579
self.get_transport('source')).bzrdir
580
target_transport = self.get_transport('target')
581
target_bzrdir = a_dir.clone_on_transport(target_transport)
582
target_repo = target_bzrdir.open_repository()
583
source_branch = bzrlib.branch.Branch.open(
584
self.get_vfs_only_url('source'))
585
if isinstance(target_repo, RemoteRepository):
586
target_repo._ensure_real()
587
target_repo = target_repo._real_repository
588
self.assertEqual(target_repo._format, source_branch.repository._format)
590
def test_revert_inventory(self):
591
tree = self.make_branch_and_tree('source')
592
self.build_tree(['source/foo'])
594
tree.commit('revision 1')
596
target = dir.clone(self.get_url('target'))
597
self.skipIfNoWorkingTree(target)
598
self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
599
['./.bzr/stat-cache',
600
'./.bzr/checkout/dirstate',
601
'./.bzr/checkout/stat-cache',
602
'./.bzr/checkout/merge-hashes',
603
'./.bzr/merge-hashes',
606
self.assertRepositoryHasSameItems(tree.branch.repository,
607
target.open_repository())
609
target.open_workingtree().revert()
610
self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
611
['./.bzr/stat-cache',
612
'./.bzr/checkout/dirstate',
613
'./.bzr/checkout/stat-cache',
614
'./.bzr/checkout/merge-hashes',
615
'./.bzr/merge-hashes',
618
self.assertRepositoryHasSameItems(tree.branch.repository,
619
target.open_repository())
621
def test_clone_bzrdir_tree_branch_reference(self):
622
# a tree with a branch reference (aka a checkout)
623
# should stay a checkout on clone.
624
referenced_branch = self.make_branch('referencced')
625
dir = self.make_bzrdir('source')
627
reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
628
target_branch=referenced_branch)
629
except errors.IncompatibleFormat:
630
# this is ok too, not all formats have to support references.
632
self.createWorkingTreeOrSkip(dir)
633
target = dir.clone(self.get_url('target'))
634
self.skipIfNoWorkingTree(target)
635
self.assertNotEqual(dir.transport.base, target.transport.base)
636
self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
637
['./.bzr/stat-cache',
638
'./.bzr/checkout/stat-cache',
639
'./.bzr/checkout/merge-hashes',
640
'./.bzr/merge-hashes',
641
'./.bzr/repository/inventory.knit',
644
def test_clone_bzrdir_tree_revision(self):
645
# test for revision limiting, [smoke test, not corner case checks].
646
# make a tree with a revision with a last-revision
647
# and clone it with a revision limit.
648
# This smoke test just checks the revision-id is right. Tree specific
649
# tests will check corner cases.
650
tree = self.make_branch_and_tree('source')
651
self.build_tree(['source/foo'])
653
tree.commit('revision 1', rev_id='1')
654
tree.commit('revision 2', rev_id='2', allow_pointless=True)
656
target = dir.clone(self.get_url('target'), revision_id='1')
657
self.skipIfNoWorkingTree(target)
658
self.assertEqual(['1'], target.open_workingtree().get_parent_ids())
660
def test_clone_bzrdir_into_notrees_repo(self):
661
"""Cloning into a no-trees repo should not create a working tree"""
662
tree = self.make_branch_and_tree('source')
663
self.build_tree(['source/foo'])
665
tree.commit('revision 1')
668
repo = self.make_repository('repo', shared=True)
669
except errors.IncompatibleFormat:
670
raise TestNotApplicable('must support shared repositories')
671
if repo.make_working_trees():
672
repo.set_make_working_trees(False)
673
self.assertFalse(repo.make_working_trees())
676
a_dir = dir.clone(self.get_url('repo/a'))
678
self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
680
def test_clone_respects_stacked(self):
681
branch = self.make_branch('parent')
682
child_transport = self.get_transport('child')
683
child = branch.bzrdir.clone_on_transport(child_transport,
684
stacked_on=branch.base)
685
self.assertEqual(child.open_branch().get_stacked_on_url(), branch.base)
687
def test_get_branch_reference_on_reference(self):
688
"""get_branch_reference should return the right url."""
689
referenced_branch = self.make_branch('referenced')
690
dir = self.make_bzrdir('source')
692
reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
693
target_branch=referenced_branch)
694
except errors.IncompatibleFormat:
695
# this is ok too, not all formats have to support references.
697
self.assertEqual(referenced_branch.bzrdir.root_transport.abspath('') + '/',
698
dir.get_branch_reference())
700
def test_get_branch_reference_on_non_reference(self):
701
"""get_branch_reference should return None for non-reference branches."""
702
branch = self.make_branch('referenced')
703
self.assertEqual(None, branch.bzrdir.get_branch_reference())
705
def test_get_branch_reference_no_branch(self):
706
"""get_branch_reference should not mask NotBranchErrors."""
707
dir = self.make_bzrdir('source')
709
# this format does not support branchless bzrdirs.
711
self.assertRaises(errors.NotBranchError, dir.get_branch_reference)
713
def test_sprout_bzrdir_empty(self):
714
dir = self.make_bzrdir('source')
715
target = dir.sprout(self.get_url('target'))
716
self.assertNotEqual(dir.transport.base, target.transport.base)
717
# creates a new repository branch and tree
718
target.open_repository()
720
self.openWorkingTreeIfLocal(target)
722
def test_sprout_bzrdir_empty_under_shared_repo(self):
723
# sprouting an empty dir into a repo uses the repo
724
dir = self.make_bzrdir('source')
726
self.make_repository('target', shared=True)
727
except errors.IncompatibleFormat:
729
target = dir.sprout(self.get_url('target/child'))
730
self.assertRaises(errors.NoRepositoryPresent, target.open_repository)
733
target.open_workingtree()
734
except errors.NoWorkingTree:
735
# bzrdir's that never have working trees are allowed to pass;
736
# whitelist them for now.
737
self.assertIsInstance(target, RemoteBzrDir)
739
def test_sprout_bzrdir_empty_under_shared_repo_force_new(self):
740
# the force_new_repo parameter should force use of a new repo in an empty
741
# bzrdir's sprout logic
742
dir = self.make_bzrdir('source')
744
self.make_repository('target', shared=True)
745
except errors.IncompatibleFormat:
747
target = dir.sprout(self.get_url('target/child'), force_new_repo=True)
748
target.open_repository()
750
self.openWorkingTreeIfLocal(target)
752
def test_sprout_bzrdir_repository(self):
753
tree = self.make_branch_and_tree('commit_tree')
754
self.build_tree(['foo'], transport=tree.bzrdir.transport.clone('..'))
756
tree.commit('revision 1', rev_id='1')
757
dir = self.make_bzrdir('source')
758
repo = dir.create_repository()
759
repo.fetch(tree.branch.repository)
760
self.assertTrue(repo.has_revision('1'))
763
_mod_revision.is_null(_mod_revision.ensure_null(
764
dir.open_branch().last_revision())))
765
except errors.NotBranchError:
767
target = dir.sprout(self.get_url('target'))
768
self.assertNotEqual(dir.transport.base, target.transport.base)
769
# testing inventory isn't reasonable for repositories
770
self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
776
'./.bzr/repository/inventory.knit',
779
local_inventory = dir.transport.local_abspath('inventory')
780
except errors.NotLocalUrl:
783
# If we happen to have a tree, we'll guarantee everything
784
# except for the tree root is the same.
785
inventory_f = file(local_inventory, 'rb')
786
self.addCleanup(inventory_f.close)
787
self.assertContainsRe(inventory_f.read(),
788
'<inventory format="5">\n</inventory>\n')
790
if e.errno != errno.ENOENT:
793
def test_sprout_bzrdir_with_repository_to_shared(self):
794
tree = self.make_branch_and_tree('commit_tree')
795
self.build_tree(['commit_tree/foo'])
797
tree.commit('revision 1', rev_id='1')
798
tree.bzrdir.open_branch().set_revision_history([])
799
tree.set_parent_trees([])
800
tree.commit('revision 2', rev_id='2')
801
source = self.make_repository('source')
802
tree.branch.repository.copy_content_into(source)
805
shared_repo = self.make_repository('target', shared=True)
806
except errors.IncompatibleFormat:
808
target = dir.sprout(self.get_url('target/child'))
809
self.assertNotEqual(dir.transport.base, target.transport.base)
810
self.assertTrue(shared_repo.has_revision('1'))
812
def test_sprout_bzrdir_repository_branch_both_under_shared(self):
814
shared_repo = self.make_repository('shared', shared=True)
815
except errors.IncompatibleFormat:
817
tree = self.make_branch_and_tree('commit_tree')
818
self.build_tree(['commit_tree/foo'])
820
tree.commit('revision 1', rev_id='1')
821
tree.bzrdir.open_branch().set_revision_history([])
822
tree.set_parent_trees([])
823
tree.commit('revision 2', rev_id='2')
824
tree.branch.repository.copy_content_into(shared_repo)
825
dir = self.make_bzrdir('shared/source')
827
target = dir.sprout(self.get_url('shared/target'))
828
self.assertNotEqual(dir.transport.base, target.transport.base)
829
self.assertNotEqual(dir.transport.base, shared_repo.bzrdir.transport.base)
830
self.assertTrue(shared_repo.has_revision('1'))
832
def test_sprout_bzrdir_repository_branch_only_source_under_shared(self):
834
shared_repo = self.make_repository('shared', shared=True)
835
except errors.IncompatibleFormat:
837
tree = self.make_branch_and_tree('commit_tree')
838
self.build_tree(['commit_tree/foo'])
840
tree.commit('revision 1', rev_id='1')
841
tree.bzrdir.open_branch().set_revision_history([])
842
tree.set_parent_trees([])
843
tree.commit('revision 2', rev_id='2')
844
tree.branch.repository.copy_content_into(shared_repo)
845
if shared_repo.make_working_trees():
846
shared_repo.set_make_working_trees(False)
847
self.assertFalse(shared_repo.make_working_trees())
848
self.assertTrue(shared_repo.has_revision('1'))
849
dir = self.make_bzrdir('shared/source')
851
target = dir.sprout(self.get_url('target'))
852
self.assertNotEqual(dir.transport.base, target.transport.base)
853
self.assertNotEqual(dir.transport.base, shared_repo.bzrdir.transport.base)
854
branch = target.open_branch()
855
self.assertTrue(branch.repository.has_revision('1'))
856
if not isinstance(branch.bzrdir, RemoteBzrDir):
857
self.assertTrue(branch.repository.make_working_trees())
858
self.assertFalse(branch.repository.is_shared())
860
def test_sprout_bzrdir_repository_under_shared_force_new_repo(self):
861
tree = self.make_branch_and_tree('commit_tree')
862
self.build_tree(['commit_tree/foo'])
864
tree.commit('revision 1', rev_id='1')
865
tree.bzrdir.open_branch().set_revision_history([])
866
tree.set_parent_trees([])
867
tree.commit('revision 2', rev_id='2')
868
source = self.make_repository('source')
869
tree.branch.repository.copy_content_into(source)
872
shared_repo = self.make_repository('target', shared=True)
873
except errors.IncompatibleFormat:
875
target = dir.sprout(self.get_url('target/child'), force_new_repo=True)
876
self.assertNotEqual(dir.transport.base, target.transport.base)
877
self.assertFalse(shared_repo.has_revision('1'))
879
def test_sprout_bzrdir_repository_revision(self):
880
# test for revision limiting, [smoke test, not corner case checks].
881
# make a repository with some revisions,
882
# and sprout it with a revision limit.
884
tree = self.make_branch_and_tree('commit_tree')
885
self.build_tree(['commit_tree/foo'])
887
tree.commit('revision 1', rev_id='1')
888
tree.bzrdir.open_branch().set_revision_history([])
889
tree.set_parent_trees([])
890
tree.commit('revision 2', rev_id='2')
891
source = self.make_repository('source')
892
tree.branch.repository.copy_content_into(source)
894
target = self.sproutOrSkip(dir, self.get_url('target'), revision_id='2')
895
raise TestSkipped('revision limiting not strict yet')
897
def test_sprout_bzrdir_branch_and_repo(self):
898
tree = self.make_branch_and_tree('commit_tree')
899
self.build_tree(['commit_tree/foo'])
901
tree.commit('revision 1')
902
source = self.make_branch('source')
903
tree.branch.repository.copy_content_into(source.repository)
904
tree.bzrdir.open_branch().copy_content_into(source)
906
target = dir.sprout(self.get_url('target'))
907
self.assertNotEqual(dir.transport.base, target.transport.base)
908
target_repo = target.open_repository()
909
self.assertRepositoryHasSameItems(source.repository, target_repo)
910
self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
912
'./.bzr/basis-inventory-cache',
913
'./.bzr/branch/branch.conf',
914
'./.bzr/branch/parent',
916
'./.bzr/checkout/inventory',
917
'./.bzr/checkout/stat-cache',
925
def test_sprout_bzrdir_branch_and_repo_shared(self):
926
# sprouting a branch with a repo into a shared repo uses the shared
928
tree = self.make_branch_and_tree('commit_tree')
929
self.build_tree(['commit_tree/foo'])
931
tree.commit('revision 1', rev_id='1')
932
source = self.make_branch('source')
933
tree.branch.repository.copy_content_into(source.repository)
934
tree.bzrdir.open_branch().copy_content_into(source)
937
shared_repo = self.make_repository('target', shared=True)
938
except errors.IncompatibleFormat:
940
target = dir.sprout(self.get_url('target/child'))
941
self.assertTrue(shared_repo.has_revision('1'))
943
def test_sprout_bzrdir_branch_and_repo_shared_force_new_repo(self):
944
# sprouting a branch with a repo into a shared repo uses the shared
946
tree = self.make_branch_and_tree('commit_tree')
947
self.build_tree(['commit_tree/foo'])
949
tree.commit('revision 1', rev_id='1')
950
source = self.make_branch('source')
951
tree.branch.repository.copy_content_into(source.repository)
952
tree.bzrdir.open_branch().copy_content_into(source)
955
shared_repo = self.make_repository('target', shared=True)
956
except errors.IncompatibleFormat:
958
target = dir.sprout(self.get_url('target/child'), force_new_repo=True)
959
self.assertNotEqual(dir.transport.base, target.transport.base)
960
self.assertFalse(shared_repo.has_revision('1'))
962
def test_sprout_bzrdir_branch_reference(self):
963
# sprouting should create a repository if needed and a sprouted branch.
964
referenced_branch = self.make_branch('referenced')
965
dir = self.make_bzrdir('source')
967
reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
968
target_branch=referenced_branch)
969
except errors.IncompatibleFormat:
970
# this is ok too, not all formats have to support references.
972
self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
973
target = dir.sprout(self.get_url('target'))
974
self.assertNotEqual(dir.transport.base, target.transport.base)
975
# we want target to have a branch that is in-place.
976
self.assertEqual(target, target.open_branch().bzrdir)
977
# and as we dont support repositories being detached yet, a repo in
979
target.open_repository()
981
def test_sprout_bzrdir_branch_reference_shared(self):
982
# sprouting should create a repository if needed and a sprouted branch.
983
referenced_tree = self.make_branch_and_tree('referenced')
984
referenced_tree.commit('1', rev_id='1', allow_pointless=True)
985
dir = self.make_bzrdir('source')
987
reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
988
target_branch=referenced_tree.branch)
989
except errors.IncompatibleFormat:
990
# this is ok too, not all formats have to support references.
992
self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
994
shared_repo = self.make_repository('target', shared=True)
995
except errors.IncompatibleFormat:
997
target = dir.sprout(self.get_url('target/child'))
998
self.assertNotEqual(dir.transport.base, target.transport.base)
999
# we want target to have a branch that is in-place.
1000
self.assertEqual(target, target.open_branch().bzrdir)
1001
# and we want no repository as the target is shared
1002
self.assertRaises(errors.NoRepositoryPresent,
1003
target.open_repository)
1004
# and we want revision '1' in the shared repo
1005
self.assertTrue(shared_repo.has_revision('1'))
1007
def test_sprout_bzrdir_branch_reference_shared_force_new_repo(self):
1008
# sprouting should create a repository if needed and a sprouted branch.
1009
referenced_tree = self.make_branch_and_tree('referenced')
1010
referenced_tree.commit('1', rev_id='1', allow_pointless=True)
1011
dir = self.make_bzrdir('source')
1013
reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
1014
target_branch=referenced_tree.branch)
1015
except errors.IncompatibleFormat:
1016
# this is ok too, not all formats have to support references.
1018
self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
1020
shared_repo = self.make_repository('target', shared=True)
1021
except errors.IncompatibleFormat:
1023
target = dir.sprout(self.get_url('target/child'), force_new_repo=True)
1024
self.assertNotEqual(dir.transport.base, target.transport.base)
1025
# we want target to have a branch that is in-place.
1026
self.assertEqual(target, target.open_branch().bzrdir)
1027
# and we want revision '1' in the new repo
1028
self.assertTrue(target.open_repository().has_revision('1'))
1029
# but not the shared one
1030
self.assertFalse(shared_repo.has_revision('1'))
1032
def test_sprout_bzrdir_branch_revision(self):
1033
# test for revision limiting, [smoke test, not corner case checks].
1034
# make a repository with some revisions,
1035
# and sprout it with a revision limit.
1037
tree = self.make_branch_and_tree('commit_tree')
1038
self.build_tree(['commit_tree/foo'])
1040
tree.commit('revision 1', rev_id='1')
1041
tree.commit('revision 2', rev_id='2', allow_pointless=True)
1042
source = self.make_branch('source')
1043
tree.branch.repository.copy_content_into(source.repository)
1044
tree.bzrdir.open_branch().copy_content_into(source)
1046
target = dir.sprout(self.get_url('target'), revision_id='1')
1047
self.assertEqual('1', target.open_branch().last_revision())
1049
def test_sprout_bzrdir_tree_branch_repo(self):
1050
tree = self.make_branch_and_tree('source')
1051
self.build_tree(['foo'], transport=tree.bzrdir.transport.clone('..'))
1053
tree.commit('revision 1')
1055
target = self.sproutOrSkip(dir, self.get_url('target'))
1056
self.assertNotEqual(dir.transport.base, target.transport.base)
1057
self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
1059
'./.bzr/branch/branch.conf',
1060
'./.bzr/branch/parent',
1061
'./.bzr/checkout/dirstate',
1062
'./.bzr/checkout/stat-cache',
1063
'./.bzr/checkout/inventory',
1066
'./.bzr/repository',
1067
'./.bzr/stat-cache',
1069
self.assertRepositoryHasSameItems(
1070
tree.branch.repository, target.open_repository())
1072
def test_sprout_bzrdir_tree_branch_reference(self):
1073
# sprouting should create a repository if needed and a sprouted branch.
1074
# the tree state should not be copied.
1075
referenced_branch = self.make_branch('referencced')
1076
dir = self.make_bzrdir('source')
1078
reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
1079
target_branch=referenced_branch)
1080
except errors.IncompatibleFormat:
1081
# this is ok too, not all formats have to support references.
1083
self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
1084
tree = self.createWorkingTreeOrSkip(dir)
1085
self.build_tree(['source/subdir/'])
1087
target = dir.sprout(self.get_url('target'))
1088
self.assertNotEqual(dir.transport.base, target.transport.base)
1089
# we want target to have a branch that is in-place.
1090
self.assertEqual(target, target.open_branch().bzrdir)
1091
# and as we dont support repositories being detached yet, a repo in
1093
target.open_repository()
1094
result_tree = target.open_workingtree()
1095
self.assertFalse(result_tree.has_filename('subdir'))
1097
def test_sprout_bzrdir_tree_branch_reference_revision(self):
1098
# sprouting should create a repository if needed and a sprouted branch.
1099
# the tree state should not be copied but the revision changed,
1100
# and the likewise the new branch should be truncated too
1101
referenced_branch = self.make_branch('referencced')
1102
dir = self.make_bzrdir('source')
1104
reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
1105
target_branch=referenced_branch)
1106
except errors.IncompatibleFormat:
1107
# this is ok too, not all formats have to support references.
1109
self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
1110
tree = self.createWorkingTreeOrSkip(dir)
1111
self.build_tree(['source/foo'])
1113
tree.commit('revision 1', rev_id='1')
1114
tree.commit('revision 2', rev_id='2', allow_pointless=True)
1115
target = dir.sprout(self.get_url('target'), revision_id='1')
1116
self.skipIfNoWorkingTree(target)
1117
self.assertNotEqual(dir.transport.base, target.transport.base)
1118
# we want target to have a branch that is in-place.
1119
self.assertEqual(target, target.open_branch().bzrdir)
1120
# and as we dont support repositories being detached yet, a repo in
1122
target.open_repository()
1123
# we trust that the working tree sprouting works via the other tests.
1124
self.assertEqual(['1'], target.open_workingtree().get_parent_ids())
1125
self.assertEqual('1', target.open_branch().last_revision())
1127
def test_sprout_bzrdir_tree_revision(self):
1128
# test for revision limiting, [smoke test, not corner case checks].
1129
# make a tree with a revision with a last-revision
1130
# and sprout it with a revision limit.
1131
# This smoke test just checks the revision-id is right. Tree specific
1132
# tests will check corner cases.
1133
tree = self.make_branch_and_tree('source')
1134
self.build_tree(['source/foo'])
1136
tree.commit('revision 1', rev_id='1')
1137
tree.commit('revision 2', rev_id='2', allow_pointless=True)
1139
target = self.sproutOrSkip(dir, self.get_url('target'), revision_id='1')
1140
self.assertEqual(['1'], target.open_workingtree().get_parent_ids())
1142
def test_sprout_takes_accelerator(self):
1143
tree = self.make_branch_and_tree('source')
1144
self.build_tree(['source/foo'])
1146
tree.commit('revision 1', rev_id='1')
1147
tree.commit('revision 2', rev_id='2', allow_pointless=True)
1149
target = self.sproutOrSkip(dir, self.get_url('target'),
1150
accelerator_tree=tree)
1151
self.assertEqual(['2'], target.open_workingtree().get_parent_ids())
1153
def test_sprout_branch_no_tree(self):
1154
tree = self.make_branch_and_tree('source')
1155
self.build_tree(['source/foo'])
1157
tree.commit('revision 1', rev_id='1')
1158
tree.commit('revision 2', rev_id='2', allow_pointless=True)
1160
if isinstance(dir, (bzrdir.BzrDirPreSplitOut,)):
1161
self.assertRaises(errors.MustHaveWorkingTree, dir.sprout,
1162
self.get_url('target'),
1163
create_tree_if_local=False)
1165
target = dir.sprout(self.get_url('target'), create_tree_if_local=False)
1166
self.failIfExists('target/foo')
1167
self.assertEqual(tree.branch.last_revision(),
1168
target.open_branch().last_revision())
1170
def test_sprout_with_revision_id_uses_default_stack_on(self):
1171
# Make a branch with three commits to stack on.
1172
builder = self.make_branch_builder('stack-on')
1173
builder.start_series()
1174
builder.build_commit(message='Rev 1.', rev_id='rev-1')
1175
builder.build_commit(message='Rev 2.', rev_id='rev-2')
1176
builder.build_commit(message='Rev 3.', rev_id='rev-3')
1177
builder.finish_series()
1178
stack_on = builder.get_branch()
1179
# Make a bzrdir with a default stacking policy to stack on that branch.
1180
config = self.make_bzrdir('policy-dir').get_config()
1182
config.set_default_stack_on(self.get_url('stack-on'))
1183
except errors.BzrError:
1184
raise TestNotApplicable('Only relevant for stackable formats.')
1185
# Sprout the stacked-on branch into the bzrdir.
1186
sprouted = stack_on.bzrdir.sprout(
1187
self.get_url('policy-dir/sprouted'), revision_id='rev-3')
1188
# Not all revisions are copied into the sprouted repository.
1189
repo = sprouted.open_repository()
1190
self.addCleanup(repo.lock_read().unlock)
1191
self.assertEqual(None, repo.get_parent_map(['rev-1']).get('rev-1'))
1193
def test_format_initialize_find_open(self):
1194
# loopback test to check the current format initializes to itself.
1195
if not self.bzrdir_format.is_supported():
1196
# unsupported formats are not loopback testable
1197
# because the default open will not open them and
1198
# they may not be initializable.
1200
# for remote formats, there must be no prior assumption about the
1201
# network name to use - it's possible that this may somehow have got
1202
# in through an unisolated test though - see
1203
# <https://bugs.launchpad.net/bzr/+bug/504102>
1204
self.assertEquals(getattr(self.bzrdir_format,
1205
'_network_name', None),
1207
# supported formats must be able to init and open
1208
t = transport.get_transport(self.get_url())
1209
readonly_t = transport.get_transport(self.get_readonly_url())
1210
made_control = self.bzrdir_format.initialize(t.base)
1211
self.failUnless(isinstance(made_control, bzrdir.BzrDir))
1212
self.assertEqual(self.bzrdir_format,
1213
bzrdir.BzrDirFormat.find_format(readonly_t))
1214
direct_opened_dir = self.bzrdir_format.open(readonly_t)
1215
opened_dir = bzrdir.BzrDir.open(t.base)
1216
self.assertEqual(made_control._format,
1218
self.assertEqual(direct_opened_dir._format,
1220
self.failUnless(isinstance(opened_dir, bzrdir.BzrDir))
1222
def test_format_initialize_on_transport_ex(self):
1223
t = self.get_transport('dir')
1224
self.assertInitializeEx(t)
1226
def test_format_initialize_on_transport_ex_use_existing_dir_True(self):
1227
t = self.get_transport('dir')
1229
self.assertInitializeEx(t, use_existing_dir=True)
1231
def test_format_initialize_on_transport_ex_use_existing_dir_False(self):
1232
if not self.bzrdir_format.is_supported():
1233
# Not initializable - not a failure either.
1235
t = self.get_transport('dir')
1237
self.assertRaises(errors.FileExists,
1238
self.bzrdir_format.initialize_on_transport_ex, t,
1239
use_existing_dir=False)
1241
def test_format_initialize_on_transport_ex_create_prefix_True(self):
1242
t = self.get_transport('missing/dir')
1243
self.assertInitializeEx(t, create_prefix=True)
1245
def test_format_initialize_on_transport_ex_create_prefix_False(self):
1246
if not self.bzrdir_format.is_supported():
1247
# Not initializable - not a failure either.
1249
t = self.get_transport('missing/dir')
1250
self.assertRaises(errors.NoSuchFile, self.assertInitializeEx, t,
1251
create_prefix=False)
1253
def test_format_initialize_on_transport_ex_force_new_repo_True(self):
1254
t = self.get_transport('repo')
1255
repo_fmt = bzrdir.format_registry.make_bzrdir('1.9')
1256
repo_name = repo_fmt.repository_format.network_name()
1257
repo = repo_fmt.initialize_on_transport_ex(t,
1258
repo_format_name=repo_name, shared_repo=True)[0]
1259
made_repo, control = self.assertInitializeEx(t.clone('branch'),
1260
force_new_repo=True, repo_format_name=repo_name)
1262
# uninitialisable format
1264
self.assertNotEqual(repo.bzrdir.root_transport.base,
1265
made_repo.bzrdir.root_transport.base)
1267
def test_format_initialize_on_transport_ex_force_new_repo_False(self):
1268
t = self.get_transport('repo')
1269
repo_fmt = bzrdir.format_registry.make_bzrdir('1.9')
1270
repo_name = repo_fmt.repository_format.network_name()
1271
repo = repo_fmt.initialize_on_transport_ex(t,
1272
repo_format_name=repo_name, shared_repo=True)[0]
1273
made_repo, control = self.assertInitializeEx(t.clone('branch'),
1274
force_new_repo=False, repo_format_name=repo_name)
1276
# uninitialisable format
1278
if not isinstance(control._format, (bzrdir.BzrDirFormat5,
1279
bzrdir.BzrDirFormat6,)):
1280
self.assertEqual(repo.bzrdir.root_transport.base,
1281
made_repo.bzrdir.root_transport.base)
1283
def test_format_initialize_on_transport_ex_stacked_on(self):
1284
# trunk is a stackable format. Note that its in the same server area
1285
# which is what launchpad does, but not sufficient to exercise the
1287
trunk = self.make_branch('trunk', format='1.9')
1288
t = self.get_transport('stacked')
1289
old_fmt = bzrdir.format_registry.make_bzrdir('pack-0.92')
1290
repo_name = old_fmt.repository_format.network_name()
1291
# Should end up with a 1.9 format (stackable)
1292
repo, control = self.assertInitializeEx(t, need_meta=True,
1293
repo_format_name=repo_name, stacked_on='../trunk', stack_on_pwd=t.base)
1295
# uninitialisable format
1297
self.assertLength(1, repo._fallback_repositories)
1299
def test_format_initialize_on_transport_ex_default_stack_on(self):
1300
# When initialize_on_transport_ex uses a stacked-on branch because of
1301
# a stacking policy on the target, the location of the fallback
1302
# repository is the same as the external location of the stacked-on
1304
balloon = self.make_bzrdir('balloon')
1305
if isinstance(balloon._format, bzrdir.BzrDirMetaFormat1):
1306
stack_on = self.make_branch('stack-on', format='1.9')
1308
stack_on = self.make_branch('stack-on')
1309
config = self.make_bzrdir('.').get_config()
1311
config.set_default_stack_on('stack-on')
1312
except errors.BzrError:
1313
raise TestNotApplicable('Only relevant for stackable formats.')
1314
# Initialize a bzrdir subject to the policy.
1315
t = self.get_transport('stacked')
1316
repo_fmt = bzrdir.format_registry.make_bzrdir('1.9')
1317
repo_name = repo_fmt.repository_format.network_name()
1318
repo, control = self.assertInitializeEx(
1319
t, need_meta=True, repo_format_name=repo_name, stacked_on=None)
1320
# self.addCleanup(repo.unlock)
1322
# uninitialisable format
1324
# There's one fallback repo, with a public location.
1325
self.assertLength(1, repo._fallback_repositories)
1326
fallback_repo = repo._fallback_repositories[0]
1328
stack_on.base, fallback_repo.bzrdir.root_transport.base)
1329
# The bzrdir creates a branch in stacking-capable format.
1330
new_branch = control.create_branch()
1331
self.assertTrue(new_branch._format.supports_stacking())
1333
def test_format_initialize_on_transport_ex_repo_fmt_name_None(self):
1334
t = self.get_transport('dir')
1335
repo, control = self.assertInitializeEx(t)
1336
self.assertEqual(None, repo)
1338
def test_format_initialize_on_transport_ex_repo_fmt_name_followed(self):
1339
t = self.get_transport('dir')
1340
# 1.6 is likely to never be default
1341
fmt = bzrdir.format_registry.make_bzrdir('1.6')
1342
repo_name = fmt.repository_format.network_name()
1343
repo, control = self.assertInitializeEx(t, repo_format_name=repo_name)
1345
# uninitialisable format
1347
if isinstance(self.bzrdir_format, (bzrdir.BzrDirFormat5,
1348
bzrdir.BzrDirFormat6)):
1349
# must stay with the all-in-one-format.
1350
repo_name = self.bzrdir_format.network_name()
1351
self.assertEqual(repo_name, repo._format.network_name())
1353
def assertInitializeEx(self, t, need_meta=False, **kwargs):
1354
"""Execute initialize_on_transport_ex and check it succeeded correctly.
1356
This involves checking that the disk objects were created, open with
1357
the same format returned, and had the expected disk format.
1359
:param t: The transport to initialize on.
1360
:param **kwargs: Additional arguments to pass to
1361
initialize_on_transport_ex.
1362
:return: the resulting repo, control dir tuple.
1364
if not self.bzrdir_format.is_supported():
1365
# Not initializable - not a failure either.
1367
repo, control, require_stacking, repo_policy = \
1368
self.bzrdir_format.initialize_on_transport_ex(t, **kwargs)
1369
if repo is not None:
1370
# Repositories are open write-locked
1371
self.assertTrue(repo.is_write_locked())
1372
self.addCleanup(repo.unlock)
1373
self.assertIsInstance(control, bzrdir.BzrDir)
1374
opened = bzrdir.BzrDir.open(t.base)
1375
expected_format = self.bzrdir_format
1376
if isinstance(expected_format, bzrdir.RemoteBzrDirFormat):
1377
# Current RemoteBzrDirFormat's do not reliably get network_name
1378
# set, so we skip a number of tests for RemoteBzrDirFormat's.
1379
self.assertIsInstance(control, RemoteBzrDir)
1381
if need_meta and isinstance(expected_format, (bzrdir.BzrDirFormat5,
1382
bzrdir.BzrDirFormat6)):
1383
# Pre-metadir formats change when we are making something that
1384
# needs a metaformat, because clone is used for push.
1385
expected_format = bzrdir.BzrDirMetaFormat1()
1386
self.assertEqual(control._format.network_name(),
1387
expected_format.network_name())
1388
self.assertEqual(control._format.network_name(),
1389
opened._format.network_name())
1390
self.assertEqual(control.__class__, opened.__class__)
1391
return repo, control
1393
def test_format_network_name(self):
1394
# All control formats must have a network name.
1395
dir = self.make_bzrdir('.')
1396
format = dir._format
1397
# We want to test that the network_name matches the actual format on
1398
# disk. For local control dirsthat means that using network_name as a
1399
# key in the registry gives back the same format. For remote obects
1400
# we check that the network_name of the RemoteBzrDirFormat we have
1401
# locally matches the actual format present on disk.
1402
if isinstance(format, bzrdir.RemoteBzrDirFormat):
1404
real_dir = dir._real_bzrdir
1405
network_name = format.network_name()
1406
self.assertEqual(real_dir._format.network_name(), network_name)
1408
registry = bzrdir.network_format_registry
1409
network_name = format.network_name()
1410
looked_up_format = registry.get(network_name)
1411
self.assertEqual(format.__class__, looked_up_format.__class__)
1412
# The network name must be a byte string.
1413
self.assertIsInstance(network_name, str)
1415
def test_open_not_bzrdir(self):
1416
# test the formats specific behaviour for no-content or similar dirs.
1417
self.assertRaises(NotBranchError,
1418
self.bzrdir_format.open,
1419
transport.get_transport(self.get_readonly_url()))
1421
def test_create_branch(self):
1422
# a bzrdir can construct a branch and repository for itself.
1423
if not self.bzrdir_format.is_supported():
1424
# unsupported formats are not loopback testable
1425
# because the default open will not open them and
1426
# they may not be initializable.
1428
t = transport.get_transport(self.get_url())
1429
made_control = self.bzrdir_format.initialize(t.base)
1430
made_repo = made_control.create_repository()
1431
made_branch = made_control.create_branch()
1432
self.failUnless(isinstance(made_branch, bzrlib.branch.Branch))
1433
self.assertEqual(made_control, made_branch.bzrdir)
1435
def test_open_branch(self):
1436
if not self.bzrdir_format.is_supported():
1437
# unsupported formats are not loopback testable
1438
# because the default open will not open them and
1439
# they may not be initializable.
1441
t = transport.get_transport(self.get_url())
1442
made_control = self.bzrdir_format.initialize(t.base)
1443
made_repo = made_control.create_repository()
1444
made_branch = made_control.create_branch()
1445
opened_branch = made_control.open_branch()
1446
self.assertEqual(made_control, opened_branch.bzrdir)
1447
self.failUnless(isinstance(opened_branch, made_branch.__class__))
1448
self.failUnless(isinstance(opened_branch._format, made_branch._format.__class__))
1450
def test_list_branches(self):
1451
if not self.bzrdir_format.is_supported():
1452
# unsupported formats are not loopback testable
1453
# because the default open will not open them and
1454
# they may not be initializable.
1456
t = transport.get_transport(self.get_url())
1457
made_control = self.bzrdir_format.initialize(t.base)
1458
made_repo = made_control.create_repository()
1459
made_branch = made_control.create_branch()
1460
branches = made_control.list_branches()
1461
self.assertEquals(1, len(branches))
1462
self.assertEquals(made_branch.base, branches[0].base)
1464
made_control.destroy_branch()
1465
except errors.UnsupportedOperation:
1466
pass # Not all bzrdirs support destroying directories
1468
self.assertEquals([], made_control.list_branches())
1470
def test_create_repository(self):
1471
# a bzrdir can construct a repository for itself.
1472
if not self.bzrdir_format.is_supported():
1473
# unsupported formats are not loopback testable
1474
# because the default open will not open them and
1475
# they may not be initializable.
1477
t = transport.get_transport(self.get_url())
1478
made_control = self.bzrdir_format.initialize(t.base)
1479
made_repo = made_control.create_repository()
1480
# Check that we have a repository object.
1481
made_repo.has_revision('foo')
1482
self.assertEqual(made_control, made_repo.bzrdir)
1484
def test_create_repository_shared(self):
1485
# a bzrdir can create a shared repository or
1486
# fail appropriately
1487
if not self.bzrdir_format.is_supported():
1488
# unsupported formats are not loopback testable
1489
# because the default open will not open them and
1490
# they may not be initializable.
1492
t = transport.get_transport(self.get_url())
1493
made_control = self.bzrdir_format.initialize(t.base)
1495
made_repo = made_control.create_repository(shared=True)
1496
except errors.IncompatibleFormat:
1497
# Old bzrdir formats don't support shared repositories
1498
# and should raise IncompatibleFormat
1500
self.assertTrue(made_repo.is_shared())
1502
def test_create_repository_nonshared(self):
1503
# a bzrdir can create a non-shared repository
1504
if not self.bzrdir_format.is_supported():
1505
# unsupported formats are not loopback testable
1506
# because the default open will not open them and
1507
# they may not be initializable.
1509
t = transport.get_transport(self.get_url())
1510
made_control = self.bzrdir_format.initialize(t.base)
1511
made_repo = made_control.create_repository(shared=False)
1512
self.assertFalse(made_repo.is_shared())
1514
def test_open_repository(self):
1515
if not self.bzrdir_format.is_supported():
1516
# unsupported formats are not loopback testable
1517
# because the default open will not open them and
1518
# they may not be initializable.
1520
t = transport.get_transport(self.get_url())
1521
made_control = self.bzrdir_format.initialize(t.base)
1522
made_repo = made_control.create_repository()
1523
opened_repo = made_control.open_repository()
1524
self.assertEqual(made_control, opened_repo.bzrdir)
1525
self.failUnless(isinstance(opened_repo, made_repo.__class__))
1526
self.failUnless(isinstance(opened_repo._format, made_repo._format.__class__))
1528
def test_create_workingtree(self):
1529
# a bzrdir can construct a working tree for itself.
1530
if not self.bzrdir_format.is_supported():
1531
# unsupported formats are not loopback testable
1532
# because the default open will not open them and
1533
# they may not be initializable.
1535
t = self.get_transport()
1536
made_control = self.bzrdir_format.initialize(t.base)
1537
made_repo = made_control.create_repository()
1538
made_branch = made_control.create_branch()
1539
made_tree = self.createWorkingTreeOrSkip(made_control)
1540
self.failUnless(isinstance(made_tree, workingtree.WorkingTree))
1541
self.assertEqual(made_control, made_tree.bzrdir)
1543
def test_create_workingtree_revision(self):
1544
# a bzrdir can construct a working tree for itself @ a specific revision.
1545
t = self.get_transport()
1546
source = self.make_branch_and_tree('source')
1547
source.commit('a', rev_id='a', allow_pointless=True)
1548
source.commit('b', rev_id='b', allow_pointless=True)
1550
t_new = t.clone('new')
1551
made_control = self.bzrdir_format.initialize_on_transport(t_new)
1552
source.branch.repository.clone(made_control)
1553
source.branch.clone(made_control)
1555
made_tree = made_control.create_workingtree(revision_id='a')
1556
except errors.NotLocalUrl:
1557
raise TestSkipped("Can't make working tree on transport %r" % t)
1558
self.assertEqual(['a'], made_tree.get_parent_ids())
1560
def test_open_workingtree(self):
1561
if not self.bzrdir_format.is_supported():
1562
# unsupported formats are not loopback testable
1563
# because the default open will not open them and
1564
# they may not be initializable.
1566
# this has to be tested with local access as we still support creating
1568
t = self.get_transport()
1570
made_control = self.bzrdir_format.initialize(t.base)
1571
made_repo = made_control.create_repository()
1572
made_branch = made_control.create_branch()
1573
made_tree = made_control.create_workingtree()
1574
except errors.NotLocalUrl:
1575
raise TestSkipped("Can't initialize %r on transport %r"
1576
% (self.bzrdir_format, t))
1577
opened_tree = made_control.open_workingtree()
1578
self.assertEqual(made_control, opened_tree.bzrdir)
1579
self.failUnless(isinstance(opened_tree, made_tree.__class__))
1580
self.failUnless(isinstance(opened_tree._format, made_tree._format.__class__))
1582
def test_get_branch_transport(self):
1583
dir = self.make_bzrdir('.')
1584
# without a format, get_branch_transport gives use a transport
1585
# which -may- point to an existing dir.
1586
self.assertTrue(isinstance(dir.get_branch_transport(None),
1587
transport.Transport))
1588
# with a given format, either the bzr dir supports identifiable
1589
# branches, or it supports anonymous branch formats, but not both.
1590
anonymous_format = bzrlib.branch.BzrBranchFormat4()
1591
identifiable_format = bzrlib.branch.BzrBranchFormat5()
1593
found_transport = dir.get_branch_transport(anonymous_format)
1594
self.assertRaises(errors.IncompatibleFormat,
1595
dir.get_branch_transport,
1596
identifiable_format)
1597
except errors.IncompatibleFormat:
1598
found_transport = dir.get_branch_transport(identifiable_format)
1599
self.assertTrue(isinstance(found_transport, transport.Transport))
1600
# and the dir which has been initialized for us must exist.
1601
found_transport.list_dir('.')
1603
def test_get_repository_transport(self):
1604
dir = self.make_bzrdir('.')
1605
# without a format, get_repository_transport gives use a transport
1606
# which -may- point to an existing dir.
1607
self.assertTrue(isinstance(dir.get_repository_transport(None),
1608
transport.Transport))
1609
# with a given format, either the bzr dir supports identifiable
1610
# repositories, or it supports anonymous repository formats, but not both.
1611
anonymous_format = weaverepo.RepositoryFormat6()
1612
identifiable_format = weaverepo.RepositoryFormat7()
1614
found_transport = dir.get_repository_transport(anonymous_format)
1615
self.assertRaises(errors.IncompatibleFormat,
1616
dir.get_repository_transport,
1617
identifiable_format)
1618
except errors.IncompatibleFormat:
1619
found_transport = dir.get_repository_transport(identifiable_format)
1620
self.assertTrue(isinstance(found_transport, transport.Transport))
1621
# and the dir which has been initialized for us must exist.
1622
found_transport.list_dir('.')
1624
def test_get_workingtree_transport(self):
1625
dir = self.make_bzrdir('.')
1626
# without a format, get_workingtree_transport gives use a transport
1627
# which -may- point to an existing dir.
1628
self.assertTrue(isinstance(dir.get_workingtree_transport(None),
1629
transport.Transport))
1630
# with a given format, either the bzr dir supports identifiable
1631
# trees, or it supports anonymous tree formats, but not both.
1632
anonymous_format = workingtree.WorkingTreeFormat2()
1633
identifiable_format = workingtree.WorkingTreeFormat3()
1635
found_transport = dir.get_workingtree_transport(anonymous_format)
1636
self.assertRaises(errors.IncompatibleFormat,
1637
dir.get_workingtree_transport,
1638
identifiable_format)
1639
except errors.IncompatibleFormat:
1640
found_transport = dir.get_workingtree_transport(identifiable_format)
1641
self.assertTrue(isinstance(found_transport, transport.Transport))
1642
# and the dir which has been initialized for us must exist.
1643
found_transport.list_dir('.')
1645
def test_root_transport(self):
1646
dir = self.make_bzrdir('.')
1647
self.assertEqual(dir.root_transport.base,
1648
transport.get_transport(self.get_url('.')).base)
1650
def test_find_repository_no_repo_under_standalone_branch(self):
1651
# finding a repo stops at standalone branches even if there is a
1652
# higher repository available.
1654
repo = self.make_repository('.', shared=True)
1655
except errors.IncompatibleFormat:
1656
# need a shared repository to test this.
1658
url = self.get_url('intermediate')
1659
transport.get_transport(self.get_url()).mkdir('intermediate')
1660
transport.get_transport(self.get_url()).mkdir('intermediate/child')
1661
made_control = self.bzrdir_format.initialize(url)
1662
made_control.create_repository()
1663
innermost_control = self.bzrdir_format.initialize(
1664
self.get_url('intermediate/child'))
1666
child_repo = innermost_control.open_repository()
1667
# if there is a repository, then the format cannot ever hit this
1670
except errors.NoRepositoryPresent:
1672
self.assertRaises(errors.NoRepositoryPresent,
1673
innermost_control.find_repository)
1675
def test_find_repository_containing_shared_repository(self):
1676
# find repo inside a shared repo with an empty control dir
1677
# returns the shared repo.
1679
repo = self.make_repository('.', shared=True)
1680
except errors.IncompatibleFormat:
1681
# need a shared repository to test this.
1683
url = self.get_url('childbzrdir')
1684
transport.get_transport(self.get_url()).mkdir('childbzrdir')
1685
made_control = self.bzrdir_format.initialize(url)
1687
child_repo = made_control.open_repository()
1688
# if there is a repository, then the format cannot ever hit this
1691
except errors.NoRepositoryPresent:
1693
found_repo = made_control.find_repository()
1694
self.assertEqual(repo.bzrdir.root_transport.base,
1695
found_repo.bzrdir.root_transport.base)
1697
def test_find_repository_standalone_with_containing_shared_repository(self):
1698
# find repo inside a standalone repo inside a shared repo finds the standalone repo
1700
containing_repo = self.make_repository('.', shared=True)
1701
except errors.IncompatibleFormat:
1702
# need a shared repository to test this.
1704
child_repo = self.make_repository('childrepo')
1705
opened_control = bzrdir.BzrDir.open(self.get_url('childrepo'))
1706
found_repo = opened_control.find_repository()
1707
self.assertEqual(child_repo.bzrdir.root_transport.base,
1708
found_repo.bzrdir.root_transport.base)
1710
def test_find_repository_shared_within_shared_repository(self):
1711
# find repo at a shared repo inside a shared repo finds the inner repo
1713
containing_repo = self.make_repository('.', shared=True)
1714
except errors.IncompatibleFormat:
1715
# need a shared repository to test this.
1717
url = self.get_url('childrepo')
1718
transport.get_transport(self.get_url()).mkdir('childrepo')
1719
child_control = self.bzrdir_format.initialize(url)
1720
child_repo = child_control.create_repository(shared=True)
1721
opened_control = bzrdir.BzrDir.open(self.get_url('childrepo'))
1722
found_repo = opened_control.find_repository()
1723
self.assertEqual(child_repo.bzrdir.root_transport.base,
1724
found_repo.bzrdir.root_transport.base)
1725
self.assertNotEqual(child_repo.bzrdir.root_transport.base,
1726
containing_repo.bzrdir.root_transport.base)
1728
def test_find_repository_with_nested_dirs_works(self):
1729
# find repo inside a bzrdir inside a bzrdir inside a shared repo
1730
# finds the outer shared repo.
1732
repo = self.make_repository('.', shared=True)
1733
except errors.IncompatibleFormat:
1734
# need a shared repository to test this.
1736
url = self.get_url('intermediate')
1737
transport.get_transport(self.get_url()).mkdir('intermediate')
1738
transport.get_transport(self.get_url()).mkdir('intermediate/child')
1739
made_control = self.bzrdir_format.initialize(url)
1741
child_repo = made_control.open_repository()
1742
# if there is a repository, then the format cannot ever hit this
1745
except errors.NoRepositoryPresent:
1747
innermost_control = self.bzrdir_format.initialize(
1748
self.get_url('intermediate/child'))
1750
child_repo = innermost_control.open_repository()
1751
# if there is a repository, then the format cannot ever hit this
1754
except errors.NoRepositoryPresent:
1756
found_repo = innermost_control.find_repository()
1757
self.assertEqual(repo.bzrdir.root_transport.base,
1758
found_repo.bzrdir.root_transport.base)
1760
def test_can_and_needs_format_conversion(self):
1761
# check that we can ask an instance if its upgradable
1762
dir = self.make_bzrdir('.')
1763
if dir.can_convert_format():
1764
# if its default updatable there must be an updater
1765
# (we force the latest known format as downgrades may not be
1767
self.assertTrue(isinstance(dir._format.get_converter(
1768
format=dir._format), bzrdir.Converter))
1769
dir.needs_format_conversion(
1770
bzrdir.BzrDirFormat.get_default_format())
1772
def test_backup_copies_existing(self):
1773
tree = self.make_branch_and_tree('test')
1774
self.build_tree(['test/a'])
1775
tree.add(['a'], ['a-id'])
1776
tree.commit('some data to be copied.')
1777
old_url, new_url = tree.bzrdir.backup_bzrdir()
1778
old_path = urlutils.local_path_from_url(old_url)
1779
new_path = urlutils.local_path_from_url(new_url)
1780
self.failUnlessExists(old_path)
1781
self.failUnlessExists(new_path)
1782
for (((dir_relpath1, _), entries1),
1783
((dir_relpath2, _), entries2)) in izip(
1784
osutils.walkdirs(old_path),
1785
osutils.walkdirs(new_path)):
1786
self.assertEquals(dir_relpath1, dir_relpath2)
1787
for f1, f2 in zip(entries1, entries2):
1788
self.assertEquals(f1[0], f2[0])
1789
self.assertEquals(f1[2], f2[2])
1791
osutils.compare_files(open(f1[4]), open(f2[4]))
1793
def test_upgrade_new_instance(self):
1794
"""Does an available updater work?"""
1795
dir = self.make_bzrdir('.')
1796
# for now, upgrade is not ready for partial bzrdirs.
1797
dir.create_repository()
1799
self.createWorkingTreeOrSkip(dir)
1800
if dir.can_convert_format():
1801
# if its default updatable there must be an updater
1802
# (we force the latest known format as downgrades may not be
1804
pb = ui.ui_factory.nested_progress_bar()
1806
dir._format.get_converter(format=dir._format).convert(dir, pb)
1809
# and it should pass 'check' now.
1810
check.check_dwim(self.get_url('.'), False, True, True)
1812
def test_format_description(self):
1813
dir = self.make_bzrdir('.')
1814
text = dir._format.get_format_description()
1815
self.failUnless(len(text))
1817
def test_retire_bzrdir(self):
1818
bd = self.make_bzrdir('.')
1819
transport = bd.root_transport
1820
# must not overwrite existing directories
1821
self.build_tree(['.bzr.retired.0/', '.bzr.retired.0/junk',],
1822
transport=transport)
1823
self.failUnless(transport.has('.bzr'))
1825
self.failIf(transport.has('.bzr'))
1826
self.failUnless(transport.has('.bzr.retired.1'))
1828
def test_retire_bzrdir_limited(self):
1829
bd = self.make_bzrdir('.')
1830
transport = bd.root_transport
1831
# must not overwrite existing directories
1832
self.build_tree(['.bzr.retired.0/', '.bzr.retired.0/junk',],
1833
transport=transport)
1834
self.failUnless(transport.has('.bzr'))
1835
self.assertRaises((errors.FileExists, errors.DirectoryNotEmpty),
1836
bd.retire_bzrdir, limit=0)
1839
class TestBreakLock(TestCaseWithBzrDir):
1841
def test_break_lock_empty(self):
1842
# break lock on an empty bzrdir should work silently.
1843
dir = self.make_bzrdir('.')
1846
except NotImplementedError:
1849
def test_break_lock_repository(self):
1850
# break lock with just a repo should unlock the repo.
1851
repo = self.make_repository('.')
1853
lock_repo = repo.bzrdir.open_repository()
1854
if not lock_repo.get_physical_lock_status():
1855
# This bzrdir's default repository does not physically lock things
1856
# and thus this interaction cannot be tested at the interface
1860
# only one yes needed here: it should only be unlocking
1862
bzrlib.ui.ui_factory = CannedInputUIFactory([True])
1864
repo.bzrdir.break_lock()
1865
except NotImplementedError:
1866
# this bzrdir does not implement break_lock - so we cant test it.
1869
lock_repo.lock_write()
1871
self.assertRaises(errors.LockBroken, repo.unlock)
1873
def test_break_lock_branch(self):
1874
# break lock with just a repo should unlock the branch.
1875
# and not directly try the repository.
1876
# we test this by making a branch reference to a branch
1877
# and repository in another bzrdir
1878
# for pre-metadir formats this will fail, thats ok.
1879
master = self.make_branch('branch')
1880
thisdir = self.make_bzrdir('this')
1882
bzrlib.branch.BranchReferenceFormat().initialize(
1883
thisdir, target_branch=master)
1884
except errors.IncompatibleFormat:
1886
unused_repo = thisdir.create_repository()
1888
unused_repo.lock_write()
1890
# two yes's : branch and repository. If the repo in this
1891
# dir is inappropriately accessed, 3 will be needed, and
1892
# we'll see that because the stream will be fully consumed
1893
bzrlib.ui.ui_factory = CannedInputUIFactory([True, True, True])
1894
# determine if the repository will have been locked;
1895
this_repo_locked = \
1896
thisdir.open_repository().get_physical_lock_status()
1897
master.bzrdir.break_lock()
1898
if this_repo_locked:
1899
# only two ys should have been read
1900
self.assertEqual([True],
1901
bzrlib.ui.ui_factory.responses)
1903
# only one y should have been read
1904
self.assertEqual([True, True],
1905
bzrlib.ui.ui_factory.responses)
1906
# we should be able to lock a newly opened branch now
1907
branch = master.bzrdir.open_branch()
1910
if this_repo_locked:
1911
# we should not be able to lock the repository in thisdir as
1912
# its still held by the explicit lock we took, and the break
1913
# lock should not have touched it.
1914
repo = thisdir.open_repository()
1915
self.assertRaises(errors.LockContention, repo.lock_write)
1917
unused_repo.unlock()
1918
self.assertRaises(errors.LockBroken, master.unlock)
1920
def test_break_lock_tree(self):
1921
# break lock with a tree should unlock the tree but not try the
1922
# branch explicitly. However this is very hard to test for as we
1923
# dont have a tree reference class, nor is one needed;
1924
# the worst case if this code unlocks twice is an extra question
1926
tree = self.make_branch_and_tree('.')
1928
# three yes's : tree, branch and repository.
1929
bzrlib.ui.ui_factory = CannedInputUIFactory([True, True, True])
1931
tree.bzrdir.break_lock()
1932
except (NotImplementedError, errors.LockActive):
1933
# bzrdir does not support break_lock
1934
# or one of the locked objects (currently only tree does this)
1935
# raised a LockActive because we do still have a live locked
1939
self.assertEqual([True],
1940
bzrlib.ui.ui_factory.responses)
1941
lock_tree = tree.bzrdir.open_workingtree()
1942
lock_tree.lock_write()
1944
self.assertRaises(errors.LockBroken, tree.unlock)
1947
class TestTransportConfig(TestCaseWithBzrDir):
1949
def test_get_config(self):
1950
my_dir = self.make_bzrdir('.')
1951
config = my_dir.get_config()
1953
config.set_default_stack_on('http://example.com')
1954
except errors.BzrError, e:
1955
if 'Cannot set config' in str(e):
1957
isinstance(my_dir, (bzrdir.BzrDirMeta1, RemoteBzrDir)),
1958
"%r should support configs" % my_dir)
1959
raise TestNotApplicable(
1960
'This BzrDir format does not support configs.')
1963
self.assertEqual('http://example.com', config.get_default_stack_on())
1964
my_dir2 = bzrdir.BzrDir.open(self.get_url('.'))
1965
config2 = my_dir2.get_config()
1966
self.assertEqual('http://example.com', config2.get_default_stack_on())
1969
class ChrootedBzrDirTests(ChrootedTestCase):
1971
def test_find_repository_no_repository(self):
1972
# loopback test to check the current format fails to find a
1973
# share repository correctly.
1974
if not self.bzrdir_format.is_supported():
1975
# unsupported formats are not loopback testable
1976
# because the default open will not open them and
1977
# they may not be initializable.
1979
# supported formats must be able to init and open
1980
# - do the vfs initialisation over the basic vfs transport
1981
# XXX: TODO this should become a 'bzrdirlocation' api call.
1982
url = self.get_vfs_only_url('subdir')
1983
transport.get_transport(self.get_vfs_only_url()).mkdir('subdir')
1984
made_control = self.bzrdir_format.initialize(self.get_url('subdir'))
1986
repo = made_control.open_repository()
1987
# if there is a repository, then the format cannot ever hit this
1990
except errors.NoRepositoryPresent:
1992
made_control = bzrdir.BzrDir.open(self.get_readonly_url('subdir'))
1993
self.assertRaises(errors.NoRepositoryPresent,
1994
made_control.find_repository)
1997
class TestBzrDirControlComponent(TestCaseWithBzrDir):
1998
"""BzrDir implementations adequately implement ControlComponent."""
2000
def test_urls(self):
2001
bd = self.make_bzrdir('bd')
2002
self.assertIsInstance(bd.user_url, str)
2003
self.assertEqual(bd.user_url, bd.user_transport.base)
2004
# for all current bzrdir implementations the user dir must be
2005
# above the control dir but we might need to relax that?
2006
self.assertEqual(bd.control_url.find(bd.user_url), 0)
2007
self.assertEqual(bd.control_url, bd.control_transport.base)