~ubuntu-branches/ubuntu/karmic/bzr/karmic-proposed

« back to all changes in this revision

Viewing changes to bzrlib/remote.py

  • Committer: Bazaar Package Importer
  • Author(s): Jelmer Vernooij
  • Date: 2009-04-11 14:25:12 UTC
  • mfrom: (1.1.51 upstream) (3.1.5 sid)
  • Revision ID: james.westby@ubuntu.com-20090411142512-jomeween5l8etlrm
Add missing dependency on zlib development library. (Closes:
#523595)

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
# TODO: At some point, handle upgrades by just passing the whole request
18
18
# across to run on the server.
22
22
from bzrlib import (
23
23
    branch,
24
24
    bzrdir,
 
25
    config,
25
26
    debug,
26
27
    errors,
27
28
    graph,
153
154
        except errors.UnknownSmartMethod:
154
155
            medium._remember_remote_is_before((1, 13))
155
156
            return self._vfs_cloning_metadir(require_stacking=require_stacking)
 
157
        except errors.UnknownErrorFromSmartServer, err:
 
158
            if err.error_tuple != ('BranchReference',):
 
159
                raise
 
160
            # We need to resolve the branch reference to determine the
 
161
            # cloning_metadir.  This causes unnecessary RPCs to open the
 
162
            # referenced branch (and bzrdir, etc) but only when the caller
 
163
            # didn't already resolve the branch reference.
 
164
            referenced_branch = self.open_branch()
 
165
            return referenced_branch.bzrdir.cloning_metadir()
156
166
        if len(response) != 3:
157
167
            raise errors.UnexpectedSmartServerResponse(response)
158
168
        control_name, repo_name, branch_info = response
256
266
        """See BzrDir._get_tree_branch()."""
257
267
        return None, self.open_branch()
258
268
 
259
 
    def open_branch(self, _unsupported=False):
 
269
    def open_branch(self, _unsupported=False, ignore_fallbacks=False):
260
270
        if _unsupported:
261
271
            raise NotImplementedError('unsupported flag support not implemented yet.')
262
272
        if self._next_open_branch_result is not None:
268
278
        if response[0] == 'ref':
269
279
            # a branch reference, use the existing BranchReference logic.
270
280
            format = BranchReferenceFormat()
271
 
            return format.open(self, _found=True, location=response[1])
 
281
            return format.open(self, _found=True, location=response[1],
 
282
                ignore_fallbacks=ignore_fallbacks)
272
283
        branch_format_name = response[1]
273
284
        if not branch_format_name:
274
285
            branch_format_name = None
275
286
        format = RemoteBranchFormat(network_name=branch_format_name)
276
 
        return RemoteBranch(self, self.find_repository(), format=format)
 
287
        return RemoteBranch(self, self.find_repository(), format=format,
 
288
            setup_stacking=not ignore_fallbacks)
277
289
 
278
290
    def _open_repo_v1(self, path):
279
291
        verb = 'BzrDir.find_repository'
417
429
        self._rich_root_data = None
418
430
 
419
431
    @property
 
432
    def fast_deltas(self):
 
433
        self._ensure_real()
 
434
        return self._custom_format.fast_deltas
 
435
 
 
436
    @property
420
437
    def rich_root_data(self):
421
438
        if self._rich_root_data is None:
422
439
            self._ensure_real()
620
637
        return self._real_repository.abort_write_group(
621
638
            suppress_errors=suppress_errors)
622
639
 
 
640
    @property
 
641
    def chk_bytes(self):
 
642
        """Decorate the real repository for now.
 
643
 
 
644
        In the long term a full blown network facility is needed to avoid
 
645
        creating a real repository object locally.
 
646
        """
 
647
        self._ensure_real()
 
648
        return self._real_repository.chk_bytes
 
649
 
623
650
    def commit_write_group(self):
624
651
        """Complete a write group on the decorated repository.
625
652
 
643
670
        """Ensure that there is a _real_repository set.
644
671
 
645
672
        Used before calls to self._real_repository.
 
673
 
 
674
        Note that _ensure_real causes many roundtrips to the server which are
 
675
        not desirable, and prevents the use of smart one-roundtrip RPC's to
 
676
        perform complex operations (such as accessing parent data, streaming
 
677
        revisions etc). Adding calls to _ensure_real should only be done when
 
678
        bringing up new functionality, adding fallbacks for smart methods that
 
679
        require a fallback path, and never to replace an existing smart method
 
680
        invocation. If in doubt chat to the bzr network team.
646
681
        """
647
682
        if self._real_repository is None:
648
683
            self.bzrdir._ensure_real()
677
712
        self._ensure_real()
678
713
        return self._real_repository._generate_text_key_index()
679
714
 
680
 
    @symbol_versioning.deprecated_method(symbol_versioning.one_four)
681
 
    def get_revision_graph(self, revision_id=None):
682
 
        """See Repository.get_revision_graph()."""
683
 
        return self._get_revision_graph(revision_id)
684
 
 
685
715
    def _get_revision_graph(self, revision_id):
686
716
        """Private method for using with old (< 1.2) servers to fallback."""
687
717
        if revision_id is None:
820
850
        if not self._lock_mode:
821
851
            self._lock_mode = 'r'
822
852
            self._lock_count = 1
823
 
            self._unstacked_provider.enable_cache(cache_misses=False)
 
853
            self._unstacked_provider.enable_cache(cache_misses=True)
824
854
            if self._real_repository is not None:
825
855
                self._real_repository.lock_read()
826
856
        else:
827
857
            self._lock_count += 1
 
858
        for repo in self._fallback_repositories:
 
859
            repo.lock_read()
828
860
 
829
861
    def _remote_lock_write(self, token):
830
862
        path = self.bzrdir._path_for_remote_call(self._client)
865
897
            raise errors.ReadOnlyError(self)
866
898
        else:
867
899
            self._lock_count += 1
 
900
        for repo in self._fallback_repositories:
 
901
            # Writes don't affect fallback repos
 
902
            repo.lock_read()
868
903
        return self._lock_token or None
869
904
 
870
905
    def leave_lock_in_place(self):
892
927
        if isinstance(repository, RemoteRepository):
893
928
            raise AssertionError()
894
929
        self._real_repository = repository
895
 
        # If the _real_repository has _fallback_repositories, clear them out,
896
 
        # because we want it to have the same set as this repository.  This is
897
 
        # reasonable to do because the fallbacks we clear here are from a
898
 
        # "real" branch, and we're about to replace them with the equivalents
899
 
        # from a RemoteBranch.
900
 
        self._real_repository._fallback_repositories = []
 
930
        # three code paths happen here:
 
931
        # 1) old servers, RemoteBranch.open() calls _ensure_real before setting
 
932
        # up stacking. In this case self._fallback_repositories is [], and the
 
933
        # real repo is already setup. Preserve the real repo and
 
934
        # RemoteRepository.add_fallback_repository will avoid adding
 
935
        # duplicates.
 
936
        # 2) new servers, RemoteBranch.open() sets up stacking, and when
 
937
        # ensure_real is triggered from a branch, the real repository to
 
938
        # set already has a matching list with separate instances, but
 
939
        # as they are also RemoteRepositories we don't worry about making the
 
940
        # lists be identical.
 
941
        # 3) new servers, RemoteRepository.ensure_real is triggered before
 
942
        # RemoteBranch.ensure real, in this case we get a repo with no fallbacks
 
943
        # and need to populate it.
 
944
        if (self._fallback_repositories and
 
945
            len(self._real_repository._fallback_repositories) !=
 
946
            len(self._fallback_repositories)):
 
947
            if len(self._real_repository._fallback_repositories):
 
948
                raise AssertionError(
 
949
                    "cannot cleanly remove existing _fallback_repositories")
901
950
        for fb in self._fallback_repositories:
902
951
            self._real_repository.add_fallback_repository(fb)
903
952
        if self._lock_mode == 'w':
1032
1081
        # _real_branch had its get_stacked_on_url method called), then the
1033
1082
        # repository to be added may already be in the _real_repositories list.
1034
1083
        if self._real_repository is not None:
1035
 
            if repository not in self._real_repository._fallback_repositories:
 
1084
            fallback_locations = [repo.bzrdir.root_transport.base for repo in
 
1085
                self._real_repository._fallback_repositories]
 
1086
            if repository.bzrdir.root_transport.base not in fallback_locations:
1036
1087
                self._real_repository.add_fallback_repository(repository)
1037
 
        else:
1038
 
            # They are also seen by the fallback repository.  If it doesn't
1039
 
            # exist yet they'll be added then.  This implicitly copies them.
1040
 
            self._ensure_real()
1041
1088
 
1042
1089
    def add_inventory(self, revid, inv, parents):
1043
1090
        self._ensure_real()
1082
1129
        self._ensure_real()
1083
1130
        return self._real_repository.make_working_trees()
1084
1131
 
 
1132
    def refresh_data(self):
 
1133
        """Re-read any data needed to to synchronise with disk.
 
1134
 
 
1135
        This method is intended to be called after another repository instance
 
1136
        (such as one used by a smart server) has inserted data into the
 
1137
        repository. It may not be called during a write group, but may be
 
1138
        called at any other time.
 
1139
        """
 
1140
        if self.is_in_write_group():
 
1141
            raise errors.InternalBzrError(
 
1142
                "May not refresh_data while in a write group.")
 
1143
        if self._real_repository is not None:
 
1144
            self._real_repository.refresh_data()
 
1145
 
1085
1146
    def revision_ids_to_search_result(self, result_set):
1086
1147
        """Convert a set of revision ids to a graph SearchResult."""
1087
1148
        result_parents = set()
1108
1169
 
1109
1170
    def fetch(self, source, revision_id=None, pb=None, find_ghosts=False,
1110
1171
            fetch_spec=None):
 
1172
        # No base implementation to use as RemoteRepository is not a subclass
 
1173
        # of Repository; so this is a copy of Repository.fetch().
1111
1174
        if fetch_spec is not None and revision_id is not None:
1112
1175
            raise AssertionError(
1113
1176
                "fetch_spec and revision_id are mutually exclusive.")
1114
 
        # Not delegated to _real_repository so that InterRepository.get has a
1115
 
        # chance to find an InterRepository specialised for RemoteRepository.
 
1177
        if self.is_in_write_group():
 
1178
            raise errors.InternalBzrError(
 
1179
                "May not fetch while in a write group.")
 
1180
        # fast path same-url fetch operations
1116
1181
        if self.has_same_location(source) and fetch_spec is None:
1117
1182
            # check that last_revision is in 'from' and then return a
1118
1183
            # no-operation.
1120
1185
                not revision.is_null(revision_id)):
1121
1186
                self.get_revision(revision_id)
1122
1187
            return 0, []
 
1188
        # if there is no specific appropriate InterRepository, this will get
 
1189
        # the InterRepository base class, which raises an
 
1190
        # IncompatibleRepositories when asked to fetch.
1123
1191
        inter = repository.InterRepository.get(source, self)
1124
 
        try:
1125
 
            return inter.fetch(revision_id=revision_id, pb=pb,
1126
 
                    find_ghosts=find_ghosts, fetch_spec=fetch_spec)
1127
 
        except NotImplementedError:
1128
 
            raise errors.IncompatibleRepositories(source, self)
 
1192
        return inter.fetch(revision_id=revision_id, pb=pb,
 
1193
            find_ghosts=find_ghosts, fetch_spec=fetch_spec)
1129
1194
 
1130
1195
    def create_bundle(self, target, base, fileobj, format=None):
1131
1196
        self._ensure_real()
1162
1227
            # We already found out that the server can't understand
1163
1228
            # Repository.get_parent_map requests, so just fetch the whole
1164
1229
            # graph.
1165
 
            # XXX: Note that this will issue a deprecation warning. This is ok
1166
 
            # :- its because we're working with a deprecated server anyway, and
1167
 
            # the user will almost certainly have seen a warning about the
1168
 
            # server version already.
1169
 
            rg = self.get_revision_graph()
 
1230
            #
 
1231
            # Note that this reads the whole graph, when only some keys are
 
1232
            # wanted.  On this old server there's no way (?) to get them all
 
1233
            # in one go, and the user probably will have seen a warning about
 
1234
            # the server being old anyhow.
 
1235
            rg = self._get_revision_graph(None)
1170
1236
            # There is an api discrepency between get_parent_map and
1171
1237
            # get_revision_graph. Specifically, a "key:()" pair in
1172
1238
            # get_revision_graph just means a node has no parents. For
1203
1269
        # TODO: Manage this incrementally to avoid covering the same path
1204
1270
        # repeatedly. (The server will have to on each request, but the less
1205
1271
        # work done the better).
 
1272
        #
 
1273
        # Negative caching notes:
 
1274
        # new server sends missing when a request including the revid
 
1275
        # 'include-missing:' is present in the request.
 
1276
        # missing keys are serialised as missing:X, and we then call
 
1277
        # provider.note_missing(X) for-all X
1206
1278
        parents_map = self._unstacked_provider.get_cached_map()
1207
1279
        if parents_map is None:
1208
1280
            # Repository is not locked, so there's no cache.
1209
1281
            parents_map = {}
 
1282
        # start_set is all the keys in the cache
1210
1283
        start_set = set(parents_map)
 
1284
        # result set is all the references to keys in the cache
1211
1285
        result_parents = set()
1212
1286
        for parents in parents_map.itervalues():
1213
1287
            result_parents.update(parents)
1214
1288
        stop_keys = result_parents.difference(start_set)
 
1289
        # We don't need to send ghosts back to the server as a position to
 
1290
        # stop either.
 
1291
        stop_keys.difference_update(self._unstacked_provider.missing_keys)
 
1292
        key_count = len(parents_map)
 
1293
        if (NULL_REVISION in result_parents
 
1294
            and NULL_REVISION in self._unstacked_provider.missing_keys):
 
1295
            # If we pruned NULL_REVISION from the stop_keys because it's also
 
1296
            # in our cache of "missing" keys we need to increment our key count
 
1297
            # by 1, because the reconsitituted SearchResult on the server will
 
1298
            # still consider NULL_REVISION to be an included key.
 
1299
            key_count += 1
1215
1300
        included_keys = start_set.intersection(result_parents)
1216
1301
        start_set.difference_update(included_keys)
1217
 
        recipe = (start_set, stop_keys, len(parents_map))
 
1302
        recipe = ('manual', start_set, stop_keys, key_count)
1218
1303
        body = self._serialise_search_recipe(recipe)
1219
1304
        path = self.bzrdir._path_for_remote_call(self._client)
1220
1305
        for key in keys:
1222
1307
                raise ValueError(
1223
1308
                    "key %r not a plain string" % (key,))
1224
1309
        verb = 'Repository.get_parent_map'
1225
 
        args = (path,) + tuple(keys)
 
1310
        args = (path, 'include-missing:') + tuple(keys)
1226
1311
        try:
1227
1312
            response = self._call_with_body_bytes_expecting_body(
1228
1313
                verb, args, body)
1238
1323
            # To avoid having to disconnect repeatedly, we keep track of the
1239
1324
            # fact the server doesn't understand remote methods added in 1.2.
1240
1325
            medium._remember_remote_is_before((1, 2))
1241
 
            return self.get_revision_graph(None)
 
1326
            # Recurse just once and we should use the fallback code.
 
1327
            return self._get_parent_map_rpc(keys)
1242
1328
        response_tuple, response_handler = response
1243
1329
        if response_tuple[0] not in ['ok']:
1244
1330
            response_handler.cancel_read_body()
1255
1341
                if len(d) > 1:
1256
1342
                    revision_graph[d[0]] = d[1:]
1257
1343
                else:
1258
 
                    # No parents - so give the Graph result (NULL_REVISION,).
1259
 
                    revision_graph[d[0]] = (NULL_REVISION,)
 
1344
                    # No parents:
 
1345
                    if d[0].startswith('missing:'):
 
1346
                        revid = d[0][8:]
 
1347
                        self._unstacked_provider.note_missing_key(revid)
 
1348
                    else:
 
1349
                        # no parents - so give the Graph result
 
1350
                        # (NULL_REVISION,).
 
1351
                        revision_graph[d[0]] = (NULL_REVISION,)
1260
1352
            return revision_graph
1261
1353
 
1262
1354
    @needs_read_lock
1265
1357
        return self._real_repository.get_signature_text(revision_id)
1266
1358
 
1267
1359
    @needs_read_lock
1268
 
    @symbol_versioning.deprecated_method(symbol_versioning.one_three)
1269
 
    def get_revision_graph_with_ghosts(self, revision_ids=None):
1270
 
        self._ensure_real()
1271
 
        return self._real_repository.get_revision_graph_with_ghosts(
1272
 
            revision_ids=revision_ids)
1273
 
 
1274
 
    @needs_read_lock
1275
1360
    def get_inventory_xml(self, revision_id):
1276
1361
        self._ensure_real()
1277
1362
        return self._real_repository.get_inventory_xml(revision_id)
1289
1374
        return self._real_repository.all_revision_ids()
1290
1375
 
1291
1376
    @needs_read_lock
1292
 
    def get_deltas_for_revisions(self, revisions):
 
1377
    def get_deltas_for_revisions(self, revisions, specific_fileids=None):
1293
1378
        self._ensure_real()
1294
 
        return self._real_repository.get_deltas_for_revisions(revisions)
 
1379
        return self._real_repository.get_deltas_for_revisions(revisions,
 
1380
            specific_fileids=specific_fileids)
1295
1381
 
1296
1382
    @needs_read_lock
1297
 
    def get_revision_delta(self, revision_id):
 
1383
    def get_revision_delta(self, revision_id, specific_fileids=None):
1298
1384
        self._ensure_real()
1299
 
        return self._real_repository.get_revision_delta(revision_id)
 
1385
        return self._real_repository.get_revision_delta(revision_id,
 
1386
            specific_fileids=specific_fileids)
1300
1387
 
1301
1388
    @needs_read_lock
1302
1389
    def revision_trees(self, revision_ids):
1479
1566
        :param recipe: A search recipe (start, stop, count).
1480
1567
        :return: Serialised bytes.
1481
1568
        """
1482
 
        start_keys = ' '.join(recipe[0])
1483
 
        stop_keys = ' '.join(recipe[1])
1484
 
        count = str(recipe[2])
 
1569
        start_keys = ' '.join(recipe[1])
 
1570
        stop_keys = ' '.join(recipe[2])
 
1571
        count = str(recipe[3])
1485
1572
        return '\n'.join((start_keys, stop_keys, count))
1486
1573
 
1487
1574
    def _serialise_search_result(self, search_result):
1490
1577
            parts.extend(search_result.heads)
1491
1578
        else:
1492
1579
            recipe = search_result.get_recipe()
1493
 
            parts = ['search', self._serialise_search_recipe(recipe)]
 
1580
            parts = [recipe[0], self._serialise_search_recipe(recipe)]
1494
1581
        return '\n'.join(parts)
1495
1582
 
1496
1583
    def autopack(self):
1501
1588
            self._ensure_real()
1502
1589
            self._real_repository._pack_collection.autopack()
1503
1590
            return
1504
 
        if self._real_repository is not None:
1505
 
            # Reset the real repository's cache of pack names.
1506
 
            # XXX: At some point we may be able to skip this and just rely on
1507
 
            # the automatic retry logic to do the right thing, but for now we
1508
 
            # err on the side of being correct rather than being optimal.
1509
 
            self._real_repository._pack_collection.reload_pack_names()
 
1591
        self.refresh_data()
1510
1592
        if response[0] != 'ok':
1511
1593
            raise errors.UnexpectedSmartServerResponse(response)
1512
1594
 
1522
1604
        return result
1523
1605
 
1524
1606
    def insert_stream(self, stream, src_format, resume_tokens):
1525
 
        repo = self.target_repo
1526
 
        client = repo._client
 
1607
        target = self.target_repo
 
1608
        if target._lock_token:
 
1609
            verb = 'Repository.insert_stream_locked'
 
1610
            extra_args = (target._lock_token or '',)
 
1611
            required_version = (1, 14)
 
1612
        else:
 
1613
            verb = 'Repository.insert_stream'
 
1614
            extra_args = ()
 
1615
            required_version = (1, 13)
 
1616
        client = target._client
1527
1617
        medium = client._medium
1528
 
        if medium._is_remote_before((1, 13)):
 
1618
        if medium._is_remote_before(required_version):
1529
1619
            # No possible way this can work.
1530
1620
            return self._insert_real(stream, src_format, resume_tokens)
1531
 
        path = repo.bzrdir._path_for_remote_call(client)
 
1621
        path = target.bzrdir._path_for_remote_call(client)
1532
1622
        if not resume_tokens:
1533
1623
            # XXX: Ugly but important for correctness, *will* be fixed during
1534
1624
            # 1.13 cycle. Pushing a stream that is interrupted results in a
1541
1631
            byte_stream = smart_repo._stream_to_byte_stream([], src_format)
1542
1632
            try:
1543
1633
                response = client.call_with_body_stream(
1544
 
                    ('Repository.insert_stream', path, ''), byte_stream)
 
1634
                    (verb, path, '') + extra_args, byte_stream)
1545
1635
            except errors.UnknownSmartMethod:
1546
 
                medium._remember_remote_is_before((1,13))
 
1636
                medium._remember_remote_is_before(required_version)
1547
1637
                return self._insert_real(stream, src_format, resume_tokens)
1548
1638
        byte_stream = smart_repo._stream_to_byte_stream(
1549
1639
            stream, src_format)
1550
1640
        resume_tokens = ' '.join(resume_tokens)
1551
1641
        response = client.call_with_body_stream(
1552
 
            ('Repository.insert_stream', path, resume_tokens), byte_stream)
 
1642
            (verb, path, resume_tokens) + extra_args, byte_stream)
