13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
# TODO: At some point, handle upgrades by just passing the whole request
18
18
# across to run on the server.
153
154
except errors.UnknownSmartMethod:
154
155
medium._remember_remote_is_before((1, 13))
155
156
return self._vfs_cloning_metadir(require_stacking=require_stacking)
157
except errors.UnknownErrorFromSmartServer, err:
158
if err.error_tuple != ('BranchReference',):
160
# We need to resolve the branch reference to determine the
161
# cloning_metadir. This causes unnecessary RPCs to open the
162
# referenced branch (and bzrdir, etc) but only when the caller
163
# didn't already resolve the branch reference.
164
referenced_branch = self.open_branch()
165
return referenced_branch.bzrdir.cloning_metadir()
156
166
if len(response) != 3:
157
167
raise errors.UnexpectedSmartServerResponse(response)
158
168
control_name, repo_name, branch_info = response
268
278
if response[0] == 'ref':
269
279
# a branch reference, use the existing BranchReference logic.
270
280
format = BranchReferenceFormat()
271
return format.open(self, _found=True, location=response[1])
281
return format.open(self, _found=True, location=response[1],
282
ignore_fallbacks=ignore_fallbacks)
272
283
branch_format_name = response[1]
273
284
if not branch_format_name:
274
285
branch_format_name = None
275
286
format = RemoteBranchFormat(network_name=branch_format_name)
276
return RemoteBranch(self, self.find_repository(), format=format)
287
return RemoteBranch(self, self.find_repository(), format=format,
288
setup_stacking=not ignore_fallbacks)
278
290
def _open_repo_v1(self, path):
279
291
verb = 'BzrDir.find_repository'
620
637
return self._real_repository.abort_write_group(
621
638
suppress_errors=suppress_errors)
642
"""Decorate the real repository for now.
644
In the long term a full blown network facility is needed to avoid
645
creating a real repository object locally.
648
return self._real_repository.chk_bytes
623
650
def commit_write_group(self):
624
651
"""Complete a write group on the decorated repository.
643
670
"""Ensure that there is a _real_repository set.
645
672
Used before calls to self._real_repository.
674
Note that _ensure_real causes many roundtrips to the server which are
675
not desirable, and prevents the use of smart one-roundtrip RPC's to
676
perform complex operations (such as accessing parent data, streaming
677
revisions etc). Adding calls to _ensure_real should only be done when
678
bringing up new functionality, adding fallbacks for smart methods that
679
require a fallback path, and never to replace an existing smart method
680
invocation. If in doubt chat to the bzr network team.
647
682
if self._real_repository is None:
648
683
self.bzrdir._ensure_real()
677
712
self._ensure_real()
678
713
return self._real_repository._generate_text_key_index()
680
@symbol_versioning.deprecated_method(symbol_versioning.one_four)
681
def get_revision_graph(self, revision_id=None):
682
"""See Repository.get_revision_graph()."""
683
return self._get_revision_graph(revision_id)
685
715
def _get_revision_graph(self, revision_id):
686
716
"""Private method for using with old (< 1.2) servers to fallback."""
687
717
if revision_id is None:
892
927
if isinstance(repository, RemoteRepository):
893
928
raise AssertionError()
894
929
self._real_repository = repository
895
# If the _real_repository has _fallback_repositories, clear them out,
896
# because we want it to have the same set as this repository. This is
897
# reasonable to do because the fallbacks we clear here are from a
898
# "real" branch, and we're about to replace them with the equivalents
899
# from a RemoteBranch.
900
self._real_repository._fallback_repositories = []
930
# three code paths happen here:
931
# 1) old servers, RemoteBranch.open() calls _ensure_real before setting
932
# up stacking. In this case self._fallback_repositories is [], and the
933
# real repo is already setup. Preserve the real repo and
934
# RemoteRepository.add_fallback_repository will avoid adding
936
# 2) new servers, RemoteBranch.open() sets up stacking, and when
937
# ensure_real is triggered from a branch, the real repository to
938
# set already has a matching list with separate instances, but
939
# as they are also RemoteRepositories we don't worry about making the
940
# lists be identical.
941
# 3) new servers, RemoteRepository.ensure_real is triggered before
942
# RemoteBranch.ensure real, in this case we get a repo with no fallbacks
943
# and need to populate it.
944
if (self._fallback_repositories and
945
len(self._real_repository._fallback_repositories) !=
946
len(self._fallback_repositories)):
947
if len(self._real_repository._fallback_repositories):
948
raise AssertionError(
949
"cannot cleanly remove existing _fallback_repositories")
901
950
for fb in self._fallback_repositories:
902
951
self._real_repository.add_fallback_repository(fb)
903
952
if self._lock_mode == 'w':
1032
1081
# _real_branch had its get_stacked_on_url method called), then the
1033
1082
# repository to be added may already be in the _real_repositories list.
1034
1083
if self._real_repository is not None:
1035
if repository not in self._real_repository._fallback_repositories:
1084
fallback_locations = [repo.bzrdir.root_transport.base for repo in
1085
self._real_repository._fallback_repositories]
1086
if repository.bzrdir.root_transport.base not in fallback_locations:
1036
1087
self._real_repository.add_fallback_repository(repository)
1038
# They are also seen by the fallback repository. If it doesn't
1039
# exist yet they'll be added then. This implicitly copies them.
1042
1089
def add_inventory(self, revid, inv, parents):
1043
1090
self._ensure_real()
1082
1129
self._ensure_real()
1083
1130
return self._real_repository.make_working_trees()
1132
def refresh_data(self):
1133
"""Re-read any data needed to to synchronise with disk.
1135
This method is intended to be called after another repository instance
1136
(such as one used by a smart server) has inserted data into the
1137
repository. It may not be called during a write group, but may be
1138
called at any other time.
1140
if self.is_in_write_group():
1141
raise errors.InternalBzrError(
1142
"May not refresh_data while in a write group.")
1143
if self._real_repository is not None:
1144
self._real_repository.refresh_data()
1085
1146
def revision_ids_to_search_result(self, result_set):
1086
1147
"""Convert a set of revision ids to a graph SearchResult."""
1087
1148
result_parents = set()
1109
1170
def fetch(self, source, revision_id=None, pb=None, find_ghosts=False,
1110
1171
fetch_spec=None):
1172
# No base implementation to use as RemoteRepository is not a subclass
1173
# of Repository; so this is a copy of Repository.fetch().
1111
1174
if fetch_spec is not None and revision_id is not None:
1112
1175
raise AssertionError(
1113
1176
"fetch_spec and revision_id are mutually exclusive.")
1114
# Not delegated to _real_repository so that InterRepository.get has a
1115
# chance to find an InterRepository specialised for RemoteRepository.
1177
if self.is_in_write_group():
1178
raise errors.InternalBzrError(
1179
"May not fetch while in a write group.")
1180
# fast path same-url fetch operations
1116
1181
if self.has_same_location(source) and fetch_spec is None:
1117
1182
# check that last_revision is in 'from' and then return a
1118
1183
# no-operation.
1120
1185
not revision.is_null(revision_id)):
1121
1186
self.get_revision(revision_id)
1188
# if there is no specific appropriate InterRepository, this will get
1189
# the InterRepository base class, which raises an
1190
# IncompatibleRepositories when asked to fetch.
1123
1191
inter = repository.InterRepository.get(source, self)
1125
return inter.fetch(revision_id=revision_id, pb=pb,
1126
find_ghosts=find_ghosts, fetch_spec=fetch_spec)
1127
except NotImplementedError:
1128
raise errors.IncompatibleRepositories(source, self)
1192
return inter.fetch(revision_id=revision_id, pb=pb,
1193
find_ghosts=find_ghosts, fetch_spec=fetch_spec)
1130
1195
def create_bundle(self, target, base, fileobj, format=None):
1131
1196
self._ensure_real()
1162
1227
# We already found out that the server can't understand
1163
1228
# Repository.get_parent_map requests, so just fetch the whole
1165
# XXX: Note that this will issue a deprecation warning. This is ok
1166
# :- its because we're working with a deprecated server anyway, and
1167
# the user will almost certainly have seen a warning about the
1168
# server version already.
1169
rg = self.get_revision_graph()
1231
# Note that this reads the whole graph, when only some keys are
1232
# wanted. On this old server there's no way (?) to get them all
1233
# in one go, and the user probably will have seen a warning about
1234
# the server being old anyhow.
1235
rg = self._get_revision_graph(None)
1170
1236
# There is an api discrepency between get_parent_map and
1171
1237
# get_revision_graph. Specifically, a "key:()" pair in
1172
1238
# get_revision_graph just means a node has no parents. For
1203
1269
# TODO: Manage this incrementally to avoid covering the same path
1204
1270
# repeatedly. (The server will have to on each request, but the less
1205
1271
# work done the better).
1273
# Negative caching notes:
1274
# new server sends missing when a request including the revid
1275
# 'include-missing:' is present in the request.
1276
# missing keys are serialised as missing:X, and we then call
1277
# provider.note_missing(X) for-all X
1206
1278
parents_map = self._unstacked_provider.get_cached_map()
1207
1279
if parents_map is None:
1208
1280
# Repository is not locked, so there's no cache.
1209
1281
parents_map = {}
1282
# start_set is all the keys in the cache
1210
1283
start_set = set(parents_map)
1284
# result set is all the references to keys in the cache
1211
1285
result_parents = set()
1212
1286
for parents in parents_map.itervalues():
1213
1287
result_parents.update(parents)
1214
1288
stop_keys = result_parents.difference(start_set)
1289
# We don't need to send ghosts back to the server as a position to
1291
stop_keys.difference_update(self._unstacked_provider.missing_keys)
1292
key_count = len(parents_map)
1293
if (NULL_REVISION in result_parents
1294
and NULL_REVISION in self._unstacked_provider.missing_keys):
1295
# If we pruned NULL_REVISION from the stop_keys because it's also
1296
# in our cache of "missing" keys we need to increment our key count
1297
# by 1, because the reconsitituted SearchResult on the server will
1298
# still consider NULL_REVISION to be an included key.
1215
1300
included_keys = start_set.intersection(result_parents)
1216
1301
start_set.difference_update(included_keys)
1217
recipe = (start_set, stop_keys, len(parents_map))
1302
recipe = ('manual', start_set, stop_keys, key_count)
1218
1303
body = self._serialise_search_recipe(recipe)
1219
1304
path = self.bzrdir._path_for_remote_call(self._client)
1220
1305
for key in keys:
1238
1323
# To avoid having to disconnect repeatedly, we keep track of the
1239
1324
# fact the server doesn't understand remote methods added in 1.2.
1240
1325
medium._remember_remote_is_before((1, 2))
1241
return self.get_revision_graph(None)
1326
# Recurse just once and we should use the fallback code.
1327
return self._get_parent_map_rpc(keys)
1242
1328
response_tuple, response_handler = response
1243
1329
if response_tuple[0] not in ['ok']:
1244
1330
response_handler.cancel_read_body()
1256
1342
revision_graph[d[0]] = d[1:]
1258
# No parents - so give the Graph result (NULL_REVISION,).
1259
revision_graph[d[0]] = (NULL_REVISION,)
1345
if d[0].startswith('missing:'):
1347
self._unstacked_provider.note_missing_key(revid)
1349
# no parents - so give the Graph result
1351
revision_graph[d[0]] = (NULL_REVISION,)
1260
1352
return revision_graph
1262
1354
@needs_read_lock
1265
1357
return self._real_repository.get_signature_text(revision_id)
1267
1359
@needs_read_lock
1268
@symbol_versioning.deprecated_method(symbol_versioning.one_three)
1269
def get_revision_graph_with_ghosts(self, revision_ids=None):
1271
return self._real_repository.get_revision_graph_with_ghosts(
1272
revision_ids=revision_ids)
1275
1360
def get_inventory_xml(self, revision_id):
1276
1361
self._ensure_real()
1277
1362
return self._real_repository.get_inventory_xml(revision_id)
1289
1374
return self._real_repository.all_revision_ids()
1291
1376
@needs_read_lock
1292
def get_deltas_for_revisions(self, revisions):
1377
def get_deltas_for_revisions(self, revisions, specific_fileids=None):
1293
1378
self._ensure_real()
1294
return self._real_repository.get_deltas_for_revisions(revisions)
1379
return self._real_repository.get_deltas_for_revisions(revisions,
1380
specific_fileids=specific_fileids)
1296
1382
@needs_read_lock
1297
def get_revision_delta(self, revision_id):
1383
def get_revision_delta(self, revision_id, specific_fileids=None):
1298
1384
self._ensure_real()
1299
return self._real_repository.get_revision_delta(revision_id)
1385
return self._real_repository.get_revision_delta(revision_id,
1386
specific_fileids=specific_fileids)
1301
1388
@needs_read_lock
1302
1389
def revision_trees(self, revision_ids):
1479
1566
:param recipe: A search recipe (start, stop, count).
1480
1567
:return: Serialised bytes.
1482
start_keys = ' '.join(recipe[0])
1483
stop_keys = ' '.join(recipe[1])
1484
count = str(recipe[2])
1569
start_keys = ' '.join(recipe[1])
1570
stop_keys = ' '.join(recipe[2])
1571
count = str(recipe[3])
1485
1572
return '\n'.join((start_keys, stop_keys, count))
1487
1574
def _serialise_search_result(self, search_result):
1501
1588
self._ensure_real()
1502
1589
self._real_repository._pack_collection.autopack()
1504
if self._real_repository is not None:
1505
# Reset the real repository's cache of pack names.
1506
# XXX: At some point we may be able to skip this and just rely on
1507
# the automatic retry logic to do the right thing, but for now we
1508
# err on the side of being correct rather than being optimal.
1509
self._real_repository._pack_collection.reload_pack_names()
1510
1592
if response[0] != 'ok':
1511
1593
raise errors.UnexpectedSmartServerResponse(response)
1524
1606
def insert_stream(self, stream, src_format, resume_tokens):
1525
repo = self.target_repo
1526
client = repo._client
1607
target = self.target_repo
1608
if target._lock_token:
1609
verb = 'Repository.insert_stream_locked'
1610
extra_args = (target._lock_token or '',)
1611
required_version = (1, 14)
1613
verb = 'Repository.insert_stream'
1615
required_version = (1, 13)
1616
client = target._client
1527
1617
medium = client._medium
1528
if medium._is_remote_before((1, 13)):
1618
if medium._is_remote_before(required_version):
1529
1619
# No possible way this can work.
1530
1620
return self._insert_real(stream, src_format, resume_tokens)
1531
path = repo.bzrdir._path_for_remote_call(client)
1621
path = target.bzrdir._path_for_remote_call(client)
1532
1622
if not resume_tokens:
1533
1623
# XXX: Ugly but important for correctness, *will* be fixed during
1534
1624
# 1.13 cycle. Pushing a stream that is interrupted results in a
1541
1631
byte_stream = smart_repo._stream_to_byte_stream([], src_format)
1543
1633
response = client.call_with_body_stream(
1544
('Repository.insert_stream', path, ''), byte_stream)
1634
(verb, path, '') + extra_args, byte_stream)
1545
1635
except errors.UnknownSmartMethod:
1546
medium._remember_remote_is_before((1,13))
1636
medium._remember_remote_is_before(required_version)
1547
1637
return self._insert_real(stream, src_format, resume_tokens)
1548
1638
byte_stream = smart_repo._stream_to_byte_stream(
1549
1639
stream, src_format)
1550
1640
resume_tokens = ' '.join(resume_tokens)
1551
1641
response = client.call_with_body_stream(
1552
('Repository.insert_stream', path, resume_tokens), byte_stream)
1642
(verb, path, resume_tokens) + extra_args, byte_stream)
1553
1643
if response[0][0] not in ('ok', 'missing-basis'):
1554
1644
raise errors.UnexpectedSmartServerResponse(response)
1555
1645
if response[0][0] == 'missing-basis':
1557
1647
resume_tokens = tokens
1558
1648
return resume_tokens, missing_keys
1560
if self.target_repo._real_repository is not None:
1561
collection = getattr(self.target_repo._real_repository,
1562
'_pack_collection', None)
1563
if collection is not None:
1564
collection.reload_pack_names()
1650
self.target_repo.refresh_data()
1565
1651
return [], set()
1569
1655
"""Stream data from a remote server."""
1571
1657
def get_stream(self, search):
1572
# streaming with fallback repositories is not well defined yet: The
1573
# remote repository cannot see the fallback repositories, and thus
1574
# cannot satisfy the entire search in the general case. Likewise the
1575
# fallback repositories cannot reify the search to determine what they
1576
# should send. It likely needs a return value in the stream listing the
1577
# edge of the search to resume from in fallback repositories.
1578
if self.from_repository._fallback_repositories:
1579
return repository.StreamSource.get_stream(self, search)
1580
repo = self.from_repository
1658
if (self.from_repository._fallback_repositories and
1659
self.to_format._fetch_order == 'topological'):
1660
return self._real_stream(self.from_repository, search)
1661
return self.missing_parents_chain(search, [self.from_repository] +
1662
self.from_repository._fallback_repositories)
1664
def _real_stream(self, repo, search):
1665
"""Get a stream for search from repo.
1667
This never called RemoteStreamSource.get_stream, and is a heler
1668
for RemoteStreamSource._get_stream to allow getting a stream
1669
reliably whether fallback back because of old servers or trying
1670
to stream from a non-RemoteRepository (which the stacked support
1673
source = repo._get_source(self.to_format)
1674
if isinstance(source, RemoteStreamSource):
1675
return repository.StreamSource.get_stream(source, search)
1676
return source.get_stream(search)
1678
def _get_stream(self, repo, search):
1679
"""Core worker to get a stream from repo for search.
1681
This is used by both get_stream and the stacking support logic. It
1682
deliberately gets a stream for repo which does not need to be
1683
self.from_repository. In the event that repo is not Remote, or
1684
cannot do a smart stream, a fallback is made to the generic
1685
repository._get_stream() interface, via self._real_stream.
1687
In the event of stacking, streams from _get_stream will not
1688
contain all the data for search - this is normal (see get_stream).
1690
:param repo: A repository.
1691
:param search: A search.
1693
# Fallbacks may be non-smart
1694
if not isinstance(repo, RemoteRepository):
1695
return self._real_stream(repo, search)
1581
1696
client = repo._client
1582
1697
medium = client._medium
1583
1698
if medium._is_remote_before((1, 13)):
1584
# No possible way this can work.
1585
return repository.StreamSource.get_stream(self, search)
1699
# streaming was added in 1.13
1700
return self._real_stream(repo, search)
1586
1701
path = repo.bzrdir._path_for_remote_call(client)
1588
1703
search_bytes = repo._serialise_search_result(search)
1592
1707
response_tuple, response_handler = response
1593
1708
except errors.UnknownSmartMethod:
1594
1709
medium._remember_remote_is_before((1,13))
1595
return repository.StreamSource.get_stream(self, search)
1710
return self._real_stream(repo, search)
1596
1711
if response_tuple[0] != 'ok':
1597
1712
raise errors.UnexpectedSmartServerResponse(response_tuple)
1598
1713
byte_stream = response_handler.read_streamed_body()
1603
1718
src_format.network_name(), repo._format.network_name()))
1721
def missing_parents_chain(self, search, sources):
1722
"""Chain multiple streams together to handle stacking.
1724
:param search: The overall search to satisfy with streams.
1725
:param sources: A list of Repository objects to query.
1727
self.serialiser = self.to_format._serializer
1728
self.seen_revs = set()
1729
self.referenced_revs = set()
1730
# If there are heads in the search, or the key count is > 0, we are not
1732
while not search.is_empty() and len(sources) > 1:
1733
source = sources.pop(0)
1734
stream = self._get_stream(source, search)
1735
for kind, substream in stream:
1736
if kind != 'revisions':
1737
yield kind, substream
1739
yield kind, self.missing_parents_rev_handler(substream)
1740
search = search.refine(self.seen_revs, self.referenced_revs)
1741
self.seen_revs = set()
1742
self.referenced_revs = set()
1743
if not search.is_empty():
1744
for kind, stream in self._get_stream(sources[0], search):
1747
def missing_parents_rev_handler(self, substream):
1748
for content in substream:
1749
revision_bytes = content.get_bytes_as('fulltext')
1750
revision = self.serialiser.read_revision_from_string(revision_bytes)
1751
self.seen_revs.add(content.key[-1])
1752
self.referenced_revs.update(revision.parent_ids)
1607
1756
class RemoteBranchLockableFiles(LockableFiles):
1608
1757
"""A 'LockableFiles' implementation that talks to a smart server.
1648
1797
def network_name(self):
1649
1798
return self._network_name
1651
def open(self, a_bzrdir):
1652
return a_bzrdir.open_branch()
1800
def open(self, a_bzrdir, ignore_fallbacks=False):
1801
return a_bzrdir.open_branch(ignore_fallbacks=ignore_fallbacks)
1654
1803
def _vfs_initialize(self, a_bzrdir):
1655
1804
# Initialisation when using a local bzrdir object, or a non-vfs init
1813
1962
except (errors.NotStacked, errors.UnstackableBranchFormat,
1814
1963
errors.UnstackableRepositoryFormat), e:
1816
# it's relative to this branch...
1817
fallback_url = urlutils.join(self.base, fallback_url)
1818
transports = [self.bzrdir.root_transport]
1819
stacked_on = branch.Branch.open(fallback_url,
1820
possible_transports=transports)
1821
self.repository.add_fallback_repository(stacked_on.repository)
1965
self._activate_fallback_location(fallback_url)
1967
def _get_config(self):
1968
return RemoteBranchConfig(self)
1823
1970
def _get_real_transport(self):
1824
1971
# if we try vfs access, return the real branch's vfs transport
2166
2313
self._ensure_real()
2167
2314
return self._real_branch._set_parent_location(url)
2169
def set_stacked_on_url(self, stacked_location):
2170
"""Set the URL this branch is stacked against.
2172
:raises UnstackableBranchFormat: If the branch does not support
2174
:raises UnstackableRepositoryFormat: If the repository does not support
2178
return self._real_branch.set_stacked_on_url(stacked_location)
2180
2316
@needs_write_lock
2181
2317
def pull(self, source, overwrite=False, stop_revision=None,
2249
2385
return self._real_branch.set_push_location(location)
2388
class RemoteBranchConfig(object):
2389
"""A Config that reads from a smart branch and writes via smart methods.
2391
It is a low-level object that considers config data to be name/value pairs
2392
that may be associated with a section. Assigning meaning to the these
2393
values is done at higher levels like bzrlib.config.TreeConfig.
2396
def __init__(self, branch):
2397
self._branch = branch
2399
def get_option(self, name, section=None, default=None):
2400
"""Return the value associated with a named option.
2402
:param name: The name of the value
2403
:param section: The section the option is in (if any)
2404
:param default: The value to return if the value is not set
2405
:return: The value or default value
2407
configobj = self._get_configobj()
2409
section_obj = configobj
2412
section_obj = configobj[section]
2415
return section_obj.get(name, default)
2417
def _get_configobj(self):
2418
path = self._branch._remote_path()
2419
response = self._branch._client.call_expecting_body(
2420
'Branch.get_config_file', path)
2421
if response[0][0] != 'ok':
2422
raise UnexpectedSmartServerResponse(response)
2423
lines = response[1].read_body_bytes().splitlines()
2424
return config.ConfigObj(lines, encoding='utf-8')
2426
def set_option(self, value, name, section=None):
2427
"""Set the value associated with a named option.
2429
:param value: The value to set
2430
:param name: The name of the value to set
2431
:param section: The section the option is in (if any)
2433
medium = self._branch._client._medium
2434
if medium._is_remote_before((1, 14)):
2435
return self._vfs_set_option(value, name, section)
2437
path = self._branch._remote_path()
2438
response = self._branch._client.call('Branch.set_config_option',
2439
path, self._branch._lock_token, self._branch._repo_lock_token,
2440
value.encode('utf8'), name, section or '')
2441
except errors.UnknownSmartMethod:
2442
medium._remember_remote_is_before((1, 14))
2443
return self._vfs_set_option(value, name, section)
2445
raise errors.UnexpectedSmartServerResponse(response)
2447
def _vfs_set_option(self, value, name, section=None):
2448
self._branch._ensure_real()
2449
return self._branch._real_branch._get_config().set_option(
2450
value, name, section)
2252
2453
def _extract_tar(tar, to_dir):
2253
2454
"""Extract all the contents of a tarfile object.