~ubuntu-branches/debian/wheezy/bzr/wheezy

« back to all changes in this revision

Viewing changes to .pc/07_lazy_import_scope/bzrlib/knit.py

  • Committer: Bazaar Package Importer
  • Author(s): Jelmer Vernooij, Jelmer Vernooij, John Ferlito
  • Date: 2011-04-05 14:38:50 UTC
  • Revision ID: james.westby@ubuntu.com-20110405143850-oi7cy895yzwwpeaz
Tags: 2.3.1-2
[ Jelmer Vernooij ]
* Add python-medusa, python-lzma and python-meliae as build
  dependencies (used by the test suite).

[ John Ferlito ]
* Remove John Ferlito from Uploaders.

[ Jelmer Vernooij ]
* Prefer cython as build dependency.
* Remove generated C files during clean.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Knit versionedfile implementation.
 
18
 
 
19
A knit is a versioned file implementation that supports efficient append only
 
20
updates.
 
21
 
 
22
Knit file layout:
 
23
lifeless: the data file is made up of "delta records".  each delta record has a delta header
 
24
that contains; (1) a version id, (2) the size of the delta (in lines), and (3)  the digest of
 
25
the -expanded data- (ie, the delta applied to the parent).  the delta also ends with a
 
26
end-marker; simply "end VERSION"
 
27
 
 
28
delta can be line or full contents.a
 
29
... the 8's there are the index number of the annotation.
 
30
version robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad 7 c7d23b2a5bd6ca00e8e266cec0ec228158ee9f9e
 
31
59,59,3
 
32
8
 
33
8         if ie.executable:
 
34
8             e.set('executable', 'yes')
 
35
130,130,2
 
36
8         if elt.get('executable') == 'yes':
 
37
8             ie.executable = True
 
38
end robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad
 
39
 
 
40
 
 
41
whats in an index:
 
42
09:33 < jrydberg> lifeless: each index is made up of a tuple of; version id, options, position, size, parents
 
43
09:33 < jrydberg> lifeless: the parents are currently dictionary compressed
 
44
09:33 < jrydberg> lifeless: (meaning it currently does not support ghosts)
 
45
09:33 < lifeless> right
 
46
09:33 < jrydberg> lifeless: the position and size is the range in the data file
 
47
 
 
48
 
 
49
so the index sequence is the dictionary compressed sequence number used
 
50
in the deltas to provide line annotation
 
51
 
 
52
"""
 
53
 
 
54
 
 
55
from cStringIO import StringIO
 
56
from itertools import izip
 
57
import gzip
 
58
import operator
 
59
import os
 
60
import sys
 
61
 
 
62
from bzrlib.lazy_import import lazy_import
 
63
lazy_import(globals(), """
 
64
from bzrlib import (
 
65
    annotate,
 
66
    debug,
 
67
    diff,
 
68
    graph as _mod_graph,
 
69
    index as _mod_index,
 
70
    lru_cache,
 
71
    pack,
 
72
    patiencediff,
 
73
    progress,
 
74
    static_tuple,
 
75
    trace,
 
76
    tsort,
 
77
    tuned_gzip,
 
78
    ui,
 
79
    )
 
80
""")
 
81
from bzrlib import (
 
82
    errors,
 
83
    osutils,
 
84
    )
 
85
from bzrlib.errors import (
 
86
    FileExists,
 
87
    NoSuchFile,
 
88
    KnitError,
 
89
    InvalidRevisionId,
 
90
    KnitCorrupt,
 
91
    KnitHeaderError,
 
92
    RevisionNotPresent,
 
93
    RevisionAlreadyPresent,
 
94
    SHA1KnitCorrupt,
 
95
    )
 
96
from bzrlib.osutils import (
 
97
    contains_whitespace,
 
98
    contains_linebreaks,
 
99
    sha_string,
 
100
    sha_strings,
 
101
    split_lines,
 
102
    )
 
103
from bzrlib.versionedfile import (
 
104
    AbsentContentFactory,
 
105
    adapter_registry,
 
106
    ConstantMapper,
 
107
    ContentFactory,
 
108
    ChunkedContentFactory,
 
109
    sort_groupcompress,
 
110
    VersionedFile,
 
111
    VersionedFiles,
 
112
    )
 
113
 
 
114
 
 
115
# TODO: Split out code specific to this format into an associated object.
 
116
 
 
117
# TODO: Can we put in some kind of value to check that the index and data
 
118
# files belong together?
 
119
 
 
120
# TODO: accommodate binaries, perhaps by storing a byte count
 
121
 
 
122
# TODO: function to check whole file
 
123
 
 
124
# TODO: atomically append data, then measure backwards from the cursor
 
125
# position after writing to work out where it was located.  we may need to
 
126
# bypass python file buffering.
 
127
 
 
128
DATA_SUFFIX = '.knit'
 
129
INDEX_SUFFIX = '.kndx'
 
130
_STREAM_MIN_BUFFER_SIZE = 5*1024*1024
 
131
 
 
132
 
 
133
class KnitAdapter(object):
 
134
    """Base class for knit record adaption."""
 
135
 
 
136
    def __init__(self, basis_vf):
 
137
        """Create an adapter which accesses full texts from basis_vf.
 
138
 
 
139
        :param basis_vf: A versioned file to access basis texts of deltas from.
 
140
            May be None for adapters that do not need to access basis texts.
 
141
        """
 
142
        self._data = KnitVersionedFiles(None, None)
 
143
        self._annotate_factory = KnitAnnotateFactory()
 
144
        self._plain_factory = KnitPlainFactory()
 
145
        self._basis_vf = basis_vf
 
146
 
 
147
 
 
148
class FTAnnotatedToUnannotated(KnitAdapter):
 
149
    """An adapter from FT annotated knits to unannotated ones."""
 
150
 
 
151
    def get_bytes(self, factory):
 
152
        annotated_compressed_bytes = factory._raw_record
 
153
        rec, contents = \
 
154
            self._data._parse_record_unchecked(annotated_compressed_bytes)
 
155
        content = self._annotate_factory.parse_fulltext(contents, rec[1])
 
156
        size, bytes = self._data._record_to_data((rec[1],), rec[3], content.text())
 
157
        return bytes
 
158
 
 
159
 
 
160
class DeltaAnnotatedToUnannotated(KnitAdapter):
 
161
    """An adapter for deltas from annotated to unannotated."""
 
162
 
 
163
    def get_bytes(self, factory):
 
164
        annotated_compressed_bytes = factory._raw_record
 
165
        rec, contents = \
 
166
            self._data._parse_record_unchecked(annotated_compressed_bytes)
 
167
        delta = self._annotate_factory.parse_line_delta(contents, rec[1],
 
168
            plain=True)
 
169
        contents = self._plain_factory.lower_line_delta(delta)
 
170
        size, bytes = self._data._record_to_data((rec[1],), rec[3], contents)
 
171
        return bytes
 
172
 
 
173
 
 
174
class FTAnnotatedToFullText(KnitAdapter):
 
175
    """An adapter from FT annotated knits to unannotated ones."""
 
176
 
 
177
    def get_bytes(self, factory):
 
178
        annotated_compressed_bytes = factory._raw_record
 
179
        rec, contents = \
 
180
            self._data._parse_record_unchecked(annotated_compressed_bytes)
 
181
        content, delta = self._annotate_factory.parse_record(factory.key[-1],
 
182
            contents, factory._build_details, None)
 
183
        return ''.join(content.text())
 
184
 
 
185
 
 
186
class DeltaAnnotatedToFullText(KnitAdapter):
 
187
    """An adapter for deltas from annotated to unannotated."""
 
188
 
 
189
    def get_bytes(self, factory):
 
190
        annotated_compressed_bytes = factory._raw_record
 
191
        rec, contents = \
 
192
            self._data._parse_record_unchecked(annotated_compressed_bytes)
 
193
        delta = self._annotate_factory.parse_line_delta(contents, rec[1],
 
194
            plain=True)
 
195
        compression_parent = factory.parents[0]
 
196
        basis_entry = self._basis_vf.get_record_stream(
 
197
            [compression_parent], 'unordered', True).next()
 
198
        if basis_entry.storage_kind == 'absent':
 
199
            raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
 
200
        basis_chunks = basis_entry.get_bytes_as('chunked')
 
201
        basis_lines = osutils.chunks_to_lines(basis_chunks)
 
202
        # Manually apply the delta because we have one annotated content and
 
203
        # one plain.
 
204
        basis_content = PlainKnitContent(basis_lines, compression_parent)
 
205
        basis_content.apply_delta(delta, rec[1])
 
206
        basis_content._should_strip_eol = factory._build_details[1]
 
207
        return ''.join(basis_content.text())
 
208
 
 
209
 
 
210
class FTPlainToFullText(KnitAdapter):
 
211
    """An adapter from FT plain knits to unannotated ones."""
 
212
 
 
213
    def get_bytes(self, factory):
 
214
        compressed_bytes = factory._raw_record
 
215
        rec, contents = \
 
216
            self._data._parse_record_unchecked(compressed_bytes)
 
217
        content, delta = self._plain_factory.parse_record(factory.key[-1],
 
218
            contents, factory._build_details, None)
 
219
        return ''.join(content.text())
 
220
 
 
221
 
 
222
class DeltaPlainToFullText(KnitAdapter):
 
223
    """An adapter for deltas from annotated to unannotated."""
 
224
 
 
225
    def get_bytes(self, factory):
 
226
        compressed_bytes = factory._raw_record
 
227
        rec, contents = \
 
228
            self._data._parse_record_unchecked(compressed_bytes)
 
229
        delta = self._plain_factory.parse_line_delta(contents, rec[1])
 
230
        compression_parent = factory.parents[0]
 
231
        # XXX: string splitting overhead.
 
232
        basis_entry = self._basis_vf.get_record_stream(
 
233
            [compression_parent], 'unordered', True).next()
 
234
        if basis_entry.storage_kind == 'absent':
 
235
            raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
 
236
        basis_chunks = basis_entry.get_bytes_as('chunked')
 
237
        basis_lines = osutils.chunks_to_lines(basis_chunks)
 
238
        basis_content = PlainKnitContent(basis_lines, compression_parent)
 
239
        # Manually apply the delta because we have one annotated content and
 
240
        # one plain.
 
241
        content, _ = self._plain_factory.parse_record(rec[1], contents,
 
242
            factory._build_details, basis_content)
 
243
        return ''.join(content.text())
 
244
 
 
245
 
 
246
class KnitContentFactory(ContentFactory):
 
247
    """Content factory for streaming from knits.
 
248
 
 
249
    :seealso ContentFactory:
 
250
    """
 
251
 
 
252
    def __init__(self, key, parents, build_details, sha1, raw_record,
 
253
        annotated, knit=None, network_bytes=None):
 
254
        """Create a KnitContentFactory for key.
 
255
 
 
256
        :param key: The key.
 
257
        :param parents: The parents.
 
258
        :param build_details: The build details as returned from
 
259
            get_build_details.
 
260
        :param sha1: The sha1 expected from the full text of this object.
 
261
        :param raw_record: The bytes of the knit data from disk.
 
262
        :param annotated: True if the raw data is annotated.
 
263
        :param network_bytes: None to calculate the network bytes on demand,
 
264
            not-none if they are already known.
 
265
        """
 
266
        ContentFactory.__init__(self)
 
267
        self.sha1 = sha1
 
268
        self.key = key
 
269
        self.parents = parents
 
270
        if build_details[0] == 'line-delta':
 
271
            kind = 'delta'
 
272
        else:
 
273
            kind = 'ft'
 
274
        if annotated:
 
275
            annotated_kind = 'annotated-'
 
276
        else:
 
277
            annotated_kind = ''
 
278
        self.storage_kind = 'knit-%s%s-gz' % (annotated_kind, kind)
 
279
        self._raw_record = raw_record
 
280
        self._network_bytes = network_bytes
 
281
        self._build_details = build_details
 
282
        self._knit = knit
 
283
 
 
284
    def _create_network_bytes(self):
 
285
        """Create a fully serialised network version for transmission."""
 
286
        # storage_kind, key, parents, Noeol, raw_record
 
287
        key_bytes = '\x00'.join(self.key)
 
288
        if self.parents is None:
 
289
            parent_bytes = 'None:'
 
290
        else:
 
291
            parent_bytes = '\t'.join('\x00'.join(key) for key in self.parents)
 
292
        if self._build_details[1]:
 
293
            noeol = 'N'
 
294
        else:
 
295
            noeol = ' '
 
296
        network_bytes = "%s\n%s\n%s\n%s%s" % (self.storage_kind, key_bytes,
 
297
            parent_bytes, noeol, self._raw_record)
 
298
        self._network_bytes = network_bytes
 
299
 
 
300
    def get_bytes_as(self, storage_kind):
 
301
        if storage_kind == self.storage_kind:
 
302
            if self._network_bytes is None:
 
303
                self._create_network_bytes()
 
304
            return self._network_bytes
 
305
        if ('-ft-' in self.storage_kind and
 
306
            storage_kind in ('chunked', 'fulltext')):
 
307
            adapter_key = (self.storage_kind, 'fulltext')
 
308
            adapter_factory = adapter_registry.get(adapter_key)
 
309
            adapter = adapter_factory(None)
 
310
            bytes = adapter.get_bytes(self)
 
311
            if storage_kind == 'chunked':
 
312
                return [bytes]
 
313
            else:
 
314
                return bytes
 
315
        if self._knit is not None:
 
316
            # Not redundant with direct conversion above - that only handles
 
317
            # fulltext cases.
 
318
            if storage_kind == 'chunked':
 
319
                return self._knit.get_lines(self.key[0])
 
320
            elif storage_kind == 'fulltext':
 
321
                return self._knit.get_text(self.key[0])
 
322
        raise errors.UnavailableRepresentation(self.key, storage_kind,
 
323
            self.storage_kind)
 
324
 
 
325
 
 
326
class LazyKnitContentFactory(ContentFactory):
 
327
    """A ContentFactory which can either generate full text or a wire form.
 
328
 
 
329
    :seealso ContentFactory:
 
330
    """
 
331
 
 
332
    def __init__(self, key, parents, generator, first):
 
333
        """Create a LazyKnitContentFactory.
 
334
 
 
335
        :param key: The key of the record.
 
336
        :param parents: The parents of the record.
 
337
        :param generator: A _ContentMapGenerator containing the record for this
 
338
            key.
 
339
        :param first: Is this the first content object returned from generator?
 
340
            if it is, its storage kind is knit-delta-closure, otherwise it is
 
341
            knit-delta-closure-ref
 
342
        """
 
343
        self.key = key
 
344
        self.parents = parents
 
345
        self.sha1 = None
 
346
        self._generator = generator
 
347
        self.storage_kind = "knit-delta-closure"
 
348
        if not first:
 
349
            self.storage_kind = self.storage_kind + "-ref"
 
350
        self._first = first
 
351
 
 
352
    def get_bytes_as(self, storage_kind):
 
353
        if storage_kind == self.storage_kind:
 
354
            if self._first:
 
355
                return self._generator._wire_bytes()
 
356
            else:
 
357
                # all the keys etc are contained in the bytes returned in the
 
358
                # first record.
 
359
                return ''
 
360
        if storage_kind in ('chunked', 'fulltext'):
 
361
            chunks = self._generator._get_one_work(self.key).text()
 
362
            if storage_kind == 'chunked':
 
363
                return chunks
 
364
            else:
 
365
                return ''.join(chunks)
 
366
        raise errors.UnavailableRepresentation(self.key, storage_kind,
 
367
            self.storage_kind)
 
368
 
 
369
 
 
370
def knit_delta_closure_to_records(storage_kind, bytes, line_end):
 
371
    """Convert a network record to a iterator over stream records.
 
372
 
 
373
    :param storage_kind: The storage kind of the record.
 
374
        Must be 'knit-delta-closure'.
 
375
    :param bytes: The bytes of the record on the network.
 
376
    """
 
377
    generator = _NetworkContentMapGenerator(bytes, line_end)
 
378
    return generator.get_record_stream()
 
379
 
 
380
 
 
381
def knit_network_to_record(storage_kind, bytes, line_end):
 
382
    """Convert a network record to a record object.
 
383
 
 
384
    :param storage_kind: The storage kind of the record.
 
385
    :param bytes: The bytes of the record on the network.
 
386
    """
 
387
    start = line_end
 
388
    line_end = bytes.find('\n', start)
 
389
    key = tuple(bytes[start:line_end].split('\x00'))
 
390
    start = line_end + 1
 
391
    line_end = bytes.find('\n', start)
 
392
    parent_line = bytes[start:line_end]
 
393
    if parent_line == 'None:':
 
394
        parents = None
 
395
    else:
 
396
        parents = tuple(
 
397
            [tuple(segment.split('\x00')) for segment in parent_line.split('\t')
 
398
             if segment])
 
399
    start = line_end + 1
 
400
    noeol = bytes[start] == 'N'
 
401
    if 'ft' in storage_kind:
 
402
        method = 'fulltext'
 
403
    else:
 
404
        method = 'line-delta'
 
405
    build_details = (method, noeol)
 
406
    start = start + 1
 
407
    raw_record = bytes[start:]
 
408
    annotated = 'annotated' in storage_kind
 
409
    return [KnitContentFactory(key, parents, build_details, None, raw_record,
 
410
        annotated, network_bytes=bytes)]
 
411
 
 
412
 
 
413
class KnitContent(object):
 
414
    """Content of a knit version to which deltas can be applied.
 
415
 
 
416
    This is always stored in memory as a list of lines with \n at the end,
 
417
    plus a flag saying if the final ending is really there or not, because that
 
418
    corresponds to the on-disk knit representation.
 
419
    """
 
420
 
 
421
    def __init__(self):
 
422
        self._should_strip_eol = False
 
423
 
 
424
    def apply_delta(self, delta, new_version_id):
 
425
        """Apply delta to this object to become new_version_id."""
 
426
        raise NotImplementedError(self.apply_delta)
 
427
 
 
428
    def line_delta_iter(self, new_lines):
 
429
        """Generate line-based delta from this content to new_lines."""
 
430
        new_texts = new_lines.text()
 
431
        old_texts = self.text()
 
432
        s = patiencediff.PatienceSequenceMatcher(None, old_texts, new_texts)
 
433
        for tag, i1, i2, j1, j2 in s.get_opcodes():
 
434
            if tag == 'equal':
 
435
                continue
 
436
            # ofrom, oto, length, data
 
437
            yield i1, i2, j2 - j1, new_lines._lines[j1:j2]
 
438
 
 
439
    def line_delta(self, new_lines):
 
440
        return list(self.line_delta_iter(new_lines))
 
441
 
 
442
    @staticmethod
 
443
    def get_line_delta_blocks(knit_delta, source, target):
 
444
        """Extract SequenceMatcher.get_matching_blocks() from a knit delta"""
 
445
        target_len = len(target)
 
446
        s_pos = 0
 
447
        t_pos = 0
 
448
        for s_begin, s_end, t_len, new_text in knit_delta:
 
449
            true_n = s_begin - s_pos
 
450
            n = true_n
 
451
            if n > 0:
 
452
                # knit deltas do not provide reliable info about whether the
 
453
                # last line of a file matches, due to eol handling.
 
454
                if source[s_pos + n -1] != target[t_pos + n -1]:
 
455
                    n-=1
 
456
                if n > 0:
 
457
                    yield s_pos, t_pos, n
 
458
            t_pos += t_len + true_n
 
459
            s_pos = s_end
 
460
        n = target_len - t_pos
 
461
        if n > 0:
 
462
            if source[s_pos + n -1] != target[t_pos + n -1]:
 
463
                n-=1
 
464
            if n > 0:
 
465
                yield s_pos, t_pos, n
 
466
        yield s_pos + (target_len - t_pos), target_len, 0
 
467
 
 
468
 
 
469
class AnnotatedKnitContent(KnitContent):
 
470
    """Annotated content."""
 
471
 
 
472
    def __init__(self, lines):
 
473
        KnitContent.__init__(self)
 
474
        self._lines = lines
 
475
 
 
476
    def annotate(self):
 
477
        """Return a list of (origin, text) for each content line."""
 
478
        lines = self._lines[:]
 
479
        if self._should_strip_eol:
 
480
            origin, last_line = lines[-1]
 
481
            lines[-1] = (origin, last_line.rstrip('\n'))
 
482
        return lines
 
483
 
 
484
    def apply_delta(self, delta, new_version_id):
 
485
        """Apply delta to this object to become new_version_id."""
 
486
        offset = 0
 
487
        lines = self._lines
 
488
        for start, end, count, delta_lines in delta:
 
489
            lines[offset+start:offset+end] = delta_lines
 
490
            offset = offset + (start - end) + count
 
491
 
 
492
    def text(self):
 
493
        try:
 
494
            lines = [text for origin, text in self._lines]
 
495
        except ValueError, e:
 
496
            # most commonly (only?) caused by the internal form of the knit
 
497
            # missing annotation information because of a bug - see thread
 
498
            # around 20071015
 
499
            raise KnitCorrupt(self,
 
500
                "line in annotated knit missing annotation information: %s"
 
501
                % (e,))
 
502
        if self._should_strip_eol:
 
503
            lines[-1] = lines[-1].rstrip('\n')
 
504
        return lines
 
505
 
 
506
    def copy(self):
 
507
        return AnnotatedKnitContent(self._lines[:])
 
508
 
 
509
 
 
510
class PlainKnitContent(KnitContent):
 
511
    """Unannotated content.
 