1553
1643
        if response[0][0] not in ('ok', 'missing-basis'):
1554
1644
            raise errors.UnexpectedSmartServerResponse(response)
1555
1645
        if response[0][0] == 'missing-basis':
1557
1647
            resume_tokens = tokens
1558
1648
            return resume_tokens, missing_keys
1559
1649
        else:
1560
 
            if self.target_repo._real_repository is not None:
1561
 
                collection = getattr(self.target_repo._real_repository,
1562
 
                    '_pack_collection', None)
1563
 
                if collection is not None:
1564
 
                    collection.reload_pack_names()
 
1650
            self.target_repo.refresh_data()
1565
1651
            return [], set()
1566
1652
 
1567
1653
 
1569
1655
    """Stream data from a remote server."""
1570
1656
 
1571
1657
    def get_stream(self, search):
1572
 
        # streaming with fallback repositories is not well defined yet: The
1573
 
        # remote repository cannot see the fallback repositories, and thus
1574
 
        # cannot satisfy the entire search in the general case. Likewise the
1575
 
        # fallback repositories cannot reify the search to determine what they
1576
 
        # should send. It likely needs a return value in the stream listing the
1577
 
        # edge of the search to resume from in fallback repositories.
1578
 
        if self.from_repository._fallback_repositories:
