~lifeless/ubuntu/lucid/bzr/2.1.2-sru

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_interrepository/test_interrepository.py

  • Committer: Bazaar Package Importer
  • Author(s): Jelmer Vernooij
  • Date: 2009-07-21 11:25:12 UTC
  • mfrom: (1.4.1 upstream) (8.1.1 sid)
  • Revision ID: james.westby@ubuntu.com-20090721112512-dzfg8hfhqddf1dwj
* New upstream release.
 + Fixes compatibility with Python 2.4. Closes: #537708

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007, 2008 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for InterRepository implementastions."""
 
18
 
 
19
import sys
 
20
 
 
21
import bzrlib
 
22
import bzrlib.bzrdir as bzrdir
 
23
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
 
24
import bzrlib.errors as errors
 
25
import bzrlib.gpg
 
26
from bzrlib.inventory import Inventory
 
27
import bzrlib.repofmt.weaverepo as weaverepo
 
28
import bzrlib.repository as repository
 
29
from bzrlib.revision import NULL_REVISION, Revision
 
30
from bzrlib.tests import (
 
31
    TestCase,
 
32
    TestCaseWithTransport,
 
33
    TestNotApplicable,
 
34
    TestSkipped,
 
35
    )
 
36
from bzrlib.tests.per_interrepository import (
 
37
    TestCaseWithInterRepository,
 
38
    )
 
39
 
 
40
 
 
41
def check_old_format_lock_error(repository_format):
 
42
    """Potentially ignore LockError on old formats.
 
43
 
 
44
    On win32, with the old OS locks, we get a failure of double-lock when
 
45
    we open a object in 2 objects and try to lock both.
 
46
 
 
47
    On new formats, LockError would be invalid, but for old formats
 
48
    this was not supported on Win32.
 
