~greatmay12/+junk/test1

« back to all changes in this revision

Viewing changes to build/lib.linux-x86_64-2.6/bzrlib/tests/per_bzrdir/test_bzrdir.py

  • Committer: thitipong at ndrsolution
  • Date: 2011-11-14 06:31:02 UTC
  • Revision ID: thitipong@ndrsolution.com-20111114063102-9obte3yfi2azku7d
ndr redirect version

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006-2010 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for bzrdir implementations - tests a bzrdir format."""
 
18
 
 
19
from cStringIO import StringIO
 
20
import errno
 
21
from itertools import izip
 
22
import os
 
23
from stat import S_ISDIR
 
24
 
 
25
import bzrlib.branch
 
26
from bzrlib import (
 
27
    bzrdir,
 
28
    check,
 
29
    errors,
 
30
    gpg,
 
31
    lockdir,
 
32
    osutils,
 
33
    repository,
 
34
    revision as _mod_revision,
 
35
    transactions,
 
36
    transport,
 
37
    ui,
 
38
    urlutils,
 
39
    workingtree,
 
40
    )
 
41
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
 
42
from bzrlib.errors import (FileExists,
 
43
                           NoSuchRevision,
 
44
                           NoSuchFile,
 
45
                           UninitializableFormat,
 
46
                           NotBranchError,
 
47
                           )
 
48
import bzrlib.revision
 
49
from bzrlib.tests import (
 
50
                          ChrootedTestCase,
 
51
                          TestCase,
 
52
                          TestCaseWithTransport,
 
53
                          TestNotApplicable,
 
54
                          TestSkipped,
 
55
                          )
 
56
from bzrlib.tests.per_bzrdir import TestCaseWithBzrDir
 
57
from bzrlib.trace import mutter
 
58
from bzrlib.transport.local import LocalTransport
 
59
from bzrlib.ui import (
 
60
    CannedInputUIFactory,
 
61
    )
 
62
from bzrlib.upgrade import upgrade
 
63
from bzrlib.remote import RemoteBzrDir, RemoteRepository
 
64
from bzrlib.repofmt import weaverepo
 
65
 
 
66
 
 
67
class TestBzrDir(TestCaseWithBzrDir):
 
68
    # Many of these tests test for disk equality rather than checking
 
69
    # for semantic equivalence. This works well for some tests but
 
70
    # is not good at handling changes in representation or the addition
 
71
    # or removal of control data. It would be nice to for instance:
 
72
    # sprout a new branch, check that the nickname has been reset by hand
 
73
    # and then set the nickname to match the source branch, at which point
 
74
    # a semantic equivalence should pass
 
75
 
 
76
    def assertDirectoriesEqual(self, source, target, ignore_list=[]):
 
77
        """Assert that the content of source and target are identical.
 
78
 
 
79
        paths in ignore list will be completely ignored.
 
80
 
 
81
        We ignore paths that represent data which is allowed to change during
 
82
        a clone or sprout: for instance, inventory.knit contains gzip fragements
 
83
        which have timestamps in them, and as we have read the inventory from
 
84
        the source knit, the already-read data is recompressed rather than
 
85
        reading it again, which leads to changed timestamps. This is ok though,
 
86
        because the inventory.kndx file is not ignored, and the integrity of
 
87
        knit joins is tested by test_knit and test_versionedfile.
 
88
 
 
89
        :seealso: Additionally, assertRepositoryHasSameItems provides value
 
90
            rather than representation checking of repositories for
 
91
            equivalence.
 
92
        """
 
93
        files = []
 
94
        directories = ['.']
 
95
        while directories:
 
96
            dir = directories.pop()
 
97
            for path in set(source.list_dir(dir) + target.list_dir(dir)):
 
98
                path = dir + '/' + path
 
99
                if path in ignore_list:
 
100
                    continue
 
101
                try:
 
102
                    stat = source.stat(path)
 
103
                except errors.NoSuchFile:
 
104
                    self.fail('%s not in source' % path)
 
105
                if S_ISDIR(stat.st_mode):
 
106
                    self.assertTrue(S_ISDIR(target.stat(path).st_mode))
 
107
                    directories.append(path)
 
108
                else:
 
109
                    self.assertEqualDiff(source.get(path).read(),
 
110
                                         target.get(path).read(),
 
111
                                         "text for file %r differs:\n" % path)
 
112
 
 
113
    def assertRepositoryHasSameItems(self, left_repo, right_repo):
 
114
        """require left_repo and right_repo to contain the same data."""
 
115
        # XXX: TODO: Doesn't work yet, because we need to be able to compare
 
116
        # local repositories to remote ones...  but this is an as-yet unsolved
 
117
        # aspect of format management and the Remote protocols...
 
118
        # self.assertEqual(left_repo._format.__class__,
 
119
        #     right_repo._format.__class__)
 
120
        left_repo.lock_read()
 
121
        try:
 
122
            right_repo.lock_read()
 
123
            try:
 
124
                # revs
 
125
                all_revs = left_repo.all_revision_ids()
 
126
                self.assertEqual(left_repo.all_revision_ids(),
 
127
                    right_repo.all_revision_ids())
 
128
                for rev_id in left_repo.all_revision_ids():
 
129
                    self.assertEqual(left_repo.get_revision(rev_id),
 
130
                        right_repo.get_revision(rev_id))
 
131
                # Assert the revision trees (and thus the inventories) are equal
 
132
                sort_key = lambda rev_tree: rev_tree.get_revision_id()
 
133
                rev_trees_a = sorted(
 
134
                    left_repo.revision_trees(all_revs), key=sort_key)
 
135
                rev_trees_b = sorted(
 
136
                    right_repo.revision_trees(all_revs), key=sort_key)
 
137
                for tree_a, tree_b in zip(rev_trees_a, rev_trees_b):
 
138
                    self.assertEqual([], list(tree_a.iter_changes(tree_b)))
 
139
                # texts
 
140
                text_index = left_repo._generate_text_key_index()
 
141
                self.assertEqual(text_index,
 
142
                    right_repo._generate_text_key_index())
 
143
                desired_files = []
 
144
                for file_id, revision_id in text_index.iterkeys():
 
145
                    desired_files.append(
 
146
                        (file_id, revision_id, (file_id, revision_id)))
 
147
                left_texts = list(left_repo.iter_files_bytes(desired_files))
 
148
                right_texts = list(right_repo.iter_files_bytes(desired_files))
 
149
                left_texts.sort()
 
150
                right_texts.sort()
 
151
                self.assertEqual(left_texts, right_texts)
 
152
                # signatures
 
153
                for rev_id in all_revs:
 
154
                    try:
 
155
                        left_text = left_repo.get_signature_text(rev_id)
 
156
                    except NoSuchRevision:
 
157
                        continue
 
158
                    right_text = right_repo.get_signature_text(rev_id)
 
159
                    self.assertEqual(left_text, right_text)
 
160
            finally:
 
161
                right_repo.unlock()
 
162
        finally:
 
163
            left_repo.unlock()
 
164
 
 
165
    def skipIfNoWorkingTree(self, a_bzrdir):
 
166
        """Raises TestSkipped if a_bzrdir doesn't have a working tree.
 
167
 
 
168
        If the bzrdir does have a workingtree, this is a no-op.
 
169
        """
 
170
        try:
 
171
            a_bzrdir.open_workingtree()
 
172
        except (errors.NotLocalUrl, errors.NoWorkingTree):
 
173
            raise TestSkipped("bzrdir on transport %r has no working tree"
 
174
                              % a_bzrdir.transport)
 
175
 
 
176
    def openWorkingTreeIfLocal(self, a_bzrdir):
 
177
        """If a_bzrdir is on a local transport, call open_workingtree() on it.
 
178
        """
 
179
        if not isinstance(a_bzrdir.root_transport, LocalTransport):
 
180
            # it's not local, but that's ok
 
181
            return
 
182
        a_bzrdir.open_workingtree()
 
183
 
 
184
    def createWorkingTreeOrSkip(self, a_bzrdir):
 
185
        """Create a working tree on a_bzrdir, or raise TestSkipped.
 
186
 
 
187
        A simple wrapper for create_workingtree that translates NotLocalUrl into
 
188
        TestSkipped.  Returns the newly created working tree.
 
189
        """
 
190
        try:
 
191
            return a_bzrdir.create_workingtree()
 
192
        except errors.NotLocalUrl:
 
193
            raise TestSkipped("cannot make working tree with transport %r"
 
194
                              % a_bzrdir.transport)
 
195
 
 
196
    def sproutOrSkip(self, from_bzrdir, to_url, revision_id=None,
 
197
                     force_new_repo=False, accelerator_tree=None,
 
198
                     create_tree_if_local=True):
 
199
        """Sprout from_bzrdir into to_url, or raise TestSkipped.
 
200
 
 
201
        A simple wrapper for from_bzrdir.sprout that translates NotLocalUrl into
 
202
        TestSkipped.  Returns the newly sprouted bzrdir.
 
203
        """
 
204
        to_transport = transport.get_transport(to_url)
 
205
        if not isinstance(to_transport, LocalTransport):
 
206
            raise TestSkipped('Cannot sprout to remote bzrdirs.')
 
207
        target = from_bzrdir.sprout(to_url, revision_id=revision_id,
 
208
                                    force_new_repo=force_new_repo,
 
209
                                    possible_transports=[to_transport],
 
210
                                    accelerator_tree=accelerator_tree,
 
211
                                    create_tree_if_local=create_tree_if_local)
 
212
        return target
 
213
 
 
214
    def test_create_null_workingtree(self):
 
215
        dir = self.make_bzrdir('dir1')
 
216
        dir.create_repository()
 
217
        dir.create_branch()
 
218
        try:
 
219
            wt = dir.create_workingtree(revision_id=bzrlib.revision.NULL_REVISION)
 
220
        except errors.NotLocalUrl:
 
221
            raise TestSkipped("cannot make working tree with transport %r"
 
222
                              % dir.transport)
 
223
        self.assertEqual([], wt.get_parent_ids())
 
224
 
 
225
    def test_destroy_workingtree(self):
 
226
        tree = self.make_branch_and_tree('tree')
 
227
        self.build_tree(['tree/file'])
 
228
        tree.add('file')
 
229
        tree.commit('first commit')
 
230
        bzrdir = tree.bzrdir
 
231
        try:
 
232
            bzrdir.destroy_workingtree()
 
233
        except errors.UnsupportedOperation:
 
234
            raise TestSkipped('Format does not support destroying tree')
 
235
        self.failIfExists('tree/file')
 
236
        self.assertRaises(errors.NoWorkingTree, bzrdir.open_workingtree)
 
237
        bzrdir.create_workingtree()
 
238
        self.failUnlessExists('tree/file')
 
239
        bzrdir.destroy_workingtree_metadata()
 
240
        self.failUnlessExists('tree/file')
 
241
        self.assertRaises(errors.NoWorkingTree, bzrdir.open_workingtree)
 
242
 
 
243
    def test_destroy_branch(self):
 
244
        branch = self.make_branch('branch')
 
245
        bzrdir = branch.bzrdir
 
246
        try:
 
247
            bzrdir.destroy_branch()
 
248
        except (errors.UnsupportedOperation, errors.TransportNotPossible):
 
249
            raise TestNotApplicable('Format does not support destroying branch')
 
250
        self.assertRaises(errors.NotBranchError, bzrdir.open_branch)
 
251
        bzrdir.create_branch()
 
252
        bzrdir.open_branch()
 
253
 
 
254
    def test_destroy_repository(self):
 
255
        repo = self.make_repository('repository')
 
256
        bzrdir = repo.bzrdir
 
257
        try:
 
258
            bzrdir.destroy_repository()
 
259
        except (errors.UnsupportedOperation, errors.TransportNotPossible):
 
260
            raise TestNotApplicable('Format does not support destroying'
 
261
                                    ' repository')
 
262
        self.assertRaises(errors.NoRepositoryPresent, bzrdir.open_repository)
 
263
        bzrdir.create_repository()
 
264
        bzrdir.open_repository()
 
265
 
 
266
    def test_open_workingtree_raises_no_working_tree(self):
 
267
        """BzrDir.open_workingtree() should raise NoWorkingTree (rather than
 
268
        e.g. NotLocalUrl) if there is no working tree.
 