1579
 
            return repository.StreamSource.get_stream(self, search)
1580
 
        repo = self.from_repository
 
1658
        if (self.from_repository._fallback_repositories and
 
1659
            self.to_format._fetch_order == 'topological'):
 
1660
            return self._real_stream(self.from_repository, search)
 
1661
        return self.missing_parents_chain(search, [self.from_repository] +
 
1662
            self.from_repository._fallback_repositories)
 
1663
 
 
1664
    def _real_stream(self, repo, search):
 
1665
        """Get a stream for search from repo.
 
1666
        
 
1667
        This never called RemoteStreamSource.get_stream, and is a heler
 
1668
        for RemoteStreamSource._get_stream to allow getting a stream 
 
1669
        reliably whether fallback back because of old servers or trying
 
1670
        to stream from a non-RemoteRepository (which the stacked support
 
1671
        code will do).
 
1672
        """
 
1673
        source = repo._get_source(self.to_format)
 
1674
        if isinstance(source, RemoteStreamSource):
 
1675
            return repository.StreamSource.get_stream(source, search)
 
1676
        return source.get_stream(search)
 
1677
 
 
1678
    def _get_stream(self, repo, search):
 
1679
        """Core worker to get a stream from repo for search.
 
1680
 
 
1681
        This is used by both get_stream and the stacking support logic. It
 
1682
        deliberately gets a stream for repo which does not need to be
 
1683
        self.from_repository. In the event that repo is not Remote, or
 
1684
        cannot do a smart stream, a fallback is made to the generic
 
1685
        repository._get_stream() interface, via self._real_stream.
 
1686
 
 
1687
        In the event of stacking, streams from _get_stream will not
 
1688
        contain all the data for search - this is normal (see get_stream).
 
1689
 
 
1690
        :param repo: A repository.
 
1691
        :param search: A search.
 
1692
        """
 