512
 
 
513
    When annotate[_iter] is called on this content, the same version is reported
 
514
    for all lines. Generally, annotate[_iter] is not useful on PlainKnitContent
 
515
    objects.
 
516
    """
 
517
 
 
518
    def __init__(self, lines, version_id):
 
519
        KnitContent.__init__(self)
 
520
        self._lines = lines
 
521
        self._version_id = version_id
 
522
 
 
523
    def annotate(self):
 
524
        """Return a list of (origin, text) for each content line."""
 
525
        return [(self._version_id, line) for line in self._lines]
 
526
 
 
527
    def apply_delta(self, delta, new_version_id):
 
528
        """Apply delta to this object to become new_version_id."""
 
529
        offset = 0
 
530
        lines = self._lines
 
531
        for start, end, count, delta_lines in delta:
 
532
            lines[offset+start:offset+end] = delta_lines
 
533
            offset = offset + (start - end) + count
 
534
        self._version_id = new_version_id
 
535
 
 
536
    def copy(self):
 
537
        return PlainKnitContent(self._lines[:], self._version_id)
 
538
 
 
539
    def text(self):
 
540
        lines = self._lines
 
541
        if self._should_strip_eol:
 
542
            lines = lines[:]
 
543
            lines[-1] = lines[-1].rstrip('\n')
 
544
        return lines
 
545
 
 
546
 
 
547
class _KnitFactory(object):
 
548
    """Base class for common Factory functions."""
 
549
 
 
550
    def parse_record(self, version_id, record, record_details,
 
551
                     base_content, copy_base_content=True):
 
552
        """Parse a record into a full content object.
 
553
 
 
554
        :param version_id: The official version id for this content
 
555
        :param record: The data returned by read_records_iter()
 
556
        :param record_details: Details about the record returned by
 
557
            get_build_details
 
558
        :param base_content: If get_build_details returns a compression_parent,
 
559
            you must return a base_content here, else use None
 
560
        :param copy_base_content: When building from the base_content, decide
 
561
            you can either copy it and return a new object, or modify it in
 
562
            place.
 
563
        :return: (content, delta) A Content object and possibly a line-delta,
 
564
            delta may be None
 
565
        """
 
566
        method, noeol = record_details
 
567
        if method == 'line-delta':
 
568
            if copy_base_content:
 
569
                content = base_content.copy()
 
570
            else:
 
571
                content = base_content
 
572
            delta = self.parse_line_delta(record, version_id)
 
573
            content.apply_delta(delta, version_id)
 
574
        else:
 
575
            content = self.parse_fulltext(record, version_id)
 
576
            delta = None
 
577
        content._should_strip_eol = noeol
 
578
        return (content, delta)
 
579
 
 
580
 
 
581
class KnitAnnotateFactory(_KnitFactory):
 
582
    """Factory for creating annotated Content objects."""
 
583
 
 
584
    annotated = True
 
585
 
 
586
    def make(self, lines, version_id):
 
587
        num_lines = len(lines)
 
588
        return AnnotatedKnitContent(zip([version_id] * num_lines, lines))
 
589
 
 
590
    def parse_fulltext(self, content, version_id):
 
591
        """Convert fulltext to internal representation
 
592
 
 
593
        fulltext content is of the format
 
594
        revid(utf8) plaintext\n
 
595
        internal representation is of the format:
 
596
        (revid, plaintext)
 
597
        """
 
598
        # TODO: jam 20070209 The tests expect this to be returned as tuples,
 
599
        #       but the code itself doesn't really depend on that.
 
600
        #       Figure out a way to not require the overhead of turning the
 
601
        #       list back into tuples.
 
602
        lines = [tuple(line.split(' ', 1)) for line in content]
 
603
        return AnnotatedKnitContent(lines)
 
604
 
 
605
    def parse_line_delta_iter(self, lines):
 
606
        return iter(self.parse_line_delta(lines))
 
607
 
 
608
    def parse_line_delta(self, lines, version_id, plain=False):
 
609
        """Convert a line based delta into internal representation.
 
610
 
 
611
        line delta is in the form of:
 
612
        intstart intend intcount
 
613
        1..count lines:
 
614
        revid(utf8) newline\n
 
615
        internal representation is
 
616
        (start, end, count, [1..count tuples (revid, newline)])
 
617
 
 
618
        :param plain: If True, the lines are returned as a plain
 
619
            list without annotations, not as a list of (origin, content) tuples, i.e.
 
620
            (start, end, count, [1..count newline])
 
621
        """
 
622
        result = []
 
623
        lines = iter(lines)
 
624
        next = lines.next
 
625
 
 
626
        cache = {}
 
627
        def cache_and_return(line):
 
628
            origin, text = line.split(' ', 1)
 
629
            return cache.setdefault(origin, origin), text
 
630
 
 
631
        # walk through the lines parsing.
 
632
        # Note that the plain test is explicitly pulled out of the
 
633
        # loop to minimise any performance impact
 
634
        if plain:
 
635
            for header in lines:
 
636
                start, end, count = [int(n) for n in header.split(',')]
 
637
                contents = [next().split(' ', 1)[1] for i in xrange(count)]
 
638
                result.append((start, end, count, contents))
 
639
        else:
 
640
            for header in lines:
 
641
                start, end, count = [int(n) for n in header.split(',')]
 
642
                contents = [tuple(next().split(' ', 1)) for i in xrange(count)]
 
643
                result.append((start, end, count, contents))
 
644
        return result
 
645
 
 
646
    def get_fulltext_content(self, lines):
 
647
        """Extract just the content lines from a fulltext."""
 
648
        return (line.split(' ', 1)[1] for line in lines)
 
649
 
 
650
    def get_linedelta_content(self, lines):
 
651
        """Extract just the content from a line delta.
 
652
 
 
653
        This doesn't return all of the extra information stored in a delta.
 
654
        Only the actual content lines.
 
655
        """
 
656
        lines = iter(lines)
 
657
        next = lines.next
 
658
        for header in lines:
 
659
            header = header.split(',')
 
660
            count = int(header[2])
 
661
            for i in xrange(count):
 
662
                origin, text = next().split(' ', 1)
 
663
                yield text
 
664
 
 
665
    def lower_fulltext(self, content):
 
666
        """convert a fulltext content record into a serializable form.
 
667
 
 
668
        see parse_fulltext which this inverts.
 
669
        """
 
670
        return ['%s %s' % (o, t) for o, t in content._lines]
 
671
 
 
672
    def lower_line_delta(self, delta):
 
673
        """convert a delta into a serializable form.
 
674
 
 
675
        See parse_line_delta which this inverts.
 
676
        """
 
677
        # TODO: jam 20070209 We only do the caching thing to make sure that
 
678
        #       the origin is a valid utf-8 line, eventually we could remove it
 
679
        out = []
 
680
        for start, end, c, lines in delta:
 
681
            out.append('%d,%d,%d\n' % (start, end, c))
 
682
            out.extend(origin + ' ' + text
 
683
                       for origin, text in lines)
 
684
        return out
 
685
 
 
686
    def annotate(self, knit, key):
 
687
        content = knit._get_content(key)
 
688
        # adjust for the fact that serialised annotations are only key suffixes
 
689
        # for this factory.
 
690
        if type(key) is tuple:
 
691
            prefix = key[:-1]
 
692
            origins = content.annotate()
 
693
            result = []
 
694
            for origin, line in origins:
 
695
                result.append((prefix + (origin,), line))
 
696
            return result
 
697
        else:
 
698
            # XXX: This smells a bit.  Why would key ever be a non-tuple here?
 
699
            # Aren't keys defined to be tuples?  -- spiv 20080618
 
700
            return content.annotate()
 
701
 
 
702
 
 
703
class KnitPlainFactory(_KnitFactory):
 
704
    """Factory for creating plain Content objects."""
 
705
 
 
706
    annotated = False
 
707
 
 
708
    def make(self, lines, version_id):
 
709
        return PlainKnitContent(lines, version_id)
 
710
 
 
711
    def parse_fulltext(self, content, version_id):
 
712
        """This parses an unannotated fulltext.
 
713
 
 
714
        Note that this is not a noop - the internal representation
 
715
        has (versionid, line) - its just a constant versionid.
 
716
        """
 
717
        return self.make(content, version_id)
 
718
 
 
719
    def parse_line_delta_iter(self, lines, version_id):
 
720
        cur = 0
 
721
        num_lines = len(lines)
 
722
        while cur < num_lines:
 
723
            header = lines[cur]
 
724
            cur += 1
 
725
            start, end, c = [int(n) for n in header.split(',')]
 
726
            yield start, end, c, lines[cur:cur+c]
 
727
            cur += c
 
728
 
 
729
    def parse_line_delta(self, lines, version_id):
 
730
        return list(self.parse_line_delta_iter(lines, version_id))
 
731
 
 
732
    def get_fulltext_content(self, lines):
 
733
        """Extract just the content lines from a fulltext."""
 
734
        return iter(lines)
 
735
 
 
736
    def get_linedelta_content(self, lines):
 
737
        """Extract just the content from a line delta.
 
738
 
 
739
        This doesn't return all of the extra information stored in a delta.
 
740
        Only the actual content lines.
 
741
        """
 
742
        lines = iter(lines)
 
743
        next = lines.next
 
744
        for header in lines:
 
745
            header = header.split(',')
 
746
            count = int(header[2])
 
747
            for i in xrange(count):
 
748
                yield next()
 
749
 
 
750
    def lower_fulltext(self, content):
 
751
        return content.text()
 
752
 
 
753
    def lower_line_delta(self, delta):
 
754
        out = []
 
755
        for start, end, c, lines in delta:
 
756
            out.append('%d,%d,%d\n' % (start, end, c))
 
757
            out.extend(lines)
 
758
        return out
 
759
 
 
760
    def annotate(self, knit, key):
 
761
        annotator = _KnitAnnotator(knit)
 
762
        return annotator.annotate_flat(key)
 
763
 
 
764
 
 
765
 
 
766
def make_file_factory(annotated, mapper):
 
767
    """Create a factory for creating a file based KnitVersionedFiles.
 
768
 
 
769
    This is only functional enough to run interface tests, it doesn't try to
 
770
    provide a full pack environment.
 
771
 
 
772
    :param annotated: knit annotations are wanted.
 
773
    :param mapper: The mapper from keys to paths.
 
774
    """
 
775
    def factory(transport):
 
776
        index = _KndxIndex(transport, mapper, lambda:None, lambda:True, lambda:True)
 
777
        access = _KnitKeyAccess(transport, mapper)
 
778
        return KnitVersionedFiles(index, access, annotated=annotated)
 
779
    return factory
 
780
 
 
781
 
 
782
def make_pack_factory(graph, delta, keylength):
 
783
    """Create a factory for creating a pack based VersionedFiles.
 
784
 
 
785
    This is only functional enough to run interface tests, it doesn't try to
 
786
    provide a full pack environment.
 
787
 
 
788
    :param graph: Store a graph.
 
789
    :param delta: Delta compress contents.
 
790
    :param keylength: How long should keys be.
 
791
    """
 
792
    def factory(transport):
 
793
        parents = graph or delta
 
794
        ref_length = 0
 
795
        if graph:
 
796
            ref_length += 1
 
797
        if delta:
 
798
            ref_length += 1
 
799
            max_delta_chain = 200
 
800
        else:
 
801
            max_delta_chain = 0
 
802
        graph_index = _mod_index.InMemoryGraphIndex(reference_lists=ref_length,
 
803
            key_elements=keylength)
 
804
        stream = transport.open_write_stream('newpack')
 
805
        writer = pack.ContainerWriter(stream.write)
 
806
        writer.begin()
 
807
        index = _KnitGraphIndex(graph_index, lambda:True, parents=parents,
 
808
            deltas=delta, add_callback=graph_index.add_nodes)
 
809
        access = _DirectPackAccess({})
 
810
        access.set_writer(writer, graph_index, (transport, 'newpack'))
 
811
        result = KnitVersionedFiles(index, access,
 
812
            max_delta_chain=max_delta_chain)
 
813
        result.stream = stream
 
814
        result.writer = writer
 
815
        return result
 
816
    return factory
 
817
 
 
818
 
 
819
def cleanup_pack_knit(versioned_files):
 
820
    versioned_files.stream.close()
 
821
    versioned_files.writer.end()
 
822
 
 
823
 
 
824
def _get_total_build_size(self, keys, positions):
 
825
    """Determine the total bytes to build these keys.
 
