~ubuntu-branches/ubuntu/lucid/bzr/lucid-proposed

« back to all changes in this revision

Viewing changes to bzrlib/tests/branch_implementations/test_branch.py

  • Committer: Bazaar Package Importer
  • Author(s): Jeff Bailey
  • Date: 2006-03-20 08:31:00 UTC
  • mfrom: (1.1.2 upstream)
  • mto: This revision was merged to the branch mainline in revision 4.
  • Revision ID: james.westby@ubuntu.com-20060320083100-ovdi2ssuw0epcx8s
Tags: 0.8~200603200831-0ubuntu1
* Snapshot uploaded to Dapper at Martin Pool's request.

* Disable testsuite for upload.  Fakeroot and the testsuite don't
  play along.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# (C) 2005, 2006 Canonical Ltd
 
2
 
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
 
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
 
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for branch implementations - tests a branch format."""
 
18
 
 
19
import os
 
20
import sys
 
21
 
 
22
import bzrlib.branch
 
23
import bzrlib.bzrdir as bzrdir
 
24
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
 
25
from bzrlib.commit import commit
 
26
import bzrlib.errors as errors
 
27
from bzrlib.errors import (FileExists,
 
28
                           NoSuchRevision,
 
29
                           NoSuchFile,
 
30
                           UninitializableFormat,
 
31
                           NotBranchError,
 
32
                           )
 
33
import bzrlib.gpg
 
34
from bzrlib.osutils import getcwd
 
35
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
 
36
from bzrlib.trace import mutter
 
37
import bzrlib.transactions as transactions
 
38
from bzrlib.transport import get_transport
 
39
from bzrlib.transport.http import HttpServer
 
40
from bzrlib.transport.memory import MemoryServer
 
41
from bzrlib.upgrade import upgrade
 
42
from bzrlib.workingtree import WorkingTree
 
43
 
 
44
 
 
45
# TODO: Make a branch using basis branch, and check that it 
 
46
# doesn't request any files that could have been avoided, by 
 
47
# hooking into the Transport.
 
48
 
 
49
 
 
50
class TestCaseWithBranch(TestCaseWithTransport):
 
51
 
 
52
    def setUp(self):
 
53
        super(TestCaseWithBranch, self).setUp()
 
54
        self.branch = None
 
55
 
 
56
    def get_branch(self):
 
57
        if self.branch is None:
 
58
            self.branch = self.make_branch(None)
 
59
        return self.branch
 
60
 
 
61
    def make_branch(self, relpath):
 
62
        repo = self.make_repository(relpath)
 
63
        # fixme RBC 20060210 this isnt necessarily a fixable thing,
 
64
        # Skipped is the wrong exception to raise.
 
65
        try:
 
66
            return self.branch_format.initialize(repo.bzrdir)
 
67
        except errors.UninitializableFormat:
 
68
            raise TestSkipped('Uninitializable branch format')
 
69
 
 
70
    def make_repository(self, relpath, shared=False):
 
71
        try:
 
72
            url = self.get_url(relpath)
 
73
            segments = url.split('/')
 
74
            if segments and segments[-1] not in ('', '.'):
 
75
                parent = '/'.join(segments[:-1])
 
76
                t = get_transport(parent)
 
77
                try:
 
78
                    t.mkdir(segments[-1])
 
79
                except FileExists:
 
80
                    pass
 
81
            made_control = self.bzrdir_format.initialize(url)
 
82
            return made_control.create_repository(shared=shared)
 
83
        except UninitializableFormat:
 
84
            raise TestSkipped("Format %s is not initializable.")
 
85
 
 
86
 
 
87
class TestBranch(TestCaseWithBranch):
 
88
 
 
89
    def test_append_revisions(self):
 
90
        """Test appending more than one revision"""
 
91
        br = self.get_branch()
 
92
        br.append_revision("rev1")
 
93
        self.assertEquals(br.revision_history(), ["rev1",])
 
94
        br.append_revision("rev2", "rev3")
 
95
        self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
 
96
 
 
97
    def test_fetch_revisions(self):
 
98
        """Test fetch-revision operation."""
 
99
        get_transport(self.get_url()).mkdir('b1')
 
100
        get_transport(self.get_url()).mkdir('b2')
 
101
        wt = self.make_branch_and_tree('b1')
 
102
        b1 = wt.branch
 
103
        b2 = self.make_branch('b2')
 
104
        file('b1/foo', 'w').write('hello')
 
105
        wt.add(['foo'], ['foo-id'])
 
106
        wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
 
107
 
 
108
        mutter('start fetch')
 
109
        self.assertEqual((1, []), b2.fetch(b1))
 
110
 
 
111
        rev = b2.repository.get_revision('revision-1')
 
112
        tree = b2.repository.revision_tree('revision-1')
 
113
        self.assertEqual(tree.get_file_text('foo-id'), 'hello')
 
114
 
 
115
    def get_unbalanced_tree_pair(self):
 
116
        """Return two branches, a and b, with one file in a."""
 
117
        get_transport(self.get_url()).mkdir('a')
 
118
        tree_a = self.make_branch_and_tree('a')
 
119
        file('a/b', 'wb').write('b')
 
120
        tree_a.add('b')
 
121
        tree_a.commit("silly commit", rev_id='A')
 
122
 
 
123
        get_transport(self.get_url()).mkdir('b')
 
124
        tree_b = self.make_branch_and_tree('b')
 
125
        return tree_a, tree_b
 
126
 
 
127
    def get_balanced_branch_pair(self):
 
128
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
 
129
        tree_a, tree_b = self.get_unbalanced_tree_pair()
 
130
        tree_b.branch.repository.fetch(tree_a.branch.repository)
 
131
        return tree_a, tree_b
 
132
 
 
133
    def test_clone_branch(self):
 
134
        """Copy the stores from one branch to another"""
 
135
        tree_a, tree_b = self.get_balanced_branch_pair()
 
136
        tree_b.commit("silly commit")
 
137
        os.mkdir('c')
 
138
        # this fails to test that the history from a was not used.
 
139
        dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
 
140
        self.assertEqual(tree_a.branch.revision_history(),
 
141
                         dir_c.open_branch().revision_history())
 
142
 
 
143
    def test_clone_partial(self):
 
144
        """Copy only part of the history of a branch."""
 
145
        # TODO: RBC 20060208 test with a revision not on revision-history.
 
146
        #       what should that behaviour be ? Emailed the list.
 
147
        wt_a = self.make_branch_and_tree('a')
 
148
        self.build_tree(['a/one'])
 
149
        wt_a.add(['one'])
 
150
        wt_a.commit('commit one', rev_id='1')
 
151
        self.build_tree(['a/two'])
 
152
        wt_a.add(['two'])
 
153
        wt_a.commit('commit two', rev_id='2')
 
154
        repo_b = self.make_repository('b')
 
155
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
 
156
        br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
 
157
        self.assertEqual(br_b.last_revision(), '1')
 
158
 
 
159
    def test_sprout_partial(self):
 
160
        # test sprouting with a prefix of the revision-history.
 
161
        # also needs not-on-revision-history behaviour defined.
 
162
        wt_a = self.make_branch_and_tree('a')
 
163
        self.build_tree(['a/one'])
 
164
        wt_a.add(['one'])
 
165
        wt_a.commit('commit one', rev_id='1')
 
166
        self.build_tree(['a/two'])
 
167
        wt_a.add(['two'])
 
168
        wt_a.commit('commit two', rev_id='2')
 
169
        repo_b = self.make_repository('b')
 
170
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
 
171
        br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
 
172
        self.assertEqual(br_b.last_revision(), '1')
 
173
 
 
174
    def test_clone_branch_nickname(self):
 
175
        # test the nick name is preserved always
 
176
        raise TestSkipped('XXX branch cloning is not yet tested..')
 
177
 
 
178
    def test_clone_branch_parent(self):
 
179
        # test the parent is preserved always
 
180
        raise TestSkipped('XXX branch cloning is not yet tested..')
 
181
        
 
182
    def test_sprout_branch_nickname(self):
 
183
        # test the nick name is reset always
 
184
        raise TestSkipped('XXX branch sprouting is not yet tested..')
 
185
 
 
186
    def test_sprout_branch_parent(self):
 
187
        source = self.make_branch('source')
 
188
        target = source.bzrdir.sprout(self.get_url('target')).open_branch()
 
189
        self.assertEqual(source.bzrdir.root_transport.base, target.get_parent())
 
190
        
 
191
    def test_record_initial_ghost_merge(self):
 
192
        """A pending merge with no revision present is still a merge."""
 
193
        wt = self.make_branch_and_tree('.')
 
194
        branch = wt.branch
 
195
        wt.add_pending_merge('non:existent@rev--ision--0--2')
 
196
        wt.commit('pretend to merge nonexistent-revision', rev_id='first')
 
197
        rev = branch.repository.get_revision(branch.last_revision())
 
198
        self.assertEqual(len(rev.parent_ids), 1)
 
199
        # parent_sha1s is not populated now, WTF. rbc 20051003
 
200
        self.assertEqual(len(rev.parent_sha1s), 0)
 
201
        self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
 
202
 
 
203
    def test_bad_revision(self):
 
204
        self.assertRaises(errors.InvalidRevisionId,
 
205
                          self.get_branch().repository.get_revision,
 
206
                          None)
 
207
 
 
208
# TODO 20051003 RBC:
 
209
# compare the gpg-to-sign info for a commit with a ghost and 
 
210
#     an identical tree without a ghost
 
211
# fetch missing should rewrite the TOC of weaves to list newly available parents.
 
212
        
 
213
    def test_pending_merges(self):
 
214
        """Tracking pending-merged revisions."""
 
215
        wt = self.make_branch_and_tree('.')
 
216
        b = wt.branch
 
217
        self.assertEquals(wt.pending_merges(), [])
 
218
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
 
219
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
220
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
 
221
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
222
        wt.add_pending_merge('wibble@fofof--20050401--1928390812')
 
223
        self.assertEquals(wt.pending_merges(),
 
224
                          ['foo@azkhazan-123123-abcabc',
 
225
                           'wibble@fofof--20050401--1928390812'])
 
226
        wt.commit("commit from base with two merges")
 
227
        rev = b.repository.get_revision(b.revision_history()[0])
 
228
        self.assertEquals(len(rev.parent_ids), 2)
 
229
        self.assertEquals(rev.parent_ids[0],
 
230
                          'foo@azkhazan-123123-abcabc')
 
231
        self.assertEquals(rev.parent_ids[1],
 
232
                           'wibble@fofof--20050401--1928390812')
 
233
        # list should be cleared when we do a commit
 
234
        self.assertEquals(wt.pending_merges(), [])
 
235
 
 
236
    def test_sign_existing_revision(self):
 
237
        wt = self.make_branch_and_tree('.')
 
238
        branch = wt.branch
 
239
        wt.commit("base", allow_pointless=True, rev_id='A')
 
240
        from bzrlib.testament import Testament
 
241
        strategy = bzrlib.gpg.LoopbackGPGStrategy(None)
 
242
        branch.repository.sign_revision('A', strategy)
 
243
        self.assertEqual(Testament.from_revision(branch.repository, 
 
244
                         'A').as_short_text(),
 
245
                         branch.repository.revision_store.get('A', 
 
246
                         'sig').read())
 
247
 
 
248
    def test_store_signature(self):
 
249
        branch = self.get_branch()
 
250
        branch.repository.store_revision_signature(
 
251
            bzrlib.gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
 
252
        self.assertEqual('FOO', 
 
253
                         branch.repository.revision_store.get('A', 
 
254
                         'sig').read())
 
255
 
 
256
    def test_branch_keeps_signatures(self):
 
257
        wt = self.make_branch_and_tree('source')
 
258
        wt.commit('A', allow_pointless=True, rev_id='A')
 
259
        wt.branch.repository.sign_revision('A',
 
260
            bzrlib.gpg.LoopbackGPGStrategy(None))
 
261
        #FIXME: clone should work to urls,
 
262
        # wt.clone should work to disks.
 
263
        self.build_tree(['target/'])
 
264
        d2 = wt.bzrdir.clone('target')
 
265
        self.assertEqual(wt.branch.repository.revision_store.get('A', 
 
266
                            'sig').read(),
 
267
                         d2.open_repository().revision_store.get('A', 
 
268
                            'sig').read())
 
269
 
 
270
    def test_nicks(self):
 
271
        """Branch nicknames"""
 
272
        t = get_transport(self.get_url())
 
273
        t.mkdir('bzr.dev')
 
274
        branch = self.make_branch('bzr.dev')
 
275
        self.assertEqual(branch.nick, 'bzr.dev')
 
276
        t.move('bzr.dev', 'bzr.ab')
 
277
        branch = Branch.open(self.get_url('bzr.ab'))
 
278
        self.assertEqual(branch.nick, 'bzr.ab')
 
279
        branch.nick = "Aaron's branch"
 
280
        branch.nick = "Aaron's branch"
 
281
        self.failUnless(
 
282
            t.has(
 
283
                t.relpath(
 
284
                    branch.control_files.controlfilename("branch.conf")
 
285
                    )
 
286
                )
 
287
            )
 
288
        self.assertEqual(branch.nick, "Aaron's branch")
 
289
        t.move('bzr.ab', 'integration')
 
290
        branch = Branch.open(self.get_url('integration'))
 
291
        self.assertEqual(branch.nick, "Aaron's branch")
 
292
        branch.nick = u"\u1234"
 
293
        self.assertEqual(branch.nick, u"\u1234")
 
294
 
 
295
    def test_commit_nicks(self):
 
296
        """Nicknames are committed to the revision"""
 
297
        get_transport(self.get_url()).mkdir('bzr.dev')
 
298
        wt = self.make_branch_and_tree('bzr.dev')
 
299
        branch = wt.branch
 
300
        branch.nick = "My happy branch"
 
301
        wt.commit('My commit respect da nick.')
 
302
        committed = branch.repository.get_revision(branch.last_revision())
 
303
        self.assertEqual(committed.properties["branch-nick"], 
 
304
                         "My happy branch")
 
305
 
 
306
    def test_create_open_branch_uses_repository(self):
 
307
        try:
 
308
            repo = self.make_repository('.', shared=True)
 
309
        except errors.IncompatibleFormat:
 
310
            return
 
311
        repo.bzrdir.root_transport.mkdir('child')
 
312
        child_dir = self.bzrdir_format.initialize('child')
 
313
        try:
 
314
            child_branch = self.branch_format.initialize(child_dir)
 
315
        except errors.UninitializableFormat:
 
316
            # branch references are not default init'able.
 
317
            return
 
318
        self.assertEqual(repo.bzrdir.root_transport.base,
 
319
                         child_branch.repository.bzrdir.root_transport.base)
 
320
        child_branch = bzrlib.branch.Branch.open(self.get_url('child'))
 
321
        self.assertEqual(repo.bzrdir.root_transport.base,
 
322
                         child_branch.repository.bzrdir.root_transport.base)
 
323
 
 
324
 
 
325
class ChrootedTests(TestCaseWithBranch):
 
326
    """A support class that provides readonly urls outside the local namespace.
 