1693
        # Fallbacks may be non-smart
 
1694
        if not isinstance(repo, RemoteRepository):
 
1695
            return self._real_stream(repo, search)
1581
1696
        client = repo._client
1582
1697
        medium = client._medium
1583
1698
        if medium._is_remote_before((1, 13)):
1584
 
            # No possible way this can work.
1585
 
            return repository.StreamSource.get_stream(self, search)
 
1699
            # streaming was added in 1.13
 
1700
            return self._real_stream(repo, search)
1586
1701
        path = repo.bzrdir._path_for_remote_call(client)
1587
1702
        try:
1588
1703
            search_bytes = repo._serialise_search_result(search)
1592
1707
            response_tuple, response_handler = response
1593
1708
        except errors.UnknownSmartMethod:
1594
1709
            medium._remember_remote_is_before((1,13))
1595
 
            return repository.StreamSource.get_stream(self, search)
 
1710
            return self._real_stream(repo, search)
1596
1711
        if response_tuple[0] != 'ok':
1597
1712
            raise errors.UnexpectedSmartServerResponse(response_tuple)
1598
1713
        byte_stream = response_handler.read_streamed_body()
1603
1718
                src_format.network_name(), repo._format.network_name()))
1604
1719
        return stream
1605
1720
 
 
1721
    def missing_parents_chain(self, search, sources):
 
