~ubuntu-branches/ubuntu/karmic/tahoe-lafs/karmic

« back to all changes in this revision

Viewing changes to src/allmydata/mutable/checker.py

  • Committer: Bazaar Package Importer
  • Author(s): Zooko O'Whielacronx (Hacker)
  • Date: 2009-09-24 00:00:05 UTC
  • Revision ID: james.westby@ubuntu.com-20090924000005-ixe2n4yngmk49ysz
Tags: upstream-1.5.0
ImportĀ upstreamĀ versionĀ 1.5.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
 
 
2
from twisted.internet import defer
 
3
from twisted.python import failure
 
4
from allmydata import hashtree
 
5
from allmydata.uri import from_string
 
6
from allmydata.util import hashutil, base32, idlib, log
 
7
from allmydata.check_results import CheckAndRepairResults, CheckResults
 
8
 
 
9
from common import MODE_CHECK, CorruptShareError
 
10
from servermap import ServerMap, ServermapUpdater
 
11
from layout import unpack_share, SIGNED_PREFIX_LENGTH
 
12
 
 
13
class MutableChecker:
 
14
 
 
15
    def __init__(self, node, monitor):
 
16
        self._node = node
 
17
        self._monitor = monitor
 
18
        self.bad_shares = [] # list of (nodeid,shnum,failure)
 
19
        self._storage_index = self._node.get_storage_index()
 
20
        self.results = CheckResults(from_string(node.get_uri()), self._storage_index)
 
21
        self.need_repair = False
 
22
        self.responded = set() # set of (binary) nodeids
 
23
 
 
24
    def check(self, verify=False, add_lease=False):
 
25
        servermap = ServerMap()
 
26
        u = ServermapUpdater(self._node, self._monitor, servermap, MODE_CHECK,
 
27
                             add_lease=add_lease)
 
28
        history = self._node._client.get_history()
 
29
        if history:
 
30
            history.notify_mapupdate(u.get_status())
 
31
        d = u.update()
 
32
        d.addCallback(self._got_mapupdate_results)
 
33
        if verify:
 
34
            d.addCallback(self._verify_all_shares)
 
35
        d.addCallback(lambda res: servermap)
 
36
        d.addCallback(self._fill_checker_results, self.results)
 
37
        d.addCallback(lambda res: self.results)
 
38
        return d
 
39
 
 
40
    def _got_mapupdate_results(self, servermap):
 
41
        # the file is healthy if there is exactly one recoverable version, it
 
42
        # has at least N distinct shares, and there are no unrecoverable
 
43
        # versions: all existing shares will be for the same version.
 
44
        self._monitor.raise_if_cancelled()
 
45
        self.best_version = None
 
46
        num_recoverable = len(servermap.recoverable_versions())
 
47
        if num_recoverable:
 
48
            self.best_version = servermap.best_recoverable_version()
 
49
 
 
50
        if servermap.unrecoverable_versions():
 
51
            self.need_repair = True
 
52
        if num_recoverable != 1:
 
53
            self.need_repair = True
 
54
        if self.best_version:
 
55
            available_shares = servermap.shares_available()
 
56
            (num_distinct_shares, k, N) = available_shares[self.best_version]
 
57
            if num_distinct_shares < N:
 
58
                self.need_repair = True
 
59
 
 
60
        return servermap
 
61
 
 
62
    def _verify_all_shares(self, servermap):
 
63
        # read every byte of each share
 
64
        if not self.best_version:
 
65
            return
 
66
        versionmap = servermap.make_versionmap()
 
67
        shares = versionmap[self.best_version]
 
68
        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
 
69
         offsets_tuple) = self.best_version
 
70
        offsets = dict(offsets_tuple)
 
71
        readv = [ (0, offsets["EOF"]) ]
 
72
        dl = []
 
73
        for (shnum, peerid, timestamp) in shares:
 
74
            ss = servermap.connections[peerid]
 
75
            d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
 
76
            d.addCallback(self._got_answer, peerid, servermap)
 
77
            dl.append(d)
 
