~brz/brz/trunk

« back to all changes in this revision

Viewing changes to breezy/bzr/groupcompress.py

  • Committer: The Breezy Bot
  • Author(s): Jelmer Vernooij
  • Date: 2023-11-17 16:36:00 UTC
  • mfrom: (7912.1.3 gc-refactor)
  • Revision ID: the_breezy_bot-20231117163600-ox87pr2r4fwvkml8
Refactor GroupCompressor

by jelmer review by jelmer

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 
19
19
import time
20
20
import zlib
21
 
from typing import Type
22
21
 
23
22
from breezy import debug
24
23
from breezy.i18n import gettext
833
832
    return manager.get_record_stream()
834
833
 
835
834
 
836
 
class _CommonGroupCompressor:
 
835
class PyrexGroupCompressor:
 
836
    """Produce a serialised group of compressed texts.
 
837
 
 
838
    It contains code very similar to SequenceMatcher because of having a similar
 
839
    task. However some key differences apply:
 
840
 
 
841
    * there is no junk, we want a minimal edit not a human readable diff.
 
842
    * we don't filter very common lines (because we don't know where a good
 
843
      range will start, and after the first text we want to be emitting minmal
 
844
      edits only.
 
845
    * we chain the left side, not the right side
 
846
    * we incrementally update the adjacency matrix as new lines are provided.
 
847
    * we look for matches in all of the left side, so the routine which does
 
848
      the analagous task of find_longest_match does not need to filter on the
 
849
      left side.
 
850
    """
 
851
 
837
852
    chunks: list[bytes]
838
853
 
839
854
    def __init__(self, settings=None):
848
863
            self._settings = {}
849
864
        else:
850
865
            self._settings = settings
 
866
        self.chunks = []
 
867
        max_bytes_to_index = self._settings.get("max_bytes_to_index", 0)
 
868
        self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
851
869
 
852
870
    def compress(self, key, chunks, length, expected_sha, nostore_sha=None, soft=False):
853
871
        """Compress lines with label key.
886
904
        return sha1, start, end, type
887
905
 
888
906
    def _compress(self, key, chunks, input_len, max_delta_size, soft=False):
889
 
        """Compress lines with label key.
890
 
 
891
 
        :param key: A key tuple. It is stored in the output for identification
892
 
            of the text during decompression.
893
 
 
894
 
        :param chunks: The chunks of bytes to be compressed
895
 
 
896
 
        :param input_len: The length of the chunks
897
 
 
898
 
        :param max_delta_size: The size above which we issue a fulltext instead
899
 
            of a delta.
900
 
 
901
 
        :param soft: Do a 'soft' compression. This means that we require larger
902
 
            ranges to match to be considered for a copy command.
903
 
 
904
 
        :return: The sha1 of lines, the start and end offsets in the delta, and
905
 
            the type ('fulltext' or 'delta').
906
 
        """
907
 
        raise NotImplementedError(self._compress)
908
 
 
909
 
    def extract(self, key):
910
 
        """Extract a key previously added to the compressor.
911
 
 
912
 
        :param key: The key to extract.
913
 
        :return: An iterable over chunks and the sha1.
914
 
        """
915
 
        (start_byte, start_chunk, end_byte, end_chunk) = self.labels_deltas[key]
916
 
        delta_chunks = self.chunks[start_chunk:end_chunk]
917
 
        stored_bytes = b"".join(delta_chunks)
918
 
        kind = stored_bytes[:1]
919
 
        if kind == b"f":
920
 
            fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
921
 
            data_len = fulltext_len + 1 + offset
922
 
            if data_len != len(stored_bytes):
923
 
                raise ValueError(
924
 
                    "Index claimed fulltext len, but stored bytes"
925
 
                    f" claim {len(stored_bytes)} != {data_len}"
926
 
                )
927
 
            data = [stored_bytes[offset + 1 :]]
928
 
        else:
929
 
            if kind != b"d":
930
 
                raise ValueError(f"Unknown content kind, bytes claim {kind}")
931
 
            # XXX: This is inefficient at best
932
 
            source = b"".join(self.chunks[:start_chunk])
933
 
            delta_len, offset = decode_base128_int(stored_bytes[1:10])
934
 
            data_len = delta_len + 1 + offset
935
 
            if data_len != len(stored_bytes):
936
 
                raise ValueError(
937
 
                    "Index claimed delta len, but stored bytes"
938
 
                    f" claim {len(stored_bytes)} != {data_len}"
939
 
                )
940
 
            data = [apply_delta(source, stored_bytes[offset + 1 :])]
941
 
        data_sha1 = osutils.sha_strings(data)
942
 
        return data, data_sha1
943
 
 
944
 
    def flush(self):
945
 
        """Finish this group, creating a formatted stream.