1722
        """Chain multiple streams together to handle stacking.
 
1723
 
 
1724
        :param search: The overall search to satisfy with streams.
 
1725
        :param sources: A list of Repository objects to query.
 
1726
        """
 
1727
        self.serialiser = self.to_format._serializer
 
1728
        self.seen_revs = set()
 
1729
        self.referenced_revs = set()
 
1730
        # If there are heads in the search, or the key count is > 0, we are not
 
1731
        # done.
 
1732
        while not search.is_empty() and len(sources) > 1:
 
1733
            source = sources.pop(0)
 
1734
            stream = self._get_stream(source, search)
 
1735
            for kind, substream in stream:
 
1736
                if kind != 'revisions':
 
1737
                    yield kind, substream
 
1738
                else:
 
1739
                    yield kind, self.missing_parents_rev_handler(substream)
 
1740
            search = search.refine(self.seen_revs, self.referenced_revs)
 
1741
            self.seen_revs = set()
 
1742
            self.referenced_revs = set()
 
1743
        if not search.is_empty():
 
1744
            for kind, stream in self._get_stream(sources[0], search):
 
1745
                yield kind, stream
 
1746
 
 
1747
    def missing_parents_rev_handler(self, substream):
 
1748
        for content in substream:
 
1749
            revision_bytes = content.get_bytes_as('fulltext')
 