826
 
 
827
    (helper function because _KnitGraphIndex and _KndxIndex work the same, but
 
828
    don't inherit from a common base.)
 
829
 
 
830
    :param keys: Keys that we want to build
 
831
    :param positions: dict of {key, (info, index_memo, comp_parent)} (such
 
832
        as returned by _get_components_positions)
 
833
    :return: Number of bytes to build those keys
 
834
    """
 
835
    all_build_index_memos = {}
 
836
    build_keys = keys
 
837
    while build_keys:
 
838
        next_keys = set()
 
839
        for key in build_keys:
 
840
            # This is mostly for the 'stacked' case
 
841
            # Where we will be getting the data from a fallback
 
842
            if key not in positions:
 
843
                continue
 
844
            _, index_memo, compression_parent = positions[key]
 
845
            all_build_index_memos[key] = index_memo
 
846
            if compression_parent not in all_build_index_memos:
 
847
                next_keys.add(compression_parent)
 
848
        build_keys = next_keys
 
849
    return sum([index_memo[2] for index_memo
 
850
                in all_build_index_memos.itervalues()])
 
851
 
 
852
 
 
853
class KnitVersionedFiles(VersionedFiles):
 
854
    """Storage for many versioned files using knit compression.
 
855
 
 
856
    Backend storage is managed by indices and data objects.
 
857
 
 
858
    :ivar _index: A _KnitGraphIndex or similar that can describe the
 
859
        parents, graph, compression and data location of entries in this
 
860
        KnitVersionedFiles.  Note that this is only the index for
 
861
        *this* vfs; if there are fallbacks they must be queried separately.
 
862
    """
 
863
 
 
864
    def __init__(self, index, data_access, max_delta_chain=200,
 
865
                 annotated=False, reload_func=None):
 
866
        """Create a KnitVersionedFiles with index and data_access.
 
867
 
 
868
        :param index: The index for the knit data.
 
869
        :param data_access: The access object to store and retrieve knit
 
870
            records.
 
871
        :param max_delta_chain: The maximum number of deltas to permit during
 
872
            insertion. Set to 0 to prohibit the use of deltas.
 
873
        :param annotated: Set to True to cause annotations to be calculated and
 
874
            stored during insertion.
 
875
        :param reload_func: An function that can be called if we think we need
 
876
            to reload the pack listing and try again. See
 
877
            'bzrlib.repofmt.pack_repo.AggregateIndex' for the signature.
 
878
        """
 
879
        self._index = index
 
880
        self._access = data_access
 
881
        self._max_delta_chain = max_delta_chain
 
882
        if annotated:
 
883
            self._factory = KnitAnnotateFactory()
 
884
        else:
 
885
            self._factory = KnitPlainFactory()
 
886
        self._fallback_vfs = []
 
887
        self._reload_func = reload_func
 
888
 
 
889
    def __repr__(self):
 
890
        return "%s(%r, %r)" % (
 
891
            self.__class__.__name__,
 
892
            self._index,
 
893
            self._access)
 
894
 
 
895
    def add_fallback_versioned_files(self, a_versioned_files):
 
896
        """Add a source of texts for texts not present in this knit.
 
897
 
 
898
        :param a_versioned_files: A VersionedFiles object.
 
899
        """
 
900
        self._fallback_vfs.append(a_versioned_files)
 
901
 
 
902
    def add_lines(self, key, parents, lines, parent_texts=None,
 
903
        left_matching_blocks=None, nostore_sha=None, random_id=False,
 
904
        check_content=True):
 
905
        """See VersionedFiles.add_lines()."""
 
906
        self._index._check_write_ok()
 
907
        self._check_add(key, lines, random_id, check_content)
 
908
        if parents is None:
 
909
            # The caller might pass None if there is no graph data, but kndx
 
910
            # indexes can't directly store that, so we give them
 
911
            # an empty tuple instead.
 
912
            parents = ()
 
913
        line_bytes = ''.join(lines)
 
914
        return self._add(key, lines, parents,
 
915
            parent_texts, left_matching_blocks, nostore_sha, random_id,
 
916
            line_bytes=line_bytes)
 
917
 
 
918
    def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
 
919
        """See VersionedFiles._add_text()."""
 
920
        self._index._check_write_ok()
 
921
        self._check_add(key, None, random_id, check_content=False)
 
922
        if text.__class__ is not str:
 
923
            raise errors.BzrBadParameterUnicode("text")
 
924
        if parents is None:
 
925
            # The caller might pass None if there is no graph data, but kndx
 
926
            # indexes can't directly store that, so we give them
 
927
            # an empty tuple instead.
 
928
            parents = ()
 
929
        return self._add(key, None, parents,
 
930
            None, None, nostore_sha, random_id,
 
931
            line_bytes=text)
 
932
 
 
933
    def _add(self, key, lines, parents, parent_texts,
 
934
        left_matching_blocks, nostore_sha, random_id,
 
935
        line_bytes):
 
936
        """Add a set of lines on top of version specified by parents.
 
937
 
 
938
        Any versions not present will be converted into ghosts.
 
939
 
 
940
        :param lines: A list of strings where each one is a single line (has a
 
941
            single newline at the end of the string) This is now optional
 
942
            (callers can pass None). It is left in its location for backwards
 
943
            compatibility. It should ''.join(lines) must == line_bytes
 
944
        :param line_bytes: A single string containing the content
 
945
 
 
946
        We pass both lines and line_bytes because different routes bring the
 
947
        values to this function. And for memory efficiency, we don't want to
 
948
        have to split/join on-demand.
 
949
        """
 
950
        # first thing, if the content is something we don't need to store, find
 
951
        # that out.
 
952
        digest = sha_string(line_bytes)
 
953
        if nostore_sha == digest:
 
954
            raise errors.ExistingContent
 
955
 
 
956
        present_parents = []
 
957
        if parent_texts is None:
 
958
            parent_texts = {}
 
959
        # Do a single query to ascertain parent presence; we only compress
 
960
        # against parents in the same kvf.
 
961
        present_parent_map = self._index.get_parent_map(parents)
 
962
        for parent in parents:
 
963
            if parent in present_parent_map:
 
964
                present_parents.append(parent)
 
965
 
 
966
        # Currently we can only compress against the left most present parent.
 
967
        if (len(present_parents) == 0 or
 
968
            present_parents[0] != parents[0]):
 
969
            delta = False
 
970
        else:
 
971
            # To speed the extract of texts the delta chain is limited
 
972
            # to a fixed number of deltas.  This should minimize both
 
973
            # I/O and the time spend applying deltas.
 
974
            delta = self._check_should_delta(present_parents[0])
 
975
 
 
976
        text_length = len(line_bytes)
 
977
        options = []
 
978
        no_eol = False
 
979
        # Note: line_bytes is not modified to add a newline, that is tracked
 
980
        #       via the no_eol flag. 'lines' *is* modified, because that is the
 
981
        #       general values needed by the Content code.
 
982
        if line_bytes and line_bytes[-1] != '\n':
 
983
            options.append('no-eol')
 
984
            no_eol = True
 
985
            # Copy the existing list, or create a new one
 
986
            if lines is None:
 
987
                lines = osutils.split_lines(line_bytes)
 
988
            else:
 
989
                lines = lines[:]
 
990
            # Replace the last line with one that ends in a final newline
 
991
            lines[-1] = lines[-1] + '\n'
 
992
        if lines is None:
 
993
            lines = osutils.split_lines(line_bytes)
 
994
 
 
995
        for element in key[:-1]:
 
996
            if type(element) is not str:
 
997
                raise TypeError("key contains non-strings: %r" % (key,))
 
998
        if key[-1] is None:
 
999
            key = key[:-1] + ('sha1:' + digest,)
 
1000
        elif type(key[-1]) is not str:
 
1001
                raise TypeError("key contains non-strings: %r" % (key,))
 
1002
        # Knit hunks are still last-element only
 
1003
        version_id = key[-1]
 
1004
        content = self._factory.make(lines, version_id)
 
1005
        if no_eol:
 
1006
            # Hint to the content object that its text() call should strip the
 
1007
            # EOL.
 
1008
            content._should_strip_eol = True
 
1009
        if delta or (self._factory.annotated and len(present_parents) > 0):
 
1010
            # Merge annotations from parent texts if needed.
 
1011
            delta_hunks = self._merge_annotations(content, present_parents,
 
1012
                parent_texts, delta, self._factory.annotated,
 
1013
                left_matching_blocks)
 
1014
 
 
1015
        if delta:
 
1016
            options.append('line-delta')
 
1017
            store_lines = self._factory.lower_line_delta(delta_hunks)
 
1018
            size, bytes = self._record_to_data(key, digest,
 
1019
                store_lines)
 
1020
        else:
 
1021
            options.append('fulltext')
 
1022
            # isinstance is slower and we have no hierarchy.
 
1023
            if self._factory.__class__ is KnitPlainFactory:
 
1024
                # Use the already joined bytes saving iteration time in
 
1025
                # _record_to_data.
 
1026
                dense_lines = [line_bytes]
 
1027
                if no_eol:
 
1028
                    dense_lines.append('\n')
 
1029
                size, bytes = self._record_to_data(key, digest,
 
1030
                    lines, dense_lines)
 
1031
            else:
 
1032
                # get mixed annotation + content and feed it into the
 
1033
                # serialiser.
 
1034
                store_lines = self._factory.lower_fulltext(content)
 
1035
                size, bytes = self._record_to_data(key, digest,
 
1036
                    store_lines)
 
1037
 
 
1038
        access_memo = self._access.add_raw_records([(key, size)], bytes)[0]
 
1039
        self._index.add_records(
 
1040
            ((key, options, access_memo, parents),),
 
1041
            random_id=random_id)
 
1042
        return digest, text_length, content
 
1043
 
 
1044
    def annotate(self, key):
 
1045
        """See VersionedFiles.annotate."""
 
1046
        return self._factory.annotate(self, key)
 
1047
 
 
1048
    def get_annotator(self):
 
1049
        return _KnitAnnotator(self)
 
1050
 
 
1051
    def check(self, progress_bar=None, keys=None):
 
1052
        """See VersionedFiles.check()."""
 
1053
        if keys is None:
 
1054
            return self._logical_check()
 
1055
        else:
 
1056
            # At the moment, check does not extra work over get_record_stream
 
1057
            return self.get_record_stream(keys, 'unordered', True)
 
1058
 
 
1059
    def _logical_check(self):
 
1060
        # This doesn't actually test extraction of everything, but that will
 
1061
        # impact 'bzr check' substantially, and needs to be integrated with
 
1062
        # care. However, it does check for the obvious problem of a delta with
 
1063
        # no basis.
 
1064
        keys = self._index.keys()
 
1065
        parent_map = self.get_parent_map(keys)
 
1066
        for key in keys:
 
1067
            if self._index.get_method(key) != 'fulltext':
 
1068
                compression_parent = parent_map[key][0]
 
1069
                if compression_parent not in parent_map:
 
1070
                    raise errors.KnitCorrupt(self,
 
1071
                        "Missing basis parent %s for %s" % (
 
1072
                        compression_parent, key))
 
1073
        for fallback_vfs in self._fallback_vfs:
 
1074
            fallback_vfs.check()
 
1075
 
 
1076
    def _check_add(self, key, lines, random_id, check_content):
 
1077
        """check that version_id and lines are safe to add."""
 
1078
        version_id = key[-1]
 
1079
        if version_id is not None:
 
1080
            if contains_whitespace(version_id):
 
1081
                raise InvalidRevisionId(version_id, self)
 
1082
            self.check_not_reserved_id(version_id)
 
1083
        # TODO: If random_id==False and the key is already present, we should
 
1084
        # probably check that the existing content is identical to what is
 
1085
        # being inserted, and otherwise raise an exception.  This would make
 
1086
        # the bundle code simpler.
 
1087
        if check_content:
 
1088
            self._check_lines_not_unicode(lines)
 
1089
            self._check_lines_are_lines(lines)
 
1090
 
 
1091
    def _check_header(self, key, line):
 
1092
        rec = self._split_header(line)
 
1093
        self._check_header_version(rec, key[-1])
 
1094
        return rec
 
1095
 
 
1096
    def _check_header_version(self, rec, version_id):
 
1097
        """Checks the header version on original format knit records.
 
1098
 
 
1099
        These have the last component of the key embedded in the record.
 
1100
        """
 
1101
        if rec[1] != version_id:
 
1102
            raise KnitCorrupt(self,
 
1103
                'unexpected version, wanted %r, got %r' % (version_id, rec[1]))
 
1104
 
 
1105
    def _check_should_delta(self, parent):
 
1106
        """Iterate back through the parent listing, looking for a fulltext.
 
1107
 
 
1108
        This is used when we want to decide whether to add a delta or a new
 
1109
        fulltext. It searches for _max_delta_chain parents. When it finds a
 
1110
        fulltext parent, it sees if the total size of the deltas leading up to
 
1111
        it is large enough to indicate that we want a new full text anyway.
 
1112
 
 
1113
        Return True if we should create a new delta, False if we should use a
 
1114
        full text.
 
1115
        """
 
1116
        delta_size = 0
 
1117
        fulltext_size = None
 
1118
        for count in xrange(self._max_delta_chain):
 
1119
            try:
 
1120
                # Note that this only looks in the index of this particular
 
1121
                # KnitVersionedFiles, not in the fallbacks.  This ensures that
 
1122
                # we won't store a delta spanning physical repository
 
1123
                # boundaries.
 
1124
                build_details = self._index.get_build_details([parent])
 
1125
                parent_details = build_details[parent]
 
1126
            except (RevisionNotPresent, KeyError), e:
 
1127
                # Some basis is not locally present: always fulltext
 
1128
                return False
 
1129
            index_memo, compression_parent, _, _ = parent_details
 
1130
            _, _, size = index_memo
 
1131
            if compression_parent is None:
 
1132
                fulltext_size = size
 
1133
                break
 
1134
            delta_size += size
 
1135
            # We don't explicitly check for presence because this is in an
 
1136
            # inner loop, and if it's missing it'll fail anyhow.
 
1137
            parent = compression_parent
 
1138
        else:
 
1139
            # We couldn't find a fulltext, so we must create a new one
 
1140
            return False
 
1141
        # Simple heuristic - if the total I/O wold be greater as a delta than
 
1142
        # the originally installed fulltext, we create a new fulltext.
 
1143
        return fulltext_size > delta_size
 
1144
 
 
1145
    def _build_details_to_components(self, build_details):
 
1146
        """Convert a build_details tuple to a position tuple."""
 
1147
        # record_details, access_memo, compression_parent
 
1148
        return build_details[3], build_details[0], build_details[1]
 
1149
 
 
1150
    def _get_components_positions(self, keys, allow_missing=False):
 
1151
        """Produce a map of position data for the components of keys.
 
1152
 
 
1153
        This data is intended to be used for retrieving the knit records.
 
1154
 
 
1155
        A dict of key to (record_details, index_memo, next, parents) is
 
1156
        returned.
 
1157
        method is the way referenced data should be applied.
 
1158
        index_memo is the handle to pass to the data access to actually get the
 
1159
            data
 
1160
        next is the build-parent of the version, or None for fulltexts.
 
1161
        parents is the version_ids of the parents of this version
 
1162
 
 
1163
        :param allow_missing: If True do not raise an error on a missing component,
 
1164
            just ignore it.
 
1165
        """
 
1166
        component_data = {}
 
1167
        pending_components = keys
 
1168
        while pending_components:
 
1169
            build_details = self._index.get_build_details(pending_components)
 
1170
            current_components = set(pending_components)
 
1171
            pending_components = set()
 
1172
            for key, details in build_details.iteritems():
 
1173
                (index_memo, compression_parent, parents,
 
1174
                 record_details) = details
 
1175
                method = record_details[0]
 
1176
                if compression_parent is not None:
 
1177
                    pending_components.add(compression_parent)
 
1178
                component_data[key] = self._build_details_to_components(details)
 
1179
            missing = current_components.difference(build_details)
 
1180
            if missing and not allow_missing:
 
1181
                raise errors.RevisionNotPresent(missing.pop(), self)
 
1182
        return component_data
 
1183
 
 
1184
    def _get_content(self, key, parent_texts={}):
 
1185
        """Returns a content object that makes up the specified
 
1186
        version."""
 
1187
        cached_version = parent_texts.get(key, None)
 
1188
        if cached_version is not None:
 
1189
            # Ensure the cache dict is valid.
 
1190
            if not self.get_parent_map([key]):
 
1191
                raise RevisionNotPresent(key, self)
 
1192
            return cached_version
 
1193
        generator = _VFContentMapGenerator(self, [key])
 
1194
        return generator._get_content(key)
 
1195
 
 
1196
    def get_known_graph_ancestry(self, keys):
 
1197
        """Get a KnownGraph instance with the ancestry of keys."""
 
1198
        parent_map, missing_keys = self._index.find_ancestry(keys)
 
1199
        for fallback in self._transitive_fallbacks():
 
1200
            if not missing_keys:
 
1201
                break
 
1202
            (f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
 
1203
                                                missing_keys)
 
1204
            parent_map.update(f_parent_map)
 
1205
            missing_keys = f_missing_keys
 
1206
        kg = _mod_graph.KnownGraph(parent_map)
 
1207
        return kg
 
1208
 
 
1209
    def get_parent_map(self, keys):
 
1210
        """Get a map of the graph parents of keys.
 
1211
 
 
1212
        :param keys: The keys to look up parents for.
 
1213
        :return: A mapping from keys to parents. Absent keys are absent from
 
1214
            the mapping.
 
1215
        """
 
1216
        return self._get_parent_map_with_sources(keys)[0]
 
1217
 
 
1218
    def _get_parent_map_with_sources(self, keys):
 
1219
        """Get a map of the parents of keys.
 
1220
 
 
1221
        :param keys: The keys to look up parents for.
 
1222
        :return: A tuple. The first element is a mapping from keys to parents.
 
1223
            Absent keys are absent from the mapping. The second element is a
 
1224
            list with the locations each key was found in. The first element
 
1225
            is the in-this-knit parents, the second the first fallback source,
 
1226
            and so on.
 
1227
        """
 
1228
        result = {}
 
1229
        sources = [self._index] + self._fallback_vfs
 
1230
        source_results = []
 
1231
        missing = set(keys)
 
1232
        for source in sources:
 
1233
            if not missing:
 
1234
                break
 
1235
            new_result = source.get_parent_map(missing)
 
1236
            source_results.append(new_result)
 
1237
            result.update(new_result)
 
1238
            missing.difference_update(set(new_result))
 
1239
        return result, source_results
 
1240
 
 
1241
    def _get_record_map(self, keys, allow_missing=False):
 
1242
        """Produce a dictionary of knit records.
 
1243
 
 
1244
        :return: {key:(record, record_details, digest, next)}
 
1245
            record
 
1246
                data returned from read_records (a KnitContentobject)
 
1247
            record_details
 
1248
                opaque information to pass to parse_record
 
1249
            digest
 
1250
                SHA1 digest of the full text after all steps are done
 
1251
            next
 
1252
                build-parent of the version, i.e. the leftmost ancestor.
 
1253
                Will be None if the record is not a delta.
 
1254
        :param keys: The keys to build a map for
 
1255
        :param allow_missing: If some records are missing, rather than
 
1256
            error, just return the data that could be generated.
 
1257
        """
 
1258
        raw_map = self._get_record_map_unparsed(keys,
 
1259
            allow_missing=allow_missing)
 
1260
        return self._raw_map_to_record_map(raw_map)
 
1261
 
 
1262
    def _raw_map_to_record_map(self, raw_map):
 
1263
        """Parse the contents of _get_record_map_unparsed.
 
1264
 
 
1265
        :return: see _get_record_map.
 
1266
        """
 
1267
        result = {}
 
1268
        for key in raw_map:
 
1269
            data, record_details, next = raw_map[key]
 
1270
            content, digest = self._parse_record(key[-1], data)
 
1271
            result[key] = content, record_details, digest, next
 
1272
        return result
 
1273
 
 
1274
    def _get_record_map_unparsed(self, keys, allow_missing=False):
 
1275
        """Get the raw data for reconstructing keys without parsing it.
 
1276
 
 
1277
        :return: A dict suitable for parsing via _raw_map_to_record_map.
 
1278
            key-> raw_bytes, (method, noeol), compression_parent
 
1279
        """
 
1280
        # This retries the whole request if anything fails. Potentially we
 
1281
        # could be a bit more selective. We could track the keys whose records
 
1282
        # we have successfully found, and then only request the new records
 
1283
        # from there. However, _get_components_positions grabs the whole build
 
1284
        # chain, which means we'll likely try to grab the same records again
 
1285
        # anyway. Also, can the build chains change as part of a pack
 
1286
        # operation? We wouldn't want to end up with a broken chain.
 
1287
        while True:
 
1288
            try:
 
1289
                position_map = self._get_components_positions(keys,
 
1290
                    allow_missing=allow_missing)
 
1291
                # key = component_id, r = record_details, i_m = index_memo,
 
1292
                # n = next
 
1293
                records = [(key, i_m) for key, (r, i_m, n)
 
1294
                                       in position_map.iteritems()]
 
1295
                # Sort by the index memo, so that we request records from the
 
1296
                # same pack file together, and in forward-sorted order
 
1297
                records.sort(key=operator.itemgetter(1))
 
1298
                raw_record_map = {}
 
1299
                for key, data in self._read_records_iter_unchecked(records):
 
1300
                    (record_details, index_memo, next) = position_map[key]
 
1301
                    raw_record_map[key] = data, record_details, next
 
1302
                return raw_record_map
 
1303
            except errors.RetryWithNewPacks, e:
 
1304
                self._access.reload_or_raise(e)
 
1305
 
 
1306
    @classmethod
 
1307
    def _split_by_prefix(cls, keys):
 
1308
        """For the given keys, split them up based on their prefix.
 
1309
 
 
1310
        To keep memory pressure somewhat under control, split the
 
1311
        requests back into per-file-id requests, otherwise "bzr co"
 
1312
        extracts the full tree into memory before writing it to disk.
 
1313
        This should be revisited if _get_content_maps() can ever cross
 
1314
        file-id boundaries.
 
1315
 
 
1316
        The keys for a given file_id are kept in the same relative order.
 
1317
        Ordering between file_ids is not, though prefix_order will return the
 
1318
        order that the key was first seen.
 
1319
 
 
1320
        :param keys: An iterable of key tuples
 
1321
        :return: (split_map, prefix_order)
 
1322
            split_map       A dictionary mapping prefix => keys
 
1323
            prefix_order    The order that we saw the various prefixes
 
1324
        """
 
1325
        split_by_prefix = {}
 
1326
        prefix_order = []
 
1327
        for key in keys:
 
1328
            if len(key) == 1:
 
1329
                prefix = ''
 
1330
            else:
 
1331
                prefix = key[0]
 
1332
 
 
1333
            if prefix in split_by_prefix:
 
1334
                split_by_prefix[prefix].append(key)
 
1335
            else:
 
1336
                split_by_prefix[prefix] = [key]
 
1337
                prefix_order.append(prefix)
 
1338
        return split_by_prefix, prefix_order
 
1339
 
 
1340
    def _group_keys_for_io(self, keys, non_local_keys, positions,
 
1341
                           _min_buffer_size=_STREAM_MIN_BUFFER_SIZE):
 
1342
        """For the given keys, group them into 'best-sized' requests.
 
1343
 
 
1344
        The idea is to avoid making 1 request per file, but to never try to
 
1345
        unpack an entire 1.5GB source tree in a single pass. Also when
 
1346
        possible, we should try to group requests to the same pack file
 
1347
        together.
 
1348
 
 
1349
        :return: list of (keys, non_local) tuples that indicate what keys
 
1350
            should be fetched next.
 
1351
        """
 
1352
        # TODO: Ideally we would group on 2 factors. We want to extract texts
 
1353
        #       from the same pack file together, and we want to extract all
 
1354
        #       the texts for a given build-chain together. Ultimately it
 
1355
        #       probably needs a better global view.
 
1356
        total_keys = len(keys)
 
1357
        prefix_split_keys, prefix_order = self._split_by_prefix(keys)
 
1358
        prefix_split_non_local_keys, _ = self._split_by_prefix(non_local_keys)
 
1359
        cur_keys = []
 
1360
        cur_non_local = set()
 
1361
        cur_size = 0
 
1362
        result = []
 
1363
        sizes = []
 
1364
        for prefix in prefix_order:
 
1365
            keys = prefix_split_keys[prefix]
 
1366
            non_local = prefix_split_non_local_keys.get(prefix, [])
 
1367
 
 
1368
            this_size = self._index._get_total_build_size(keys, positions)
 
1369
            cur_size += this_size
 
1370
            cur_keys.extend(keys)
 
1371
            cur_non_local.update(non_local)
 
1372
            if cur_size > _min_buffer_size:
 
1373
                result.append((cur_keys, cur_non_local))
 
1374
                sizes.append(cur_size)
 
1375
                cur_keys = []
 
1376
                cur_non_local = set()
 
1377
                cur_size = 0
 
1378
        if cur_keys:
 
1379
            result.append((cur_keys, cur_non_local))
 
1380
            sizes.append(cur_size)
 
1381
        return result
 
1382
 
 
1383
    def get_record_stream(self, keys, ordering, include_delta_closure):
 
1384
        """Get a stream of records for keys.
 
1385
 
 
1386
        :param keys: The keys to include.
 
1387
        :param ordering: Either 'unordered' or 'topological'. A topologically
 
1388
            sorted stream has compression parents strictly before their
 
1389
            children.
 
1390
        :param include_delta_closure: If True then the closure across any
 
1391
            compression parents will be included (in the opaque data).
 
1392
        :return: An iterator of ContentFactory objects, each of which is only
 
1393
            valid until the iterator is advanced.
 
1394
        """
 
1395
        # keys might be a generator
 
1396
        keys = set(keys)
 
1397
        if not keys:
 
1398
            return
 
1399
        if not self._index.has_graph:
 
1400
            # Cannot sort when no graph has been stored.
 
1401
            ordering = 'unordered'
 
1402
 
 
1403
        remaining_keys = keys
 
1404
        while True:
 
1405
            try:
 
1406
                keys = set(remaining_keys)
 
1407
                for content_factory in self._get_remaining_record_stream(keys,
 
1408
                                            ordering, include_delta_closure):
 
1409
                    remaining_keys.discard(content_factory.key)
 
1410
                    yield content_factory
 
1411
                return
 
1412
            except errors.RetryWithNewPacks, e:
 
1413
                self._access.reload_or_raise(e)
 
1414
 
 
1415
    def _get_remaining_record_stream(self, keys, ordering,
 
1416
                                     include_delta_closure):
 
1417
        """This function is the 'retry' portion for get_record_stream."""
 
1418
        if include_delta_closure:
 
1419
            positions = self._get_components_positions(keys, allow_missing=True)
 
1420
        else:
 
1421
            build_details = self._index.get_build_details(keys)
 
1422
            # map from key to
 
1423
            # (record_details, access_memo, compression_parent_key)
 
1424
            positions = dict((key, self._build_details_to_components(details))
 
1425
                for key, details in build_details.iteritems())
 
1426
        absent_keys = keys.difference(set(positions))
 
1427
        # There may be more absent keys : if we're missing the basis component
 
1428
        # and are trying to include the delta closure.
 
1429
        # XXX: We should not ever need to examine remote sources because we do
 
1430
        # not permit deltas across versioned files boundaries.
 
1431
        if include_delta_closure:
 
1432
            needed_from_fallback = set()
 
1433
            # Build up reconstructable_keys dict.  key:True in this dict means
 
1434
            # the key can be reconstructed.
 
1435
            reconstructable_keys = {}
 
1436
            for key in keys:
 
1437
                # the delta chain
 
1438
                try:
 
1439
                    chain = [key, positions[key][2]]
 
1440
                except KeyError:
 
1441
                    needed_from_fallback.add(key)
 
1442
                    continue
 
1443
                result = True
 
1444
                while chain[-1] is not None:
 
1445
                    if chain[-1] in reconstructable_keys:
 
1446
                        result = reconstructable_keys[chain[-1]]
 
1447
                        break
 
1448
                    else:
 
1449
                        try:
 
1450
                            chain.append(positions[chain[-1]][2])
 
1451
                        except KeyError:
 
1452
                            # missing basis component
 
1453
                            needed_from_fallback.add(chain[-1])
 
1454
                            result = True
 
1455
                            break
 
1456
                for chain_key in chain[:-1]:
 
1457
                    reconstructable_keys[chain_key] = result
 
1458
                if not result:
 
1459
                    needed_from_fallback.add(key)
 
1460
        # Double index lookups here : need a unified api ?
 
1461
        global_map, parent_maps = self._get_parent_map_with_sources(keys)
 
1462
        if ordering in ('topological', 'groupcompress'):
 
1463
            if ordering == 'topological':
 
1464
                # Global topological sort
 
1465
                present_keys = tsort.topo_sort(global_map)
 
1466
            else:
 
1467
                present_keys = sort_groupcompress(global_map)
 
1468
            # Now group by source:
 
1469
            source_keys = []
 
1470
            current_source = None
 
1471
            for key in present_keys:
 
1472
                for parent_map in parent_maps:
 
1473
                    if key in parent_map:
 
1474
                        key_source = parent_map
 
1475
                        break
 
1476
                if current_source is not key_source:
 
1477
                    source_keys.append((key_source, []))
 
1478
                    current_source = key_source
 
1479
                source_keys[-1][1].append(key)
 
1480
        else:
 
1481
            if ordering != 'unordered':
 
1482
                raise AssertionError('valid values for ordering are:'
 
1483
                    ' "unordered", "groupcompress" or "topological" not: %r'
 
1484
                    % (ordering,))
 
1485
            # Just group by source; remote sources first.
 
1486
            present_keys = []
 
1487
            source_keys = []
 
1488
            for parent_map in reversed(parent_maps):
 
1489
                source_keys.append((parent_map, []))
 
1490
                for key in parent_map:
 
1491
                    present_keys.append(key)
 
1492
                    source_keys[-1][1].append(key)
 
1493
            # We have been requested to return these records in an order that
 
1494
            # suits us. So we ask the index to give us an optimally sorted
 
1495
            # order.
 
1496
            for source, sub_keys in source_keys:
 
1497
                if source is parent_maps[0]:
 
1498
                    # Only sort the keys for this VF
 
1499
                    self._index._sort_keys_by_io(sub_keys, positions)
 
1500
        absent_keys = keys - set(global_map)
 
1501
        for key in absent_keys:
 
1502
            yield AbsentContentFactory(key)
 
1503
        # restrict our view to the keys we can answer.
 
1504
        # XXX: Memory: TODO: batch data here to cap buffered data at (say) 1MB.
 
1505
        # XXX: At that point we need to consider the impact of double reads by
 
1506
        # utilising components multiple times.
 
1507
        if include_delta_closure:
 
1508
            # XXX: get_content_maps performs its own index queries; allow state
 
1509
            # to be passed in.
 
1510
            non_local_keys = needed_from_fallback - absent_keys
 
1511
            for keys, non_local_keys in self._group_keys_for_io(present_keys,
 
1512
                                                                non_local_keys,
 
1513
                                                                positions):
 
1514
                generator = _VFContentMapGenerator(self, keys, non_local_keys,
 
1515
                                                   global_map,
 
1516
                                                   ordering=ordering)
 
1517
                for record in generator.get_record_stream():
 
1518
                    yield record
 
1519
        else:
 
1520
            for source, keys in source_keys:
 
1521
                if source is parent_maps[0]:
 
1522
                    # this KnitVersionedFiles
 
1523
                    records = [(key, positions[key][1]) for key in keys]
 
1524
                    for key, raw_data in self._read_records_iter_unchecked(records):
 
1525
                        (record_details, index_memo, _) = positions[key]
 
1526
                        yield KnitContentFactory(key, global_map[key],
 
1527
                            record_details, None, raw_data, self._factory.annotated, None)
 
1528
                else:
 
1529
                    vf = self._fallback_vfs[parent_maps.index(source) - 1]
 
1530
                    for record in vf.get_record_stream(keys, ordering,
 
1531
                        include_delta_closure):
 
1532
                        yield record
 
1533
 
 
1534
    def get_sha1s(self, keys):
 
1535
        """See VersionedFiles.get_sha1s()."""
 
1536
        missing = set(keys)
 
1537
        record_map = self._get_record_map(missing, allow_missing=True)
 
1538
        result = {}
 
1539
        for key, details in record_map.iteritems():
 
1540
            if key not in missing:
 
1541
                continue
 
1542
            # record entry 2 is the 'digest'.
 
1543
            result[key] = details[2]
 
1544
        missing.difference_update(set(result))
 
1545
        for source in self._fallback_vfs:
 
1546
            if not missing:
 
1547
                break
 
1548
            new_result = source.get_sha1s(missing)
 
1549
            result.update(new_result)
 
1550
            missing.difference_update(set(new_result))
 
1551
        return result
 
1552
 
 
1553
    def insert_record_stream(self, stream):
 
1554
        """Insert a record stream into this container.
 
1555
 
 
1556
        :param stream: A stream of records to insert.
 
1557
        :return: None
 
1558
        :seealso VersionedFiles.get_record_stream:
 
1559
        """
 
1560
        def get_adapter(adapter_key):
 
1561
            try:
 
1562
                return adapters[adapter_key]
 
1563
            except KeyError:
 
1564
                adapter_factory = adapter_registry.get(adapter_key)
 
1565
                adapter = adapter_factory(self)
 
1566
                adapters[adapter_key] = adapter
 
1567
                return adapter
 
1568
        delta_types = set()
 
1569
        if self._factory.annotated:
 
1570
            # self is annotated, we need annotated knits to use directly.
 
1571
            annotated = "annotated-"
 
1572
            convertibles = []
 
1573
        else:
 
1574
            # self is not annotated, but we can strip annotations cheaply.
 
1575
            annotated = ""
 
1576
            convertibles = set(["knit-annotated-ft-gz"])
 
1577
            if self._max_delta_chain:
 
1578
                delta_types.add("knit-annotated-delta-gz")
 
1579
                convertibles.add("knit-annotated-delta-gz")
 
1580
        # The set of types we can cheaply adapt without needing basis texts.
 
1581
        native_types = set()
 
1582
        if self._max_delta_chain:
 
1583
            native_types.add("knit-%sdelta-gz" % annotated)
 
1584
            delta_types.add("knit-%sdelta-gz" % annotated)
 
1585
        native_types.add("knit-%sft-gz" % annotated)
 
1586
        knit_types = native_types.union(convertibles)
 
1587
        adapters = {}
 
1588
        # Buffer all index entries that we can't add immediately because their
 
1589
        # basis parent is missing. We don't buffer all because generating
 
1590
        # annotations may require access to some of the new records. However we
 
1591
        # can't generate annotations from new deltas until their basis parent
 
1592
        # is present anyway, so we get away with not needing an index that
 
1593
        # includes the new keys.
 
1594
        #
 
1595
        # See <http://launchpad.net/bugs/300177> about ordering of compression
 
1596
        # parents in the records - to be conservative, we insist that all
 
1597
        # parents must be present to avoid expanding to a fulltext.
 
1598
        #
 
1599
        # key = basis_parent, value = index entry to add
 
1600
        buffered_index_entries = {}
 
1601
        for record in stream:
 
1602
            kind = record.storage_kind
 
1603
            if kind.startswith('knit-') and kind.endswith('-gz'):
 
1604
                # Check that the ID in the header of the raw knit bytes matches
 
1605
                # the record metadata.
 
1606
                raw_data = record._raw_record
 
1607
                df, rec = self._parse_record_header(record.key, raw_data)
 
1608
                df.close()
 
1609
            buffered = False
 
1610
            parents = record.parents
 
1611
            if record.storage_kind in delta_types:
 
1612
                # TODO: eventually the record itself should track
 
1613
                #       compression_parent
 
1614
                compression_parent = parents[0]
 
1615
            else:
 
1616
                compression_parent = None
 
1617
            # Raise an error when a record is missing.
 
1618
            if record.storage_kind == 'absent':
 
1619
                raise RevisionNotPresent([record.key], self)
 
1620
            elif ((record.storage_kind in knit_types)
 
1621
                  and (compression_parent is None
 
1622
                       or not self._fallback_vfs
 
1623
                       or self._index.has_key(compression_parent)
 
1624
                       or not self.has_key(compression_parent))):
 
1625
                # we can insert the knit record literally if either it has no
 
1626
                # compression parent OR we already have its basis in this kvf
 
1627
                # OR the basis is not present even in the fallbacks.  In the
 
1628
                # last case it will either turn up later in the stream and all
 
1629
                # will be well, or it won't turn up at all and we'll raise an
 
1630
                # error at the end.
 
1631
                #
 
1632
                # TODO: self.has_key is somewhat redundant with
 
1633
                # self._index.has_key; we really want something that directly
 
1634
                # asks if it's only present in the fallbacks. -- mbp 20081119
 
1635
                if record.storage_kind not in native_types:
 
1636
                    try:
 
1637
                        adapter_key = (record.storage_kind, "knit-delta-gz")
 
1638
                        adapter = get_adapter(adapter_key)
 
1639
                    except KeyError:
 
1640
                        adapter_key = (record.storage_kind, "knit-ft-gz")
 
1641
                        adapter = get_adapter(adapter_key)
 
1642
                    bytes = adapter.get_bytes(record)
 
1643
                else:
 
1644
                    # It's a knit record, it has a _raw_record field (even if
 
1645
                    # it was reconstituted from a network stream).
 
1646
                    bytes = record._raw_record
 
1647
                options = [record._build_details[0]]
 
1648
                if record._build_details[1]:
 
1649
                    options.append('no-eol')
 
1650
                # Just blat it across.
 
1651
                # Note: This does end up adding data on duplicate keys. As
 
1652
                # modern repositories use atomic insertions this should not
 
1653
                # lead to excessive growth in the event of interrupted fetches.
 
1654
                # 'knit' repositories may suffer excessive growth, but as a
 
1655
                # deprecated format this is tolerable. It can be fixed if
 
1656
                # needed by in the kndx index support raising on a duplicate
 
1657
                # add with identical parents and options.
 
1658
                access_memo = self._access.add_raw_records(
 
1659
                    [(record.key, len(bytes))], bytes)[0]
 
1660
                index_entry = (record.key, options, access_memo, parents)
 
1661
                if 'fulltext' not in options:
 
1662
                    # Not a fulltext, so we need to make sure the compression
 
1663
                    # parent will also be present.
 
1664
                    # Note that pack backed knits don't need to buffer here
 
1665
                    # because they buffer all writes to the transaction level,
 
1666
                    # but we don't expose that difference at the index level. If
 
1667
                    # the query here has sufficient cost to show up in
 
1668
                    # profiling we should do that.
 
1669
                    #
 
1670
                    # They're required to be physically in this
 
1671
                    # KnitVersionedFiles, not in a fallback.
 
1672
                    if not self._index.has_key(compression_parent):
 
1673
                        pending = buffered_index_entries.setdefault(
 
1674
                            compression_parent, [])
 
1675
                        pending.append(index_entry)
 
1676
                        buffered = True
 
1677
                if not buffered:
 
1678
                    self._index.add_records([index_entry])
 
1679
            elif record.storage_kind == 'chunked':
 
1680
                self.add_lines(record.key, parents,
 
1681
                    osutils.chunks_to_lines(record.get_bytes_as('chunked')))
 
1682
            else:
 
1683
                # Not suitable for direct insertion as a
 
1684
                # delta, either because it's not the right format, or this
 
1685
                # KnitVersionedFiles doesn't permit deltas (_max_delta_chain ==
 
1686
                # 0) or because it depends on a base only present in the
 
1687
                # fallback kvfs.
 
1688
                self._access.flush()
 
1689
                try:
 
1690
                    # Try getting a fulltext directly from the record.
 
1691
                    bytes = record.get_bytes_as('fulltext')
 
1692
                except errors.UnavailableRepresentation:
 
1693
                    adapter_key = record.storage_kind, 'fulltext'
 
1694
                    adapter = get_adapter(adapter_key)
 
1695
                    bytes = adapter.get_bytes(record)
 
1696
                lines = split_lines(bytes)
 
1697
                try:
 
1698
                    self.add_lines(record.key, parents, lines)
 
1699
                except errors.RevisionAlreadyPresent:
 
1700
                    pass
 
1701
            # Add any records whose basis parent is now available.
 
1702
            if not buffered:
 
1703
                added_keys = [record.key]
 
1704
                while added_keys:
 
1705
                    key = added_keys.pop(0)
 
1706
                    if key in buffered_index_entries:
 
1707
                        index_entries = buffered_index_entries[key]
 
1708
                        self._index.add_records(index_entries)
 
1709
                        added_keys.extend(
 
1710
                            [index_entry[0] for index_entry in index_entries])
 
1711
                        del buffered_index_entries[key]
 
1712
        if buffered_index_entries:
 
1713
            # There were index entries buffered at the end of the stream,
 
1714
            # So these need to be added (if the index supports holding such
 
1715
            # entries for later insertion)
 
1716
            all_entries = []
 
1717
            for key in buffered_index_entries:
 
1718
                index_entries = buffered_index_entries[key]
 
1719
                all_entries.extend(index_entries)
 
1720
            self._index.add_records(
 
1721
                all_entries, missing_compression_parents=True)
 
1722
 
 
1723
    def get_missing_compression_parent_keys(self):
 
1724
        """Return an iterable of keys of missing compression parents.
 
1725
 
 
1726
        Check this after calling insert_record_stream to find out if there are
 
1727
        any missing compression parents.  If there are, the records that
 
1728
        depend on them are not able to be inserted safely. For atomic
 
1729
        KnitVersionedFiles built on packs, the transaction should be aborted or
 
1730
        suspended - commit will fail at this point. Nonatomic knits will error
 
1731
        earlier because they have no staging area to put pending entries into.
 
1732
        """
 
1733
        return self._index.get_missing_compression_parents()
 
1734
 
 
1735
    def iter_lines_added_or_present_in_keys(self, keys, pb=None):
 
1736
        """Iterate over the lines in the versioned files from keys.
 
1737
 
 
1738
        This may return lines from other keys. Each item the returned
 
1739
        iterator yields is a tuple of a line and a text version that that line
 
1740
        is present in (not introduced in).
 
1741
 
 
1742
        Ordering of results is in whatever order is most suitable for the
 
1743
        underlying storage format.
 
1744
 
 
1745
        If a progress bar is supplied, it may be used to indicate progress.
 
1746
        The caller is responsible for cleaning up progress bars (because this
 
1747
        is an iterator).
 
1748
 
 
1749
        NOTES:
 
1750
         * Lines are normalised by the underlying store: they will all have \\n
 
1751
           terminators.
 
1752
         * Lines are returned in arbitrary order.
 
1753
         * If a requested key did not change any lines (or didn't have any
 
1754
           lines), it may not be mentioned at all in the result.
 
1755
 
 
1756
        :param pb: Progress bar supplied by caller.
 
1757
        :return: An iterator over (line, key).
 
1758
        """
 
1759
        if pb is None:
 
1760
            pb = ui.ui_factory.nested_progress_bar()
 
1761
        keys = set(keys)
 
1762
        total = len(keys)
 
1763
        done = False
 
1764
        while not done:
 
1765
            try:
 
1766
                # we don't care about inclusions, the caller cares.
 
1767
                # but we need to setup a list of records to visit.
 
1768
                # we need key, position, length
 
1769
                key_records = []
 
1770
                build_details = self._index.get_build_details(keys)
 
1771
                for key, details in build_details.iteritems():
 
1772
                    if key in keys:
 
1773
                        key_records.append((key, details[0]))
 
1774
                records_iter = enumerate(self._read_records_iter(key_records))
 
1775
                for (key_idx, (key, data, sha_value)) in records_iter:
 
1776
                    pb.update('Walking content', key_idx, total)
 
1777
                    compression_parent = build_details[key][1]
 
1778
                    if compression_parent is None:
 
1779
                        # fulltext
 
1780
                        line_iterator = self._factory.get_fulltext_content(data)
 
1781
                    else:
 
1782
                        # Delta
 
1783
                        line_iterator = self._factory.get_linedelta_content(data)
 
1784
                    # Now that we are yielding the data for this key, remove it
 
1785
                    # from the list
 
1786
                    keys.remove(key)
 
1787
                    # XXX: It might be more efficient to yield (key,
 
1788
                    # line_iterator) in the future. However for now, this is a
 
1789
                    # simpler change to integrate into the rest of the
 
1790
                    # codebase. RBC 20071110
 
1791
                    for line in line_iterator:
 
1792
                        yield line, key
 
1793
                done = True
 
1794
            except errors.RetryWithNewPacks, e:
 
1795
                self._access.reload_or_raise(e)
 
1796
        # If there are still keys we've not yet found, we look in the fallback
 
1797
        # vfs, and hope to find them there.  Note that if the keys are found
 
1798
        # but had no changes or no content, the fallback may not return
 
1799
        # anything.
 
1800
        if keys and not self._fallback_vfs:
 
1801
            # XXX: strictly the second parameter is meant to be the file id
 
1802
            # but it's not easily accessible here.
 
1803
            raise RevisionNotPresent(keys, repr(self))
 
1804
        for source in self._fallback_vfs:
 
1805
            if not keys:
 
1806
                break
 
1807
            source_keys = set()
 
1808
            for line, key in source.iter_lines_added_or_present_in_keys(keys):
 
1809
                source_keys.add(key)
 
1810
                yield line, key
 
1811
            keys.difference_update(source_keys)
 
1812
        pb.update('Walking content', total, total)
 
1813
 
 
1814
    def _make_line_delta(self, delta_seq, new_content):
 
1815
        """Generate a line delta from delta_seq and new_content."""
 
1816
        diff_hunks = []
 
1817
        for op in delta_seq.get_opcodes():
 
1818
            if op[0] == 'equal':
 
1819
                continue
 
1820
            diff_hunks.append((op[1], op[2], op[4]-op[3], new_content._lines[op[3]:op[4]]))
 
1821
        return diff_hunks
 
1822
 
 
1823
    def _merge_annotations(self, content, parents, parent_texts={},
 
1824
                           delta=None, annotated=None,
 
1825
                           left_matching_blocks=None):
 
1826
        """Merge annotations for content and generate deltas.
 
1827
 
 
1828
        This is done by comparing the annotations based on changes to the text
 
1829
        and generating a delta on the resulting full texts. If annotations are
 
1830
        not being created then a simple delta is created.
 
1831
        """
 
1832
        if left_matching_blocks is not None:
 
1833
            delta_seq = diff._PrematchedMatcher(left_matching_blocks)
 
1834
        else:
 
1835
            delta_seq = None
 
1836
        if annotated:
 
1837
            for parent_key in parents:
 
1838
                merge_content = self._get_content(parent_key, parent_texts)
 
1839
                if (parent_key == parents[0] and delta_seq is not None):
 
1840
                    seq = delta_seq
 
1841
                else:
 
1842
                    seq = patiencediff.PatienceSequenceMatcher(
 
1843
                        None, merge_content.text(), content.text())
 
1844
                for i, j, n in seq.get_matching_blocks():
 
1845
                    if n == 0:
 
1846
                        continue
 
1847
                    # this copies (origin, text) pairs across to the new
 
1848
                    # content for any line that matches the last-checked
 
1849
                    # parent.
 
1850
                    content._lines[j:j+n] = merge_content._lines[i:i+n]
 
1851
            # XXX: Robert says the following block is a workaround for a
 
1852
            # now-fixed bug and it can probably be deleted. -- mbp 20080618
 
1853
            if content._lines and content._lines[-1][1][-1] != '\n':
 
1854
                # The copied annotation was from a line without a trailing EOL,
 
1855
                # reinstate one for the content object, to ensure correct
 
1856
                # serialization.
 
1857
                line = content._lines[-1][1] + '\n'
 
1858
                content._lines[-1] = (content._lines[-1][0], line)
 
1859
        if delta:
 
1860
            if delta_seq is None:
 
1861
                reference_content = self._get_content(parents[0], parent_texts)
 
1862
                new_texts = content.text()
 
1863
                old_texts = reference_content.text()
 
1864
                delta_seq = patiencediff.PatienceSequenceMatcher(
 
1865
                                                 None, old_texts, new_texts)
 
1866
            return self._make_line_delta(delta_seq, content)
 
1867
 
 
1868
    def _parse_record(self, version_id, data):
 
1869
        """Parse an original format knit record.
 
1870
 
 
1871
        These have the last element of the key only present in the stored data.
 
1872
        """
 
1873
        rec, record_contents = self._parse_record_unchecked(data)
 
1874
        self._check_header_version(rec, version_id)
 
1875
        return record_contents, rec[3]
 
1876
 
 
1877
    def _parse_record_header(self, key, raw_data):
 
1878
        """Parse a record header for consistency.
 
1879
 
 
1880
        :return: the header and the decompressor stream.
 
1881
                 as (stream, header_record)
 
1882
        """
 
1883
        df = gzip.GzipFile(mode='rb', fileobj=StringIO(raw_data))
 
1884
        try:
 
1885
            # Current serialise
 
1886
            rec = self._check_header(key, df.readline())
 
1887
        except Exception, e:
 
1888
            raise KnitCorrupt(self,
 
1889
                              "While reading {%s} got %s(%s)"
 
1890
                              % (key, e.__class__.__name__, str(e)))
 
1891
        return df, rec
 
1892
 
 
1893
    def _parse_record_unchecked(self, data):
 
1894
        # profiling notes:
 
1895
        # 4168 calls in 2880 217 internal
 
1896
        # 4168 calls to _parse_record_header in 2121
 
1897
        # 4168 calls to readlines in 330
 
1898
        df = gzip.GzipFile(mode='rb', fileobj=StringIO(data))
 
1899
        try:
 
1900
            record_contents = df.readlines()
 
1901
        except Exception, e:
 
1902
            raise KnitCorrupt(self, "Corrupt compressed record %r, got %s(%s)" %
 
1903
                (data, e.__class__.__name__, str(e)))
 
1904
        header = record_contents.pop(0)
 
1905
        rec = self._split_header(header)
 
1906
        last_line = record_contents.pop()
 
1907
        if len(record_contents) != int(rec[2]):
 
1908
            raise KnitCorrupt(self,
 
1909
                              'incorrect number of lines %s != %s'
 
1910
                              ' for version {%s} %s'
 
1911
                              % (len(record_contents), int(rec[2]),
 
1912
                                 rec[1], record_contents))
 
1913
        if last_line != 'end %s\n' % rec[1]:
 
1914
            raise KnitCorrupt(self,
 
1915
                              'unexpected version end line %r, wanted %r'
 
1916
                              % (last_line, rec[1]))
 
1917
        df.close()
 
1918
        return rec, record_contents
 
1919
 
 
1920
    def _read_records_iter(self, records):
 
1921
        """Read text records from data file and yield result.
 
1922
 
 
1923
        The result will be returned in whatever is the fastest to read.
 
1924
        Not by the order requested. Also, multiple requests for the same
 
1925
        record will only yield 1 response.
 
1926
        :param records: A list of (key, access_memo) entries
 
1927
        :return: Yields (key, contents, digest) in the order
 
1928
                 read, not the order requested
 
1929
        """
 
1930
        if not records:
 
1931
            return
 
1932
 
 
1933
        # XXX: This smells wrong, IO may not be getting ordered right.
 
1934
        needed_records = sorted(set(records), key=operator.itemgetter(1))
 
1935
        if not needed_records:
 
1936
            return
 
1937
 
 
1938
        # The transport optimizes the fetching as well
 
1939
        # (ie, reads continuous ranges.)
 
1940
        raw_data = self._access.get_raw_records(
 
1941
            [index_memo for key, index_memo in needed_records])
 
1942
 
 
1943
        for (key, index_memo), data in \
 
1944
                izip(iter(needed_records), raw_data):
 
1945
            content, digest = self._parse_record(key[-1], data)
 
1946
            yield key, content, digest
 
1947
 
 
1948
    def _read_records_iter_raw(self, records):
 
1949
        """Read text records from data file and yield raw data.
 
1950
 
 
1951
        This unpacks enough of the text record to validate the id is
 
1952
        as expected but thats all.
 
1953
 
 
1954
        Each item the iterator yields is (key, bytes,
 
1955
            expected_sha1_of_full_text).
 
1956
        """
 
1957
        for key, data in self._read_records_iter_unchecked(records):
 
1958
            # validate the header (note that we can only use the suffix in
 
1959
            # current knit records).
 
1960
            df, rec = self._parse_record_header(key, data)
 
1961
            df.close()
 
1962
            yield key, data, rec[3]
 
1963
 
 
1964
    def _read_records_iter_unchecked(self, records):
 
1965
        """Read text records from data file and yield raw data.
 
1966
 
 
1967
        No validation is done.
 
1968
 
 
1969
        Yields tuples of (key, data).
 
1970
        """
 
1971
        # setup an iterator of the external records:
 
1972
        # uses readv so nice and fast we hope.
 
1973
        if len(records):
 
1974
            # grab the disk data needed.
 
1975
            needed_offsets = [index_memo for key, index_memo
 
1976
                                           in records]
 
1977
            raw_records = self._access.get_raw_records(needed_offsets)
 
1978
 
 
1979
        for key, index_memo in records:
 
1980
            data = raw_records.next()
 
1981
            yield key, data
 
1982
 
 
1983
    def _record_to_data(self, key, digest, lines, dense_lines=None):
 
1984
        """Convert key, digest, lines into a raw data block.
 
1985
 
 
1986
        :param key: The key of the record. Currently keys are always serialised
 
1987
            using just the trailing component.
 
1988
        :param dense_lines: The bytes of lines but in a denser form. For
 
1989
            instance, if lines is a list of 1000 bytestrings each ending in \n,
 
1990
            dense_lines may be a list with one line in it, containing all the
 
1991
            1000's lines and their \n's. Using dense_lines if it is already
 
1992
            known is a win because the string join to create bytes in this
 
1993
            function spends less time resizing the final string.
 
1994
        :return: (len, a StringIO instance with the raw data ready to read.)
 
1995
        """
 
1996
        chunks = ["version %s %d %s\n" % (key[-1], len(lines), digest)]
 
1997
        chunks.extend(dense_lines or lines)
 
1998
        chunks.append("end %s\n" % key[-1])
 
1999
        for chunk in chunks:
 
2000
            if type(chunk) is not str:
 
2001
                raise AssertionError(
 
2002
                    'data must be plain bytes was %s' % type(chunk))
 
2003
        if lines and lines[-1][-1] != '\n':
 
2004
            raise ValueError('corrupt lines value %r' % lines)
 
2005
        compressed_bytes = tuned_gzip.chunks_to_gzip(chunks)
 
2006
        return len(compressed_bytes), compressed_bytes
 
2007
 
 
2008
    def _split_header(self, line):
 
2009
        rec = line.split()
 
2010
        if len(rec) != 4:
 
2011
            raise KnitCorrupt(self,
 
2012
                              'unexpected number of elements in record header')
 
2013
        return rec
 
2014
 
 
2015
    def keys(self):
 
2016
        """See VersionedFiles.keys."""
 
2017
        if 'evil' in debug.debug_flags:
 
2018
            trace.mutter_callsite(2, "keys scales with size of history")
 
2019
        sources = [self._index] + self._fallback_vfs
 
2020
        result = set()
 
2021
        for source in sources:
 
2022
            result.update(source.keys())
 
2023
        return result
 
2024
 
 
2025
 
 
2026
class _ContentMapGenerator(object):
 
2027
    """Generate texts or expose raw deltas for a set of texts."""
 
2028
 
 
2029
    def __init__(self, ordering='unordered'):
 
2030
        self._ordering = ordering
 
2031
 
 
2032
    def _get_content(self, key):
 
2033
        """Get the content object for key."""
 
2034
        # Note that _get_content is only called when the _ContentMapGenerator
 
2035
        # has been constructed with just one key requested for reconstruction.
 
2036
        if key in self.nonlocal_keys:
 
2037
            record = self.get_record_stream().next()
 
2038
            # Create a content object on the fly
 
2039
            lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
 
2040
            return PlainKnitContent(lines, record.key)
 
2041
        else:
 
2042
            # local keys we can ask for directly
 
2043
            return self._get_one_work(key)
 
2044
 
 
2045
    def get_record_stream(self):
 
2046
        """Get a record stream for the keys requested during __init__."""
 
2047
        for record in self._work():
 
2048
            yield record
 
2049
 
 
2050
    def _work(self):
 
2051
        """Produce maps of text and KnitContents as dicts.
 
2052
 
 
2053
        :return: (text_map, content_map) where text_map contains the texts for
 
2054
            the requested versions and content_map contains the KnitContents.
 
2055
        """
 
2056
        # NB: By definition we never need to read remote sources unless texts
 
2057
        # are requested from them: we don't delta across stores - and we
 
2058
        # explicitly do not want to to prevent data loss situations.
 
2059
        if self.global_map is None:
 
2060
            self.global_map = self.vf.get_parent_map(self.keys)
 
2061
        nonlocal_keys = self.nonlocal_keys
 
2062
 
 
2063
        missing_keys = set(nonlocal_keys)
 
2064
        # Read from remote versioned file instances and provide to our caller.
 
2065
        for source in self.vf._fallback_vfs:
 
2066
            if not missing_keys:
 
2067
                break
 
2068
            # Loop over fallback repositories asking them for texts - ignore
 
2069
            # any missing from a particular fallback.
 
2070
            for record in source.get_record_stream(missing_keys,
 
2071
                self._ordering, True):
 
2072
                if record.storage_kind == 'absent':
 
2073
                    # Not in thie particular stream, may be in one of the
 
2074
                    # other fallback vfs objects.
 
2075
                    continue
 
2076
                missing_keys.remove(record.key)
 
2077
                yield record
 
2078
 
 
2079
        if self._raw_record_map is None:
 
2080
            raise AssertionError('_raw_record_map should have been filled')
 
2081
        first = True
 
2082
        for key in self.keys:
 
2083
            if key in self.nonlocal_keys:
 
2084
                continue
 
2085
            yield LazyKnitContentFactory(key, self.global_map[key], self, first)
 
2086
            first = False
 
2087
 
 
2088
    def _get_one_work(self, requested_key):
 
2089
        # Now, if we have calculated everything already, just return the
 
2090
        # desired text.
 
2091
        if requested_key in self._contents_map:
 
2092
            return self._contents_map[requested_key]
 
2093
        # To simplify things, parse everything at once - code that wants one text
 
2094
        # probably wants them all.
 
2095
        # FUTURE: This function could be improved for the 'extract many' case
 
2096
        # by tracking each component and only doing the copy when the number of
 
2097
        # children than need to apply delta's to it is > 1 or it is part of the
 
2098
        # final output.
 
2099
        multiple_versions = len(self.keys) != 1
 
2100
        if self._record_map is None:
 
2101
            self._record_map = self.vf._raw_map_to_record_map(
 
2102
                self._raw_record_map)
 
2103
        record_map = self._record_map
 
2104
        # raw_record_map is key:
 
2105
        # Have read and parsed records at this point.
 
2106
        for key in self.keys:
 
2107
            if key in self.nonlocal_keys:
 
2108
                # already handled
 
2109
                continue
 
2110
            components = []
 
2111
            cursor = key
 
2112
            while cursor is not None:
 
2113
                try:
 
2114
                    record, record_details, digest, next = record_map[cursor]
 
2115
                except KeyError:
 
2116
                    raise RevisionNotPresent(cursor, self)
 
2117
                components.append((cursor, record, record_details, digest))
 
2118
                cursor = next
 
2119
                if cursor in self._contents_map:
 
2120
                    # no need to plan further back
 
2121
                    components.append((cursor, None, None, None))
 
2122
                    break
 
2123
 
 
2124
            content = None
 
2125
            for (component_id, record, record_details,
 
2126
                 digest) in reversed(components):
 
2127
                if component_id in self._contents_map:
 
2128
                    content = self._contents_map[component_id]
 
2129
                else:
 
2130
                    content, delta = self._factory.parse_record(key[-1],
 
2131
                        record, record_details, content,
 
2132
                        copy_base_content=multiple_versions)
 
2133
                    if multiple_versions:
 
2134
                        self._contents_map[component_id] = content
 
2135
 
 
2136
            # digest here is the digest from the last applied component.
 
2137
            text = content.text()
 
2138
            actual_sha = sha_strings(text)
 
2139
            if actual_sha != digest:
 
2140
                raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
 
2141
        if multiple_versions:
 
2142
            return self._contents_map[requested_key]
 
2143
        else:
 
2144
            return content
 
2145
 
 
2146
    def _wire_bytes(self):
 
2147
        """Get the bytes to put on the wire for 'key'.
 
2148
 
 
2149
        The first collection of bytes asked for returns the serialised
 
2150
        raw_record_map and the additional details (key, parent) for key.
 
2151
        Subsequent calls return just the additional details (key, parent).
 
2152
        The wire storage_kind given for the first key is 'knit-delta-closure',
 
2153
        For subsequent keys it is 'knit-delta-closure-ref'.
 
2154
 
 
2155
        :param key: A key from the content generator.
 
2156
        :return: Bytes to put on the wire.
 
2157
        """
 
2158
        lines = []
 
2159
        # kind marker for dispatch on the far side,
 
2160
        lines.append('knit-delta-closure')
 
2161
        # Annotated or not
 
2162
        if self.vf._factory.annotated:
 
2163
            lines.append('annotated')
 
2164
        else:
 
2165
            lines.append('')
 
2166
        # then the list of keys
 
2167
        lines.append('\t'.join(['\x00'.join(key) for key in self.keys
 
2168
            if key not in self.nonlocal_keys]))
 
2169
        # then the _raw_record_map in serialised form:
 
2170
        map_byte_list = []
 
2171
        # for each item in the map:
 
2172
        # 1 line with key
 
2173
        # 1 line with parents if the key is to be yielded (None: for None, '' for ())
 
2174
        # one line with method
 
2175
        # one line with noeol
 
2176
        # one line with next ('' for None)
 
2177
        # one line with byte count of the record bytes
 
2178
        # the record bytes
 
2179
        for key, (record_bytes, (method, noeol), next) in \
 
2180
            self._raw_record_map.iteritems():
 
2181
            key_bytes = '\x00'.join(key)
 
2182
            parents = self.global_map.get(key, None)
 
2183
            if parents is None:
 
2184
                parent_bytes = 'None:'
 
2185
            else:
 
2186
                parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
 
2187
            method_bytes = method
 
2188
            if noeol:
 
2189
                noeol_bytes = "T"
 
2190
            else:
 
2191
                noeol_bytes = "F"
 
2192
            if next:
 
2193
                next_bytes = '\x00'.join(next)
 
2194
            else:
 
2195
                next_bytes = ''
 
2196
            map_byte_list.append('%s\n%s\n%s\n%s\n%s\n%d\n%s' % (
 
2197
                key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
 
2198
                len(record_bytes), record_bytes))
 
2199
        map_bytes = ''.join(map_byte_list)
 
2200
        lines.append(map_bytes)
 
2201
        bytes = '\n'.join(lines)
 
2202
        return bytes
 
2203
 
 
2204
 
 
2205
class _VFContentMapGenerator(_ContentMapGenerator):
 
2206
    """Content map generator reading from a VersionedFiles object."""
 
2207
 
 
2208
    def __init__(self, versioned_files, keys, nonlocal_keys=None,
 
2209
        global_map=None, raw_record_map=None, ordering='unordered'):
 
2210
        """Create a _ContentMapGenerator.
 
2211
 
 
2212
        :param versioned_files: The versioned files that the texts are being
 
2213
            extracted from.
 
2214
        :param keys: The keys to produce content maps for.
 
2215
        :param nonlocal_keys: An iterable of keys(possibly intersecting keys)
 
2216
            which are known to not be in this knit, but rather in one of the
 
2217
            fallback knits.
 
2218
        :param global_map: The result of get_parent_map(keys) (or a supermap).
 
2219
            This is required if get_record_stream() is to be used.
 
2220
        :param raw_record_map: A unparsed raw record map to use for answering
 
2221
            contents.
 
2222
        """
 
2223
        _ContentMapGenerator.__init__(self, ordering=ordering)
 
2224
        # The vf to source data from
 
2225
        self.vf = versioned_files
 
2226
        # The keys desired
 
2227
        self.keys = list(keys)
 
2228
        # Keys known to be in fallback vfs objects
 
2229
        if nonlocal_keys is None:
 
2230
            self.nonlocal_keys = set()
 
2231
        else:
 
2232
            self.nonlocal_keys = frozenset(nonlocal_keys)
 
2233
        # Parents data for keys to be returned in get_record_stream
 
2234
        self.global_map = global_map
 
2235
        # The chunked lists for self.keys in text form
 
2236
        self._text_map = {}
 
2237
        # A cache of KnitContent objects used in extracting texts.
 
2238
        self._contents_map = {}
 
2239
        # All the knit records needed to assemble the requested keys as full
 
2240
        # texts.
 
2241
        self._record_map = None
 
2242
        if raw_record_map is None:
 
2243
            self._raw_record_map = self.vf._get_record_map_unparsed(keys,
 
2244
                allow_missing=True)
 
2245
        else:
 
2246
            self._raw_record_map = raw_record_map
 
2247
        # the factory for parsing records
 
2248
        self._factory = self.vf._factory
 
2249
 
 
2250
 
 
2251
class _NetworkContentMapGenerator(_ContentMapGenerator):
 
2252
    """Content map generator sourced from a network stream."""
 
2253
 
 
2254
    def __init__(self, bytes, line_end):
 
2255
        """Construct a _NetworkContentMapGenerator from a bytes block."""
 
2256
        self._bytes = bytes
 
2257
        self.global_map = {}
 
2258
        self._raw_record_map = {}
 
2259
        self._contents_map = {}
 
2260
        self._record_map = None
 
2261
        self.nonlocal_keys = []
 
2262
        # Get access to record parsing facilities
 
2263
        self.vf = KnitVersionedFiles(None, None)
 
2264
        start = line_end
 
2265
        # Annotated or not
 
2266
        line_end = bytes.find('\n', start)
 
2267
        line = bytes[start:line_end]
 
2268
        start = line_end + 1
 
2269
        if line == 'annotated':
 
2270
            self._factory = KnitAnnotateFactory()
 
2271
        else:
 
2272
            self._factory = KnitPlainFactory()
 
2273
        # list of keys to emit in get_record_stream
 
2274
        line_end = bytes.find('\n', start)
 
2275
        line = bytes[start:line_end]
 
2276
        start = line_end + 1
 
2277
        self.keys = [
 
2278
            tuple(segment.split('\x00')) for segment in line.split('\t')
 
2279
            if segment]
 
2280
        # now a loop until the end. XXX: It would be nice if this was just a
 
2281
        # bunch of the same records as get_record_stream(..., False) gives, but
 
2282
        # there is a decent sized gap stopping that at the moment.
 
2283
        end = len(bytes)
 
2284
        while start < end:
 
2285
            # 1 line with key
 
2286
            line_end = bytes.find('\n', start)
 
2287
            key = tuple(bytes[start:line_end].split('\x00'))
 
2288
            start = line_end + 1
 
2289
            # 1 line with parents (None: for None, '' for ())
 
2290
            line_end = bytes.find('\n', start)
 
2291
            line = bytes[start:line_end]
 
2292
            if line == 'None:':
 
2293
                parents = None
 
2294
            else:
 
2295
                parents = tuple(
 
2296
                    [tuple(segment.split('\x00')) for segment in line.split('\t')
 
2297
                     if segment])
 
2298
            self.global_map[key] = parents
 
2299
            start = line_end + 1
 
2300
            # one line with method
 
2301
            line_end = bytes.find('\n', start)
 
2302
            line = bytes[start:line_end]
 
2303
            method = line
 
2304
            start = line_end + 1
 
2305
            # one line with noeol
 
2306
            line_end = bytes.find('\n', start)
 
2307
            line = bytes[start:line_end]
 
2308
            noeol = line == "T"
 
2309
            start = line_end + 1
 
2310
            # one line with next ('' for None)
 
2311
            line_end = bytes.find('\n', start)
 
2312
            line = bytes[start:line_end]
 
2313
            if not line:
 
2314
                next = None
 
2315
            else:
 
2316
                next = tuple(bytes[start:line_end].split('\x00'))
 
2317
            start = line_end + 1
 
2318
            # one line with byte count of the record bytes
 
2319
            line_end = bytes.find('\n', start)
 
2320
            line = bytes[start:line_end]
 
2321
            count = int(line)
 
2322
            start = line_end + 1
 
2323
            # the record bytes
 
2324
            record_bytes = bytes[start:start+count]
 
2325
            start = start + count
 
2326
            # put it in the map
 
2327
            self._raw_record_map[key] = (record_bytes, (method, noeol), next)
 
2328
 
 
2329
    def get_record_stream(self):
 
2330
        """Get a record stream for for keys requested by the bytestream."""
 
2331
        first = True
 
2332
        for key in self.keys:
 
2333
            yield LazyKnitContentFactory(key, self.global_map[key], self, first)
 
2334
            first = False
 
2335
 
 
2336
    def _wire_bytes(self):
 
2337
        return self._bytes
 
2338
 
 
2339
 
 
2340
class _KndxIndex(object):
 
2341
    """Manages knit index files
 
2342
 
 
2343
    The index is kept in memory and read on startup, to enable
 
2344
    fast lookups of revision information.  The cursor of the index
 
2345
    file is always pointing to the end, making it easy to append
 
2346
    entries.
 
2347
 
 
2348
    _cache is a cache for fast mapping from version id to a Index
 
2349
    object.
 
2350
 
 
2351
    _history is a cache for fast mapping from indexes to version ids.
 
2352
 
 
2353
    The index data format is dictionary compressed when it comes to
 
2354
    parent references; a index entry may only have parents that with a
 
2355
    lover index number.  As a result, the index is topological sorted.
 
2356
 
 
2357
    Duplicate entries may be written to the index for a single version id
 
2358
    if this is done then the latter one completely replaces the former:
 
2359
    this allows updates to correct version and parent information.
 
2360
    Note that the two entries may share the delta, and that successive
 
2361
    annotations and references MUST point to the first entry.
 
2362
 
 
2363
    The index file on disc contains a header, followed by one line per knit
 
2364
    record. The same revision can be present in an index file more than once.
 
2365
    The first occurrence gets assigned a sequence number starting from 0.
 
2366
 
 
2367
    The format of a single line is
 
2368
    REVISION_ID FLAGS BYTE_OFFSET LENGTH( PARENT_ID|PARENT_SEQUENCE_ID)* :\n
 
2369
    REVISION_ID is a utf8-encoded revision id
 
2370
    FLAGS is a comma separated list of flags about the record. Values include
 
2371
        no-eol, line-delta, fulltext.
 
2372
    BYTE_OFFSET is the ascii representation of the byte offset in the data file
 
2373
        that the compressed data starts at.
 
2374
    LENGTH is the ascii representation of the length of the data file.
 
2375
    PARENT_ID a utf-8 revision id prefixed by a '.' that is a parent of
 
2376
        REVISION_ID.
 
2377
    PARENT_SEQUENCE_ID the ascii representation of the sequence number of a
 
2378
        revision id already in the knit that is a parent of REVISION_ID.
 
2379
    The ' :' marker is the end of record marker.
 
2380
 
 
2381
    partial writes:
 
2382
    when a write is interrupted to the index file, it will result in a line
 
2383
    that does not end in ' :'. If the ' :' is not present at the end of a line,
 
2384
    or at the end of the file, then the record that is missing it will be
 
2385
    ignored by the parser.
 
2386
 
 
2387
    When writing new records to the index file, the data is preceded by '\n'
 
2388
    to ensure that records always start on new lines even if the last write was
 
2389
    interrupted. As a result its normal for the last line in the index to be
 
2390
    missing a trailing newline. One can be added with no harmful effects.
 
2391
 
 
2392
    :ivar _kndx_cache: dict from prefix to the old state of KnitIndex objects,
 
2393
        where prefix is e.g. the (fileid,) for .texts instances or () for
 
2394
        constant-mapped things like .revisions, and the old state is
 
2395
        tuple(history_vector, cache_dict).  This is used to prevent having an
 
2396
        ABI change with the C extension that reads .kndx files.
 
2397
    """
 
2398
 
 
2399
    HEADER = "# bzr knit index 8\n"
 
2400
 
 
2401
    def __init__(self, transport, mapper, get_scope, allow_writes, is_locked):
 
2402
        """Create a _KndxIndex on transport using mapper."""
 
2403
        self._transport = transport
 
2404
        self._mapper = mapper
 
2405
        self._get_scope = get_scope
 
2406
        self._allow_writes = allow_writes
 
2407
        self._is_locked = is_locked
 
2408
        self._reset_cache()
 
2409
        self.has_graph = True
 
2410
 
 
2411
    def add_records(self, records, random_id=False, missing_compression_parents=False):
 
2412
        """Add multiple records to the index.
 
2413
 
 
2414
        :param records: a list of tuples:
 
2415
                         (key, options, access_memo, parents).
 
2416
        :param random_id: If True the ids being added were randomly generated
 
2417
            and no check for existence will be performed.
 
2418
        :param missing_compression_parents: If True the records being added are
 
2419
            only compressed against texts already in the index (or inside
 
2420
            records). If False the records all refer to unavailable texts (or
 
2421
            texts inside records) as compression parents.
 
2422
        """
 
2423
        if missing_compression_parents:
 
2424
            # It might be nice to get the edge of the records. But keys isn't
 
2425
            # _wrong_.
 
2426
            keys = sorted(record[0] for record in records)
 
2427
            raise errors.RevisionNotPresent(keys, self)
 
2428
        paths = {}
 
2429
        for record in records:
 
2430
            key = record[0]
 
2431
            prefix = key[:-1]
 
2432
            path = self._mapper.map(key) + '.kndx'
 
2433
            path_keys = paths.setdefault(path, (prefix, []))
 
2434
            path_keys[1].append(record)
 
2435
        for path in sorted(paths):
 
2436
            prefix, path_keys = paths[path]
 
2437
            self._load_prefixes([prefix])
 
2438
            lines = []
 
2439
            orig_history = self._kndx_cache[prefix][1][:]
 
2440
            orig_cache = self._kndx_cache[prefix][0].copy()
 
2441
 
 
2442
            try:
 
2443
                for key, options, (_, pos, size), parents in path_keys:
 
2444
                    if parents is None:
 
2445
                        # kndx indices cannot be parentless.
 
2446
                        parents = ()
 
2447
                    line = "\n%s %s %s %s %s :" % (
 
2448
                        key[-1], ','.join(options), pos, size,
 
2449
                        self._dictionary_compress(parents))
 
2450
                    if type(line) is not str:
 
2451
                        raise AssertionError(
 
2452
                            'data must be utf8 was %s' % type(line))
 
2453
                    lines.append(line)
 
2454
                    self._cache_key(key, options, pos, size, parents)
 
2455
                if len(orig_history):
 
2456
                    self._transport.append_bytes(path, ''.join(lines))
 
2457
                else:
 
2458
                    self._init_index(path, lines)
 
2459
            except:
 
2460
                # If any problems happen, restore the original values and re-raise
 
2461
                self._kndx_cache[prefix] = (orig_cache, orig_history)
 
2462
                raise
 
2463
 
 
2464
    def scan_unvalidated_index(self, graph_index):
 
2465
        """See _KnitGraphIndex.scan_unvalidated_index."""
 
2466
        # Because kndx files do not support atomic insertion via separate index
 
2467
        # files, they do not support this method.
 
2468
        raise NotImplementedError(self.scan_unvalidated_index)
 
2469
 
 
2470
    def get_missing_compression_parents(self):
 
2471
        """See _KnitGraphIndex.get_missing_compression_parents."""
 
2472
        # Because kndx files do not support atomic insertion via separate index
 
2473
        # files, they do not support this method.
 
2474
        raise NotImplementedError(self.get_missing_compression_parents)
 
2475
 
 
2476
    def _cache_key(self, key, options, pos, size, parent_keys):
 
2477
        """Cache a version record in the history array and index cache.
 
2478
 
 
2479
        This is inlined into _load_data for performance. KEEP IN SYNC.
 
2480
        (It saves 60ms, 25% of the __init__ overhead on local 4000 record
 
2481
         indexes).
 
2482
        """
 
2483
        prefix = key[:-1]
 
2484
        version_id = key[-1]
 
2485
        # last-element only for compatibilty with the C load_data.
 
2486
        parents = tuple(parent[-1] for parent in parent_keys)
 
2487
        for parent in parent_keys:
 
2488
            if parent[:-1] != prefix:
 
2489
                raise ValueError("mismatched prefixes for %r, %r" % (
 
2490
                    key, parent_keys))
 
2491
        cache, history = self._kndx_cache[prefix]
 
2492
        # only want the _history index to reference the 1st index entry
 
2493
        # for version_id
 
2494
        if version_id not in cache:
 
2495
            index = len(history)
 
2496
            history.append(version_id)
 
2497
        else:
 
2498
            index = cache[version_id][5]
 
2499
        cache[version_id] = (version_id,
 
2500
                                   options,
 
2501
                                   pos,
 
2502
                                   size,
 
2503
                                   parents,
 
2504
                                   index)
 
2505
 
 
2506
    def check_header(self, fp):
 
2507
        line = fp.readline()
 
2508
        if line == '':
 
2509
            # An empty file can actually be treated as though the file doesn't
 
2510
            # exist yet.
 
2511
            raise errors.NoSuchFile(self)
 
2512
        if line != self.HEADER:
 
2513
            raise KnitHeaderError(badline=line, filename=self)
 
2514
 
 
2515
    def _check_read(self):
 
2516
        if not self._is_locked():
 
2517
            raise errors.ObjectNotLocked(self)
 
2518
        if self._get_scope() != self._scope:
 
2519
            self._reset_cache()
 
2520
 
 
2521
    def _check_write_ok(self):
 
2522
        """Assert if not writes are permitted."""
 
2523
        if not self._is_locked():
 
2524
            raise errors.ObjectNotLocked(self)
 
2525
        if self._get_scope() != self._scope:
 
2526
            self._reset_cache()
 
2527
        if self._mode != 'w':
 
2528
            raise errors.ReadOnlyObjectDirtiedError(self)
 
2529
 
 
2530
    def get_build_details(self, keys):
 
2531
        """Get the method, index_memo and compression parent for keys.
 
2532
 
 
2533
        Ghosts are omitted from the result.
 
2534
 
 
2535
        :param keys: An iterable of keys.
 
2536
        :return: A dict of key:(index_memo, compression_parent, parents,
 
2537
            record_details).
 
2538
            index_memo
 
2539
                opaque structure to pass to read_records to extract the raw
 
2540
                data
 
2541
            compression_parent
 
2542
                Content that this record is built upon, may be None
 
2543
            parents
 
2544
                Logical parents of this node
 
2545
            record_details
 
2546
                extra information about the content which needs to be passed to
 
2547
                Factory.parse_record
 
2548
        """
 
2549
        parent_map = self.get_parent_map(keys)
 
2550
        result = {}
 
2551
        for key in keys:
 
2552
            if key not in parent_map:
 
2553
                continue # Ghost
 
2554
            method = self.get_method(key)
 
2555
            parents = parent_map[key]
 
2556
            if method == 'fulltext':
 
2557
                compression_parent = None
 
2558
            else:
 
2559
                compression_parent = parents[0]
 
2560
            noeol = 'no-eol' in self.get_options(key)
 
2561
            index_memo = self.get_position(key)
 
2562
            result[key] = (index_memo, compression_parent,
 
2563
                                  parents, (method, noeol))
 
2564
        return result
 
2565
 
 
2566
    def get_method(self, key):
 
2567
        """Return compression method of specified key."""
 
2568
        options = self.get_options(key)
 
2569
        if 'fulltext' in options:
 
2570
            return 'fulltext'
 
2571
        elif 'line-delta' in options:
 
2572
            return 'line-delta'
 
2573
        else:
 
2574
            raise errors.KnitIndexUnknownMethod(self, options)
 
2575
 
 
2576
    def get_options(self, key):
 
2577
        """Return a list representing options.
 
2578
 
 
2579
        e.g. ['foo', 'bar']
 
2580
        """
 
2581
        prefix, suffix = self._split_key(key)
 
2582
        self._load_prefixes([prefix])
 
2583
        try:
 
2584
            return self._kndx_cache[prefix][0][suffix][1]
 
2585
        except KeyError:
 
2586
            raise RevisionNotPresent(key, self)
 
2587
 
 
2588
    def find_ancestry(self, keys):
 
2589
        """See CombinedGraphIndex.find_ancestry()"""
 
2590
        prefixes = set(key[:-1] for key in keys)
 
2591
        self._load_prefixes(prefixes)
 
2592
        result = {}
 
2593
        parent_map = {}
 
2594
        missing_keys = set()
 
2595
        pending_keys = list(keys)
 
2596
        # This assumes that keys will not reference parents in a different
 
2597
        # prefix, which is accurate so far.
 
2598
        while pending_keys:
 
2599
            key = pending_keys.pop()
 
2600
            if key in parent_map:
 
2601
                continue
 
2602
            prefix = key[:-1]
 
2603
            try:
 
2604
                suffix_parents = self._kndx_cache[prefix][0][key[-1]][4]
 
2605
            except KeyError:
 
2606
                missing_keys.add(key)
 
2607
            else:
 
2608
                parent_keys = tuple([prefix + (suffix,)
 
2609
                                     for suffix in suffix_parents])
 
2610
                parent_map[key] = parent_keys
 
2611
                pending_keys.extend([p for p in parent_keys
 
2612
                                        if p not in parent_map])
 
2613
        return parent_map, missing_keys
 
2614
 
 
2615
    def get_parent_map(self, keys):
 
2616
        """Get a map of the parents of keys.
 
2617
 
 
2618
        :param keys: The keys to look up parents for.
 
2619
        :return: A mapping from keys to parents. Absent keys are absent from
 
2620
            the mapping.
 
2621
        """
 
2622
        # Parse what we need to up front, this potentially trades off I/O
 
2623
        # locality (.kndx and .knit in the same block group for the same file
 
2624
        # id) for less checking in inner loops.
 
2625
        prefixes = set(key[:-1] for key in keys)
 
2626
        self._load_prefixes(prefixes)
 
2627
        result = {}
 
2628
        for key in keys:
 
2629
            prefix = key[:-1]
 
2630
            try:
 
2631
                suffix_parents = self._kndx_cache[prefix][0][key[-1]][4]
 
2632
            except KeyError:
 
2633
                pass
 
2634
            else:
 
2635
                result[key] = tuple(prefix + (suffix,) for
 
2636
                    suffix in suffix_parents)
 
2637
        return result
 
2638
 
 
2639
    def get_position(self, key):
 
2640
        """Return details needed to access the version.
 
2641
 
 
2642
        :return: a tuple (key, data position, size) to hand to the access
 
2643
            logic to get the record.
 
2644
        """
 
2645
        prefix, suffix = self._split_key(key)
 
2646
        self._load_prefixes([prefix])
 
2647
        entry = self._kndx_cache[prefix][0][suffix]
 
2648
        return key, entry[2], entry[3]
 
2649
 
 
2650
    has_key = _mod_index._has_key_from_parent_map
 
2651
 
 
2652
    def _init_index(self, path, extra_lines=[]):
 
2653
        """Initialize an index."""
 
2654
        sio = StringIO()
 
2655
        sio.write(self.HEADER)
 
2656
        sio.writelines(extra_lines)
 
2657
        sio.seek(0)
 
2658
        self._transport.put_file_non_atomic(path, sio,
 
2659
                            create_parent_dir=True)
 
2660
                           # self._create_parent_dir)
 
2661
                           # mode=self._file_mode,
 
2662
                           # dir_mode=self._dir_mode)
 
2663
 
 
2664
    def keys(self):
 
2665
        """Get all the keys in the collection.
 
2666
 
 
2667
        The keys are not ordered.
 
2668
        """
 
2669
        result = set()
 
2670
        # Identify all key prefixes.
 
2671
        # XXX: A bit hacky, needs polish.
 
2672
        if type(self._mapper) is ConstantMapper:
 
2673
            prefixes = [()]
 
2674
        else:
 
2675
            relpaths = set()
 
2676
            for quoted_relpath in self._transport.iter_files_recursive():
 
2677
                path, ext = os.path.splitext(quoted_relpath)
 
2678
                relpaths.add(path)
 
2679
            prefixes = [self._mapper.unmap(path) for path in relpaths]
 
2680
        self._load_prefixes(prefixes)
 
2681
        for prefix in prefixes:
 
2682
            for suffix in self._kndx_cache[prefix][1]:
 
2683
                result.add(prefix + (suffix,))
 
2684
        return result
 
2685
 
 
2686
    def _load_prefixes(self, prefixes):
 
2687
        """Load the indices for prefixes."""
 
2688
        self._check_read()
 
2689
        for prefix in prefixes:
 
2690
            if prefix not in self._kndx_cache:
 
2691
                # the load_data interface writes to these variables.
 
2692
                self._cache = {}
 
2693
                self._history = []
 
2694
                self._filename = prefix
 
2695
                try:
 
2696
                    path = self._mapper.map(prefix) + '.kndx'
 
2697
                    fp = self._transport.get(path)
 
2698
                    try:
 
2699
                        # _load_data may raise NoSuchFile if the target knit is
 
2700
                        # completely empty.
 
2701
                        _load_data(self, fp)
 
2702
                    finally:
 
2703
                        fp.close()
 
2704
                    self._kndx_cache[prefix] = (self._cache, self._history)
 
2705
                    del self._cache
 
2706
                    del self._filename
 
2707
                    del self._history
 
2708
                except NoSuchFile:
 
2709
                    self._kndx_cache[prefix] = ({}, [])
 
2710
                    if type(self._mapper) is ConstantMapper:
 
2711
                        # preserve behaviour for revisions.kndx etc.
 
2712
                        self._init_index(path)
 
2713
                    del self._cache
 
2714
                    del self._filename
 
2715
                    del self._history
 
2716
 
 
2717
    missing_keys = _mod_index._missing_keys_from_parent_map
 
2718
 
 
2719
    def _partition_keys(self, keys):
 
2720
        """Turn keys into a dict of prefix:suffix_list."""
 
2721
        result = {}
 
2722
        for key in keys:
 
2723
            prefix_keys = result.setdefault(key[:-1], [])
 
2724
            prefix_keys.append(key[-1])
 
2725
        return result
 
2726
 
 
2727
    def _dictionary_compress(self, keys):
 
2728
        """Dictionary compress keys.
 
2729
 
 
2730
        :param keys: The keys to generate references to.
 
2731
        :return: A string representation of keys. keys which are present are
 
2732
            dictionary compressed, and others are emitted as fulltext with a
 
2733
            '.' prefix.
 
2734
        """
 
2735
        if not keys:
 
2736
            return ''
 
2737
        result_list = []
 
2738
        prefix = keys[0][:-1]
 
2739
        cache = self._kndx_cache[prefix][0]
 
2740
        for key in keys:
 
2741
            if key[:-1] != prefix:
 
2742
                # kndx indices cannot refer across partitioned storage.
 
2743
                raise ValueError("mismatched prefixes for %r" % keys)
 
2744
            if key[-1] in cache:
 
2745
                # -- inlined lookup() --
 
2746
                result_list.append(str(cache[key[-1]][5]))
 
2747
                # -- end lookup () --
 
2748
            else:
 
2749
                result_list.append('.' + key[-1])
 
2750
        return ' '.join(result_list)
 
2751
 
 
2752
    def _reset_cache(self):
 
2753
        # Possibly this should be a LRU cache. A dictionary from key_prefix to
 
2754
        # (cache_dict, history_vector) for parsed kndx files.
 
2755
        self._kndx_cache = {}
 
2756
        self._scope = self._get_scope()
 
2757
        allow_writes = self._allow_writes()
 
2758
        if allow_writes:
 
2759
            self._mode = 'w'
 
2760
        else:
 
2761
            self._mode = 'r'
 
2762
 
 
2763
    def _sort_keys_by_io(self, keys, positions):
 
2764
        """Figure out an optimal order to read the records for the given keys.
 
2765
 
 
2766
        Sort keys, grouped by index and sorted by position.
 
2767
 
 
2768
        :param keys: A list of keys whose records we want to read. This will be
 
2769
            sorted 'in-place'.
 
2770
        :param positions: A dict, such as the one returned by
 
2771
            _get_components_positions()
 
2772
        :return: None
 
2773
        """
 
2774
        def get_sort_key(key):
 
2775
            index_memo = positions[key][1]
 
2776
            # Group by prefix and position. index_memo[0] is the key, so it is
 
2777
            # (file_id, revision_id) and we don't want to sort on revision_id,
 
2778
            # index_memo[1] is the position, and index_memo[2] is the size,
 
2779
            # which doesn't matter for the sort
 
2780
            return index_memo[0][:-1], index_memo[1]
 
2781
        return keys.sort(key=get_sort_key)
 
2782
 
 
2783
    _get_total_build_size = _get_total_build_size
 
2784
 
 
2785
    def _split_key(self, key):
 
2786
        """Split key into a prefix and suffix."""
 
2787
        return key[:-1], key[-1]
 
2788
 
 
2789
 
 
2790
class _KeyRefs(object):
 
2791
 
 
2792
    def __init__(self, track_new_keys=False):
 
2793
        # dict mapping 'key' to 'set of keys referring to that key'
 
2794
        self.refs = {}
 
2795
        if track_new_keys:
 
2796
            # set remembering all new keys
 
2797
            self.new_keys = set()
 
2798
        else:
 
2799
            self.new_keys = None
 
2800
 
 
2801
    def clear(self):
 
2802
        if self.refs:
 
2803
            self.refs.clear()
 
2804
        if self.new_keys:
 
2805
            self.new_keys.clear()
 
2806
 
 
2807
    def add_references(self, key, refs):
 
2808
        # Record the new references
 
2809
        for referenced in refs:
 
2810
            try:
 
2811
                needed_by = self.refs[referenced]
 
2812
            except KeyError:
 
2813
                needed_by = self.refs[referenced] = set()
 
2814
            needed_by.add(key)
 
2815
        # Discard references satisfied by the new key
 
2816
        self.add_key(key)
 
2817
 
 
2818
    def get_new_keys(self):
 
2819
        return self.new_keys
 
2820
    
 
2821
    def get_unsatisfied_refs(self):
 
2822
        return self.refs.iterkeys()
 
2823
 
 
2824
    def _satisfy_refs_for_key(self, key):
 
2825
        try:
 
2826
            del self.refs[key]
 
2827
        except KeyError:
 
2828
            # No keys depended on this key.  That's ok.
 
2829
            pass
 
2830
 
 
2831
    def add_key(self, key):
 
2832
        # satisfy refs for key, and remember that we've seen this key.
 
2833
        self._satisfy_refs_for_key(key)
 
2834
        if self.new_keys is not None:
 
2835
            self.new_keys.add(key)
 
2836
 
 
2837
    def satisfy_refs_for_keys(self, keys):
 
2838
        for key in keys:
 
2839
            self._satisfy_refs_for_key(key)
 
2840
 
 
2841
    def get_referrers(self):
 
2842
        result = set()
 
2843
        for referrers in self.refs.itervalues():
 
2844
            result.update(referrers)
 
2845
        return result
 
2846
 
 
2847
 
 
2848
class _KnitGraphIndex(object):
 
2849
    """A KnitVersionedFiles index layered on GraphIndex."""
 
2850
 
 
2851
    def __init__(self, graph_index, is_locked, deltas=False, parents=True,
 
2852
        add_callback=None, track_external_parent_refs=False):
 
2853
        """Construct a KnitGraphIndex on a graph_index.
 
2854
 
 
2855
        :param graph_index: An implementation of bzrlib.index.GraphIndex.
 
2856
        :param is_locked: A callback to check whether the object should answer
 
2857
            queries.
 
2858
        :param deltas: Allow delta-compressed records.
 
2859
        :param parents: If True, record knits parents, if not do not record
 
2860
            parents.
 
2861
        :param add_callback: If not None, allow additions to the index and call
 
2862
            this callback with a list of added GraphIndex nodes:
 
2863
            [(node, value, node_refs), ...]
 
2864
        :param is_locked: A callback, returns True if the index is locked and
 
2865
            thus usable.
 
2866
        :param track_external_parent_refs: If True, record all external parent
 
2867
            references parents from added records.  These can be retrieved
 
2868
            later by calling get_missing_parents().
 
2869
        """
 
2870
        self._add_callback = add_callback
 
2871
        self._graph_index = graph_index
 
2872
        self._deltas = deltas
 
2873
        self._parents = parents
 
2874
        if deltas and not parents:
 
2875
            # XXX: TODO: Delta tree and parent graph should be conceptually
 
2876
            # separate.
 
2877
            raise KnitCorrupt(self, "Cannot do delta compression without "
 
2878
                "parent tracking.")
 
2879
        self.has_graph = parents
 
2880
        self._is_locked = is_locked
 
2881
        self._missing_compression_parents = set()
 
2882
        if track_external_parent_refs:
 
2883
            self._key_dependencies = _KeyRefs()
 
2884
        else:
 
2885
            self._key_dependencies = None
 
2886
 
 
2887
    def __repr__(self):
 
2888
        return "%s(%r)" % (self.__class__.__name__, self._graph_index)
 
2889
 
 
2890
    def add_records(self, records, random_id=False,
 
2891
        missing_compression_parents=False):
 
2892
        """Add multiple records to the index.
 
2893
 
 
2894
        This function does not insert data into the Immutable GraphIndex
 
2895
        backing the KnitGraphIndex, instead it prepares data for insertion by
 
2896
        the caller and checks that it is safe to insert then calls
 
2897
        self._add_callback with the prepared GraphIndex nodes.
 
2898
 
 
2899
        :param records: a list of tuples:
 
2900
                         (key, options, access_memo, parents).
 
2901
        :param random_id: If True the ids being added were randomly generated
 
2902
            and no check for existence will be performed.
 
2903
        :param missing_compression_parents: If True the records being added are
 
2904
            only compressed against texts already in the index (or inside
 
2905
            records). If False the records all refer to unavailable texts (or
 
2906
            texts inside records) as compression parents.
 
2907
        """
 
2908
        if not self._add_callback:
 
2909
            raise errors.ReadOnlyError(self)
 
2910
        # we hope there are no repositories with inconsistent parentage
 
2911
        # anymore.
 
2912
 
 
2913
        keys = {}
 
2914
        compression_parents = set()
 
2915
        key_dependencies = self._key_dependencies
 
2916
        for (key, options, access_memo, parents) in records:
 
2917
            if self._parents:
 
2918
                parents = tuple(parents)
 
2919
                if key_dependencies is not None:
 
2920
                    key_dependencies.add_references(key, parents)
 
2921
            index, pos, size = access_memo
 
2922
            if 'no-eol' in options:
 
2923
                value = 'N'
 
2924
            else:
 
2925
                value = ' '
 
2926
            value += "%d %d" % (pos, size)
 
2927
            if not self._deltas:
 
2928
                if 'line-delta' in options:
 
2929
                    raise KnitCorrupt(self, "attempt to add line-delta in non-delta knit")
 
2930
            if self._parents:
 
2931
                if self._deltas:
 
2932
                    if 'line-delta' in options:
 
2933
                        node_refs = (parents, (parents[0],))
 
2934
                        if missing_compression_parents:
 
2935
                            compression_parents.add(parents[0])
 
2936
                    else:
 
2937
                        node_refs = (parents, ())
 
2938
                else:
 
2939
                    node_refs = (parents, )
 
2940
            else:
 
2941
                if parents:
 
2942
                    raise KnitCorrupt(self, "attempt to add node with parents "
 
2943
                        "in parentless index.")
 
2944
                node_refs = ()
 
2945
            keys[key] = (value, node_refs)
 
2946
        # check for dups
 
2947
        if not random_id:
 
2948
            present_nodes = self._get_entries(keys)
 
2949
            for (index, key, value, node_refs) in present_nodes:
 
2950
                parents = node_refs[:1]
 
2951
                # Sometimes these are passed as a list rather than a tuple
 
2952
                passed = static_tuple.as_tuples(keys[key])
 
2953
                passed_parents = passed[1][:1]
 
2954
                if (value[0] != keys[key][0][0] or
 
2955
                    parents != passed_parents):
 
2956
                    node_refs = static_tuple.as_tuples(node_refs)
 
2957
                    raise KnitCorrupt(self, "inconsistent details in add_records"
 
2958
                        ": %s %s" % ((value, node_refs), passed))
 
2959
                del keys[key]
 
2960
        result = []
 
2961
        if self._parents:
 
2962
            for key, (value, node_refs) in keys.iteritems():
 
2963
                result.append((key, value, node_refs))
 
2964
        else:
 
2965
            for key, (value, node_refs) in keys.iteritems():
 
2966
                result.append((key, value))
 
2967
        self._add_callback(result)
 
2968
        if missing_compression_parents:
 
2969
            # This may appear to be incorrect (it does not check for
 
2970
            # compression parents that are in the existing graph index),
 
2971
            # but such records won't have been buffered, so this is
 
2972
            # actually correct: every entry when
 
2973
            # missing_compression_parents==True either has a missing parent, or
 
2974
            # a parent that is one of the keys in records.
 
2975
            compression_parents.difference_update(keys)
 
2976
            self._missing_compression_parents.update(compression_parents)
 
2977
        # Adding records may have satisfied missing compression parents.
 
2978
        self._missing_compression_parents.difference_update(keys)
 
2979
 
 
2980
    def scan_unvalidated_index(self, graph_index):
 
2981
        """Inform this _KnitGraphIndex that there is an unvalidated index.
 
2982
 
 
2983
        This allows this _KnitGraphIndex to keep track of any missing
 
2984
        compression parents we may want to have filled in to make those
 
2985
        indices valid.
 
2986
 
 
2987
        :param graph_index: A GraphIndex
 
2988
        """
 
2989
        if self._deltas:
 
2990
            new_missing = graph_index.external_references(ref_list_num=1)
 
2991
            new_missing.difference_update(self.get_parent_map(new_missing))
 
2992
            self._missing_compression_parents.update(new_missing)
 
2993
        if self._key_dependencies is not None:
 
2994
            # Add parent refs from graph_index (and discard parent refs that
 
2995
            # the graph_index has).
 
2996
            for node in graph_index.iter_all_entries():
 
2997
                self._key_dependencies.add_references(node[1], node[3][0])
 
2998
 
 
2999
    def get_missing_compression_parents(self):
 
3000
        """Return the keys of missing compression parents.
 
3001
 
 
3002
        Missing compression parents occur when a record stream was missing
 
3003
        basis texts, or a index was scanned that had missing basis texts.
 
3004
        """
 
3005
        return frozenset(self._missing_compression_parents)
 
3006
 
 
3007
    def get_missing_parents(self):
 
3008
        """Return the keys of missing parents."""
 
3009
        # If updating this, you should also update
 
3010
        # groupcompress._GCGraphIndex.get_missing_parents
 
3011
        # We may have false positives, so filter those out.
 
3012
        self._key_dependencies.satisfy_refs_for_keys(
 
3013
            self.get_parent_map(self._key_dependencies.get_unsatisfied_refs()))
 
3014
        return frozenset(self._key_dependencies.get_unsatisfied_refs())
 
3015
 
 
3016
    def _check_read(self):
 
3017
        """raise if reads are not permitted."""
 
3018
        if not self._is_locked():
 
3019
            raise errors.ObjectNotLocked(self)
 
3020
 
 
3021
    def _check_write_ok(self):
 
3022
        """Assert if writes are not permitted."""
 
3023
        if not self._is_locked():
 
3024
            raise errors.ObjectNotLocked(self)
 
3025
 
 
3026
    def _compression_parent(self, an_entry):
 
3027
        # return the key that an_entry is compressed against, or None
 
3028
        # Grab the second parent list (as deltas implies parents currently)
 
3029
        compression_parents = an_entry[3][1]
 
3030
        if not compression_parents:
 
3031
            return None
 
3032
        if len(compression_parents) != 1:
 
3033
            raise AssertionError(
 
3034
                "Too many compression parents: %r" % compression_parents)
 
3035
        return compression_parents[0]
 
3036
 
 
3037
    def get_build_details(self, keys):
 
3038
        """Get the method, index_memo and compression parent for version_ids.
 
3039
 
 
3040
        Ghosts are omitted from the result.
 
3041
 
 
3042
        :param keys: An iterable of keys.
 
3043
        :return: A dict of key:
 
3044
            (index_memo, compression_parent, parents, record_details).
 
3045
            index_memo
 
3046
                opaque structure to pass to read_records to extract the raw
 
3047
                data
 
3048
            compression_parent
 
3049
                Content that this record is built upon, may be None
 
3050
            parents
 
3051
                Logical parents of this node
 
3052
            record_details
 
3053
                extra information about the content which needs to be passed to
 
3054
                Factory.parse_record
 
3055
        """
 
3056
        self._check_read()
 
3057
        result = {}
 
3058
        entries = self._get_entries(keys, False)
 
3059
        for entry in entries:
 
3060
            key = entry[1]
 
3061
            if not self._parents:
 
3062
                parents = ()
 
3063
            else:
 
3064
                parents = entry[3][0]
 
3065
            if not self._deltas:
 
3066
                compression_parent_key = None
 
3067
            else:
 
3068
                compression_parent_key = self._compression_parent(entry)
 
3069
            noeol = (entry[2][0] == 'N')
 
3070
            if compression_parent_key:
 
3071
                method = 'line-delta'
 
3072
            else:
 
3073
                method = 'fulltext'
 
3074
            result[key] = (self._node_to_position(entry),
 
3075
                                  compression_parent_key, parents,
 
3076
                                  (method, noeol))
 
3077
        return result
 
3078
 
 
3079
    def _get_entries(self, keys, check_present=False):
 
3080
        """Get the entries for keys.
 
3081
 
 
3082
        :param keys: An iterable of index key tuples.
 
3083
        """
 
3084
        keys = set(keys)
 
3085
        found_keys = set()
 
3086
        if self._parents:
 
3087
            for node in self._graph_index.iter_entries(keys):
 
3088
                yield node
 
3089
                found_keys.add(node[1])
 
3090
        else:
 
3091
            # adapt parentless index to the rest of the code.
 
3092
            for node in self._graph_index.iter_entries(keys):
 
3093
                yield node[0], node[1], node[2], ()
 
3094
                found_keys.add(node[1])
 
3095
        if check_present:
 
3096
            missing_keys = keys.difference(found_keys)
 
3097
            if missing_keys:
 
3098
                raise RevisionNotPresent(missing_keys.pop(), self)
 
3099
 
 
3100
    def get_method(self, key):
 
3101
        """Return compression method of specified key."""
 
3102
        return self._get_method(self._get_node(key))
 
3103
 
 
3104
    def _get_method(self, node):
 
3105
        if not self._deltas:
 
3106
            return 'fulltext'
 
3107
        if self._compression_parent(node):
 
3108
            return 'line-delta'
 
3109
        else:
 
3110
            return 'fulltext'
 
3111
 
 
3112
    def _get_node(self, key):
 
3113
        try:
 
3114
            return list(self._get_entries([key]))[0]
 
3115
        except IndexError:
 
3116
            raise RevisionNotPresent(key, self)
 
3117
 
 
3118
    def get_options(self, key):
 
3119
        """Return a list representing options.
 
3120
 
 
3121
        e.g. ['foo', 'bar']
 
3122
        """
 
3123
        node = self._get_node(key)
 
3124
        options = [self._get_method(node)]
 
3125
        if node[2][0] == 'N':
 
3126
            options.append('no-eol')
 
3127
        return options
 
3128
 
 
3129
    def find_ancestry(self, keys):
 
3130
        """See CombinedGraphIndex.find_ancestry()"""
 
3131
        return self._graph_index.find_ancestry(keys, 0)
 
3132
 
 
3133
    def get_parent_map(self, keys):
 
3134
        """Get a map of the parents of keys.
 
3135
 
 
3136
        :param keys: The keys to look up parents for.
 
3137
        :return: A mapping from keys to parents. Absent keys are absent from
 
3138
            the mapping.
 
3139
        """
 
3140
        self._check_read()
 
3141
        nodes = self._get_entries(keys)
 
3142
        result = {}
 
3143
        if self._parents:
 
3144
            for node in nodes:
 
3145
                result[node[1]] = node[3][0]
 
3146
        else:
 
3147
            for node in nodes:
 
3148
                result[node[1]] = None
 
3149
        return result
 
3150
 
 
3151
    def get_position(self, key):
 
3152
        """Return details needed to access the version.
 
3153
 
 
3154
        :return: a tuple (index, data position, size) to hand to the access
 
3155
            logic to get the record.
 
3156
        """
 
3157
        node = self._get_node(key)
 
3158
        return self._node_to_position(node)
 
3159
 
 
3160
    has_key = _mod_index._has_key_from_parent_map
 
3161
 
 
3162
    def keys(self):
 
3163
        """Get all the keys in the collection.
 
3164
 
 
3165
        The keys are not ordered.
 
3166
        """
 
3167
        self._check_read()
 
3168
        return [node[1] for node in self._graph_index.iter_all_entries()]
 
3169
 
 
3170
    missing_keys = _mod_index._missing_keys_from_parent_map
 
3171
 
 
3172
    def _node_to_position(self, node):
 
3173
        """Convert an index value to position details."""
 
3174
        bits = node[2][1:].split(' ')
 
3175
        return node[0], int(bits[0]), int(bits[1])
 
3176
 
 
3177
    def _sort_keys_by_io(self, keys, positions):
 
3178
        """Figure out an optimal order to read the records for the given keys.
 
3179
 
 
3180
        Sort keys, grouped by index and sorted by position.
 
3181
 
 
3182
        :param keys: A list of keys whose records we want to read. This will be
 
3183
            sorted 'in-place'.
 
3184
        :param positions: A dict, such as the one returned by
 
3185
            _get_components_positions()
 
3186
        :return: None
 
3187
        """
 
3188
        def get_index_memo(key):
 
3189
            # index_memo is at offset [1]. It is made up of (GraphIndex,
 
3190
            # position, size). GI is an object, which will be unique for each
 
3191
            # pack file. This causes us to group by pack file, then sort by
 
3192
            # position. Size doesn't matter, but it isn't worth breaking up the
 
3193
            # tuple.
 
3194
            return positions[key][1]
 
3195
        return keys.sort(key=get_index_memo)
 
3196
 
 
3197
    _get_total_build_size = _get_total_build_size
 
3198
 
 
3199
 
 
3200
class _KnitKeyAccess(object):
 
3201
    """Access to records in .knit files."""
 
3202
 
 
3203
    def __init__(self, transport, mapper):
 
3204
        """Create a _KnitKeyAccess with transport and mapper.
 
3205
 
 
3206
        :param transport: The transport the access object is rooted at.
 
3207
        :param mapper: The mapper used to map keys to .knit files.
 
3208
        """
 
3209
        self._transport = transport
 
3210
        self._mapper = mapper
 
3211
 
 
3212
    def add_raw_records(self, key_sizes, raw_data):
 
3213
        """Add raw knit bytes to a storage area.
 
3214
 
 
3215
        The data is spooled to the container writer in one bytes-record per
 
3216
        raw data item.
 
3217
 
 
3218
        :param sizes: An iterable of tuples containing the key and size of each
 
3219
            raw data segment.
 
3220
        :param raw_data: A bytestring containing the data.
 
3221
        :return: A list of memos to retrieve the record later. Each memo is an
 
3222
            opaque index memo. For _KnitKeyAccess the memo is (key, pos,
 
3223
            length), where the key is the record key.
 
3224
        """
 
3225
        if type(raw_data) is not str:
 
3226
            raise AssertionError(
 
3227
                'data must be plain bytes was %s' % type(raw_data))
 
3228
        result = []
 
3229
        offset = 0
 
3230
        # TODO: This can be tuned for writing to sftp and other servers where
 
3231
        # append() is relatively expensive by grouping the writes to each key
 
3232
        # prefix.
 
3233
        for key, size in key_sizes:
 
3234
            path = self._mapper.map(key)
 
3235
            try:
 
3236
                base = self._transport.append_bytes(path + '.knit',
 
3237
                    raw_data[offset:offset+size])
 
3238
            except errors.NoSuchFile:
 
3239
                self._transport.mkdir(osutils.dirname(path))
 
3240
                base = self._transport.append_bytes(path + '.knit',
 
3241
                    raw_data[offset:offset+size])
 
3242
            # if base == 0:
 
3243
            # chmod.
 
3244
            offset += size
 
3245
            result.append((key, base, size))
 
3246
        return result
 
3247
 
 
3248
    def flush(self):
 
3249
        """Flush pending writes on this access object.
 
3250
        
 
3251
        For .knit files this is a no-op.
 
3252
        """
 
3253
        pass
 
3254
 
 
3255
    def get_raw_records(self, memos_for_retrieval):
 
3256
        """Get the raw bytes for a records.
 
3257
 
 
3258
        :param memos_for_retrieval: An iterable containing the access memo for
 
3259
            retrieving the bytes.
 
3260
        :return: An iterator over the bytes of the records.
 
3261
        """
 
3262
        # first pass, group into same-index request to minimise readv's issued.
 
3263
        request_lists = []
 
3264
        current_prefix = None
 
3265
        for (key, offset, length) in memos_for_retrieval:
 
3266
            if current_prefix == key[:-1]:
 
3267
                current_list.append((offset, length))
 
3268
            else:
 
3269
                if current_prefix is not None:
 
3270
                    request_lists.append((current_prefix, current_list))
 
3271
                current_prefix = key[:-1]
 
3272
                current_list = [(offset, length)]
 
3273
        # handle the last entry
 
3274
        if current_prefix is not None:
 
3275
            request_lists.append((current_prefix, current_list))
 
3276
        for prefix, read_vector in request_lists:
 
3277
            path = self._mapper.map(prefix) + '.knit'
 
3278
            for pos, data in self._transport.readv(path, read_vector):
 
3279
                yield data
 
3280
 
 
3281
 
 
3282
class _DirectPackAccess(object):
 
3283
    """Access to data in one or more packs with less translation."""
 
3284
 
 
3285
    def __init__(self, index_to_packs, reload_func=None, flush_func=None):
 
3286
        """Create a _DirectPackAccess object.
 
3287
 
 
3288
        :param index_to_packs: A dict mapping index objects to the transport
 
3289
            and file names for obtaining data.
 
3290
        :param reload_func: A function to call if we determine that the pack
 
3291
            files have moved and we need to reload our caches. See
 
3292
            bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
 
3293
        """
 
3294
        self._container_writer = None
 
3295
        self._write_index = None
 
3296
        self._indices = index_to_packs
 
3297
        self._reload_func = reload_func
 
3298
        self._flush_func = flush_func
 
3299
 
 
3300
    def add_raw_records(self, key_sizes, raw_data):
 
3301
        """Add raw knit bytes to a storage area.
 
3302
 
 
3303
        The data is spooled to the container writer in one bytes-record per
 
3304
        raw data item.
 
3305
 
 
3306
        :param sizes: An iterable of tuples containing the key and size of each
 
3307
            raw data segment.
 
3308
        :param raw_data: A bytestring containing the data.
 
3309
        :return: A list of memos to retrieve the record later. Each memo is an
 
3310
            opaque index memo. For _DirectPackAccess the memo is (index, pos,
 
3311
            length), where the index field is the write_index object supplied
 
3312
            to the PackAccess object.
 
3313
        """
 
3314
        if type(raw_data) is not str:
 
3315
            raise AssertionError(
 
3316
                'data must be plain bytes was %s' % type(raw_data))
 
3317
        result = []
 
3318
        offset = 0
 
3319
        for key, size in key_sizes:
 
3320
            p_offset, p_length = self._container_writer.add_bytes_record(
 
3321
                raw_data[offset:offset+size], [])
 
3322
            offset += size
 
3323
            result.append((self._write_index, p_offset, p_length))
 
3324
        return result
 
3325
 
 
3326
    def flush(self):
 
3327
        """Flush pending writes on this access object.
 
3328
 
 
3329
        This will flush any buffered writes to a NewPack.
 
3330
        """
 
3331
        if self._flush_func is not None:
 
3332
            self._flush_func()
 
3333
            
 
3334
    def get_raw_records(self, memos_for_retrieval):
 
3335
        """Get the raw bytes for a records.
 
3336
 
 
3337
        :param memos_for_retrieval: An iterable containing the (index, pos,
 
3338
            length) memo for retrieving the bytes. The Pack access method
 
3339
            looks up the pack to use for a given record in its index_to_pack
 
3340
            map.
 
3341
        :return: An iterator over the bytes of the records.
 
3342
        """
 
3343
        # first pass, group into same-index requests
 
3344
        request_lists = []
 
3345
        current_index = None
 
3346
        for (index, offset, length) in memos_for_retrieval:
 
3347
            if current_index == index:
 
3348
                current_list.append((offset, length))
 
3349
            else:
 
3350
                if current_index is not None:
 
3351
                    request_lists.append((current_index, current_list))
 
3352
                current_index = index
 
3353
                current_list = [(offset, length)]
 
3354
        # handle the last entry
 
3355
        if current_index is not None:
 
3356
            request_lists.append((current_index, current_list))
 
3357
        for index, offsets in request_lists:
 
3358
            try:
 
3359
                transport, path = self._indices[index]
 
3360
            except KeyError:
 
3361
                # A KeyError here indicates that someone has triggered an index
 
3362
                # reload, and this index has gone missing, we need to start
 
3363
                # over.
 
3364
                if self._reload_func is None:
 
3365
                    # If we don't have a _reload_func there is nothing that can
 
3366
                    # be done
 
3367
                    raise
 
3368
                raise errors.RetryWithNewPacks(index,
 
3369
                                               reload_occurred=True,
 
3370
                                               exc_info=sys.exc_info())
 
3371
            try:
 
3372
                reader = pack.make_readv_reader(transport, path, offsets)
 
3373
                for names, read_func in reader.iter_records():
 
3374
                    yield read_func(None)
 
3375
            except errors.NoSuchFile:
 
3376
                # A NoSuchFile error indicates that a pack file has gone
 
3377
                # missing on disk, we need to trigger a reload, and start over.
 
3378
                if self._reload_func is None:
 
3379
                    raise
 
3380
                raise errors.RetryWithNewPacks(transport.abspath(path),
 
3381
                                               reload_occurred=False,
 
3382
                                               exc_info=sys.exc_info())
 
3383
 
 
3384
    def set_writer(self, writer, index, transport_packname):
 
3385
        """Set a writer to use for adding data."""
 
3386
        if index is not None:
 
3387
            self._indices[index] = transport_packname
 
3388
        self._container_writer = writer
 
3389
        self._write_index = index
 
3390
 
 
3391
    def reload_or_raise(self, retry_exc):
 
3392
        """Try calling the reload function, or re-raise the original exception.
 
3393
 
 
3394
        This should be called after _DirectPackAccess raises a
 
3395
        RetryWithNewPacks exception. This function will handle the common logic
 
3396
        of determining when the error is fatal versus being temporary.
 
3397
        It will also make sure that the original exception is raised, rather
 
3398
        than the RetryWithNewPacks exception.
 
3399
 
 
3400
        If this function returns, then the calling function should retry
 
3401
        whatever operation was being performed. Otherwise an exception will
 
3402
        be raised.
 
3403
 
 
3404
        :param retry_exc: A RetryWithNewPacks exception.
 
3405
        """
 
3406
        is_error = False
 
3407
        if self._reload_func is None:
 
3408
            is_error = True
 
3409
        elif not self._reload_func():
 
3410
            # The reload claimed that nothing changed
 
3411
            if not retry_exc.reload_occurred:
 
3412
                # If there wasn't an earlier reload, then we really were
 
3413
                # expecting to find changes. We didn't find them, so this is a
 
3414
                # hard error
 
3415
                is_error = True
 
3416
        if is_error:
 
3417
            exc_class, exc_value, exc_traceback = retry_exc.exc_info
 
3418
            raise exc_class, exc_value, exc_traceback
 
3419
 
 
3420
 
 
3421
def annotate_knit(knit, revision_id):
 
3422
    """Annotate a knit with no cached annotations.
 
3423
 
 
3424
    This implementation is for knits with no cached annotations.
 
3425
    It will work for knits with cached annotations, but this is not
 
3426
    recommended.
 
3427
    """
 
3428
    annotator = _KnitAnnotator(knit)
 
3429
    return iter(annotator.annotate_flat(revision_id))
 
3430
 
 
3431
 
 
3432
class _KnitAnnotator(annotate.Annotator):
 
3433
    """Build up the annotations for a text."""
 
3434
 
 
3435
    def __init__(self, vf):
 
3436
        annotate.Annotator.__init__(self, vf)
 
3437
 
 
3438
        # TODO: handle Nodes which cannot be extracted
 
3439
        # self._ghosts = set()
 
3440
 
 
3441
        # Map from (key, parent_key) => matching_blocks, should be 'use once'
 
3442
        self._matching_blocks = {}
 
3443
 
 
3444
        # KnitContent objects
 
3445
        self._content_objects = {}
 
3446
        # The number of children that depend on this fulltext content object
 
3447
        self._num_compression_children = {}
 
3448
        # Delta records that need their compression parent before they can be
 
3449
        # expanded
 
3450
        self._pending_deltas = {}
 
3451
        # Fulltext records that are waiting for their parents fulltexts before
 
3452
        # they can be yielded for annotation
 
3453
        self._pending_annotation = {}
 
3454
 
 
3455
        self._all_build_details = {}
 
3456
 
 
3457
    def _get_build_graph(self, key):
 
3458
        """Get the graphs for building texts and annotations.
 
3459
 
 
3460
        The data you need for creating a full text may be different than the
 
3461
        data you need to annotate that text. (At a minimum, you need both
 
3462
        parents to create an annotation, but only need 1 parent to generate the
 
3463
        fulltext.)
 
3464
 
 
3465
        :return: A list of (key, index_memo) records, suitable for
 
3466
            passing to read_records_iter to start reading in the raw data from
 
3467
            the pack file.
 
3468
        """
 
3469
        pending = set([key])
 
3470
        records = []
 
3471
        ann_keys = set()
 
3472
        self._num_needed_children[key] = 1
 
3473
        while pending:
 
3474
            # get all pending nodes
 
3475
            this_iteration = pending
 
3476
            build_details = self._vf._index.get_build_details(this_iteration)
 
3477
            self._all_build_details.update(build_details)
 
3478
            # new_nodes = self._vf._index._get_entries(this_iteration)
 
3479
            pending = set()
 
3480
            for key, details in build_details.iteritems():
 
3481
                (index_memo, compression_parent, parent_keys,
 
3482
                 record_details) = details
 
3483
                self._parent_map[key] = parent_keys
 
3484
                self._heads_provider = None
 
3485
                records.append((key, index_memo))
 
3486
                # Do we actually need to check _annotated_lines?
 
3487
                pending.update([p for p in parent_keys
 
3488
                                   if p not in self._all_build_details])
 
3489
                if parent_keys:
 
3490
                    for parent_key in parent_keys:
 
3491
                        if parent_key in self._num_needed_children:
 
3492
                            self._num_needed_children[parent_key] += 1
 
3493
                        else:
 
3494
                            self._num_needed_children[parent_key] = 1
 
3495
                if compression_parent:
 
3496
                    if compression_parent in self._num_compression_children:
 
3497
                        self._num_compression_children[compression_parent] += 1
 
3498
                    else:
 
3499
                        self._num_compression_children[compression_parent] = 1
 
3500
 
 
3501
            missing_versions = this_iteration.difference(build_details.keys())
 
3502
            if missing_versions:
 
3503
                for key in missing_versions:
 
3504
                    if key in self._parent_map and key in self._text_cache:
 
3505
                        # We already have this text ready, we just need to
 
3506
                        # yield it later so we get it annotated
 
3507
                        ann_keys.add(key)
 
3508
                        parent_keys = self._parent_map[key]
 
3509
                        for parent_key in parent_keys:
 
3510
                            if parent_key in self._num_needed_children:
 
3511
                                self._num_needed_children[parent_key] += 1
 
3512
                            else:
 
3513
                                self._num_needed_children[parent_key] = 1
 
3514
                        pending.update([p for p in parent_keys
 
3515
                                           if p not in self._all_build_details])
 
3516
                    else:
 
3517
                        raise errors.RevisionNotPresent(key, self._vf)
 
3518
        # Generally we will want to read the records in reverse order, because
 
3519
        # we find the parent nodes after the children
 
3520
        records.reverse()
 
3521
        return records, ann_keys
 
3522
 
 
3523
    def _get_needed_texts(self, key, pb=None):
 
3524
        # if True or len(self._vf._fallback_vfs) > 0:
 
3525
        if len(self._vf._fallback_vfs) > 0:
 
3526
            # If we have fallbacks, go to the generic path
 
3527
            for v in annotate.Annotator._get_needed_texts(self, key, pb=pb):
 
3528
                yield v
 
3529
            return
 
3530
        while True:
 
3531
            try:
 
3532
                records, ann_keys = self._get_build_graph(key)
 
3533
                for idx, (sub_key, text, num_lines) in enumerate(
 
3534
                                                self._extract_texts(records)):
 
3535
                    if pb is not None:
 
3536
                        pb.update('annotating', idx, len(records))
 
3537
                    yield sub_key, text, num_lines
 
3538
                for sub_key in ann_keys:
 
3539
                    text = self._text_cache[sub_key]
 
3540
                    num_lines = len(text) # bad assumption
 
3541
                    yield sub_key, text, num_lines
 
3542
                return
 
3543
            except errors.RetryWithNewPacks, e:
 
3544
                self._vf._access.reload_or_raise(e)
 
3545
                # The cached build_details are no longer valid
 
3546
                self._all_build_details.clear()
 
3547
 
 
3548
    def _cache_delta_blocks(self, key, compression_parent, delta, lines):
 
3549
        parent_lines = self._text_cache[compression_parent]
 
3550
        blocks = list(KnitContent.get_line_delta_blocks(delta, parent_lines, lines))
 
3551
        self._matching_blocks[(key, compression_parent)] = blocks
 
3552
 
 
3553
    def _expand_record(self, key, parent_keys, compression_parent, record,
 
3554
                       record_details):
 
3555
        delta = None
 
3556
        if compression_parent:
 
3557
            if compression_parent not in self._content_objects:
 
3558
                # Waiting for the parent
 
3559
                self._pending_deltas.setdefault(compression_parent, []).append(
 
3560
                    (key, parent_keys, record, record_details))
 
3561
                return None
 
3562
            # We have the basis parent, so expand the delta
 
3563
            num = self._num_compression_children[compression_parent]
 
3564
            num -= 1
 
3565
            if num == 0:
 
3566
                base_content = self._content_objects.pop(compression_parent)
 
3567
                self._num_compression_children.pop(compression_parent)
 
3568
            else:
 
3569
                self._num_compression_children[compression_parent] = num
 
3570
                base_content = self._content_objects[compression_parent]
 
3571
            # It is tempting to want to copy_base_content=False for the last
 
3572
            # child object. However, whenever noeol=False,
 
3573
            # self._text_cache[parent_key] is content._lines. So mutating it
 
3574
            # gives very bad results.
 
3575
            # The alternative is to copy the lines into text cache, but then we
 
3576
            # are copying anyway, so just do it here.
 
3577
            content, delta = self._vf._factory.parse_record(
 
3578
                key, record, record_details, base_content,
 
3579
                copy_base_content=True)
 
3580
        else:
 
3581
            # Fulltext record
 
3582
            content, _ = self._vf._factory.parse_record(
 
3583
                key, record, record_details, None)
 
3584
        if self._num_compression_children.get(key, 0) > 0:
 
3585
            self._content_objects[key] = content
 
3586
        lines = content.text()
 
3587
        self._text_cache[key] = lines
 
3588
        if delta is not None:
 
3589
            self._cache_delta_blocks(key, compression_parent, delta, lines)
 
3590
        return lines
 
3591
 
 
3592
    def _get_parent_annotations_and_matches(self, key, text, parent_key):
 
3593
        """Get the list of annotations for the parent, and the matching lines.
 
3594
 
 
3595
        :param text: The opaque value given by _get_needed_texts
 
3596
        :param parent_key: The key for the parent text
 
3597
        :return: (parent_annotations, matching_blocks)
 
3598
            parent_annotations is a list as long as the number of lines in
 
3599
                parent
 
3600
            matching_blocks is a list of (parent_idx, text_idx, len) tuples
 
3601
                indicating which lines match between the two texts
 
3602
        """
 
3603
        block_key = (key, parent_key)
 
3604
        if block_key in self._matching_blocks:
 
3605
            blocks = self._matching_blocks.pop(block_key)
 
3606
            parent_annotations = self._annotations_cache[parent_key]
 
3607
            return parent_annotations, blocks
 
3608
        return annotate.Annotator._get_parent_annotations_and_matches(self,
 
3609
            key, text, parent_key)
 
3610
 
 
3611
    def _process_pending(self, key):
 
3612
        """The content for 'key' was just processed.
 
3613
 
 
3614
        Determine if there is any more pending work to be processed.
 
3615
        """
 
3616
        to_return = []
 
3617
        if key in self._pending_deltas:
 
3618
            compression_parent = key
 
3619
            children = self._pending_deltas.pop(key)
 
3620
            for child_key, parent_keys, record, record_details in children:
 
3621
                lines = self._expand_record(child_key, parent_keys,
 
3622
                                            compression_parent,
 
3623
                                            record, record_details)
 
3624
                if self._check_ready_for_annotations(child_key, parent_keys):
 
3625
                    to_return.append(child_key)
 
3626
        # Also check any children that are waiting for this parent to be
 
3627
        # annotation ready
 
3628
        if key in self._pending_annotation:
 
3629
            children = self._pending_annotation.pop(key)
 
3630
            to_return.extend([c for c, p_keys in children
 
3631
                              if self._check_ready_for_annotations(c, p_keys)])
 
3632
        return to_return
 
3633
 
 
3634
    def _check_ready_for_annotations(self, key, parent_keys):
 
3635
        """return true if this text is ready to be yielded.
 
3636
 
 
3637
        Otherwise, this will return False, and queue the text into
 
3638
        self._pending_annotation
 
3639
        """
 
3640
        for parent_key in parent_keys:
 
3641
            if parent_key not in self._annotations_cache:
 
3642
                # still waiting on at least one parent text, so queue it up
 
3643
                # Note that if there are multiple parents, we need to wait
 
3644
                # for all of them.
 
3645
                self._pending_annotation.setdefault(parent_key,
 
3646
                    []).append((key, parent_keys))
 
3647
                return False
 
3648
        return True
 
3649
 
 
3650
    def _extract_texts(self, records):
 
3651
        """Extract the various texts needed based on records"""
 
3652
        # We iterate in the order read, rather than a strict order requested
 
3653
        # However, process what we can, and put off to the side things that
 
3654
        # still need parents, cleaning them up when those parents are
 
3655
        # processed.
 
3656
        # Basic data flow:
 
3657
        #   1) As 'records' are read, see if we can expand these records into
 
3658
        #      Content objects (and thus lines)
 
3659
        #   2) If a given line-delta is waiting on its compression parent, it
 
3660
        #      gets queued up into self._pending_deltas, otherwise we expand
 
3661
        #      it, and put it into self._text_cache and self._content_objects
 
3662
        #   3) If we expanded the text, we will then check to see if all
 
3663
        #      parents have also been processed. If so, this text gets yielded,
 
3664
        #      else this record gets set aside into pending_annotation
 
3665
        #   4) Further, if we expanded the text in (2), we will then check to
 
3666
        #      see if there are any children in self._pending_deltas waiting to
 
3667
        #      also be processed. If so, we go back to (2) for those
 
3668
        #   5) Further again, if we yielded the text, we can then check if that
 
3669
        #      'unlocks' any of the texts in pending_annotations, which should
 
3670
        #      then get yielded as well
 
3671
        # Note that both steps 4 and 5 are 'recursive' in that unlocking one
 
3672
        # compression child could unlock yet another, and yielding a fulltext
 
3673
        # will also 'unlock' the children that are waiting on that annotation.
 
3674
        # (Though also, unlocking 1 parent's fulltext, does not unlock a child
 
3675
        # if other parents are also waiting.)
 
3676
        # We want to yield content before expanding child content objects, so
 
3677
        # that we know when we can re-use the content lines, and the annotation
 
3678
        # code can know when it can stop caching fulltexts, as well.
 
3679
 
 
3680
        # Children that are missing their compression parent
 
3681
        pending_deltas = {}
 
3682
        for (key, record, digest) in self._vf._read_records_iter(records):
 
3683
            # ghosts?
 
3684
            details = self._all_build_details[key]
 
3685
            (_, compression_parent, parent_keys, record_details) = details
 
3686
            lines = self._expand_record(key, parent_keys, compression_parent,
 
3687
                                        record, record_details)
 
3688
            if lines is None:
 
3689
                # Pending delta should be queued up
 
3690
                continue
 
3691
            # At this point, we may be able to yield this content, if all
 
3692
            # parents are also finished
 
3693
            yield_this_text = self._check_ready_for_annotations(key,
 
3694
                                                                parent_keys)
 
3695
            if yield_this_text:
 
3696
                # All parents present
 
3697
                yield key, lines, len(lines)
 
3698
            to_process = self._process_pending(key)
 
3699
            while to_process:
 
3700
                this_process = to_process
 
3701
                to_process = []
 
3702
                for key in this_process:
 
3703
                    lines = self._text_cache[key]
 
3704
                    yield key, lines, len(lines)
 
3705
                    to_process.extend(self._process_pending(key))
 
3706
 
 
3707
try:
 
3708
    from bzrlib._knit_load_data_pyx import _load_data_c as _load_data
 
3709
except ImportError, e:
 
3710
    osutils.failed_to_load_extension(e)
 
3711
    from bzrlib._knit_load_data_py import _load_data_py as _load_data