946
 
 
947
 
        After calling this, the compressor should no longer be used
948
 
        """
949
 
        self._block.set_chunked_content(self.chunks, self.endpoint)
950
 
        self._delta_index = None
951
 
        return self._block
952
 
 
953
 
    def ratio(self):
954
 
        """Return the overall compression ratio."""
955
 
        return float(self.input_bytes) / float(self.endpoint)
956
 
 
957
 
 
958
 
class PyrexGroupCompressor(_CommonGroupCompressor):
959
 
    """Produce a serialised group of compressed texts.
960
 
 
961
 
    It contains code very similar to SequenceMatcher because of having a similar
962
 
    task. However some key differences apply:
963
 
 
964
 
    * there is no junk, we want a minimal edit not a human readable diff.
965
 
    * we don't filter very common lines (because we don't know where a good
966
 
      range will start, and after the first text we want to be emitting minmal
967
 
      edits only.
968
 
    * we chain the left side, not the right side
969
 
    * we incrementally update the adjacency matrix as new lines are provided.
970
 
    * we look for matches in all of the left side, so the routine which does
971
 
      the analagous task of find_longest_match does not need to filter on the
972
 
      left side.
973
 
    """
974
 
 
975
 
    def __init__(self, settings=None):
976
 
        super().__init__(settings)
977
 
        self.chunks = []
978
 
        max_bytes_to_index = self._settings.get("max_bytes_to_index", 0)
979
 
        self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
980
 
 
981
 
    def _compress(self, key, chunks, input_len, max_delta_size, soft=False):
982
907
        """See _CommonGroupCompressor._compress."""
983
908
        # By having action/label/sha1/len, we can parse the group if the index
984
909
        # was ever destroyed, we have the key in 'label', we know the final
1035
960
        endpoint += sum(map(len, new_chunks))
1036
961
        self.endpoint = endpoint
1037
962
 
 
963
    def extract(self, key):
 
964
        """Extract a key previously added to the compressor.
 
965
 
 
966
        :param key: The key to extract.
 
967
        :return: An iterable over chunks and the sha1.
 
968
        """
 
969
        (start_byte, start_chunk, end_byte, end_chunk) = self.labels_deltas[key]
 
970
        delta_chunks = self.chunks[start_chunk:end_chunk]
 
971
        stored_bytes = b"".join(delta_chunks)
 
972
        kind = stored_bytes[:1]
 
973
        if kind == b"f":
 
974
            fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
 
975
            data_len = fulltext_len + 1 + offset
 
976
            if data_len != len(stored_bytes):
 
977
                raise ValueError(
 
978
                    "Index claimed fulltext len, but stored bytes"
 
979
                    f" claim {len(stored_bytes)} != {data_len}"
 
980
                )
 
981
            data = [stored_bytes[offset + 1 :]]
 
982
        else:
 
983
            if kind != b"d":
 
984
                raise ValueError(f"Unknown content kind, bytes claim {kind}")
 
985
            # XXX: This is inefficient at best
 
986
            source = b"".join(self.chunks[:start_chunk])
 
987
            delta_len, offset = decode_base128_int(stored_bytes[1:10])
 
988
            data_len = delta_len + 1 + offset
 
989
            if data_len != len(stored_bytes):
 
990
                raise ValueError(
 
991
                    "Index claimed delta len, but stored bytes"
 
992
                    f" claim {len(stored_bytes)} != {data_len}"
 
993
                )
 
994
            data = [apply_delta(source, stored_bytes[offset + 1 :])]
 
995
        data_sha1 = osutils.sha_strings(data)
 
996
        return data, data_sha1
 
997
 
1038
998
    def flush(self):
1039
 
        ret = super().flush()
 
999
        """Finish this group, creating a formatted stream.
 