1750
            revision = self.serialiser.read_revision_from_string(revision_bytes)
 
1751
            self.seen_revs.add(content.key[-1])
 
1752
            self.referenced_revs.update(revision.parent_ids)
 
1753
            yield content
 
1754
 
1606
1755
 
1607
1756
class RemoteBranchLockableFiles(LockableFiles):
1608
1757
    """A 'LockableFiles' implementation that talks to a smart server.
1648
1797
    def network_name(self):
1649
1798
        return self._network_name
1650
1799
 
1651
 
    def open(self, a_bzrdir):
1652
 
        return a_bzrdir.open_branch()
 
1800
    def open(self, a_bzrdir, ignore_fallbacks=False):
 
1801
        return a_bzrdir.open_branch(ignore_fallbacks=ignore_fallbacks)
1653
1802
 
1654
1803
    def _vfs_initialize(self, a_bzrdir):
1655
1804
        # Initialisation when using a local bzrdir object, or a non-vfs init
1813
1962
        except (errors.NotStacked, errors.UnstackableBranchFormat,
1814
1963
            errors.UnstackableRepositoryFormat), e:
1815
1964
            return
1816
 
        # it's relative to this branch...
1817
 
        fallback_url = urlutils.join(self.base, fallback_url)
1818
 
        transports = [self.bzrdir.root_transport]
1819
 
        stacked_on = branch.Branch.open(fallback_url,
1820
 
                                        possible_transports=transports)
1821
 
        self.repository.add_fallback_repository(stacked_on.repository)
 
1965
        self._activate_fallback_location(fallback_url)
 
1966
 
 
1967
    def _get_config(self):
 
1968
        return RemoteBranchConfig(self)
1822
1969
 
1823
1970
    def _get_real_transport(self):
1824
1971
        # if we try vfs access, return the real branch's vfs transport
2166
2313
            self._ensure_real()
2167
2314
            return self._real_branch._set_parent_location(url)
2168
2315
 
2169
 
    def set_stacked_on_url(self, stacked_location):
2170
 
        """Set the URL this branch is stacked against.