49
    """
 
50
    if sys.platform != 'win32':
 
51
        raise
 
52
 
 
53
    description = repository_format.get_format_description()
 
54
    if description in ("Repository format 4",
 
55
                       "Weave repository format 5",
 
56
                       "Weave repository format 6"):
 
57
        # jam 20060701
 
58
        # win32 OS locks are not re-entrant. So one process cannot
 
59
        # open the same repository twice and lock them both.
 
60
        raise TestSkipped('%s on win32 cannot open the same'
 
61
                          ' repository twice in different objects'
 
62
                          % description)
 
63
    raise
 
64
 
 
65
 
 
66
def check_repo_format_for_funky_id_on_win32(repo):
 
67
    if (isinstance(repo, (weaverepo.AllInOneRepository,
 
68
                          weaverepo.WeaveMetaDirRepository))
 
69
        and sys.platform == 'win32'):
 
70
            raise TestSkipped("funky chars does not permitted"
 
71
                              " on this platform in repository"
 
72
                              " %s" % repo.__class__.__name__)
 
73
 
 
74
 
 
75
class TestInterRepository(TestCaseWithInterRepository):
 
76
 
 
77
    def test_interrepository_get_returns_correct_optimiser(self):
 
78
        # we assume the optimising code paths are triggered
 
79
        # by the type of the repo not the transport - at this point.
 
80
        # we may need to update this test if this changes.
 
81
        #
 
82
        # XXX: This code tests that we get an InterRepository when we try to
 
83
        # convert between the two repositories that it wants to be tested with
 
84
        # -- but that's not necessarily correct.  So for now this is disabled.
 
85
        # mbp 20070206
 
86
        ## source_repo = self.make_repository("source")
 
87
        ## target_repo = self.make_to_repository("target")
 
88
        ## interrepo = repository.InterRepository.get(source_repo, target_repo)
 
89
        ## self.assertEqual(self.interrepo_class, interrepo.__class__)
 
90
        pass
 
91
 
 
92
 
 
93
class TestCaseWithComplexRepository(TestCaseWithInterRepository):
 
94
 
 
95
    def setUp(self):
 
96
        super(TestCaseWithComplexRepository, self).setUp()
 
97
        tree_a = self.make_branch_and_tree('a')
 
98
        self.bzrdir = tree_a.branch.bzrdir
 
99
        # add a corrupt inventory 'orphan'
 
100
        tree_a.branch.repository.lock_write()
 
101
        tree_a.branch.repository.start_write_group()
 
102
        inv_file = tree_a.branch.repository.inventories
 
103
        inv_file.add_lines(('orphan',), [], [])
 
104
        tree_a.branch.repository.commit_write_group()
 
105
        tree_a.branch.repository.unlock()
 
106
        # add a real revision 'rev1'
 
107
        tree_a.commit('rev1', rev_id='rev1', allow_pointless=True)
 
108
        # add a real revision 'rev2' based on rev1
 
109
        tree_a.commit('rev2', rev_id='rev2', allow_pointless=True)
 
110
        # and sign 'rev2'
 
111
        tree_a.branch.repository.lock_write()
 
112
        tree_a.branch.repository.start_write_group()
 
113
        tree_a.branch.repository.sign_revision('rev2', bzrlib.gpg.LoopbackGPGStrategy(None))
 
114
        tree_a.branch.repository.commit_write_group()
 
115
        tree_a.branch.repository.unlock()
 
116
 
 
117
    def test_search_missing_revision_ids(self):
 
118
        # revision ids in repository A but not B are returned, fake ones
 
119
        # are stripped. (fake meaning no revision object, but an inventory
 
120
        # as some formats keyed off inventory data in the past.)
 
121
        # make a repository to compare against that claims to have rev1
 
122
        repo_b = self.make_to_repository('rev1_only')
 
123
        repo_a = self.bzrdir.open_repository()
 
124
        repo_b.fetch(repo_a, 'rev1')
 
125
        # check the test will be valid
 
126
        self.assertFalse(repo_b.has_revision('rev2'))
 
127
        result = repo_b.search_missing_revision_ids(repo_a)
 
128
        self.assertEqual(set(['rev2']), result.get_keys())
 
129
        self.assertEqual(('search', set(['rev2']), set(['rev1']), 1),
 
130
            result.get_recipe())
 
131
 
 
132
    def test_search_missing_revision_ids_absent_requested_raises(self):
 
133
        # Asking for missing revisions with a tip that is itself absent in the
 
134
        # source raises NoSuchRevision.
 
135
        repo_b = self.make_to_repository('target')
 
136
        repo_a = self.bzrdir.open_repository()
 
137
        # No pizza revisions anywhere
 
138
        self.assertFalse(repo_a.has_revision('pizza'))
 
139
        self.assertFalse(repo_b.has_revision('pizza'))
 
140
        # Asking specifically for an absent revision errors.
 
141
        self.assertRaises(errors.NoSuchRevision,
 
142
            repo_b.search_missing_revision_ids, repo_a, revision_id='pizza',
 
143
            find_ghosts=True)
 
144
        self.assertRaises(errors.NoSuchRevision,
 
145
            repo_b.search_missing_revision_ids, repo_a, revision_id='pizza',
 
146
            find_ghosts=False)
 
147
 
 
148
    def test_search_missing_revision_ids_revision_limited(self):
 
149
        # revision ids in repository A that are not referenced by the
 
150
        # requested revision are not returned.
 
151
        # make a repository to compare against that is empty
 
152
        repo_b = self.make_to_repository('empty')
 
153
        repo_a = self.bzrdir.open_repository()
 
154
        result = repo_b.search_missing_revision_ids(repo_a, revision_id='rev1')
 
155
        self.assertEqual(set(['rev1']), result.get_keys())
 
156
        self.assertEqual(('search', set(['rev1']), set([NULL_REVISION]), 1),
 
157
            result.get_recipe())
 
158
 
 
159
    def test_fetch_fetches_signatures_too(self):
 
160
        from_repo = self.bzrdir.open_repository()
 
161
        from_signature = from_repo.get_signature_text('rev2')
 
162
        to_repo = self.make_to_repository('target')
 
163
        to_repo.fetch(from_repo)
 
164
        to_signature = to_repo.get_signature_text('rev2')
 
165
        self.assertEqual(from_signature, to_signature)
 
166
 
 
167
 
 
168
class TestCaseWithGhosts(TestCaseWithInterRepository):
 
169
 
 
170
    def test_fetch_all_fixes_up_ghost(self):
 
171
        # we want two repositories at this point:
 
172
        # one with a revision that is a ghost in the other
 
173
        # repository.
 
174
        # 'ghost' is present in has_ghost, 'ghost' is absent in 'missing_ghost'.
 
175
        # 'references' is present in both repositories, and 'tip' is present
 
176
        # just in has_ghost.
 
177
        # has_ghost       missing_ghost
 
178
        #------------------------------
 
179
        # 'ghost'             -
 
180
        # 'references'    'references'
 
181
        # 'tip'               -
 
182
        # In this test we fetch 'tip' which should not fetch 'ghost'
 
183
        has_ghost = self.make_repository('has_ghost')
 
184
        missing_ghost = self.make_repository('missing_ghost')
 
185
        if [True, True] != [repo._format.supports_ghosts for repo in
 
186
            (has_ghost, missing_ghost)]:
 
187
            raise TestNotApplicable("Need ghost support.")
 
188
 
 
189
        def add_commit(repo, revision_id, parent_ids):
 
190
            repo.lock_write()
 
191
            repo.start_write_group()
 
192
            inv = Inventory(revision_id=revision_id)
 
193
            inv.root.revision = revision_id
 
194
            root_id = inv.root.file_id
 
195
            sha1 = repo.add_inventory(revision_id, inv, parent_ids)
 
196
            repo.texts.add_lines((root_id, revision_id), [], [])
 
197
            rev = bzrlib.revision.Revision(timestamp=0,
 
198
                                           timezone=None,
 
199
                                           committer="Foo Bar <foo@example.com>",
 
200
                                           message="Message",
 
201
                                           inventory_sha1=sha1,
 
202
                                           revision_id=revision_id)
 
203
            rev.parent_ids = parent_ids
 
204
            repo.add_revision(revision_id, rev)
 
205
            repo.commit_write_group()
 
206
            repo.unlock()
 
207
        add_commit(has_ghost, 'ghost', [])
 
208
        add_commit(has_ghost, 'references', ['ghost'])
 
209
        add_commit(missing_ghost, 'references', ['ghost'])
 
210
        add_commit(has_ghost, 'tip', ['references'])
 
211
        missing_ghost.fetch(has_ghost, 'tip', find_ghosts=True)
 
212
        # missing ghost now has tip and ghost.
 
213
        rev = missing_ghost.get_revision('tip')
 
214
        inv = missing_ghost.get_inventory('tip')
 
215
        rev = missing_ghost.get_revision('ghost')
 
216
        inv = missing_ghost.get_inventory('ghost')
 
217
        # rev must not be corrupt now
 
218
        self.assertEqual([None, 'ghost', 'references', 'tip'],
 
219
            missing_ghost.get_ancestry('tip'))