1000
 
 
1001
        After calling this, the compressor should no longer be used
 
1002
        """
 
1003
        self._block.set_chunked_content(self.chunks, self.endpoint)
 
1004
        self._delta_index = None
1040
1005
        self.chunks = None
1041
 
        return ret
 
1006
        return self._block
 
1007
 
 
1008
    def flush_without_last(self):
 
1009
        self._pop_last()
 
1010
        return self.flush()
 
1011
 
 
1012
    def _pop_last(self):
 
1013
        """Call this if you want to 'revoke' the last compression.
 
1014
 
 
1015
        After this, the data structures will be rolled back, but you cannot do
 
1016
        more compression.
 
1017
        """
 
1018
        self._delta_index = None
 
1019
        del self.chunks[self._last[0] :]
 
1020
        self.endpoint = self._last[1]
 
1021
        self._last = None
 
1022
 
 
1023
    def ratio(self):
 
1024
        """Return the overall compression ratio."""
 
1025
        return float(self.input_bytes) / float(self.endpoint)
1042
1026
 
1043
1027
 
1044
1028
def make_pack_factory(graph, delta, keylength, inconsistency_fatal=True):
1756
1740
        self._unadded_refs = {}
1757
1741
        keys_to_add = []
1758
1742
 
1759
 
        def flush():
1760
 
            bytes_len, chunks = self._compressor.flush().to_chunks()
 
1743
        def flush(block):
 
1744
            bytes_len, chunks = block.to_chunks()
1761
1745
            self._compressor = self._make_group_compressor()
1762
1746
            # Note: At this point we still have 1 copy of the fulltext (in
1763
1747
            #       record and the var 'bytes'), and this generates 2 copies of
1889
1873
                start_new_block = False
1890
1874
            last_prefix = prefix
1891
1875
            if start_new_block:
1892
 
                self._compressor.pop_last()
1893
 
                flush()
 
1876
                flush(self._compressor.flush_without_last())
1894
1877
                max_fulltext_len = chunks_len
1895
1878
                (found_sha1, start_point, end_point, type) = self._compressor.compress(
1896
1879
                    record.key, chunks, chunks_len, record.sha1
1909
1892
            refs = static_tuple.StaticTuple(parents)
1910
1893
            keys_to_add.append((key, b"%d %d" % (start_point, end_point), refs))
1911
1894
        if len(keys_to_add):
1912
 
            flush()
 
1895
            flush(self._compressor.flush())
1913
1896
        self._compressor = None
1914
1897
 
1915
1898
    def iter_lines_added_or_present_in_keys(self, keys, pb=None):
2282
2265
            key_dependencies.add_references(node[1], node[3][0])
2283
2266
 
2284
2267
 
2285
 
GroupCompressor: Type[_CommonGroupCompressor]
2286
 
 
2287
 
 
2288
2268
from .._bzr_rs import groupcompress
2289
2269
 
2290
2270
encode_base128_int = groupcompress.encode_base128_int
2302
2282
try:
2303
2283
    from ._groupcompress_pyx import DeltaIndex
2304
2284
 
2305
 
    GroupCompressor = PyrexGroupCompressor
 
2285
    GroupCompressor = PyrexGroupCompressor  # type: ignore
2306
2286
except ImportError as e:
2307
2287
    osutils.failed_to_load_extension(e)
2308
 
    GroupCompressor = PythonGroupCompressor
 
2288
    GroupCompressor = PythonGroupCompressor  # type: ignore