2171
 
 
2172
 
        :raises UnstackableBranchFormat: If the branch does not support
2173
 
            stacking.
2174
 
        :raises UnstackableRepositoryFormat: If the repository does not support
2175
 
            stacking.
2176
 
        """
2177
 
        self._ensure_real()
2178
 
        return self._real_branch.set_stacked_on_url(stacked_location)
2179
 
 
2180
2316
    @needs_write_lock
2181
2317
    def pull(self, source, overwrite=False, stop_revision=None,
2182
2318
             **kwargs):
2249
2385
        return self._real_branch.set_push_location(location)
2250
2386
 
2251
2387
 
 
2388
class RemoteBranchConfig(object):
 
2389
    """A Config that reads from a smart branch and writes via smart methods.
 
2390
 
 
2391
    It is a low-level object that considers config data to be name/value pairs
 
2392
    that may be associated with a section. Assigning meaning to the these
 
2393
    values is done at higher levels like bzrlib.config.TreeConfig.
 
2394
    """
 
2395
 
 
2396
    def __init__(self, branch):
 
2397
        self._branch = branch
 
2398
 
 
2399
    def get_option(self, name, section=None, default=None):
 
2400
        """Return the value associated with a named option.
 
2401
 
 
2402
        :param name: The name of the value
 