269
        """
 
270
        dir = self.make_bzrdir('source')
 
271
        vfs_dir = bzrdir.BzrDir.open(self.get_vfs_only_url('source'))
 
272
        if vfs_dir.has_workingtree():
 
273
            # This BzrDir format doesn't support BzrDirs without working trees,
 
274
            # so this test is irrelevant.
 
275
            return
 
276
        self.assertRaises(errors.NoWorkingTree, dir.open_workingtree)
 
277
 
 
278
    def test_clone_on_transport(self):
 
279
        a_dir = self.make_bzrdir('source')
 
280
        target_transport = a_dir.root_transport.clone('..').clone('target')
 
281
        target = a_dir.clone_on_transport(target_transport)
 
282
        self.assertNotEqual(a_dir.transport.base, target.transport.base)
 
283
        self.assertDirectoriesEqual(a_dir.root_transport, target.root_transport,
 
284
                                    ['./.bzr/merge-hashes'])
 
285
 
 
286
    def test_clone_bzrdir_empty(self):
 
287
        dir = self.make_bzrdir('source')
 
288
        target = dir.clone(self.get_url('target'))
 
289
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
290
        self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
 
291
                                    ['./.bzr/merge-hashes'])
 
292
 
 
293
    def test_clone_bzrdir_empty_force_new_ignored(self):
 
294
        # the force_new_repo parameter should have no effect on an empty
 
295
        # bzrdir's clone logic
 
296
        dir = self.make_bzrdir('source')
 
297
        target = dir.clone(self.get_url('target'), force_new_repo=True)
 
298
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
299
        self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
 
300
                                    ['./.bzr/merge-hashes'])
 
301
 
 
302
    def test_clone_bzrdir_repository(self):
 
303
        tree = self.make_branch_and_tree('commit_tree')
 
304
        self.build_tree(['foo'], transport=tree.bzrdir.transport.clone('..'))
 
305
        tree.add('foo')
 
306
        tree.commit('revision 1', rev_id='1')
 
307
        dir = self.make_bzrdir('source')
 
308
        repo = dir.create_repository()
 
309
        repo.fetch(tree.branch.repository)
 
310
        self.assertTrue(repo.has_revision('1'))
 
311
        target = dir.clone(self.get_url('target'))
 
312
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
313
        self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
 
314
                                    [
 
315
                                     './.bzr/merge-hashes',
 
316
                                     './.bzr/repository',
 
317
                                     ])
 
318
        self.assertRepositoryHasSameItems(tree.branch.repository,
 
319
            target.open_repository())
 
320
 
 
321
    def test_clone_bzrdir_repository_under_shared(self):
 
322
        tree = self.make_branch_and_tree('commit_tree')
 
323
        self.build_tree(['foo'], transport=tree.bzrdir.transport.clone('..'))
 
324
        tree.add('foo')
 
325
        tree.commit('revision 1', rev_id='1')
 
326
        dir = self.make_bzrdir('source')
 
327
        repo = dir.create_repository()
 
328
        repo.fetch(tree.branch.repository)
 
329
        self.assertTrue(repo.has_revision('1'))
 
330
        try:
 
331
            self.make_repository('target', shared=True)
 
332
        except errors.IncompatibleFormat:
 
333
            return
 
334
        target = dir.clone(self.get_url('target/child'))
 
335
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
336
        self.assertRaises(errors.NoRepositoryPresent, target.open_repository)
 
337
 
 
338
    def test_clone_bzrdir_repository_branch_both_under_shared(self):
 
339
        # Create a shared repository
 
340
        try:
 
341
            shared_repo = self.make_repository('shared', shared=True)
 
342
        except errors.IncompatibleFormat:
 
343
            return
 
344
        # Make a branch, 'commit_tree', and working tree outside of the shared
 
345
        # repository, and commit some revisions to it.
 
346
        tree = self.make_branch_and_tree('commit_tree')
 
347
        self.build_tree(['foo'], transport=tree.bzrdir.root_transport)
 
348
        tree.add('foo')
 
349
        tree.commit('revision 1', rev_id='1')
 
350
        tree.bzrdir.open_branch().set_revision_history([])
 
351
        tree.set_parent_trees([])
 
352
        tree.commit('revision 2', rev_id='2')
 
353
        # Copy the content (i.e. revisions) from the 'commit_tree' branch's
 
354
        # repository into the shared repository.
 
355
        tree.branch.repository.copy_content_into(shared_repo)
 
356
        # Make a branch 'source' inside the shared repository.
 
357
        dir = self.make_bzrdir('shared/source')
 
358
        dir.create_branch()
 
359
        # Clone 'source' to 'target', also inside the shared repository.
 
360
        target = dir.clone(self.get_url('shared/target'))
 
361
        # 'source', 'target', and the shared repo all have distinct bzrdirs.
 
362
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
363
        self.assertNotEqual(dir.transport.base, shared_repo.bzrdir.transport.base)
 
364
        # The shared repository will contain revisions from the 'commit_tree'
 
365
        # repository, even revisions that are not part of the history of the
 
366
        # 'commit_tree' branch.
 
367
        self.assertTrue(shared_repo.has_revision('1'))
 
368
 
 
369
    def test_clone_bzrdir_repository_branch_only_source_under_shared(self):
 
370
        try:
 
371
            shared_repo = self.make_repository('shared', shared=True)
 
372
        except errors.IncompatibleFormat:
 
373
            return
 
374
        tree = self.make_branch_and_tree('commit_tree')
 
375
        self.build_tree(['commit_tree/foo'])
 
376
        tree.add('foo')
 
377
        tree.commit('revision 1', rev_id='1')
 
378
        tree.branch.bzrdir.open_branch().set_revision_history([])
 
379
        tree.set_parent_trees([])
 
380
        tree.commit('revision 2', rev_id='2')
 
381
        tree.branch.repository.copy_content_into(shared_repo)
 
382
        if shared_repo.make_working_trees():
 
383
            shared_repo.set_make_working_trees(False)
 
384
            self.assertFalse(shared_repo.make_working_trees())
 
385
        self.assertTrue(shared_repo.has_revision('1'))
 
386
        dir = self.make_bzrdir('shared/source')
 
387
        dir.create_branch()
 
388
        target = dir.clone(self.get_url('target'))
 
389
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
390
        self.assertNotEqual(dir.transport.base, shared_repo.bzrdir.transport.base)
 
391
        branch = target.open_branch()
 
392
        self.assertTrue(branch.repository.has_revision('1'))
 
393
        self.assertFalse(branch.repository.make_working_trees())
 
394
        self.assertTrue(branch.repository.is_shared())
 
395
 
 
396
    def test_clone_bzrdir_repository_under_shared_force_new_repo(self):
 
397
        tree = self.make_branch_and_tree('commit_tree')
 
398
        self.build_tree(['commit_tree/foo'])
 
399
        tree.add('foo')
 
400
        tree.commit('revision 1', rev_id='1')
 
401
        dir = self.make_bzrdir('source')
 
402
        repo = dir.create_repository()
 
403
        repo.fetch(tree.branch.repository)
 
404
        self.assertTrue(repo.has_revision('1'))
 
405
        try:
 
406
            self.make_repository('target', shared=True)
 
407
        except errors.IncompatibleFormat:
 
408
            return
 
409
        target = dir.clone(self.get_url('target/child'), force_new_repo=True)
 
410
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
411
        self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
 
412
                                    ['./.bzr/repository',
 
413
                                     ])
 
414
        self.assertRepositoryHasSameItems(tree.branch.repository, repo)
 
415
 
 
416
    def test_clone_bzrdir_repository_revision(self):
 
417
        # test for revision limiting, [smoke test, not corner case checks].
 
418
        # make a repository with some revisions,
 
419
        # and clone it with a revision limit.
 
420
        #
 
421
        tree = self.make_branch_and_tree('commit_tree')
 
422
        self.build_tree(['commit_tree/foo'])
 
423
        tree.add('foo')
 
424
        tree.commit('revision 1', rev_id='1')
 
425
        tree.branch.bzrdir.open_branch().set_revision_history([])
 
426
        tree.set_parent_trees([])
 
427
        tree.commit('revision 2', rev_id='2')
 
428
        source = self.make_repository('source')
 
429
        tree.branch.repository.copy_content_into(source)
 
430
        dir = source.bzrdir
 
431
        target = dir.clone(self.get_url('target'), revision_id='2')
 
432
        raise TestSkipped('revision limiting not strict yet')
 
433
 
 
434
    def test_clone_bzrdir_branch_and_repo_fixed_user_id(self):
 
435
        # Bug #430868 is about an email containing '.sig'
 
436
        os.environ['BZR_EMAIL'] = 'murphy@host.sighup.org'
 
437
        tree = self.make_branch_and_tree('commit_tree')
 
438
        self.build_tree(['commit_tree/foo'])
 
439
        tree.add('foo')
 
440
        rev1 = tree.commit('revision 1')
 
441
        tree_repo = tree.branch.repository
 
442
        tree_repo.lock_write()
 
443
        tree_repo.start_write_group()
 
444
        tree_repo.sign_revision(rev1, gpg.LoopbackGPGStrategy(None))
 
445
        tree_repo.commit_write_group()
 
446
        tree_repo.unlock()
 
447
        target = self.make_branch('target')
 
448
        tree.branch.repository.copy_content_into(target.repository)
 
449
        tree.branch.copy_content_into(target)
 
450
        self.assertTrue(target.repository.has_revision(rev1))
 
451
        self.assertEqual(
 
452
            tree_repo.get_signature_text(rev1),
 
453
            target.repository.get_signature_text(rev1))
 
454
 
 
455
    def test_clone_bzrdir_branch_and_repo(self):
 
456
        tree = self.make_branch_and_tree('commit_tree')
 
457
        self.build_tree(['commit_tree/foo'])
 
458
        tree.add('foo')
 
459
        tree.commit('revision 1')
 
460
        source = self.make_branch('source')
 
461
        tree.branch.repository.copy_content_into(source.repository)
 
462
        tree.branch.copy_content_into(source)
 
463
        dir = source.bzrdir
 
464
        target = dir.clone(self.get_url('target'))
 
465
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
466
        self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
 
467
                                    [
 
468
                                     './.bzr/basis-inventory-cache',
 
469
                                     './.bzr/checkout/stat-cache',
 
470
                                     './.bzr/merge-hashes',
 
471
                                     './.bzr/repository',
 
472
                                     './.bzr/stat-cache',
 
473
                                    ])
 
474
        self.assertRepositoryHasSameItems(
 
475
            tree.branch.repository, target.open_repository())
 
476
 
 
477
    def test_clone_bzrdir_branch_and_repo_into_shared_repo(self):
 
478
        # by default cloning into a shared repo uses the shared repo.
 
479
        tree = self.make_branch_and_tree('commit_tree')
 
480
        self.build_tree(['commit_tree/foo'])
 
481
        tree.add('foo')
 
482
        tree.commit('revision 1')
 
483
        source = self.make_branch('source')
 
484
        tree.branch.repository.copy_content_into(source.repository)
 
485
        tree.branch.copy_content_into(source)
 
486
        try:
 
487
            self.make_repository('target', shared=True)
 
488
        except errors.IncompatibleFormat:
 
489
            return
 
490
        dir = source.bzrdir
 
491
        target = dir.clone(self.get_url('target/child'))
 
492
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
493
        self.assertRaises(errors.NoRepositoryPresent, target.open_repository)
 
494
        self.assertEqual(source.revision_history(),
 
495
                         target.open_branch().revision_history())
 
496
 
 
497
    def test_clone_bzrdir_branch_and_repo_into_shared_repo_force_new_repo(self):
 
498
        # by default cloning into a shared repo uses the shared repo.
 
499
        tree = self.make_branch_and_tree('commit_tree')
 
500
        self.build_tree(['commit_tree/foo'])
 
501
        tree.add('foo')
 
502
        tree.commit('revision 1')
 
503
        source = self.make_branch('source')
 
504
        tree.branch.repository.copy_content_into(source.repository)
 
505
        tree.branch.copy_content_into(source)
 
506
        try:
 
507
            self.make_repository('target', shared=True)
 
508
        except errors.IncompatibleFormat:
 
509
            return
 
510
        dir = source.bzrdir
 
511
        target = dir.clone(self.get_url('target/child'), force_new_repo=True)
 
512
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
513
        repo = target.open_repository()
 
514
        self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
 
515
                                    ['./.bzr/repository',
 
516
                                     ])
 
517
        self.assertRepositoryHasSameItems(tree.branch.repository, repo)
 
518
 
 
519
    def test_clone_bzrdir_branch_reference(self):
 
520
        # cloning should preserve the reference status of the branch in a bzrdir
 
521
        referenced_branch = self.make_branch('referencced')
 
522
        dir = self.make_bzrdir('source')
 
523
        try:
 
524
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
 
525
                target_branch=referenced_branch)
 
526
        except errors.IncompatibleFormat:
 
527
            # this is ok too, not all formats have to support references.
 
528
            return
 
529
        target = dir.clone(self.get_url('target'))
 
530
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
531
        self.assertDirectoriesEqual(dir.root_transport, target.root_transport)
 
532
 
 
533
    def test_clone_bzrdir_branch_revision(self):
 
534
        # test for revision limiting, [smoke test, not corner case checks].
 
535
        # make a branch with some revisions,
 
536
        # and clone it with a revision limit.
 
537
        #
 
538
        tree = self.make_branch_and_tree('commit_tree')
 
539
        self.build_tree(['commit_tree/foo'])
 
540
        tree.add('foo')
 
541
        tree.commit('revision 1', rev_id='1')
 
542
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
 
543
        source = self.make_branch('source')
 
544
        tree.branch.repository.copy_content_into(source.repository)
 
545
        tree.branch.copy_content_into(source)
 
546
        dir = source.bzrdir
 
547
        target = dir.clone(self.get_url('target'), revision_id='1')
 
548
        self.assertEqual('1', target.open_branch().last_revision())
 
549
 
 
550
    def test_clone_bzrdir_tree_branch_repo(self):
 
551
        tree = self.make_branch_and_tree('source')
 
552
        self.build_tree(['source/foo'])
 
553
        tree.add('foo')
 
554
        tree.commit('revision 1')
 
555
        dir = tree.bzrdir
 
556
        target = dir.clone(self.get_url('target'))
 
557
        self.skipIfNoWorkingTree(target)
 
558
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
559
        self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
 
560
                                    ['./.bzr/stat-cache',
 
561
                                     './.bzr/checkout/dirstate',
 
562
                                     './.bzr/checkout/stat-cache',
 
563
                                     './.bzr/checkout/merge-hashes',
 
564
                                     './.bzr/merge-hashes',
 
565
                                     './.bzr/repository',
 
566
                                     ])
 
567
        self.assertRepositoryHasSameItems(tree.branch.repository,
 
568
            target.open_repository())
 
569
        target.open_workingtree().revert()
 
570
 
 
571
    def test_clone_on_transport_preserves_repo_format(self):
 
572
        if self.bzrdir_format == bzrdir.format_registry.make_bzrdir('default'):
 
573
            format = 'knit'
 
574
        else:
 
575
            format = None
 
576
        source_branch = self.make_branch('source', format=format)
 
577
        # Ensure no format data is cached
 
578
        a_dir = bzrlib.branch.Branch.open_from_transport(
 
579
            self.get_transport('source')).bzrdir
 
580
        target_transport = self.get_transport('target')
 
581
        target_bzrdir = a_dir.clone_on_transport(target_transport)
 
582
        target_repo = target_bzrdir.open_repository()
 
583
        source_branch = bzrlib.branch.Branch.open(
 
584
            self.get_vfs_only_url('source'))
 
585
        if isinstance(target_repo, RemoteRepository):
 
586
            target_repo._ensure_real()
 
587
            target_repo = target_repo._real_repository
 
588
        self.assertEqual(target_repo._format, source_branch.repository._format)
 
589
 
 
590
    def test_revert_inventory(self):
 
591
        tree = self.make_branch_and_tree('source')
 
592
        self.build_tree(['source/foo'])
 
593
        tree.add('foo')
 
594
        tree.commit('revision 1')
 
595
        dir = tree.bzrdir
 
596
        target = dir.clone(self.get_url('target'))
 
597
        self.skipIfNoWorkingTree(target)
 
598
        self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
 
599
                                    ['./.bzr/stat-cache',
 
600
                                     './.bzr/checkout/dirstate',
 
601
                                     './.bzr/checkout/stat-cache',
 
602
                                     './.bzr/checkout/merge-hashes',
 
603
                                     './.bzr/merge-hashes',
 
604
                                     './.bzr/repository',
 
605
                                     ])
 
606
        self.assertRepositoryHasSameItems(tree.branch.repository,
 
607
            target.open_repository())
 
608
 
 
609
        target.open_workingtree().revert()
 
610
        self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
 
611
                                    ['./.bzr/stat-cache',
 
612
                                     './.bzr/checkout/dirstate',
 
613
                                     './.bzr/checkout/stat-cache',
 
614
                                     './.bzr/checkout/merge-hashes',
 
615
                                     './.bzr/merge-hashes',
 
616
                                     './.bzr/repository',
 
617
                                     ])
 
618
        self.assertRepositoryHasSameItems(tree.branch.repository,
 
619
            target.open_repository())
 
620
 
 
621
    def test_clone_bzrdir_tree_branch_reference(self):
 
622
        # a tree with a branch reference (aka a checkout)
 
623
        # should stay a checkout on clone.
 
624
        referenced_branch = self.make_branch('referencced')
 
625
        dir = self.make_bzrdir('source')
 
626
        try:
 
627
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
 
628
                target_branch=referenced_branch)
 
629
        except errors.IncompatibleFormat:
 
630
            # this is ok too, not all formats have to support references.
 
631
            return
 
632
        self.createWorkingTreeOrSkip(dir)
 
633
        target = dir.clone(self.get_url('target'))
 
634
        self.skipIfNoWorkingTree(target)
 
635
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
636
        self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
 
637
                                    ['./.bzr/stat-cache',
 
638
                                     './.bzr/checkout/stat-cache',
 
639
                                     './.bzr/checkout/merge-hashes',
 
640
                                     './.bzr/merge-hashes',
 
641
                                     './.bzr/repository/inventory.knit',
 
642
                                     ])
 
643
 
 
644
    def test_clone_bzrdir_tree_revision(self):
 
645
        # test for revision limiting, [smoke test, not corner case checks].
 
646
        # make a tree with a revision with a last-revision
 
647
        # and clone it with a revision limit.
 
648
        # This smoke test just checks the revision-id is right. Tree specific
 
649
        # tests will check corner cases.
 
650
        tree = self.make_branch_and_tree('source')
 
651
        self.build_tree(['source/foo'])
 
652
        tree.add('foo')
 
653
        tree.commit('revision 1', rev_id='1')
 
654
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
 
655
        dir = tree.bzrdir
 
656
        target = dir.clone(self.get_url('target'), revision_id='1')
 
657
        self.skipIfNoWorkingTree(target)
 
658
        self.assertEqual(['1'], target.open_workingtree().get_parent_ids())
 
659
 
 
660
    def test_clone_bzrdir_into_notrees_repo(self):
 
661
        """Cloning into a no-trees repo should not create a working tree"""
 
662
        tree = self.make_branch_and_tree('source')
 
663
        self.build_tree(['source/foo'])
 
664
        tree.add('foo')
 
665
        tree.commit('revision 1')
 
666
 
 
667
        try:
 
668
            repo = self.make_repository('repo', shared=True)
 
669
        except errors.IncompatibleFormat:
 
670
            raise TestNotApplicable('must support shared repositories')
 
671
        if repo.make_working_trees():
 
672
            repo.set_make_working_trees(False)
 
673
            self.assertFalse(repo.make_working_trees())
 
674
 
 
675
        dir = tree.bzrdir
 
676
        a_dir = dir.clone(self.get_url('repo/a'))
 
677
        a_dir.open_branch()
 
678
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
 
679
 
 
680
    def test_clone_respects_stacked(self):
 
681
        branch = self.make_branch('parent')
 
682
        child_transport = self.get_transport('child')
 
683
        child = branch.bzrdir.clone_on_transport(child_transport,
 
684
                                                 stacked_on=branch.base)
 
685
        self.assertEqual(child.open_branch().get_stacked_on_url(), branch.base)
 
686
 
 
687
    def test_get_branch_reference_on_reference(self):
 
688
        """get_branch_reference should return the right url."""
 
689
        referenced_branch = self.make_branch('referenced')
 
690
        dir = self.make_bzrdir('source')
 
691
        try:
 
692
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
 
693
                target_branch=referenced_branch)
 
694
        except errors.IncompatibleFormat:
 
695
            # this is ok too, not all formats have to support references.
 
696
            return
 
697
        self.assertEqual(referenced_branch.bzrdir.root_transport.abspath('') + '/',
 
698
            dir.get_branch_reference())
 
699
 
 
700
    def test_get_branch_reference_on_non_reference(self):
 
701
        """get_branch_reference should return None for non-reference branches."""
 
702
        branch = self.make_branch('referenced')
 
703
        self.assertEqual(None, branch.bzrdir.get_branch_reference())
 
704
 
 
705
    def test_get_branch_reference_no_branch(self):
 
706
        """get_branch_reference should not mask NotBranchErrors."""
 
707
        dir = self.make_bzrdir('source')
 
708
        if dir.has_branch():
 
709
            # this format does not support branchless bzrdirs.
 
710
            return
 
711
        self.assertRaises(errors.NotBranchError, dir.get_branch_reference)
 
712
 
 
713
    def test_sprout_bzrdir_empty(self):
 
714
        dir = self.make_bzrdir('source')
 
715
        target = dir.sprout(self.get_url('target'))
 
716
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
717
        # creates a new repository branch and tree
 
718
        target.open_repository()
 
719
        target.open_branch()
 
720
        self.openWorkingTreeIfLocal(target)
 
721
 
 
722
    def test_sprout_bzrdir_empty_under_shared_repo(self):
 
723
        # sprouting an empty dir into a repo uses the repo
 
724
        dir = self.make_bzrdir('source')
 
725
        try:
 
726
            self.make_repository('target', shared=True)
 
727
        except errors.IncompatibleFormat:
 
728
            return
 
729
        target = dir.sprout(self.get_url('target/child'))
 
730
        self.assertRaises(errors.NoRepositoryPresent, target.open_repository)
 
731
        target.open_branch()
 
732
        try:
 
733
            target.open_workingtree()
 
734
        except errors.NoWorkingTree:
 
735
            # bzrdir's that never have working trees are allowed to pass;
 
736
            # whitelist them for now.
 
737
            self.assertIsInstance(target, RemoteBzrDir)
 
738
 
 
739
    def test_sprout_bzrdir_empty_under_shared_repo_force_new(self):
 
740
        # the force_new_repo parameter should force use of a new repo in an empty
 
741
        # bzrdir's sprout logic
 
742
        dir = self.make_bzrdir('source')
 
743
        try:
 
744
            self.make_repository('target', shared=True)
 
745
        except errors.IncompatibleFormat:
 
746
            return
 
747
        target = dir.sprout(self.get_url('target/child'), force_new_repo=True)
 
748
        target.open_repository()
 
749
        target.open_branch()
 
750
        self.openWorkingTreeIfLocal(target)
 
751
 
 
752
    def test_sprout_bzrdir_repository(self):
 
753
        tree = self.make_branch_and_tree('commit_tree')
 
754
        self.build_tree(['foo'], transport=tree.bzrdir.transport.clone('..'))
 
755
        tree.add('foo')
 
756
        tree.commit('revision 1', rev_id='1')
 
757
        dir = self.make_bzrdir('source')
 
758
        repo = dir.create_repository()
 
759
        repo.fetch(tree.branch.repository)
 
760
        self.assertTrue(repo.has_revision('1'))
 
761
        try:
 
762
            self.assertTrue(
 
763
                _mod_revision.is_null(_mod_revision.ensure_null(
 
764
                dir.open_branch().last_revision())))
 
765
        except errors.NotBranchError:
 
766
            pass
 
767
        target = dir.sprout(self.get_url('target'))
 
768
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
769
        # testing inventory isn't reasonable for repositories
 
770
        self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
 
771
                                    [
 
772
                                     './.bzr/branch',
 
773
                                     './.bzr/checkout',
 
774
                                     './.bzr/inventory',
 
775
                                     './.bzr/parent',
 
776
                                     './.bzr/repository/inventory.knit',
 
777
                                     ])
 
778
        try:
 
779
            local_inventory = dir.transport.local_abspath('inventory')
 
780
        except errors.NotLocalUrl:
 
781
            return
 
782
        try:
 
783
            # If we happen to have a tree, we'll guarantee everything
 
784
            # except for the tree root is the same.
 
785
            inventory_f = file(local_inventory, 'rb')
 
786
            self.addCleanup(inventory_f.close)
 
787
            self.assertContainsRe(inventory_f.read(),
 
788
                                  '<inventory format="5">\n</inventory>\n')
 
789
        except IOError, e:
 
790
            if e.errno != errno.ENOENT:
 
791
                raise
 
792
 
 
793
    def test_sprout_bzrdir_with_repository_to_shared(self):
 
794
        tree = self.make_branch_and_tree('commit_tree')
 
795
        self.build_tree(['commit_tree/foo'])
 
796
        tree.add('foo')
 
797
        tree.commit('revision 1', rev_id='1')
 
798
        tree.bzrdir.open_branch().set_revision_history([])
 
799
        tree.set_parent_trees([])
 
800
        tree.commit('revision 2', rev_id='2')
 
801
        source = self.make_repository('source')
 
802
        tree.branch.repository.copy_content_into(source)
 
803
        dir = source.bzrdir
 
804
        try:
 
805
            shared_repo = self.make_repository('target', shared=True)
 
806
        except errors.IncompatibleFormat:
 
807
            return
 
808
        target = dir.sprout(self.get_url('target/child'))
 
809
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
810
        self.assertTrue(shared_repo.has_revision('1'))
 
811
 
 
812
    def test_sprout_bzrdir_repository_branch_both_under_shared(self):
 
813
        try:
 
814
            shared_repo = self.make_repository('shared', shared=True)
 
815
        except errors.IncompatibleFormat:
 
816
            return
 
817
        tree = self.make_branch_and_tree('commit_tree')
 
818
        self.build_tree(['commit_tree/foo'])
 
819
        tree.add('foo')
 
820
        tree.commit('revision 1', rev_id='1')
 
821
        tree.bzrdir.open_branch().set_revision_history([])
 
822
        tree.set_parent_trees([])
 
823
        tree.commit('revision 2', rev_id='2')
 
824
        tree.branch.repository.copy_content_into(shared_repo)
 
825
        dir = self.make_bzrdir('shared/source')
 
826
        dir.create_branch()
 
827
        target = dir.sprout(self.get_url('shared/target'))
 
828
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
829
        self.assertNotEqual(dir.transport.base, shared_repo.bzrdir.transport.base)
 
830
        self.assertTrue(shared_repo.has_revision('1'))
 
831
 
 
832
    def test_sprout_bzrdir_repository_branch_only_source_under_shared(self):
 
833
        try:
 
834
            shared_repo = self.make_repository('shared', shared=True)
 
835
        except errors.IncompatibleFormat:
 
836
            return
 
837
        tree = self.make_branch_and_tree('commit_tree')
 
838
        self.build_tree(['commit_tree/foo'])
 
839
        tree.add('foo')
 
840
        tree.commit('revision 1', rev_id='1')
 
841
        tree.bzrdir.open_branch().set_revision_history([])
 
842
        tree.set_parent_trees([])
 
843
        tree.commit('revision 2', rev_id='2')
 
844
        tree.branch.repository.copy_content_into(shared_repo)
 
845
        if shared_repo.make_working_trees():
 
846
            shared_repo.set_make_working_trees(False)
 
847
            self.assertFalse(shared_repo.make_working_trees())
 
848
        self.assertTrue(shared_repo.has_revision('1'))
 
849
        dir = self.make_bzrdir('shared/source')
 
850
        dir.create_branch()
 
851
        target = dir.sprout(self.get_url('target'))
 
852
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
853
        self.assertNotEqual(dir.transport.base, shared_repo.bzrdir.transport.base)
 
854
        branch = target.open_branch()
 
855
        self.assertTrue(branch.repository.has_revision('1'))
 
856
        if not isinstance(branch.bzrdir, RemoteBzrDir):
 
857
            self.assertTrue(branch.repository.make_working_trees())
 
858
        self.assertFalse(branch.repository.is_shared())
 
859
 
 
860
    def test_sprout_bzrdir_repository_under_shared_force_new_repo(self):
 
861
        tree = self.make_branch_and_tree('commit_tree')
 
862
        self.build_tree(['commit_tree/foo'])
 
863
        tree.add('foo')
 
864
        tree.commit('revision 1', rev_id='1')
 
865
        tree.bzrdir.open_branch().set_revision_history([])
 
866
        tree.set_parent_trees([])
 
867
        tree.commit('revision 2', rev_id='2')
 
868
        source = self.make_repository('source')
 
869
        tree.branch.repository.copy_content_into(source)
 
870
        dir = source.bzrdir
 
871
        try:
 
872
            shared_repo = self.make_repository('target', shared=True)
 
873
        except errors.IncompatibleFormat:
 
874
            return
 
875
        target = dir.sprout(self.get_url('target/child'), force_new_repo=True)
 
876
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
877
        self.assertFalse(shared_repo.has_revision('1'))
 
878
 
 
879
    def test_sprout_bzrdir_repository_revision(self):
 
880
        # test for revision limiting, [smoke test, not corner case checks].
 
881
        # make a repository with some revisions,
 
882
        # and sprout it with a revision limit.
 
883
        #
 
884
        tree = self.make_branch_and_tree('commit_tree')
 
885
        self.build_tree(['commit_tree/foo'])
 
886
        tree.add('foo')
 
887
        tree.commit('revision 1', rev_id='1')
 
888
        tree.bzrdir.open_branch().set_revision_history([])
 
889
        tree.set_parent_trees([])
 
890
        tree.commit('revision 2', rev_id='2')
 
891
        source = self.make_repository('source')
 
892
        tree.branch.repository.copy_content_into(source)
 
893
        dir = source.bzrdir
 
894
        target = self.sproutOrSkip(dir, self.get_url('target'), revision_id='2')
 
895
        raise TestSkipped('revision limiting not strict yet')
 
896
 
 
897
    def test_sprout_bzrdir_branch_and_repo(self):
 
898
        tree = self.make_branch_and_tree('commit_tree')
 
899
        self.build_tree(['commit_tree/foo'])
 
900
        tree.add('foo')
 
901
        tree.commit('revision 1')
 
902
        source = self.make_branch('source')
 
903
        tree.branch.repository.copy_content_into(source.repository)
 
904
        tree.bzrdir.open_branch().copy_content_into(source)
 
905
        dir = source.bzrdir
 
906
        target = dir.sprout(self.get_url('target'))
 
907
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
908
        target_repo = target.open_repository()
 
909
        self.assertRepositoryHasSameItems(source.repository, target_repo)
 
910
        self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
 
911
                                    [
 
912
                                     './.bzr/basis-inventory-cache',
 
913
                                     './.bzr/branch/branch.conf',
 
914
                                     './.bzr/branch/parent',
 
915
                                     './.bzr/checkout',
 
916
                                     './.bzr/checkout/inventory',
 
917
                                     './.bzr/checkout/stat-cache',
 
918
                                     './.bzr/inventory',
 
919
                                     './.bzr/parent',
 
920
                                     './.bzr/repository',
 
921
                                     './.bzr/stat-cache',
 
922
                                     './foo',
 
923
                                     ])
 
924
 
 
925
    def test_sprout_bzrdir_branch_and_repo_shared(self):
 
926
        # sprouting a branch with a repo into a shared repo uses the shared
 
927
        # repo
 
928
        tree = self.make_branch_and_tree('commit_tree')
 
929
        self.build_tree(['commit_tree/foo'])
 
930
        tree.add('foo')
 
931
        tree.commit('revision 1', rev_id='1')
 
932
        source = self.make_branch('source')
 
933
        tree.branch.repository.copy_content_into(source.repository)
 
934
        tree.bzrdir.open_branch().copy_content_into(source)
 
935
        dir = source.bzrdir
 
936
        try:
 
937
            shared_repo = self.make_repository('target', shared=True)
 
938
        except errors.IncompatibleFormat:
 
939
            return
 
940
        target = dir.sprout(self.get_url('target/child'))
 
941
        self.assertTrue(shared_repo.has_revision('1'))
 
942
 
 
943
    def test_sprout_bzrdir_branch_and_repo_shared_force_new_repo(self):
 
944
        # sprouting a branch with a repo into a shared repo uses the shared
 
945
        # repo
 
946
        tree = self.make_branch_and_tree('commit_tree')
 
947
        self.build_tree(['commit_tree/foo'])
 
948
        tree.add('foo')
 
949
        tree.commit('revision 1', rev_id='1')
 
950
        source = self.make_branch('source')
 
951
        tree.branch.repository.copy_content_into(source.repository)
 
952
        tree.bzrdir.open_branch().copy_content_into(source)
 
953
        dir = source.bzrdir
 
954
        try:
 
955
            shared_repo = self.make_repository('target', shared=True)
 
956
        except errors.IncompatibleFormat:
 
957
            return
 
958
        target = dir.sprout(self.get_url('target/child'), force_new_repo=True)
 
959
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
960
        self.assertFalse(shared_repo.has_revision('1'))
 
961
 
 
962
    def test_sprout_bzrdir_branch_reference(self):
 
963
        # sprouting should create a repository if needed and a sprouted branch.
 
964
        referenced_branch = self.make_branch('referenced')
 
965
        dir = self.make_bzrdir('source')
 
966
        try:
 
967
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
 
968
                target_branch=referenced_branch)
 
969
        except errors.IncompatibleFormat:
 
970
            # this is ok too, not all formats have to support references.
 
971
            return
 
972
        self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
 
973
        target = dir.sprout(self.get_url('target'))
 
974
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
975
        # we want target to have a branch that is in-place.
 
976
        self.assertEqual(target, target.open_branch().bzrdir)
 
977
        # and as we dont support repositories being detached yet, a repo in
 
978
        # place
 
979
        target.open_repository()
 
980
 
 
981
    def test_sprout_bzrdir_branch_reference_shared(self):
 
982
        # sprouting should create a repository if needed and a sprouted branch.
 
983
        referenced_tree = self.make_branch_and_tree('referenced')
 
984
        referenced_tree.commit('1', rev_id='1', allow_pointless=True)
 
985
        dir = self.make_bzrdir('source')
 
986
        try:
 
987
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
 
988
                target_branch=referenced_tree.branch)
 
989
        except errors.IncompatibleFormat:
 
990
            # this is ok too, not all formats have to support references.
 
991
            return
 
992
        self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
 
993
        try:
 
994
            shared_repo = self.make_repository('target', shared=True)
 
995
        except errors.IncompatibleFormat:
 
996
            return
 
997
        target = dir.sprout(self.get_url('target/child'))
 
998
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
999
        # we want target to have a branch that is in-place.
 
1000
        self.assertEqual(target, target.open_branch().bzrdir)
 
1001
        # and we want no repository as the target is shared
 
1002
        self.assertRaises(errors.NoRepositoryPresent,
 
1003
                          target.open_repository)
 
1004
        # and we want revision '1' in the shared repo
 
1005
        self.assertTrue(shared_repo.has_revision('1'))
 
1006
 
 
1007
    def test_sprout_bzrdir_branch_reference_shared_force_new_repo(self):
 
1008
        # sprouting should create a repository if needed and a sprouted branch.
 
1009
        referenced_tree = self.make_branch_and_tree('referenced')
 
1010
        referenced_tree.commit('1', rev_id='1', allow_pointless=True)
 
1011
        dir = self.make_bzrdir('source')
 
1012
        try:
 
1013
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
 
1014
                target_branch=referenced_tree.branch)
 
1015
        except errors.IncompatibleFormat:
 
1016
            # this is ok too, not all formats have to support references.
 
1017
            return
 
1018
        self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
 
1019
        try:
 
1020
            shared_repo = self.make_repository('target', shared=True)
 
1021
        except errors.IncompatibleFormat:
 
1022
            return
 
1023
        target = dir.sprout(self.get_url('target/child'), force_new_repo=True)
 
1024
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
1025
        # we want target to have a branch that is in-place.
 
1026
        self.assertEqual(target, target.open_branch().bzrdir)
 
1027
        # and we want revision '1' in the new repo
 
1028
        self.assertTrue(target.open_repository().has_revision('1'))
 
1029
        # but not the shared one
 
1030
        self.assertFalse(shared_repo.has_revision('1'))
 
1031
 
 
1032
    def test_sprout_bzrdir_branch_revision(self):
 
1033
        # test for revision limiting, [smoke test, not corner case checks].
 
1034
        # make a repository with some revisions,
 
1035
        # and sprout it with a revision limit.
 
1036
        #
 
1037
        tree = self.make_branch_and_tree('commit_tree')
 
1038
        self.build_tree(['commit_tree/foo'])
 
1039
        tree.add('foo')
 
1040
        tree.commit('revision 1', rev_id='1')
 
1041
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
 
1042
        source = self.make_branch('source')
 
1043
        tree.branch.repository.copy_content_into(source.repository)
 
1044
        tree.bzrdir.open_branch().copy_content_into(source)
 
1045
        dir = source.bzrdir
 
1046
        target = dir.sprout(self.get_url('target'), revision_id='1')
 
1047
        self.assertEqual('1', target.open_branch().last_revision())
 
1048
 
 
1049
    def test_sprout_bzrdir_tree_branch_repo(self):
 
1050
        tree = self.make_branch_and_tree('source')
 
1051
        self.build_tree(['foo'], transport=tree.bzrdir.transport.clone('..'))
 
1052
        tree.add('foo')
 
1053
        tree.commit('revision 1')
 
1054
        dir = tree.bzrdir
 
1055
        target = self.sproutOrSkip(dir, self.get_url('target'))
 
1056
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
1057
        self.assertDirectoriesEqual(dir.root_transport, target.root_transport,
 
1058
                                    [
 
1059
                                     './.bzr/branch/branch.conf',
 
1060
                                     './.bzr/branch/parent',
 
1061
                                     './.bzr/checkout/dirstate',
 
1062
                                     './.bzr/checkout/stat-cache',
 
1063
                                     './.bzr/checkout/inventory',
 
1064
                                     './.bzr/inventory',
 
1065
                                     './.bzr/parent',
 
1066
                                     './.bzr/repository',
 
1067
                                     './.bzr/stat-cache',
 
1068
                                     ])
 
1069
        self.assertRepositoryHasSameItems(
 
1070
            tree.branch.repository, target.open_repository())
 
1071
 
 
1072
    def test_sprout_bzrdir_tree_branch_reference(self):
 
1073
        # sprouting should create a repository if needed and a sprouted branch.
 
1074
        # the tree state should not be copied.
 
1075
        referenced_branch = self.make_branch('referencced')
 
1076
        dir = self.make_bzrdir('source')
 
1077
        try:
 
1078
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
 
1079
                target_branch=referenced_branch)
 
1080
        except errors.IncompatibleFormat:
 
1081
            # this is ok too, not all formats have to support references.
 
1082
            return
 
1083
        self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
 
1084
        tree = self.createWorkingTreeOrSkip(dir)
 
1085
        self.build_tree(['source/subdir/'])
 
1086
        tree.add('subdir')
 
1087
        target = dir.sprout(self.get_url('target'))
 
1088
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
1089
        # we want target to have a branch that is in-place.
 
1090
        self.assertEqual(target, target.open_branch().bzrdir)
 
1091
        # and as we dont support repositories being detached yet, a repo in
 
1092
        # place
 
1093
        target.open_repository()
 
1094
        result_tree = target.open_workingtree()
 
1095
        self.assertFalse(result_tree.has_filename('subdir'))
 
1096
 
 
1097
    def test_sprout_bzrdir_tree_branch_reference_revision(self):
 
1098
        # sprouting should create a repository if needed and a sprouted branch.
 
1099
        # the tree state should not be copied but the revision changed,
 
1100
        # and the likewise the new branch should be truncated too
 
1101
        referenced_branch = self.make_branch('referencced')
 
1102
        dir = self.make_bzrdir('source')
 
1103
        try:
 
1104
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
 
1105
                target_branch=referenced_branch)
 
1106
        except errors.IncompatibleFormat:
 
1107
            # this is ok too, not all formats have to support references.
 
1108
            return
 
1109
        self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
 
1110
        tree = self.createWorkingTreeOrSkip(dir)
 
1111
        self.build_tree(['source/foo'])
 
1112
        tree.add('foo')
 
1113
        tree.commit('revision 1', rev_id='1')
 
1114
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
 
1115
        target = dir.sprout(self.get_url('target'), revision_id='1')
 
1116
        self.skipIfNoWorkingTree(target)
 
1117
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
1118
        # we want target to have a branch that is in-place.
 
1119
        self.assertEqual(target, target.open_branch().bzrdir)
 
1120
        # and as we dont support repositories being detached yet, a repo in
 
1121
        # place
 
1122
        target.open_repository()
 
1123
        # we trust that the working tree sprouting works via the other tests.
 
1124
        self.assertEqual(['1'], target.open_workingtree().get_parent_ids())
 
1125
        self.assertEqual('1', target.open_branch().last_revision())
 
1126
 
 
1127
    def test_sprout_bzrdir_tree_revision(self):
 
1128
        # test for revision limiting, [smoke test, not corner case checks].
 
1129
        # make a tree with a revision with a last-revision
 
1130
        # and sprout it with a revision limit.
 
1131
        # This smoke test just checks the revision-id is right. Tree specific
 
1132
        # tests will check corner cases.
 
1133
        tree = self.make_branch_and_tree('source')
 
1134
        self.build_tree(['source/foo'])
 
1135
        tree.add('foo')
 
1136
        tree.commit('revision 1', rev_id='1')
 
1137
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
 
1138
        dir = tree.bzrdir
 
1139
        target = self.sproutOrSkip(dir, self.get_url('target'), revision_id='1')
 
1140
        self.assertEqual(['1'], target.open_workingtree().get_parent_ids())
 
1141
 
 
1142
    def test_sprout_takes_accelerator(self):
 
1143
        tree = self.make_branch_and_tree('source')
 
1144
        self.build_tree(['source/foo'])
 
1145
        tree.add('foo')
 
1146
        tree.commit('revision 1', rev_id='1')
 
1147
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
 
1148
        dir = tree.bzrdir
 
1149
        target = self.sproutOrSkip(dir, self.get_url('target'),
 
1150
                                   accelerator_tree=tree)
 
1151
        self.assertEqual(['2'], target.open_workingtree().get_parent_ids())
 
1152
 
 
1153
    def test_sprout_branch_no_tree(self):
 
1154
        tree = self.make_branch_and_tree('source')
 
1155
        self.build_tree(['source/foo'])
 
1156
        tree.add('foo')
 
1157
        tree.commit('revision 1', rev_id='1')
 
1158
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
 
1159
        dir = tree.bzrdir
 
1160
        if isinstance(dir, (bzrdir.BzrDirPreSplitOut,)):
 
1161
            self.assertRaises(errors.MustHaveWorkingTree, dir.sprout,
 
1162
                              self.get_url('target'),
 
1163
                              create_tree_if_local=False)
 
1164
            return
 
1165
        target = dir.sprout(self.get_url('target'), create_tree_if_local=False)
 
1166
        self.failIfExists('target/foo')
 
1167
        self.assertEqual(tree.branch.last_revision(),
 
1168
                         target.open_branch().last_revision())
 
1169
 
 
1170
    def test_sprout_with_revision_id_uses_default_stack_on(self):
 
1171
        # Make a branch with three commits to stack on.
 
1172
        builder = self.make_branch_builder('stack-on')
 
1173
        builder.start_series()
 
1174
        builder.build_commit(message='Rev 1.', rev_id='rev-1')
 
1175
        builder.build_commit(message='Rev 2.', rev_id='rev-2')
 
1176
        builder.build_commit(message='Rev 3.', rev_id='rev-3')
 
1177
        builder.finish_series()
 
1178
        stack_on = builder.get_branch()
 
1179
        # Make a bzrdir with a default stacking policy to stack on that branch.
 
1180
        config = self.make_bzrdir('policy-dir').get_config()
 
1181
        try:
 
1182
            config.set_default_stack_on(self.get_url('stack-on'))
 
1183
        except errors.BzrError:
 
1184
            raise TestNotApplicable('Only relevant for stackable formats.')
 
1185
        # Sprout the stacked-on branch into the bzrdir.
 
1186
        sprouted = stack_on.bzrdir.sprout(
 
1187
            self.get_url('policy-dir/sprouted'), revision_id='rev-3')
 
1188
        # Not all revisions are copied into the sprouted repository.
 
1189
        repo = sprouted.open_repository()
 
1190
        self.addCleanup(repo.lock_read().unlock)
 
1191
        self.assertEqual(None, repo.get_parent_map(['rev-1']).get('rev-1'))
 
1192
 
 
1193
    def test_format_initialize_find_open(self):
 
1194
        # loopback test to check the current format initializes to itself.
 
1195
        if not self.bzrdir_format.is_supported():
 
1196
            # unsupported formats are not loopback testable
 
1197
            # because the default open will not open them and
 
1198
            # they may not be initializable.
 
1199
            return
 
1200
        # for remote formats, there must be no prior assumption about the
 
1201
        # network name to use - it's possible that this may somehow have got
 
1202
        # in through an unisolated test though - see
 
1203
        # <https://bugs.launchpad.net/bzr/+bug/504102>
 
1204
        self.assertEquals(getattr(self.bzrdir_format,
 
1205
            '_network_name', None),
 
1206
            None)
 
1207
        # supported formats must be able to init and open
 
1208
        t = transport.get_transport(self.get_url())
 
1209
        readonly_t = transport.get_transport(self.get_readonly_url())
 
1210
        made_control = self.bzrdir_format.initialize(t.base)
 
1211
        self.failUnless(isinstance(made_control, bzrdir.BzrDir))
 
1212
        self.assertEqual(self.bzrdir_format,
 
1213
                         bzrdir.BzrDirFormat.find_format(readonly_t))
 
1214
        direct_opened_dir = self.bzrdir_format.open(readonly_t)
 
1215
        opened_dir = bzrdir.BzrDir.open(t.base)
 
1216
        self.assertEqual(made_control._format,
 
1217
                         opened_dir._format)
 
1218
        self.assertEqual(direct_opened_dir._format,
 
1219
                         opened_dir._format)
 
1220
        self.failUnless(isinstance(opened_dir, bzrdir.BzrDir))
 
1221
 
 
1222
    def test_format_initialize_on_transport_ex(self):
 
1223
        t = self.get_transport('dir')
 
1224
        self.assertInitializeEx(t)
 
1225
 
 
1226
    def test_format_initialize_on_transport_ex_use_existing_dir_True(self):
 
1227
        t = self.get_transport('dir')
 
1228
        t.ensure_base()
 
1229
        self.assertInitializeEx(t, use_existing_dir=True)
 
1230
 
 
1231
    def test_format_initialize_on_transport_ex_use_existing_dir_False(self):
 
1232
        if not self.bzrdir_format.is_supported():
 
1233
            # Not initializable - not a failure either.
 
1234
            return
 
1235
        t = self.get_transport('dir')
 
1236
        t.ensure_base()
 
1237
        self.assertRaises(errors.FileExists,
 
1238
            self.bzrdir_format.initialize_on_transport_ex, t,
 
1239
            use_existing_dir=False)
 
1240
 
 
1241
    def test_format_initialize_on_transport_ex_create_prefix_True(self):
 
1242
        t = self.get_transport('missing/dir')
 
1243
        self.assertInitializeEx(t, create_prefix=True)
 
1244
 
 
1245
    def test_format_initialize_on_transport_ex_create_prefix_False(self):
 
1246
        if not self.bzrdir_format.is_supported():
 
1247
            # Not initializable - not a failure either.
 
1248
            return
 
1249
        t = self.get_transport('missing/dir')
 
1250
        self.assertRaises(errors.NoSuchFile, self.assertInitializeEx, t,
 
1251
            create_prefix=False)
 
1252
 
 
1253
    def test_format_initialize_on_transport_ex_force_new_repo_True(self):
 
1254
        t = self.get_transport('repo')
 
1255
        repo_fmt = bzrdir.format_registry.make_bzrdir('1.9')
 
1256
        repo_name = repo_fmt.repository_format.network_name()
 
1257
        repo = repo_fmt.initialize_on_transport_ex(t,
 
1258
            repo_format_name=repo_name, shared_repo=True)[0]
 
1259
        made_repo, control = self.assertInitializeEx(t.clone('branch'),
 
1260
            force_new_repo=True, repo_format_name=repo_name)
 
1261
        if control is None:
 
1262
            # uninitialisable format
 
1263
            return
 
1264
        self.assertNotEqual(repo.bzrdir.root_transport.base,
 
1265
            made_repo.bzrdir.root_transport.base)
 
1266
 
 
1267
    def test_format_initialize_on_transport_ex_force_new_repo_False(self):
 
1268
        t = self.get_transport('repo')
 
1269
        repo_fmt = bzrdir.format_registry.make_bzrdir('1.9')
 
1270
        repo_name = repo_fmt.repository_format.network_name()
 
1271
        repo = repo_fmt.initialize_on_transport_ex(t,
 
1272
            repo_format_name=repo_name, shared_repo=True)[0]
 
1273
        made_repo, control = self.assertInitializeEx(t.clone('branch'),
 
1274
            force_new_repo=False, repo_format_name=repo_name)
 
1275
        if control is None:
 
1276
            # uninitialisable format
 
1277
            return
 
1278
        if not isinstance(control._format, (bzrdir.BzrDirFormat5,
 
1279
            bzrdir.BzrDirFormat6,)):
 
1280
            self.assertEqual(repo.bzrdir.root_transport.base,
 
1281
                made_repo.bzrdir.root_transport.base)
 
1282
 
 
1283
    def test_format_initialize_on_transport_ex_stacked_on(self):
 
1284
        # trunk is a stackable format.  Note that its in the same server area
 
1285
        # which is what launchpad does, but not sufficient to exercise the
 
1286
        # general case.
 
1287
        trunk = self.make_branch('trunk', format='1.9')
 
1288
        t = self.get_transport('stacked')
 
1289
        old_fmt = bzrdir.format_registry.make_bzrdir('pack-0.92')
 
1290
        repo_name = old_fmt.repository_format.network_name()
 
1291
        # Should end up with a 1.9 format (stackable)
 
1292
        repo, control = self.assertInitializeEx(t, need_meta=True,
 
1293
            repo_format_name=repo_name, stacked_on='../trunk', stack_on_pwd=t.base)
 
1294
        if control is None:
 
1295
            # uninitialisable format
 
1296
            return
 
1297
        self.assertLength(1, repo._fallback_repositories)
 
1298
 
 
1299
    def test_format_initialize_on_transport_ex_default_stack_on(self):
 
1300
        # When initialize_on_transport_ex uses a stacked-on branch because of
 
1301
        # a stacking policy on the target, the location of the fallback
 
1302
        # repository is the same as the external location of the stacked-on
 
1303
        # branch.
 
1304
        balloon = self.make_bzrdir('balloon')
 
1305
        if isinstance(balloon._format, bzrdir.BzrDirMetaFormat1):
 
1306
            stack_on = self.make_branch('stack-on', format='1.9')
 
1307
        else:
 
1308
            stack_on = self.make_branch('stack-on')
 
1309
        config = self.make_bzrdir('.').get_config()
 
1310
        try:
 
1311
            config.set_default_stack_on('stack-on')
 
1312
        except errors.BzrError:
 
1313
            raise TestNotApplicable('Only relevant for stackable formats.')
 
1314
        # Initialize a bzrdir subject to the policy.
 
1315
        t = self.get_transport('stacked')
 
1316
        repo_fmt = bzrdir.format_registry.make_bzrdir('1.9')
 
1317
        repo_name = repo_fmt.repository_format.network_name()
 
1318
        repo, control = self.assertInitializeEx(
 
1319
            t, need_meta=True, repo_format_name=repo_name, stacked_on=None)
 
1320
        # self.addCleanup(repo.unlock)
 
1321
        if control is None:
 
1322
            # uninitialisable format
 
1323
            return
 
1324
        # There's one fallback repo, with a public location.
 
1325
        self.assertLength(1, repo._fallback_repositories)
 
1326
        fallback_repo = repo._fallback_repositories[0]
 
1327
        self.assertEqual(
 
1328
            stack_on.base, fallback_repo.bzrdir.root_transport.base)
 
1329
        # The bzrdir creates a branch in stacking-capable format.
 
1330
        new_branch = control.create_branch()
 
1331
        self.assertTrue(new_branch._format.supports_stacking())
 
1332
 
 
1333
    def test_format_initialize_on_transport_ex_repo_fmt_name_None(self):
 
1334
        t = self.get_transport('dir')
 
1335
        repo, control = self.assertInitializeEx(t)
 
1336
        self.assertEqual(None, repo)
 
1337
 
 
1338
    def test_format_initialize_on_transport_ex_repo_fmt_name_followed(self):
 
1339
        t = self.get_transport('dir')
 
1340
        # 1.6 is likely to never be default
 
1341
        fmt = bzrdir.format_registry.make_bzrdir('1.6')
 
1342
        repo_name = fmt.repository_format.network_name()
 
1343
        repo, control = self.assertInitializeEx(t, repo_format_name=repo_name)
 
1344
        if control is None:
 
1345
            # uninitialisable format
 
1346
            return
 
1347
        if isinstance(self.bzrdir_format, (bzrdir.BzrDirFormat5,
 
1348
            bzrdir.BzrDirFormat6)):
 
1349
            # must stay with the all-in-one-format.
 
1350
            repo_name = self.bzrdir_format.network_name()
 
1351
        self.assertEqual(repo_name, repo._format.network_name())
 
1352
 
 
1353
    def assertInitializeEx(self, t, need_meta=False, **kwargs):
 
1354
        """Execute initialize_on_transport_ex and check it succeeded correctly.
 
