78
78
from bzrlib import (
81
from bzrlib.trace import mutter
82
81
from bzrlib.errors import (WeaveError, WeaveFormatError, WeaveParentMismatch,
83
82
RevisionAlreadyPresent,
84
83
RevisionNotPresent,
84
UnavailableRepresentation,
85
85
WeaveRevisionAlreadyPresent,
86
86
WeaveRevisionNotPresent,
88
88
import bzrlib.errors as errors
89
from bzrlib.osutils import sha_strings
89
from bzrlib.osutils import sha_strings, split_lines
90
90
import bzrlib.patiencediff
91
from bzrlib.symbol_versioning import *
92
from bzrlib.trace import mutter
91
93
from bzrlib.tsort import topo_sort
92
from bzrlib.versionedfile import VersionedFile, InterVersionedFile
94
from bzrlib.versionedfile import (
93
101
from bzrlib.weavefile import _read_weave_v5, write_weave_v5
104
class WeaveContentFactory(ContentFactory):
105
"""Content factory for streaming from weaves.
107
:seealso ContentFactory:
110
def __init__(self, version, weave):
111
"""Create a WeaveContentFactory for version from weave."""
112
ContentFactory.__init__(self)
113
self.sha1 = weave.get_sha1s([version])[0]
114
self.key = (version,)
115
parents = weave.get_parent_map([version])[version]
116
self.parents = tuple((parent,) for parent in parents)
117
self.storage_kind = 'fulltext'
120
def get_bytes_as(self, storage_kind):
121
if storage_kind == 'fulltext':
122
return self._weave.get_text(self.key[0])
124
raise UnavailableRepresentation(self.key, storage_kind, 'fulltext')
96
127
class Weave(VersionedFile):
97
128
"""weave - versioned text file storage.
185
216
__slots__ = ['_weave', '_parents', '_sha1s', '_names', '_name_map',
186
217
'_weave_name', '_matcher']
188
def __init__(self, weave_name=None, access_mode='w', matcher=None):
219
def __init__(self, weave_name=None, access_mode='w', matcher=None, get_scope=None):
222
:param get_scope: A callable that returns an opaque object to be used
223
for detecting when this weave goes out of scope (should stop
224
answering requests or allowing mutation).
189
226
super(Weave, self).__init__(access_mode)
191
228
self._parents = []
197
234
self._matcher = bzrlib.patiencediff.PatienceSequenceMatcher
199
236
self._matcher = matcher
237
if get_scope is None:
238
get_scope = lambda:None
239
self._get_scope = get_scope
240
self._scope = get_scope()
241
self._access_mode = access_mode
201
243
def __repr__(self):
202
244
return "Weave(%r)" % self._weave_name
246
def _check_write_ok(self):
247
"""Is the versioned file marked as 'finished' ? Raise if it is."""
248
if self._get_scope() != self._scope:
249
raise errors.OutSideTransaction()
250
if self._access_mode != 'w':
251
raise errors.ReadOnlyObjectDirtiedError(self)
205
254
"""Return a deep copy of self.
246
295
__contains__ = has_version
248
def get_parents(self, version_id):
249
"""See VersionedFile.get_parent."""
250
return map(self._idx_to_name, self._parents[self._lookup(version_id)])
297
def get_record_stream(self, versions, ordering, include_delta_closure):
298
"""Get a stream of records for versions.
300
:param versions: The versions to include. Each version is a tuple
302
:param ordering: Either 'unordered' or 'topological'. A topologically
303
sorted stream has compression parents strictly before their
305
:param include_delta_closure: If True then the closure across any
306
compression parents will be included (in the opaque data).
307
:return: An iterator of ContentFactory objects, each of which is only
308
valid until the iterator is advanced.
310
if ordering == 'topological':
311
parents = self.get_parent_map(versions)
312
new_versions = topo_sort(parents)
313
new_versions.extend(set(versions).difference(set(parents)))
314
versions = new_versions
315
for version in versions:
317
yield WeaveContentFactory(version, self)
319
yield AbsentContentFactory((version,))
321
def get_parent_map(self, version_ids):
322
"""See VersionedFile.get_parent_map."""
324
for version_id in version_ids:
326
result[version_id] = tuple(
327
map(self._idx_to_name, self._parents[self._lookup(version_id)]))
328
except RevisionNotPresent:
332
def get_parents_with_ghosts(self, version_id):
333
raise NotImplementedError(self.get_parents_with_ghosts)
335
def insert_record_stream(self, stream):
336
"""Insert a record stream into this versioned file.
338
:param stream: A stream of records to insert.
340
:seealso VersionedFile.get_record_stream:
343
for record in stream:
344
# Raise an error when a record is missing.
345
if record.storage_kind == 'absent':
346
raise RevisionNotPresent([record.key[0]], self)
347
# adapt to non-tuple interface
348
parents = [parent[0] for parent in record.parents]
349
if record.storage_kind == 'fulltext':
350
self.add_lines(record.key[0], parents,
351
split_lines(record.get_bytes_as('fulltext')))
353
adapter_key = record.storage_kind, 'fulltext'
355
adapter = adapters[adapter_key]
357
adapter_factory = adapter_registry.get(adapter_key)
358
adapter = adapter_factory(self)
359
adapters[adapter_key] = adapter
360
lines = split_lines(adapter.get_bytes(
361
record, record.get_bytes_as(record.storage_kind)))
363
self.add_lines(record.key[0], parents, lines)
364
except RevisionAlreadyPresent:
252
367
def _check_repeated_add(self, name, parents, text, sha1):
253
368
"""Check that a duplicated add is OK.
444
547
return len(other_parents.difference(my_parents)) == 0
446
def annotate_iter(self, version_id):
447
"""Yield list of (version-id, line) pairs for the specified version.
549
def annotate(self, version_id):
550
"""Return a list of (version-id, line) tuples for version_id.
449
552
The index indicates when the line originated in the weave."""
450
553
incls = [self._lookup(version_id)]
451
for origin, lineno, text in self._extract(incls):
452
yield self._idx_to_name(origin), text
554
return [(self._idx_to_name(origin), text) for origin, lineno, text in
555
self._extract(incls)]
454
557
def iter_lines_added_or_present_in_versions(self, version_ids=None,
703
795
for p in self._parents[i]:
704
796
new_inc.update(inclusions[self._idx_to_name(p)])
706
assert set(new_inc) == set(self.get_ancestry(name)), \
707
'failed %s != %s' % (set(new_inc), set(self.get_ancestry(name)))
798
if set(new_inc) != set(self.get_ancestry(name)):
799
raise AssertionError(
801
% (set(new_inc), set(self.get_ancestry(name))))
708
802
inclusions[name] = new_inc
710
804
nlines = len(self._weave)
758
852
version_ids = set(other.versions()).intersection(set(version_ids))
759
853
# pull in the referenced graph.
760
854
version_ids = other.get_ancestry(version_ids)
761
pending_graph = [(version, other.get_parents(version)) for
762
version in version_ids]
855
pending_parents = other.get_parent_map(version_ids)
856
pending_graph = pending_parents.items()
857
if len(pending_graph) != len(version_ids):
858
raise RevisionNotPresent(
859
set(version_ids) - set(pending_parents.keys()), self)
763
860
for name in topo_sort(pending_graph):
764
861
other_idx = other._name_map[name]
765
862
# returns True if we have it, False if we need it.
854
950
WEAVE_SUFFIX = '.weave'
856
def __init__(self, name, transport, filemode=None, create=False, access_mode='w'):
952
def __init__(self, name, transport, filemode=None, create=False, access_mode='w', get_scope=None):
857
953
"""Create a WeaveFile.
859
955
:param create: If not True, only open an existing knit.
861
super(WeaveFile, self).__init__(name, access_mode)
957
super(WeaveFile, self).__init__(name, access_mode, get_scope=get_scope)
862
958
self._transport = transport
863
959
self._filemode = filemode
882
def _clone_text(self, new_version_id, old_version_id, parents):
883
"""See VersionedFile.clone_text."""
884
super(WeaveFile, self)._clone_text(new_version_id, old_version_id, parents)
887
978
def copy_to(self, name, transport):
888
979
"""See VersionedFile.copy_to()."""
889
980
# as we are all in memory always, just serialise to the new place.
910
998
"""See VersionedFile.get_suffixes()."""
911
999
return [WeaveFile.WEAVE_SUFFIX]
1001
def insert_record_stream(self, stream):
1002
super(WeaveFile, self).insert_record_stream(stream)
1005
@deprecated_method(one_five)
913
1006
def join(self, other, pb=None, msg=None, version_ids=None,
914
1007
ignore_missing=False):
915
1008
"""Join other into self and save."""