327
 
 
328
    This is done by checking if self.transport_server is a MemoryServer. if it
 
329
    is then we are chrooted already, if it is not then an HttpServer is used
 
330
    for readonly urls.
 
331
    """
 
332
 
 
333
    def setUp(self):
 
334
        super(ChrootedTests, self).setUp()
 
335
        if not self.transport_server == MemoryServer:
 
336
            self.transport_readonly_server = HttpServer
 
337
 
 
338
    def test_open_containing(self):
 
339
        self.assertRaises(NotBranchError, Branch.open_containing,
 
340
                          self.get_readonly_url(''))
 
341
        self.assertRaises(NotBranchError, Branch.open_containing,
 
342
                          self.get_readonly_url('g/p/q'))
 
343
        branch = self.make_branch('.')
 
344
        branch, relpath = Branch.open_containing(self.get_readonly_url(''))
 
345
        self.assertEqual('', relpath)
 
346
        branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
 
347
        self.assertEqual('g/p/q', relpath)
 
348
        
 
349
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
 
350
#         >>> from bzrlib.commit import commit
 
351
#         >>> bzrlib.trace.silent = True
 
352
#         >>> br1 = ScratchBranch(files=['foo', 'bar'])
 
353
#         >>> br1.working_tree().add('foo')
 
354
#         >>> br1.working_tree().add('bar')
 
355
#         >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
 
356
#         >>> br2 = ScratchBranch()
 
357
#         >>> br2.update_revisions(br1)
 
358
#         Added 2 texts.
 
359
#         Added 1 inventories.
 
360
#         Added 1 revisions.
 
361
#         >>> br2.revision_history()
 
362
#         [u'REVISION-ID-1']
 
363
#         >>> br2.update_revisions(br1)
 
364
#         Added 0 revisions.
 
365
#         >>> br1.text_store.total_size() == br2.text_store.total_size()
 
366
#         True
 
367
 
 
368
class InstrumentedTransaction(object):
 
369
 
 
370
    def finish(self):
 
371
        self.calls.append('finish')
 
372
 
 
373
    def __init__(self):
 
374
        self.calls = []
 
375
 
 
376
 
 
377
class TestDecorator(object):
 
378
 
 
379
    def __init__(self):
 
380
        self._calls = []
 
381
 
 
382
    def lock_read(self):
 
383
        self._calls.append('lr')
 
384
 
 
385
    def lock_write(self):
 
386
        self._calls.append('lw')
 
387
 
 
388
    def unlock(self):
 
389
        self._calls.append('ul')
 
390
 
 
391
    @needs_read_lock
 
392
    def do_with_read(self):
 
393
        return 1
 
394
 
 
395
    @needs_read_lock
 
396
    def except_with_read(self):
 
397
        raise RuntimeError
 
398
 
 
399
    @needs_write_lock
 
400
    def do_with_write(self):
 
401
        return 2
 
402
 
 
403
    @needs_write_lock
 
404
    def except_with_write(self):
 
405
        raise RuntimeError
 
406
 
 
407
 
 
408
class TestDecorators(TestCase):
 
409
 
 
410
    def test_needs_read_lock(self):
 
411
        branch = TestDecorator()
 
412
        self.assertEqual(1, branch.do_with_read())
 
413
        self.assertEqual(['lr', 'ul'], branch._calls)
 
414
 
 
415
    def test_excepts_in_read_lock(self):
 
416
        branch = TestDecorator()
 
417
        self.assertRaises(RuntimeError, branch.except_with_read)
 
418
        self.assertEqual(['lr', 'ul'], branch._calls)
 
419
 
 
420
    def test_needs_write_lock(self):
 
421
        branch = TestDecorator()
 
422
        self.assertEqual(2, branch.do_with_write())
 
423
        self.assertEqual(['lw', 'ul'], branch._calls)
 
424
 
 
425
    def test_excepts_in_write_lock(self):
 
426
        branch = TestDecorator()
 
427
        self.assertRaises(RuntimeError, branch.except_with_write)
 
428
        self.assertEqual(['lw', 'ul'], branch._calls)
 
429
 
 
430
 
 
431
class TestBranchTransaction(TestCaseWithBranch):
 
432
 
 
433
    def setUp(self):
 
434
        super(TestBranchTransaction, self).setUp()
 
435
        self.branch = None
 
436
        
 
437
    def test_default_get_transaction(self):
 
438
        """branch.get_transaction on a new branch should give a PassThrough."""
 
439
        self.failUnless(isinstance(self.get_branch().get_transaction(),
 
440
                                   transactions.PassThroughTransaction))
 
441
 
 
442
    def test__set_new_transaction(self):
 
443
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
 
444
 
 
445
    def test__set_over_existing_transaction_raises(self):
 
446
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
 
447
        self.assertRaises(errors.LockError,
 
448
                          self.get_branch()._set_transaction,
 
449
                          transactions.ReadOnlyTransaction())
 
450
 
 
451
    def test_finish_no_transaction_raises(self):
 
452
        self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
 
453
 
 
454
    def test_finish_readonly_transaction_works(self):
 
455
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
 
456
        self.get_branch()._finish_transaction()
 
457
        self.assertEqual(None, self.get_branch().control_files._transaction)
 
458
 
 
459
    def test_unlock_calls_finish(self):
 
460
        self.get_branch().lock_read()
 
461
        transaction = InstrumentedTransaction()
 
462
        self.get_branch().control_files._transaction = transaction
 
463
        self.get_branch().unlock()
 
464
        self.assertEqual(['finish'], transaction.calls)
 
465
 
 
466
    def test_lock_read_acquires_ro_transaction(self):
 
467
        self.get_branch().lock_read()
 
468
        self.failUnless(isinstance(self.get_branch().get_transaction(),
 
469
                                   transactions.ReadOnlyTransaction))
 
470
        self.get_branch().unlock()
 
471
        
 
472
    def test_lock_write_acquires_passthrough_transaction(self):
 
473
        self.get_branch().lock_write()
 
474
        # cannot use get_transaction as its magic
 
475
        self.failUnless(isinstance(self.get_branch().control_files._transaction,
 
476
                                   transactions.PassThroughTransaction))
 
477
        self.get_branch().unlock()
 
478
 
 
479
 
 
480
class TestBranchPushLocations(TestCaseWithBranch):
 
481
 
 
482
    def test_get_push_location_unset(self):
 
483
        self.assertEqual(None, self.get_branch().get_push_location())
 
484
 
 
485
    def test_get_push_location_exact(self):
 
486
        from bzrlib.config import (branches_config_filename,
 
487
                                   ensure_config_dir_exists)
 
488
        ensure_config_dir_exists()
 
489
        fn = branches_config_filename()
 
490
        print >> open(fn, 'wt'), ("[%s]\n"
 
491
                                  "push_location=foo" %
 
492
                                  self.get_branch().base[:-1])
 
493
        self.assertEqual("foo", self.get_branch().get_push_location())
 
494
 
 
495
    def test_set_push_location(self):
 
496
        from bzrlib.config import (branches_config_filename,
 
497
                                   ensure_config_dir_exists)
 
498
        ensure_config_dir_exists()
 
499
        fn = branches_config_filename()
 
500
        self.get_branch().set_push_location('foo')
 
501
        self.assertFileEqual("[%s]\n"
 
502
                             "push_location = foo" % self.get_branch().base[:-1],
 
503
                             fn)
 
504
 
 
505
    # TODO RBC 20051029 test getting a push location from a branch in a 
 
506
    # recursive section - that is, it appends the branch name.
 
507
 
 
508
 
 
509
class TestFormat(TestCaseWithBranch):
 
510
    """Tests for the format itself."""
 
511
 
 
512
    def test_format_initialize_find_open(self):
 
513
        # loopback test to check the current format initializes to itself.
 
514
        if not self.branch_format.is_supported():
 
515
            # unsupported formats are not loopback testable
 
516
            # because the default open will not open them and
 
517
            # they may not be initializable.
 
518
            return
 
519
        # supported formats must be able to init and open
 
520
        t = get_transport(self.get_url())
 
521
        readonly_t = get_transport(self.get_readonly_url())
 
522
        made_branch = self.make_branch('.')
 
523
        self.failUnless(isinstance(made_branch, bzrlib.branch.Branch))
 
524
 
 
525
        # find it via bzrdir opening:
 
526
        opened_control = bzrdir.BzrDir.open(readonly_t.base)
 
527
        direct_opened_branch = opened_control.open_branch()
 
528
        self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
 
529
        self.assertEqual(opened_control, direct_opened_branch.bzrdir)
 
530
        self.failUnless(isinstance(direct_opened_branch._format,
 
531
                        self.branch_format.__class__))
 
532
 
 
533
        # find it via Branch.open
 
534
        opened_branch = bzrlib.branch.Branch.open(readonly_t.base)
 
535
        self.failUnless(isinstance(opened_branch, made_branch.__class__))
 
536
        self.assertEqual(made_branch._format.__class__,
 
537
                         opened_branch._format.__class__)
 
538
        # if it has a unique id string, can we probe for it ?
 
539
        try:
 
540
            self.branch_format.get_format_string()
 
541
        except NotImplementedError:
 
542
            return
 
543
        self.assertEqual(self.branch_format,
 
544
                         bzrlib.branch.BranchFormat.find_format(opened_control))