1355
 
 
1356
        This involves checking that the disk objects were created, open with
 
1357
        the same format returned, and had the expected disk format.
 
1358
 
 
1359
        :param t: The transport to initialize on.
 
1360
        :param **kwargs: Additional arguments to pass to
 
1361
            initialize_on_transport_ex.
 
1362
        :return: the resulting repo, control dir tuple.
 
1363
        """
 
1364
        if not self.bzrdir_format.is_supported():
 
1365
            # Not initializable - not a failure either.
 
1366
            return None, None
 
1367
        repo, control, require_stacking, repo_policy = \
 
1368
            self.bzrdir_format.initialize_on_transport_ex(t, **kwargs)
 
1369
        if repo is not None:
 
1370
            # Repositories are open write-locked
 
1371
            self.assertTrue(repo.is_write_locked())
 
1372
            self.addCleanup(repo.unlock)
 
1373
        self.assertIsInstance(control, bzrdir.BzrDir)
 
1374
        opened = bzrdir.BzrDir.open(t.base)
 
1375
        expected_format = self.bzrdir_format
 
1376
        if isinstance(expected_format, bzrdir.RemoteBzrDirFormat):
 
1377
            # Current RemoteBzrDirFormat's do not reliably get network_name
 
1378
            # set, so we skip a number of tests for RemoteBzrDirFormat's.
 
1379
            self.assertIsInstance(control, RemoteBzrDir)
 
1380
        else:
 
1381
            if need_meta and isinstance(expected_format, (bzrdir.BzrDirFormat5,
 
1382
                bzrdir.BzrDirFormat6)):
 
1383
                # Pre-metadir formats change when we are making something that
 
1384
                # needs a metaformat, because clone is used for push.
 
1385
                expected_format = bzrdir.BzrDirMetaFormat1()
 
1386
            self.assertEqual(control._format.network_name(),
 
1387
                expected_format.network_name())
 
1388
            self.assertEqual(control._format.network_name(),
 
1389
                opened._format.network_name())
 
1390
        self.assertEqual(control.__class__, opened.__class__)
 
1391
        return repo, control
 
1392
 
 
1393
    def test_format_network_name(self):
 
1394
        # All control formats must have a network name.
 
1395
        dir = self.make_bzrdir('.')
 
1396
        format = dir._format
 
1397
        # We want to test that the network_name matches the actual format on
 
1398
        # disk. For local control dirsthat means that using network_name as a
 
1399
        # key in the registry gives back the same format. For remote obects
 
1400
        # we check that the network_name of the RemoteBzrDirFormat we have
 
1401
        # locally matches the actual format present on disk.
 
1402
        if isinstance(format, bzrdir.RemoteBzrDirFormat):
 
1403
            dir._ensure_real()
 
1404
            real_dir = dir._real_bzrdir
 
1405
            network_name = format.network_name()
 
1406
            self.assertEqual(real_dir._format.network_name(), network_name)
 
1407
        else:
 
1408
            registry = bzrdir.network_format_registry
 
1409
            network_name = format.network_name()
 
1410
            looked_up_format = registry.get(network_name)
 
1411
            self.assertEqual(format.__class__, looked_up_format.__class__)
 
1412
        # The network name must be a byte string.
 
1413
        self.assertIsInstance(network_name, str)
 
1414
 
 
1415
    def test_open_not_bzrdir(self):
 
1416
        # test the formats specific behaviour for no-content or similar dirs.
 
1417
        self.assertRaises(NotBranchError,
 
1418
                          self.bzrdir_format.open,
 
1419
                          transport.get_transport(self.get_readonly_url()))
 
1420
 
 
1421
    def test_create_branch(self):
 
1422
        # a bzrdir can construct a branch and repository for itself.
 
1423
        if not self.bzrdir_format.is_supported():
 
1424
            # unsupported formats are not loopback testable
 
1425
            # because the default open will not open them and
 
1426
            # they may not be initializable.
 
1427
            return
 
1428
        t = transport.get_transport(self.get_url())
 
1429
        made_control = self.bzrdir_format.initialize(t.base)
 
1430
        made_repo = made_control.create_repository()
 
1431
        made_branch = made_control.create_branch()
 
1432
        self.failUnless(isinstance(made_branch, bzrlib.branch.Branch))
 
1433
        self.assertEqual(made_control, made_branch.bzrdir)
 
1434
 
 
1435
    def test_open_branch(self):
 
1436
        if not self.bzrdir_format.is_supported():
 
1437
            # unsupported formats are not loopback testable
 
1438
            # because the default open will not open them and
 
1439
            # they may not be initializable.
 
1440
            return
 
1441
        t = transport.get_transport(self.get_url())
 
1442
        made_control = self.bzrdir_format.initialize(t.base)
 
1443
        made_repo = made_control.create_repository()
 
1444
        made_branch = made_control.create_branch()
 
1445
        opened_branch = made_control.open_branch()
 
1446
        self.assertEqual(made_control, opened_branch.bzrdir)
 
1447
        self.failUnless(isinstance(opened_branch, made_branch.__class__))
 
1448
        self.failUnless(isinstance(opened_branch._format, made_branch._format.__class__))
 
1449
 
 
1450
    def test_list_branches(self):
 
1451
        if not self.bzrdir_format.is_supported():
 
1452
            # unsupported formats are not loopback testable
 
1453
            # because the default open will not open them and
 
1454
            # they may not be initializable.
 
1455
            return
 
1456
        t = transport.get_transport(self.get_url())
 
1457
        made_control = self.bzrdir_format.initialize(t.base)
 
1458
        made_repo = made_control.create_repository()
 
1459
        made_branch = made_control.create_branch()
 
1460
        branches = made_control.list_branches()
 
1461
        self.assertEquals(1, len(branches))
 
1462
        self.assertEquals(made_branch.base, branches[0].base)
 
1463
        try:
 
1464
            made_control.destroy_branch()
 
1465
        except errors.UnsupportedOperation:
 
1466
            pass # Not all bzrdirs support destroying directories
 
1467
        else:
 
1468
            self.assertEquals([], made_control.list_branches())
 
1469
 
 
1470
    def test_create_repository(self):
 
1471
        # a bzrdir can construct a repository for itself.
 
1472
        if not self.bzrdir_format.is_supported():
 
1473
            # unsupported formats are not loopback testable
 
1474
            # because the default open will not open them and
 
1475
            # they may not be initializable.
 
1476
            return
 
1477
        t = transport.get_transport(self.get_url())
 
1478
        made_control = self.bzrdir_format.initialize(t.base)
 
1479
        made_repo = made_control.create_repository()
 
1480
        # Check that we have a repository object.
 
1481
        made_repo.has_revision('foo')
 
1482
        self.assertEqual(made_control, made_repo.bzrdir)
 
1483
 
 
1484
    def test_create_repository_shared(self):
 
1485
        # a bzrdir can create a shared repository or
 
1486
        # fail appropriately
 
1487
        if not self.bzrdir_format.is_supported():
 
1488
            # unsupported formats are not loopback testable
 
1489
            # because the default open will not open them and
 
1490
            # they may not be initializable.
 
1491
            return
 
1492
        t = transport.get_transport(self.get_url())
 
1493
        made_control = self.bzrdir_format.initialize(t.base)
 
1494
        try:
 
1495
            made_repo = made_control.create_repository(shared=True)
 
1496
        except errors.IncompatibleFormat:
 
1497
            # Old bzrdir formats don't support shared repositories
 
1498
            # and should raise IncompatibleFormat
 
1499
            return
 
1500
        self.assertTrue(made_repo.is_shared())
 
1501
 
 
1502
    def test_create_repository_nonshared(self):
 
1503
        # a bzrdir can create a non-shared repository
 
1504
        if not self.bzrdir_format.is_supported():
 
1505
            # unsupported formats are not loopback testable
 
1506
            # because the default open will not open them and
 
1507
            # they may not be initializable.
 
1508
            return
 
1509
        t = transport.get_transport(self.get_url())
 
1510
        made_control = self.bzrdir_format.initialize(t.base)
 
1511
        made_repo = made_control.create_repository(shared=False)
 
1512
        self.assertFalse(made_repo.is_shared())
 
1513
 
 
1514
    def test_open_repository(self):
 
1515
        if not self.bzrdir_format.is_supported():
 
1516
            # unsupported formats are not loopback testable
 
1517
            # because the default open will not open them and
 
1518
            # they may not be initializable.
 
1519
            return
 
1520
        t = transport.get_transport(self.get_url())
 
1521
        made_control = self.bzrdir_format.initialize(t.base)
 
1522
        made_repo = made_control.create_repository()
 
1523
        opened_repo = made_control.open_repository()
 
1524
        self.assertEqual(made_control, opened_repo.bzrdir)
 
1525
        self.failUnless(isinstance(opened_repo, made_repo.__class__))
 
1526
        self.failUnless(isinstance(opened_repo._format, made_repo._format.__class__))
 
1527
 
 
1528
    def test_create_workingtree(self):
 
1529
        # a bzrdir can construct a working tree for itself.
 
1530
        if not self.bzrdir_format.is_supported():
 
1531
            # unsupported formats are not loopback testable
 
1532
            # because the default open will not open them and
 
1533
            # they may not be initializable.
 
1534
            return
 
1535
        t = self.get_transport()
 
1536
        made_control = self.bzrdir_format.initialize(t.base)
 
1537
        made_repo = made_control.create_repository()
 
1538
        made_branch = made_control.create_branch()
 
1539
        made_tree = self.createWorkingTreeOrSkip(made_control)
 
1540
        self.failUnless(isinstance(made_tree, workingtree.WorkingTree))
 
1541
        self.assertEqual(made_control, made_tree.bzrdir)
 
1542
 
 
1543
    def test_create_workingtree_revision(self):
 
1544
        # a bzrdir can construct a working tree for itself @ a specific revision.
 
1545
        t = self.get_transport()
 
1546
        source = self.make_branch_and_tree('source')
 
1547
        source.commit('a', rev_id='a', allow_pointless=True)
 
1548
        source.commit('b', rev_id='b', allow_pointless=True)
 
1549
        t.mkdir('new')
 
1550
        t_new = t.clone('new')
 
1551
        made_control = self.bzrdir_format.initialize_on_transport(t_new)
 
1552
        source.branch.repository.clone(made_control)
 
1553
        source.branch.clone(made_control)
 
1554
        try:
 
1555
            made_tree = made_control.create_workingtree(revision_id='a')
 
1556
        except errors.NotLocalUrl:
 
1557
            raise TestSkipped("Can't make working tree on transport %r" % t)
 
1558
        self.assertEqual(['a'], made_tree.get_parent_ids())
 
1559
 
 
1560
    def test_open_workingtree(self):
 
1561
        if not self.bzrdir_format.is_supported():
 
1562
            # unsupported formats are not loopback testable
 
1563
            # because the default open will not open them and
 
1564
            # they may not be initializable.
 
1565
            return
 
1566
        # this has to be tested with local access as we still support creating
 
1567
        # format 6 bzrdirs
 
1568
        t = self.get_transport()
 
1569
        try:
 
1570
            made_control = self.bzrdir_format.initialize(t.base)
 
1571
            made_repo = made_control.create_repository()
 
1572
            made_branch = made_control.create_branch()
 
1573
            made_tree = made_control.create_workingtree()
 
1574
        except errors.NotLocalUrl:
 
1575
            raise TestSkipped("Can't initialize %r on transport %r"
 
1576
                              % (self.bzrdir_format, t))
 
1577
        opened_tree = made_control.open_workingtree()
 
1578
        self.assertEqual(made_control, opened_tree.bzrdir)
 
1579
        self.failUnless(isinstance(opened_tree, made_tree.__class__))
 
1580
        self.failUnless(isinstance(opened_tree._format, made_tree._format.__class__))
 
1581
 
 
1582
    def test_get_branch_transport(self):
 
1583
        dir = self.make_bzrdir('.')
 
1584
        # without a format, get_branch_transport gives use a transport
 
1585
        # which -may- point to an existing dir.
 
1586
        self.assertTrue(isinstance(dir.get_branch_transport(None),
 
1587
                                   transport.Transport))
 
1588
        # with a given format, either the bzr dir supports identifiable
 
1589
        # branches, or it supports anonymous  branch formats, but not both.
 
1590
        anonymous_format = bzrlib.branch.BzrBranchFormat4()
 
1591
        identifiable_format = bzrlib.branch.BzrBranchFormat5()
 
1592
        try:
 
1593
            found_transport = dir.get_branch_transport(anonymous_format)
 
1594
            self.assertRaises(errors.IncompatibleFormat,
 
1595
                              dir.get_branch_transport,
 
1596
                              identifiable_format)
 
1597
        except errors.IncompatibleFormat:
 
1598
            found_transport = dir.get_branch_transport(identifiable_format)
 
1599
        self.assertTrue(isinstance(found_transport, transport.Transport))
 
1600
        # and the dir which has been initialized for us must exist.
 
1601
        found_transport.list_dir('.')
 
1602
 
 
1603
    def test_get_repository_transport(self):
 
1604
        dir = self.make_bzrdir('.')
 
1605
        # without a format, get_repository_transport gives use a transport
 
1606
        # which -may- point to an existing dir.
 
1607
        self.assertTrue(isinstance(dir.get_repository_transport(None),
 
1608
                                   transport.Transport))
 
1609
        # with a given format, either the bzr dir supports identifiable
 
1610
        # repositories, or it supports anonymous  repository formats, but not both.
 
1611
        anonymous_format = weaverepo.RepositoryFormat6()
 
1612
        identifiable_format = weaverepo.RepositoryFormat7()
 
1613
        try:
 
1614
            found_transport = dir.get_repository_transport(anonymous_format)
 
1615
            self.assertRaises(errors.IncompatibleFormat,
 
1616
                              dir.get_repository_transport,
 
1617
                              identifiable_format)
 
1618
        except errors.IncompatibleFormat:
 
1619
            found_transport = dir.get_repository_transport(identifiable_format)
 
1620
        self.assertTrue(isinstance(found_transport, transport.Transport))
 
1621
        # and the dir which has been initialized for us must exist.
 
1622
        found_transport.list_dir('.')
 
1623
 
 
1624
    def test_get_workingtree_transport(self):
 
1625
        dir = self.make_bzrdir('.')
 
1626
        # without a format, get_workingtree_transport gives use a transport
 
1627
        # which -may- point to an existing dir.
 
1628
        self.assertTrue(isinstance(dir.get_workingtree_transport(None),
 
1629
                                   transport.Transport))
 
1630
        # with a given format, either the bzr dir supports identifiable
 
1631
        # trees, or it supports anonymous tree formats, but not both.
 
1632
        anonymous_format = workingtree.WorkingTreeFormat2()
 
1633
        identifiable_format = workingtree.WorkingTreeFormat3()
 
1634
        try:
 
1635
            found_transport = dir.get_workingtree_transport(anonymous_format)
 
1636
            self.assertRaises(errors.IncompatibleFormat,
 
1637
                              dir.get_workingtree_transport,
 
1638
                              identifiable_format)
 
1639
        except errors.IncompatibleFormat:
 
1640
            found_transport = dir.get_workingtree_transport(identifiable_format)
 
1641
        self.assertTrue(isinstance(found_transport, transport.Transport))
 
1642
        # and the dir which has been initialized for us must exist.
 
1643
        found_transport.list_dir('.')
 
1644
 
 
1645
    def test_root_transport(self):
 
1646
        dir = self.make_bzrdir('.')
 
1647
        self.assertEqual(dir.root_transport.base,
 
1648
                         transport.get_transport(self.get_url('.')).base)
 
1649
 
 
1650
    def test_find_repository_no_repo_under_standalone_branch(self):
 
1651
        # finding a repo stops at standalone branches even if there is a
 
1652
        # higher repository available.
 
1653
        try:
 
1654
            repo = self.make_repository('.', shared=True)
 
1655
        except errors.IncompatibleFormat:
 
1656
            # need a shared repository to test this.
 
1657
            return
 
1658
        url = self.get_url('intermediate')
 
1659
        transport.get_transport(self.get_url()).mkdir('intermediate')
 
1660
        transport.get_transport(self.get_url()).mkdir('intermediate/child')
 
1661
        made_control = self.bzrdir_format.initialize(url)
 
1662
        made_control.create_repository()
 
1663
        innermost_control = self.bzrdir_format.initialize(
 
1664
            self.get_url('intermediate/child'))
 
1665
        try:
 
1666
            child_repo = innermost_control.open_repository()
 
1667
            # if there is a repository, then the format cannot ever hit this
 
1668
            # code path.
 
1669
            return
 
1670
        except errors.NoRepositoryPresent:
 
1671
            pass
 
1672
        self.assertRaises(errors.NoRepositoryPresent,
 
1673
                          innermost_control.find_repository)
 
1674
 
 
1675
    def test_find_repository_containing_shared_repository(self):
 
1676
        # find repo inside a shared repo with an empty control dir
 
1677
        # returns the shared repo.
 
1678
        try:
 
1679
            repo = self.make_repository('.', shared=True)
 
1680
        except errors.IncompatibleFormat:
 
1681
            # need a shared repository to test this.
 
1682
            return
 
1683
        url = self.get_url('childbzrdir')
 
1684
        transport.get_transport(self.get_url()).mkdir('childbzrdir')
 
1685
        made_control = self.bzrdir_format.initialize(url)
 
1686
        try:
 
1687
            child_repo = made_control.open_repository()
 
1688
            # if there is a repository, then the format cannot ever hit this
 
1689
            # code path.
 
1690
            return
 
1691
        except errors.NoRepositoryPresent:
 
1692
            pass
 
1693
        found_repo = made_control.find_repository()
 
1694
        self.assertEqual(repo.bzrdir.root_transport.base,
 
1695
                         found_repo.bzrdir.root_transport.base)
 
1696
 
 
1697
    def test_find_repository_standalone_with_containing_shared_repository(self):
 
1698
        # find repo inside a standalone repo inside a shared repo finds the standalone repo
 
1699
        try:
 
1700
            containing_repo = self.make_repository('.', shared=True)
 
1701
        except errors.IncompatibleFormat:
 
1702
            # need a shared repository to test this.
 
1703
            return
 
1704
        child_repo = self.make_repository('childrepo')
 
1705
        opened_control = bzrdir.BzrDir.open(self.get_url('childrepo'))
 
1706
        found_repo = opened_control.find_repository()
 
1707
        self.assertEqual(child_repo.bzrdir.root_transport.base,
 
1708
                         found_repo.bzrdir.root_transport.base)
 
1709
 
 
1710
    def test_find_repository_shared_within_shared_repository(self):
 
1711
        # find repo at a shared repo inside a shared repo finds the inner repo
 
1712
        try:
 
1713
            containing_repo = self.make_repository('.', shared=True)
 
1714
        except errors.IncompatibleFormat:
 
1715
            # need a shared repository to test this.
 
1716
            return
 
1717
        url = self.get_url('childrepo')
 
1718
        transport.get_transport(self.get_url()).mkdir('childrepo')
 
1719
        child_control = self.bzrdir_format.initialize(url)
 
1720
        child_repo = child_control.create_repository(shared=True)
 
1721
        opened_control = bzrdir.BzrDir.open(self.get_url('childrepo'))
 
1722
        found_repo = opened_control.find_repository()
 
1723
        self.assertEqual(child_repo.bzrdir.root_transport.base,
 
1724
                         found_repo.bzrdir.root_transport.base)
 
1725
        self.assertNotEqual(child_repo.bzrdir.root_transport.base,
 
1726
                            containing_repo.bzrdir.root_transport.base)
 
1727
 
 
1728
    def test_find_repository_with_nested_dirs_works(self):
 
1729
        # find repo inside a bzrdir inside a bzrdir inside a shared repo
 
1730
        # finds the outer shared repo.
 
1731
        try:
 
1732
            repo = self.make_repository('.', shared=True)
 
1733
        except errors.IncompatibleFormat:
 
1734
            # need a shared repository to test this.
 
1735
            return
 
1736
        url = self.get_url('intermediate')
 
1737
        transport.get_transport(self.get_url()).mkdir('intermediate')
 
1738
        transport.get_transport(self.get_url()).mkdir('intermediate/child')
 
1739
        made_control = self.bzrdir_format.initialize(url)
 
1740
        try:
 
1741
            child_repo = made_control.open_repository()
 
1742
            # if there is a repository, then the format cannot ever hit this
 
1743
            # code path.
 
1744
            return
 
1745
        except errors.NoRepositoryPresent:
 
1746
            pass
 
1747
        innermost_control = self.bzrdir_format.initialize(
 
1748
            self.get_url('intermediate/child'))
 
1749
        try:
 
1750
            child_repo = innermost_control.open_repository()
 
1751
            # if there is a repository, then the format cannot ever hit this
 
1752
            # code path.
 
1753
            return
 
1754
        except errors.NoRepositoryPresent:
 
1755
            pass
 
1756
        found_repo = innermost_control.find_repository()
 
1757
        self.assertEqual(repo.bzrdir.root_transport.base,
 
1758
                         found_repo.bzrdir.root_transport.base)
 
1759
 
 
1760
    def test_can_and_needs_format_conversion(self):
 
1761
        # check that we can ask an instance if its upgradable
 
1762
        dir = self.make_bzrdir('.')
 
1763
        if dir.can_convert_format():
 
1764
            # if its default updatable there must be an updater
 
1765
            # (we force the latest known format as downgrades may not be
 
1766
            # available
 
1767
            self.assertTrue(isinstance(dir._format.get_converter(
 
1768
                format=dir._format), bzrdir.Converter))
 
1769
        dir.needs_format_conversion(
 
1770
            bzrdir.BzrDirFormat.get_default_format())
 
1771
 
 
1772
    def test_backup_copies_existing(self):
 
1773
        tree = self.make_branch_and_tree('test')
 
1774
        self.build_tree(['test/a'])
 
1775
        tree.add(['a'], ['a-id'])
 
1776
        tree.commit('some data to be copied.')
 
1777
        old_url, new_url = tree.bzrdir.backup_bzrdir()
 
1778
        old_path = urlutils.local_path_from_url(old_url)
 
1779
        new_path = urlutils.local_path_from_url(new_url)
 
1780
        self.failUnlessExists(old_path)
 
1781
        self.failUnlessExists(new_path)
 
1782
        for (((dir_relpath1, _), entries1),
 
1783
             ((dir_relpath2, _), entries2)) in izip(
 
1784
                osutils.walkdirs(old_path),
 
1785
                osutils.walkdirs(new_path)):
 
1786
            self.assertEquals(dir_relpath1, dir_relpath2)
 
1787
            for f1, f2 in zip(entries1, entries2):
 
1788
                self.assertEquals(f1[0], f2[0])
 
1789
                self.assertEquals(f1[2], f2[2])
 
1790
                if f1[2] == "file":
 
1791
                    osutils.compare_files(open(f1[4]), open(f2[4]))
 
1792
 
 
1793
    def test_upgrade_new_instance(self):
 
1794
        """Does an available updater work?"""
 
1795
        dir = self.make_bzrdir('.')
 
1796
        # for now, upgrade is not ready for partial bzrdirs.
 
1797
        dir.create_repository()
 
1798
        dir.create_branch()
 
1799
        self.createWorkingTreeOrSkip(dir)
 
1800
        if dir.can_convert_format():
 
1801
            # if its default updatable there must be an updater
 
1802
            # (we force the latest known format as downgrades may not be
 
1803
            # available
 
1804
            pb = ui.ui_factory.nested_progress_bar()
 
1805
            try:
 
1806
                dir._format.get_converter(format=dir._format).convert(dir, pb)
 
1807
            finally:
 
1808
                pb.finished()
 
1809
            # and it should pass 'check' now.
 
1810
            check.check_dwim(self.get_url('.'), False, True, True)
 
1811
 
 
1812
    def test_format_description(self):
 
1813
        dir = self.make_bzrdir('.')
 
1814
        text = dir._format.get_format_description()
 
1815
        self.failUnless(len(text))
 
1816
 
 
1817
    def test_retire_bzrdir(self):
 
1818
        bd = self.make_bzrdir('.')
 
1819
        transport = bd.root_transport
 
1820
        # must not overwrite existing directories
 
1821
        self.build_tree(['.bzr.retired.0/', '.bzr.retired.0/junk',],
 
1822
            transport=transport)
 
1823
        self.failUnless(transport.has('.bzr'))
 
1824
        bd.retire_bzrdir()
 
1825
        self.failIf(transport.has('.bzr'))
 
1826
        self.failUnless(transport.has('.bzr.retired.1'))
 
1827
 
 
1828
    def test_retire_bzrdir_limited(self):
 
1829
        bd = self.make_bzrdir('.')
 
1830
        transport = bd.root_transport
 
1831
        # must not overwrite existing directories
 
1832
        self.build_tree(['.bzr.retired.0/', '.bzr.retired.0/junk',],
 
1833
            transport=transport)
 
1834
        self.failUnless(transport.has('.bzr'))
 
1835
        self.assertRaises((errors.FileExists, errors.DirectoryNotEmpty),
 
1836
            bd.retire_bzrdir, limit=0)
 
1837
 
 
1838
 
 
1839
class TestBreakLock(TestCaseWithBzrDir):
 
1840
 
 
1841
    def test_break_lock_empty(self):
 
1842
        # break lock on an empty bzrdir should work silently.
 
1843
        dir = self.make_bzrdir('.')
 
1844
        try:
 
1845
            dir.break_lock()
 
1846
        except NotImplementedError:
 
1847
            pass
 
1848
 
 
1849
    def test_break_lock_repository(self):
 
1850
        # break lock with just a repo should unlock the repo.
 
1851
        repo = self.make_repository('.')
 
1852
        repo.lock_write()
 
1853
        lock_repo = repo.bzrdir.open_repository()
 
1854
        if not lock_repo.get_physical_lock_status():
 
1855
            # This bzrdir's default repository does not physically lock things
 
1856
            # and thus this interaction cannot be tested at the interface
 
1857
            # level.
 
1858
            repo.unlock()
 
1859
            return
 
1860
        # only one yes needed here: it should only be unlocking
 
1861
        # the repo
 
1862
        bzrlib.ui.ui_factory = CannedInputUIFactory([True])
 
1863
        try:
 
1864
            repo.bzrdir.break_lock()
 
1865
        except NotImplementedError:
 
1866
            # this bzrdir does not implement break_lock - so we cant test it.
 
1867
            repo.unlock()
 
1868
            return
 
1869
        lock_repo.lock_write()
 
1870
        lock_repo.unlock()
 
1871
        self.assertRaises(errors.LockBroken, repo.unlock)
 
1872
 
 
1873
    def test_break_lock_branch(self):
 
1874
        # break lock with just a repo should unlock the branch.
 
1875
        # and not directly try the repository.
 
1876
        # we test this by making a branch reference to a branch
 
1877
        # and repository in another bzrdir
 
1878
        # for pre-metadir formats this will fail, thats ok.
 
1879
        master = self.make_branch('branch')
 
1880
        thisdir = self.make_bzrdir('this')
 
1881
        try:
 
1882
            bzrlib.branch.BranchReferenceFormat().initialize(
 
1883
                thisdir, target_branch=master)
 
1884
        except errors.IncompatibleFormat:
 
1885
            return
 
1886
        unused_repo = thisdir.create_repository()
 
1887
        master.lock_write()
 
1888
        unused_repo.lock_write()
 
1889
        try:
 
1890
            # two yes's : branch and repository. If the repo in this
 
1891
            # dir is inappropriately accessed, 3 will be needed, and
 
1892
            # we'll see that because the stream will be fully consumed
 
1893
            bzrlib.ui.ui_factory = CannedInputUIFactory([True, True, True])
 
1894
            # determine if the repository will have been locked;
 
1895
            this_repo_locked = \
 
1896
                thisdir.open_repository().get_physical_lock_status()
 
1897
            master.bzrdir.break_lock()
 
1898
            if this_repo_locked:
 
1899
                # only two ys should have been read
 
1900
                self.assertEqual([True],
 
1901
                    bzrlib.ui.ui_factory.responses)
 
1902
            else:
 
1903
                # only one y should have been read
 
1904
                self.assertEqual([True, True],
 
1905
                    bzrlib.ui.ui_factory.responses)
 
1906
            # we should be able to lock a newly opened branch now
 
1907
            branch = master.bzrdir.open_branch()
 
1908
            branch.lock_write()
 
1909
            branch.unlock()
 
1910
            if this_repo_locked:
 
1911
                # we should not be able to lock the repository in thisdir as
 
1912
                # its still held by the explicit lock we took, and the break
 
1913
                # lock should not have touched it.
 
1914
                repo = thisdir.open_repository()
 
1915
                self.assertRaises(errors.LockContention, repo.lock_write)
 
1916
        finally:
 
1917
            unused_repo.unlock()
 
1918
        self.assertRaises(errors.LockBroken, master.unlock)
 
1919
 
 
1920
    def test_break_lock_tree(self):
 
1921
        # break lock with a tree should unlock the tree but not try the
 
1922
        # branch explicitly. However this is very hard to test for as we
 
1923
        # dont have a tree reference class, nor is one needed;
 
1924
        # the worst case if this code unlocks twice is an extra question
 
1925
        # being asked.
 
1926
        tree = self.make_branch_and_tree('.')
 
1927
        tree.lock_write()
 
1928
        # three yes's : tree, branch and repository.
 
1929
        bzrlib.ui.ui_factory = CannedInputUIFactory([True, True, True])
 
1930
        try:
 
1931
            tree.bzrdir.break_lock()
 
1932
        except (NotImplementedError, errors.LockActive):
 
1933
            # bzrdir does not support break_lock
 
1934
            # or one of the locked objects (currently only tree does this)
 
1935
            # raised a LockActive because we do still have a live locked
 
1936
            # object.
 
1937
            tree.unlock()
 
1938
            return
 
1939
        self.assertEqual([True],
 
1940
                bzrlib.ui.ui_factory.responses)
 
1941
        lock_tree = tree.bzrdir.open_workingtree()
 
1942
        lock_tree.lock_write()
 
1943
        lock_tree.unlock()
 
1944
        self.assertRaises(errors.LockBroken, tree.unlock)
 
1945
 
 
1946
 
 
1947
class TestTransportConfig(TestCaseWithBzrDir):
 
1948
 
 
1949
    def test_get_config(self):
 
1950
        my_dir = self.make_bzrdir('.')
 
1951
        config = my_dir.get_config()
 
1952
        try:
 
1953
            config.set_default_stack_on('http://example.com')
 
1954
        except errors.BzrError, e:
 
1955
            if 'Cannot set config' in str(e):
 
1956
                self.assertFalse(
 
1957
                    isinstance(my_dir, (bzrdir.BzrDirMeta1, RemoteBzrDir)),
 
1958
                    "%r should support configs" % my_dir)
 
1959
                raise TestNotApplicable(
 
1960
                    'This BzrDir format does not support configs.')
 
1961
            else:
 
1962
                raise
 
1963
        self.assertEqual('http://example.com', config.get_default_stack_on())
 
1964
        my_dir2 = bzrdir.BzrDir.open(self.get_url('.'))
 
1965
        config2 = my_dir2.get_config()
 
1966
        self.assertEqual('http://example.com', config2.get_default_stack_on())
 
1967
 
 
1968
 
 
1969
class ChrootedBzrDirTests(ChrootedTestCase):
 
1970
 
 
1971
    def test_find_repository_no_repository(self):
 
1972
        # loopback test to check the current format fails to find a
 
1973
        # share repository correctly.
 
1974
        if not self.bzrdir_format.is_supported():
 
1975
            # unsupported formats are not loopback testable
 
1976
            # because the default open will not open them and
 
1977
            # they may not be initializable.
 
1978
            return
 
1979
        # supported formats must be able to init and open
 
1980
        # - do the vfs initialisation over the basic vfs transport
 
1981
        # XXX: TODO this should become a 'bzrdirlocation' api call.
 
1982
        url = self.get_vfs_only_url('subdir')
 
1983
        transport.get_transport(self.get_vfs_only_url()).mkdir('subdir')
 
1984
        made_control = self.bzrdir_format.initialize(self.get_url('subdir'))
 
1985
        try:
 
1986
            repo = made_control.open_repository()
 
1987
            # if there is a repository, then the format cannot ever hit this
 
1988
            # code path.
 
1989
            return
 
1990
        except errors.NoRepositoryPresent:
 
1991
            pass
 
1992
        made_control = bzrdir.BzrDir.open(self.get_readonly_url('subdir'))
 
1993
        self.assertRaises(errors.NoRepositoryPresent,
 
1994
                          made_control.find_repository)
 
1995
 
 
1996
 
 
1997
class TestBzrDirControlComponent(TestCaseWithBzrDir):
 
1998
    """BzrDir implementations adequately implement ControlComponent."""
 
1999
 
 
2000
    def test_urls(self):
 
2001
        bd = self.make_bzrdir('bd')
 
2002
        self.assertIsInstance(bd.user_url, str)
 
2003
        self.assertEqual(bd.user_url, bd.user_transport.base)
 
2004
        # for all current bzrdir implementations the user dir must be 
 
2005
        # above the control dir but we might need to relax that?
 
2006
        self.assertEqual(bd.control_url.find(bd.user_url), 0)
 
2007
        self.assertEqual(bd.control_url, bd.control_transport.base)