1
# Copyright (C) 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22
from bzrlib.smart import (
25
from bzrlib.tests.per_repository import TestCaseWithRepository
28
class TestFetch(TestCaseWithRepository):
30
def make_source_branch(self):
31
# It would be nice if there was a way to force this to be memory-only
32
builder = self.make_branch_builder('source')
33
content = ['content lines\n'
34
'for the first revision\n'
35
'which is a marginal amount of content\n'
37
builder.start_series()
38
builder.build_snapshot('A-id', None, [
39
('add', ('', 'root-id', 'directory', None)),
40
('add', ('a', 'a-id', 'file', ''.join(content))),
42
content.append('and some more lines for B\n')
43
builder.build_snapshot('B-id', ['A-id'], [
44
('modify', ('a-id', ''.join(content)))])
45
content.append('and yet even more content for C\n')
46
builder.build_snapshot('C-id', ['B-id'], [
47
('modify', ('a-id', ''.join(content)))])
48
builder.finish_series()
49
source_b = builder.get_branch()
51
self.addCleanup(source_b.unlock)
52
return content, source_b
54
def test_sprout_from_stacked_with_short_history(self):
55
content, source_b = self.make_source_branch()
56
# Split the generated content into a base branch, and a stacked branch
57
# Use 'make_branch' which gives us a bzr:// branch when appropriate,
58
# rather than creating a branch-on-disk
59
stack_b = self.make_branch('stack-on')
60
stack_b.pull(source_b, stop_revision='B-id')
61
target_b = self.make_branch('target')
62
target_b.set_stacked_on_url('../stack-on')
63
target_b.pull(source_b, stop_revision='C-id')
64
# At this point, we should have a target branch, with 1 revision, on
66
final_b = self.make_branch('final')
67
final_b.pull(target_b)
69
self.addCleanup(final_b.unlock)
70
self.assertEqual('C-id', final_b.last_revision())
71
text_keys = [('a-id', 'A-id'), ('a-id', 'B-id'), ('a-id', 'C-id')]
72
stream = final_b.repository.texts.get_record_stream(text_keys,
74
records = sorted([(r.key, r.get_bytes_as('fulltext')) for r in stream])
76
(('a-id', 'A-id'), ''.join(content[:-2])),
77
(('a-id', 'B-id'), ''.join(content[:-1])),
78
(('a-id', 'C-id'), ''.join(content)),
81
def test_sprout_from_smart_stacked_with_short_history(self):
82
content, source_b = self.make_source_branch()
83
transport = self.make_smart_server('server')
84
transport.ensure_base()
85
url = transport.abspath('')
86
stack_b = source_b.bzrdir.sprout(url + '/stack-on', revision_id='B-id')
87
# self.make_branch only takes relative paths, so we do it the 'hard'
89
target_transport = transport.clone('target')
90
target_transport.ensure_base()
91
target_bzrdir = self.bzrdir_format.initialize_on_transport(
93
target_bzrdir.create_repository()
94
target_b = target_bzrdir.create_branch()
95
target_b.set_stacked_on_url('../stack-on')
96
target_b.pull(source_b, stop_revision='C-id')
97
# Now we should be able to branch from the remote location to a local
99
final_b = target_b.bzrdir.sprout('final').open_branch()
100
self.assertEqual('C-id', final_b.last_revision())
102
# bzrdir.sprout() has slightly different code paths if you supply a
103
# revision_id versus not. If you supply revision_id, then you get a
104
# PendingAncestryResult for the search, versus a SearchResult...
105
final2_b = target_b.bzrdir.sprout('final2',
106
revision_id='C-id').open_branch()
107
self.assertEqual('C-id', final_b.last_revision())
109
def make_source_with_ghost_and_stacked_target(self):
110
builder = self.make_branch_builder('source')
111
builder.start_series()
112
builder.build_snapshot('A-id', None, [
113
('add', ('', 'root-id', 'directory', None)),
114
('add', ('file', 'file-id', 'file', 'content\n'))])
115
builder.build_snapshot('B-id', ['A-id', 'ghost-id'], [])
116
builder.finish_series()
117
source_b = builder.get_branch()
119
self.addCleanup(source_b.unlock)
120
base = self.make_branch('base')
121
base.pull(source_b, stop_revision='A-id')
122
stacked = self.make_branch('stacked')
123
stacked.set_stacked_on_url('../base')
124
return source_b, base, stacked
126
def test_fetch_with_ghost_stacked(self):
128
stacked) = self.make_source_with_ghost_and_stacked_target()
129
stacked.pull(source_b, stop_revision='B-id')
131
def test_fetch_into_smart_stacked_with_ghost(self):
133
stacked) = self.make_source_with_ghost_and_stacked_target()
134
# Now, create a smart server on 'stacked' and re-open to force the
135
# target to be a smart target
136
trans = self.make_smart_server('stacked')
137
stacked = branch.Branch.open(trans.base)
139
self.addCleanup(stacked.unlock)
140
stacked.pull(source_b, stop_revision='B-id')
142
def test_fetch_to_stacked_from_smart_with_ghost(self):
144
stacked) = self.make_source_with_ghost_and_stacked_target()
145
# Now, create a smart server on 'source' and re-open to force the
146
# target to be a smart target
147
trans = self.make_smart_server('source')
148
source_b = branch.Branch.open(trans.base)
150
self.addCleanup(source_b.unlock)
151
stacked.pull(source_b, stop_revision='B-id')