78
        return defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True)
 
79
 
 
80
    def _do_read(self, ss, peerid, storage_index, shnums, readv):
 
81
        # isolate the callRemote to a separate method, so tests can subclass
 
82
        # Publish and override it
 
83
        d = ss.callRemote("slot_readv", storage_index, shnums, readv)
 
84
        return d
 
85
 
 
86
    def _got_answer(self, datavs, peerid, servermap):
 
87
        for shnum,datav in datavs.items():
 
88
            data = datav[0]
 
89
            try:
 
90
                self._got_results_one_share(shnum, peerid, data)
 
91
            except CorruptShareError:
 
92
                f = failure.Failure()
 
93
                self.need_repair = True
 
94
                self.bad_shares.append( (peerid, shnum, f) )
 
95
                prefix = data[:SIGNED_PREFIX_LENGTH]
 
96
                servermap.mark_bad_share(peerid, shnum, prefix)
 
97
                ss = servermap.connections[peerid]
 
98
                self.notify_server_corruption(ss, shnum, str(f.value))
 
99
 
 
100
    def check_prefix(self, peerid, shnum, data):
 
101
        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
 
102
         offsets_tuple) = self.best_version
 
103
        got_prefix = data[:SIGNED_PREFIX_LENGTH]
 
104
        if got_prefix != prefix:
 
105
            raise CorruptShareError(peerid, shnum,
 
106
                                    "prefix mismatch: share changed while we were reading it")
 
107
 
 
108
    def _got_results_one_share(self, shnum, peerid, data):
 
109
        self.check_prefix(peerid, shnum, data)
 
110
 
 
111
        # the [seqnum:signature] pieces are validated by _compare_prefix,
 
112
        # which checks their signature against the pubkey known to be
 
113
        # associated with this file.
 
114
 
 
115
        (seqnum, root_hash, IV, k, N, segsize, datalen, pubkey, signature,
 
116
         share_hash_chain, block_hash_tree, share_data,
 
117
         enc_privkey) = unpack_share(data)
 
118
 
 
119
        # validate [share_hash_chain,block_hash_tree,share_data]
 
120
 
 
121
        leaves = [hashutil.block_hash(share_data)]
 
122
        t = hashtree.HashTree(leaves)
 
123
        if list(t) != block_hash_tree:
 
124
            raise CorruptShareError(peerid, shnum, "block hash tree failure")
 
125
        share_hash_leaf = t[0]
 
126
        t2 = hashtree.IncompleteHashTree(N)
 
127
        # root_hash was checked by the signature
 
128
        t2.set_hashes({0: root_hash})
 
129
        try:
 
130
            t2.set_hashes(hashes=share_hash_chain,
 
131
                          leaves={shnum: share_hash_leaf})
 
132
        except (hashtree.BadHashError, hashtree.NotEnoughHashesError,
 
133
                IndexError), e:
 
134
            msg = "corrupt hashes: %s" % (e,)
 
135
            raise CorruptShareError(peerid, shnum, msg)
 
136
 
 
137
        # validate enc_privkey: only possible if we have a write-cap
 
138
        if not self._node.is_readonly():
 
139
            alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
 
140
            alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
 
141
            if alleged_writekey != self._node.get_writekey():
 
142
                raise CorruptShareError(peerid, shnum, "invalid privkey")
 
143
 
 
144
    def notify_server_corruption(self, ss, shnum, reason):
 
145
        ss.callRemoteOnly("advise_corrupt_share",
 
146
                          "mutable", self._storage_index, shnum, reason)
 
147
 
 
148
    def _count_shares(self, smap, version):
 
149
        available_shares = smap.shares_available()
 
150
        (num_distinct_shares, k, N) = available_shares[version]
 
151
        counters = {}
 
152
        counters["count-shares-good"] = num_distinct_shares
 
153
        counters["count-shares-needed"] = k
 
154
        counters["count-shares-expected"] = N
 
155
        good_hosts = smap.all_peers_for_version(version)
 
156
        counters["count-good-share-hosts"] = len(good_hosts)
 
