1
# Copyright (C) 2005, 2006, 2007, 2008 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for InterRepository implementastions."""
22
import bzrlib.bzrdir as bzrdir
23
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
24
import bzrlib.errors as errors
26
from bzrlib.inventory import Inventory
27
import bzrlib.repofmt.weaverepo as weaverepo
28
import bzrlib.repository as repository
29
from bzrlib.revision import NULL_REVISION, Revision
30
from bzrlib.tests import (
32
TestCaseWithTransport,
36
from bzrlib.tests.per_interrepository import (
37
TestCaseWithInterRepository,
41
def check_old_format_lock_error(repository_format):
42
"""Potentially ignore LockError on old formats.
44
On win32, with the old OS locks, we get a failure of double-lock when
45
we open a object in 2 objects and try to lock both.
47
On new formats, LockError would be invalid, but for old formats
48
this was not supported on Win32.
50
if sys.platform != 'win32':
53
description = repository_format.get_format_description()
54
if description in ("Repository format 4",
55
"Weave repository format 5",
56
"Weave repository format 6"):
58
# win32 OS locks are not re-entrant. So one process cannot
59
# open the same repository twice and lock them both.
60
raise TestSkipped('%s on win32 cannot open the same'
61
' repository twice in different objects'
66
def check_repo_format_for_funky_id_on_win32(repo):
67
if (isinstance(repo, (weaverepo.AllInOneRepository,
68
weaverepo.WeaveMetaDirRepository))
69
and sys.platform == 'win32'):
70
raise TestSkipped("funky chars does not permitted"
71
" on this platform in repository"
72
" %s" % repo.__class__.__name__)
75
class TestInterRepository(TestCaseWithInterRepository):
77
def test_interrepository_get_returns_correct_optimiser(self):
78
# we assume the optimising code paths are triggered
79
# by the type of the repo not the transport - at this point.
80
# we may need to update this test if this changes.
82
# XXX: This code tests that we get an InterRepository when we try to
83
# convert between the two repositories that it wants to be tested with
84
# -- but that's not necessarily correct. So for now this is disabled.
86
## source_repo = self.make_repository("source")
87
## target_repo = self.make_to_repository("target")
88
## interrepo = repository.InterRepository.get(source_repo, target_repo)
89
## self.assertEqual(self.interrepo_class, interrepo.__class__)
93
class TestCaseWithComplexRepository(TestCaseWithInterRepository):
96
super(TestCaseWithComplexRepository, self).setUp()
97
tree_a = self.make_branch_and_tree('a')
98
self.bzrdir = tree_a.branch.bzrdir
99
# add a corrupt inventory 'orphan'
100
tree_a.branch.repository.lock_write()
101
tree_a.branch.repository.start_write_group()
102
inv_file = tree_a.branch.repository.inventories
103
inv_file.add_lines(('orphan',), [], [])
104
tree_a.branch.repository.commit_write_group()
105
tree_a.branch.repository.unlock()
106
# add a real revision 'rev1'
107
tree_a.commit('rev1', rev_id='rev1', allow_pointless=True)
108
# add a real revision 'rev2' based on rev1
109
tree_a.commit('rev2', rev_id='rev2', allow_pointless=True)
111
tree_a.branch.repository.lock_write()
112
tree_a.branch.repository.start_write_group()
113
tree_a.branch.repository.sign_revision('rev2', bzrlib.gpg.LoopbackGPGStrategy(None))
114
tree_a.branch.repository.commit_write_group()
115
tree_a.branch.repository.unlock()
117
def test_search_missing_revision_ids(self):
118
# revision ids in repository A but not B are returned, fake ones
119
# are stripped. (fake meaning no revision object, but an inventory
120
# as some formats keyed off inventory data in the past.)
121
# make a repository to compare against that claims to have rev1
122
repo_b = self.make_to_repository('rev1_only')
123
repo_a = self.bzrdir.open_repository()
124
repo_b.fetch(repo_a, 'rev1')
125
# check the test will be valid
126
self.assertFalse(repo_b.has_revision('rev2'))
127
result = repo_b.search_missing_revision_ids(repo_a)
128
self.assertEqual(set(['rev2']), result.get_keys())
129
self.assertEqual(('search', set(['rev2']), set(['rev1']), 1),
132
def test_search_missing_revision_ids_absent_requested_raises(self):
133
# Asking for missing revisions with a tip that is itself absent in the
134
# source raises NoSuchRevision.
135
repo_b = self.make_to_repository('target')
136
repo_a = self.bzrdir.open_repository()
137
# No pizza revisions anywhere
138
self.assertFalse(repo_a.has_revision('pizza'))
139
self.assertFalse(repo_b.has_revision('pizza'))
140
# Asking specifically for an absent revision errors.
141
self.assertRaises(errors.NoSuchRevision,
142
repo_b.search_missing_revision_ids, repo_a, revision_id='pizza',
144
self.assertRaises(errors.NoSuchRevision,
145
repo_b.search_missing_revision_ids, repo_a, revision_id='pizza',
148
def test_search_missing_revision_ids_revision_limited(self):
149
# revision ids in repository A that are not referenced by the
150
# requested revision are not returned.
151
# make a repository to compare against that is empty
152
repo_b = self.make_to_repository('empty')
153
repo_a = self.bzrdir.open_repository()
154
result = repo_b.search_missing_revision_ids(repo_a, revision_id='rev1')
155
self.assertEqual(set(['rev1']), result.get_keys())
156
self.assertEqual(('search', set(['rev1']), set([NULL_REVISION]), 1),
159
def test_fetch_fetches_signatures_too(self):
160
from_repo = self.bzrdir.open_repository()
161
from_signature = from_repo.get_signature_text('rev2')
162
to_repo = self.make_to_repository('target')
163
to_repo.fetch(from_repo)
164
to_signature = to_repo.get_signature_text('rev2')
165
self.assertEqual(from_signature, to_signature)
168
class TestCaseWithGhosts(TestCaseWithInterRepository):
170
def test_fetch_all_fixes_up_ghost(self):
171
# we want two repositories at this point:
172
# one with a revision that is a ghost in the other
174
# 'ghost' is present in has_ghost, 'ghost' is absent in 'missing_ghost'.
175
# 'references' is present in both repositories, and 'tip' is present
177
# has_ghost missing_ghost
178
#------------------------------
180
# 'references' 'references'
182
# In this test we fetch 'tip' which should not fetch 'ghost'
183
has_ghost = self.make_repository('has_ghost')
184
missing_ghost = self.make_repository('missing_ghost')
185
if [True, True] != [repo._format.supports_ghosts for repo in
186
(has_ghost, missing_ghost)]:
187
raise TestNotApplicable("Need ghost support.")
189
def add_commit(repo, revision_id, parent_ids):
191
repo.start_write_group()
192
inv = Inventory(revision_id=revision_id)
193
inv.root.revision = revision_id
194
root_id = inv.root.file_id
195
sha1 = repo.add_inventory(revision_id, inv, parent_ids)
196
repo.texts.add_lines((root_id, revision_id), [], [])
197
rev = bzrlib.revision.Revision(timestamp=0,
199
committer="Foo Bar <foo@example.com>",
202
revision_id=revision_id)
203
rev.parent_ids = parent_ids
204
repo.add_revision(revision_id, rev)
205
repo.commit_write_group()
207
add_commit(has_ghost, 'ghost', [])
208
add_commit(has_ghost, 'references', ['ghost'])
209
add_commit(missing_ghost, 'references', ['ghost'])
210
add_commit(has_ghost, 'tip', ['references'])
211
missing_ghost.fetch(has_ghost, 'tip', find_ghosts=True)
212
# missing ghost now has tip and ghost.
213
rev = missing_ghost.get_revision('tip')
214
inv = missing_ghost.get_inventory('tip')
215
rev = missing_ghost.get_revision('ghost')
216
inv = missing_ghost.get_inventory('ghost')
217
# rev must not be corrupt now
218
self.assertEqual([None, 'ghost', 'references', 'tip'],
219
missing_ghost.get_ancestry('tip'))