2403
        :param section: The section the option is in (if any)
 
2404
        :param default: The value to return if the value is not set
 
2405
        :return: The value or default value
 
2406
        """
 
2407
        configobj = self._get_configobj()
 
2408
        if section is None:
 
2409
            section_obj = configobj
 
2410
        else:
 
2411
            try:
 
2412
                section_obj = configobj[section]
 
2413
            except KeyError:
 
2414
                return default
 
2415
        return section_obj.get(name, default)
 
2416
 
 
2417
    def _get_configobj(self):
 
2418
        path = self._branch._remote_path()
 
2419
        response = self._branch._client.call_expecting_body(
 
2420
            'Branch.get_config_file', path)
 
2421
        if response[0][0] != 'ok':
 
2422
            raise UnexpectedSmartServerResponse(response)
 
2423
        lines = response[1].read_body_bytes().splitlines()
 
2424
        return config.ConfigObj(lines, encoding='utf-8')
 
2425
 
 
2426
    def set_option(self, value, name, section=None):
 
2427
        """Set the value associated with a named option.
 
2428
 
 
2429
        :param value: The value to set
 
2430
        :param name: The name of the value to set
 
2431
        :param section: The section the option is in (if any)
 
2432
        """
 
2433
        medium = self._branch._client._medium
 
2434
        if medium._is_remote_before((1, 14)):
 
2435
            return self._vfs_set_option(value, name, section)
 
2436
        try:
 
2437
            path = self._branch._remote_path()
 
2438
            response = self._branch._client.call('Branch.set_config_option',
 
2439
                path, self._branch._lock_token, self._branch._repo_lock_token,
 
2440
                value.encode('utf8'), name, section or '')
 
2441
        except errors.UnknownSmartMethod:
 
2442
            medium._remember_remote_is_before((1, 14))
 
2443
            return self._vfs_set_option(value, name, section)
 
2444
        if response != ():
 
2445
            raise errors.UnexpectedSmartServerResponse(response)
 
2446
 
 
2447
    def _vfs_set_option(self, value, name, section=None):
 
2448
        self._branch._ensure_real()
 
2449
        return self._branch._real_branch._get_config().set_option(
 
2450
            value, name, section)
 
2451
 
 
2452
 
2252
2453
def _extract_tar(tar, to_dir):
2253
2454
    """Extract all the contents of a tarfile object.
2254
2455