157
        vmap = smap.make_versionmap()
 
158
        counters["count-wrong-shares"] = sum([len(shares)
 
159
                                          for verinfo,shares in vmap.items()
 
160
                                          if verinfo != version])
 
161
 
 
162
        return counters
 
163
 
 
164
    def _fill_checker_results(self, smap, r):
 
165
        self._monitor.raise_if_cancelled()
 
166
        r.set_servermap(smap.copy())
 
167
        healthy = True
 
168
        data = {}
 
169
        report = []
 
170
        summary = []
 
171
        vmap = smap.make_versionmap()
 
172
        recoverable = smap.recoverable_versions()
 
173
        unrecoverable = smap.unrecoverable_versions()
 
174
        data["count-recoverable-versions"] = len(recoverable)
 
175
        data["count-unrecoverable-versions"] = len(unrecoverable)
 
176
 
 
177
        if recoverable:
 
178
            report.append("Recoverable Versions: " +
 
179
                          "/".join(["%d*%s" % (len(vmap[v]),
 
180
                                               smap.summarize_version(v))
 
181
                                    for v in recoverable]))
 
182
        if unrecoverable:
 
183
            report.append("Unrecoverable Versions: " +
 
184
                          "/".join(["%d*%s" % (len(vmap[v]),
 
185
                                               smap.summarize_version(v))
 
186
                                    for v in unrecoverable]))
 
187
        if smap.unrecoverable_versions():
 
188
            healthy = False
 
189
            summary.append("some versions are unrecoverable")
 
190
            report.append("Unhealthy: some versions are unrecoverable")
 
191
        if len(recoverable) == 0:
 
192
            healthy = False
 
193
            summary.append("no versions are recoverable")
 
194
            report.append("Unhealthy: no versions are recoverable")
 
195
        if len(recoverable) > 1:
 
196
            healthy = False
 
197
            summary.append("multiple versions are recoverable")
 
198
            report.append("Unhealthy: there are multiple recoverable versions")
 
199
 
 
200
        needs_rebalancing = False
 
201
        if recoverable:
 
202
            best_version = smap.best_recoverable_version()
 
203
            report.append("Best Recoverable Version: " +
 
204
                          smap.summarize_version(best_version))
 
205
            counters = self._count_shares(smap, best_version)
 
206
            data.update(counters)
 
207
            s = counters["count-shares-good"]
 
208
            k = counters["count-shares-needed"]
 
209
            N = counters["count-shares-expected"]
 
210
            if s < N:
 
211
                healthy = False
 
212
                report.append("Unhealthy: best version has only %d shares "
 
213
                              "(encoding is %d-of-%d)" % (s, k, N))
 
214
                summary.append("%d shares (enc %d-of-%d)" % (s, k, N))
 
215
            hosts = smap.all_peers_for_version(best_version)
 
216
            needs_rebalancing = bool( len(hosts) < N )
 
217
        elif unrecoverable:
 
218
            healthy = False
 
219
            # find a k and N from somewhere
 
220
            first = list(unrecoverable)[0]
 
221
            # not exactly the best version, but that doesn't matter too much
 
222
            data.update(self._count_shares(smap, first))
 
223
            # leave needs_rebalancing=False: the file being unrecoverable is
 
224
            # the bigger problem
 
225
        else:
 
226
            # couldn't find anything at all
 
227
            data["count-shares-good"] = 0
 
228
            data["count-shares-needed"] = 3 # arbitrary defaults
 
229
            data["count-shares-expected"] = 10
 
230
            data["count-good-share-hosts"] = 0
 
231
            data["count-wrong-shares"] = 0
 
232
 
 
233
        if self.bad_shares:
 
234
            data["count-corrupt-shares"] = len(self.bad_shares)
 
235
            data["list-corrupt-shares"] = locators = []
 
236
            report.append("Corrupt Shares:")
 
237
            summary.append("Corrupt Shares:")
 
238
            for (peerid, shnum, f) in sorted(self.bad_shares):
 
239
                locators.append( (peerid, self._storage_index, shnum) )
 
