~jteh/bzr-fastimport/plainAuthorsBugs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# Copyright (C) 2009 Canonical Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""A manager of caches."""

import atexit
import os
import shutil
import tempfile
import weakref

from bzrlib import lru_cache, trace
from bzrlib.plugins.fastimport import (
    branch_mapper,
    )
from bzrlib.plugins.fastimport.reftracker import (
    RefTracker,
    )
from fastimport.helpers import (
    single_plural,
    )


class _Cleanup(object):
    """This class makes sure we clean up when CacheManager goes away.

    We use a helper class to ensure that we are never in a refcycle.
    """

    def __init__(self, disk_blobs):
        self.disk_blobs = disk_blobs
        self.tempdir = None
        self.small_blobs = None

    def __del__(self):
        self.finalize()

    def finalize(self):
        if self.disk_blobs is not None:
            for info in self.disk_blobs.itervalues():
                if info[-1] is not None:
                    os.unlink(info[-1])
            self.disk_blobs = None
        if self.small_blobs is not None:
            self.small_blobs.close()
            self.small_blobs = None
        if self.tempdir is not None:
            shutil.rmtree(self.tempdir)


class CacheManager(object):

    _small_blob_threshold = 25*1024
    _sticky_cache_size = 300*1024*1024
    _sticky_flushed_size = 100*1024*1024

    def __init__(self, info=None, verbose=False, inventory_cache_size=10):
        """Create a manager of caches.

        :param info: a ConfigObj holding the output from
            the --info processor, or None if no hints are available
        """
        self.verbose = verbose

        # dataref -> data. datref is either :mark or the sha-1.
        # Sticky blobs are referenced more than once, and are saved until their
        # refcount goes to 0
        self._blobs = {}
        self._sticky_blobs = {}
        self._sticky_memory_bytes = 0
        # if we overflow our memory cache, then we will dump large blobs to
        # disk in this directory
        self._tempdir = None
        # id => (offset, n_bytes, fname)
        #   if fname is None, then the content is stored in the small file
        self._disk_blobs = {}
        self._cleanup = _Cleanup(self._disk_blobs)

        # revision-id -> Inventory cache
        # these are large and we probably don't need too many as
        # most parents are recent in history
        self.inventories = lru_cache.LRUCache(inventory_cache_size)

        # import commmit-ids -> revision-id lookup table
        # we need to keep all of these but they are small
        self.marks = {}

        # (path, branch_ref) -> file-ids - as generated.
        # (Use store_file_id/fetch_fileid methods rather than direct access.)

        # Work out the blobs to make sticky - None means all
        self._blob_ref_counts = {}
        if info is not None:
            try:
                blobs_by_counts = info['Blob reference counts']
                # The parser hands values back as lists, already parsed
                for count, blob_list in blobs_by_counts.items():
                    n = int(count)
                    for b in blob_list:
                        self._blob_ref_counts[b] = n
            except KeyError:
                # info not in file - possible when no blobs used
                pass

        # BranchMapper has no state (for now?), but we keep it around rather
        # than reinstantiate on every usage
        self.branch_mapper = branch_mapper.BranchMapper()

        self.reftracker = RefTracker()

    def add_mark(self, mark, commit_id):
        assert mark[0] != ':'
        self.marks[mark] = commit_id

    def lookup_committish(self, committish):
        """Resolve a 'committish' to a revision id.

        :param committish: A "committish" string
        :return: Bazaar revision id
        """
        assert committish[0] == ':'
        return self.marks[committish.lstrip(':')]

    def dump_stats(self, note=trace.note):
        """Dump some statistics about what we cached."""
        # TODO: add in inventory stastistics
        note("Cache statistics:")
        self._show_stats_for(self._sticky_blobs, "sticky blobs", note=note)
        self._show_stats_for(self.marks, "revision-ids", note=note)
        # These aren't interesting so omit from the output, at least for now
        #self._show_stats_for(self._blobs, "other blobs", note=note)
        #self.reftracker.dump_stats(note=note)

    def _show_stats_for(self, dict, label, note=trace.note, tuple_key=False):
        """Dump statistics about a given dictionary.

        By the key and value need to support len().
        """
        count = len(dict)
        if tuple_key:
            size = sum(map(len, (''.join(k) for k in dict.keys())))
        else:
            size = sum(map(len, dict.keys()))
        size += sum(map(len, dict.values()))
        size = size * 1.0 / 1024
        unit = 'K'
        if size > 1024:
            size = size / 1024
            unit = 'M'
            if size > 1024:
                size = size / 1024
                unit = 'G'
        note("    %-12s: %8.1f %s (%d %s)" % (label, size, unit, count,
            single_plural(count, "item", "items")))

    def clear_all(self):
        """Free up any memory used by the caches."""
        self._blobs.clear()
        self._sticky_blobs.clear()
        self.marks.clear()
        self.reftracker.clear()
        self.inventories.clear()

    def _flush_blobs_to_disk(self):
        blobs = self._sticky_blobs.keys()
        sticky_blobs = self._sticky_blobs
        total_blobs = len(sticky_blobs)
        blobs.sort(key=lambda k:len(sticky_blobs[k]))
        if self._tempdir is None:
            tempdir = tempfile.mkdtemp(prefix='fastimport_blobs-')
            self._tempdir = tempdir
            self._cleanup.tempdir = self._tempdir
            self._cleanup.small_blobs = tempfile.TemporaryFile(
                prefix='small-blobs-', dir=self._tempdir)
            small_blob_ref = weakref.ref(self._cleanup.small_blobs)
            # Even though we add it to _Cleanup it seems that the object can be
            # destroyed 'too late' for cleanup to actually occur. Probably a
            # combination of bzr's "die directly, don't clean up" and how
            # exceptions close the running stack.
            def exit_cleanup():
                small_blob = small_blob_ref()
                if small_blob is not None:
                    small_blob.close()
                shutil.rmtree(tempdir, ignore_errors=True)
            atexit.register(exit_cleanup)
        count = 0
        bytes = 0
        n_small_bytes = 0
        while self._sticky_memory_bytes > self._sticky_flushed_size:
            id = blobs.pop()
            blob = self._sticky_blobs.pop(id)
            n_bytes = len(blob)
            self._sticky_memory_bytes -= n_bytes
            if n_bytes < self._small_blob_threshold:
                f = self._cleanup.small_blobs
                f.seek(0, os.SEEK_END)
                self._disk_blobs[id] = (f.tell(), n_bytes, None)
                f.write(blob)
                n_small_bytes += n_bytes
            else:
                fd, name = tempfile.mkstemp(prefix='blob-', dir=self._tempdir)
                os.write(fd, blob)
                os.close(fd)
                self._disk_blobs[id] = (0, n_bytes, name)
            bytes += n_bytes
            del blob
            count += 1
        trace.note('flushed %d/%d blobs w/ %.1fMB (%.1fMB small) to disk'
                   % (count, total_blobs, bytes / 1024. / 1024,
                      n_small_bytes / 1024. / 1024))

    def store_blob(self, id, data):
        """Store a blob of data."""
        # Note: If we're not reference counting, everything has to be sticky
        if not self._blob_ref_counts or id in self._blob_ref_counts:
            self._sticky_blobs[id] = data
            self._sticky_memory_bytes += len(data)
            if self._sticky_memory_bytes > self._sticky_cache_size:
                self._flush_blobs_to_disk()
        elif data == '':
            # Empty data is always sticky
            self._sticky_blobs[id] = data
        else:
            self._blobs[id] = data

    def _decref(self, id, cache, fn):
        if not self._blob_ref_counts:
            return False
        count = self._blob_ref_counts.get(id, None)
        if count is not None:
            count -= 1
            if count <= 0:
                del cache[id]
                if fn is not None:
                    os.unlink(fn)
                del self._blob_ref_counts[id]
                return True
            else:
                self._blob_ref_counts[id] = count
        return False

    def fetch_blob(self, id):
        """Fetch a blob of data."""
        if id in self._blobs:
            return self._blobs.pop(id)
        if id in self._disk_blobs:
            (offset, n_bytes, fn) = self._disk_blobs[id]
            if fn is None:
                f = self._cleanup.small_blobs
                f.seek(offset)
                content = f.read(n_bytes)
            else:
                fp = open(fn, 'rb')
                try:
                    content = fp.read()
                finally:
                    fp.close()
            self._decref(id, self._disk_blobs, fn)
            return content
        content = self._sticky_blobs[id]
        if self._decref(id, self._sticky_blobs, None):
            self._sticky_memory_bytes -= len(content)
        return content