240
                s = "%s-sh%d" % (idlib.shortnodeid_b2a(peerid), shnum)
 
241
                if f.check(CorruptShareError):
 
242
                    ft = f.value.reason
 
243
                else:
 
244
                    ft = str(f)
 
245
                report.append(" %s: %s" % (s, ft))
 
246
                summary.append(s)
 
247
                p = (peerid, self._storage_index, shnum, f)
 
248
                r.problems.append(p)
 
249
                msg = ("CorruptShareError during mutable verify, "
 
250
                       "peerid=%(peerid)s, si=%(si)s, shnum=%(shnum)d, "
 
251
                       "where=%(where)s")
 
252
                log.msg(format=msg, peerid=idlib.nodeid_b2a(peerid),
 
253
                        si=base32.b2a(self._storage_index),
 
254
                        shnum=shnum,
 
255
                        where=ft,
 
256
                        level=log.WEIRD, umid="EkK8QA")
 
257
        else:
 
258
            data["count-corrupt-shares"] = 0
 
259
            data["list-corrupt-shares"] = []
 
260
 
 
261
        sharemap = {}
 
262
        for verinfo in vmap:
 
263
            for (shnum, peerid, timestamp) in vmap[verinfo]:
 
264
                shareid = "%s-sh%d" % (smap.summarize_version(verinfo), shnum)
 
265
                if shareid not in sharemap:
 
266
                    sharemap[shareid] = []
 
267
                sharemap[shareid].append(peerid)
 
268
        data["sharemap"] = sharemap
 
269
        data["servers-responding"] = list(smap.reachable_peers)
 
270
 
 
271
        r.set_healthy(healthy)
 
272
        r.set_recoverable(bool(recoverable))
 
273
        r.set_needs_rebalancing(needs_rebalancing)
 
274
        r.set_data(data)
 
275
        if healthy:
 
276
            r.set_summary("Healthy")
 
277
        else:
 
278
            r.set_summary("Unhealthy: " + " ".join(summary))
 
279
        r.set_report(report)
 
280
 
 
281
 
 
282
class MutableCheckAndRepairer(MutableChecker):
 
283
    def __init__(self, node, monitor):
 
284
        MutableChecker.__init__(self, node, monitor)
 
285
        self.cr_results = CheckAndRepairResults(self._storage_index)
 
286
        self.cr_results.pre_repair_results = self.results
 
287
        self.need_repair = False
 
288
 
 
289
    def check(self, verify=False, add_lease=False):
 
290
        d = MutableChecker.check(self, verify, add_lease)
 
291
        d.addCallback(self._maybe_repair)
 
292
        d.addCallback(lambda res: self.cr_results)
 
293
        return d
 
294
 
 
295
    def _maybe_repair(self, res):
 
296
        self._monitor.raise_if_cancelled()
 
297
        if not self.need_repair:
 
298
            self.cr_results.post_repair_results = self.results
 
299
            return
 
300
        if self._node.is_readonly():
 
301
            # ticket #625: we cannot yet repair read-only mutable files
 
302
            self.cr_results.post_repair_results = self.results
 
303
            self.cr_results.repair_attempted = False
 
304
            return
 
305
        self.cr_results.repair_attempted = True
 
306
        d = self._node.repair(self.results)
 
307
        def _repair_finished(repair_results):
 
308
            self.cr_results.repair_successful = True
 
309
            r = CheckResults(from_string(self._node.get_uri()), self._storage_index)
 
310
            self.cr_results.post_repair_results = r
 
311
            self._fill_checker_results(repair_results.servermap, r)
 
312
            self.cr_results.repair_results = repair_results # TODO?
 
313
        def _repair_error(f):
 
314
            # I'm not sure if I want to pass through a failure or not.
 
315
            self.cr_results.repair_successful = False
 
316
            self.cr_results.repair_failure = f # TODO?
 
317
            #self.cr_results.post_repair_results = ??
 
318
            return f
 
319
        d.addCallbacks(_repair_finished, _repair_error)
 
320
        return d