~wb-munzinger/+junk/sanlock

« back to all changes in this revision

Viewing changes to paxos_lease.c

  • Committer: David Weber
  • Date: 2012-01-18 13:00:36 UTC
  • Revision ID: wb@munzinger.de-20120118130036-9a7wvhhmfuip7zx5
Tags: upstream-1.9
Import

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * Copyright 2010-2011 Red Hat, Inc.
 
3
 *
 
4
 * This copyrighted material is made available to anyone wishing to use,
 
5
 * modify, copy, or redistribute it subject to the terms and conditions
 
6
 * of the GNU General Public License v2 or (at your option) any later version.
 
7
 */
 
8
 
 
9
#include <inttypes.h>
 
10
#include <unistd.h>
 
11
#include <stdio.h>
 
12
#include <stdlib.h>
 
13
#include <stdint.h>
 
14
#include <stddef.h>
 
15
#include <fcntl.h>
 
16
#include <string.h>
 
17
#include <errno.h>
 
18
#include <limits.h>
 
19
#include <time.h>
 
20
#include <syslog.h>
 
21
#include <sys/types.h>
 
22
#include <sys/time.h>
 
23
 
 
24
#include "sanlock_internal.h"
 
25
#include "diskio.h"
 
26
#include "direct.h"
 
27
#include "log.h"
 
28
#include "lockspace.h"
 
29
#include "delta_lease.h"
 
30
#include "paxos_lease.h"
 
31
 
 
32
uint32_t crc32c(uint32_t crc, uint8_t *data, size_t length);
 
33
int get_rand(int a, int b);
 
34
 
 
35
#define DBLOCK_CHECKSUM_LEN 48 /* ends before checksum field */
 
36
 
 
37
struct paxos_dblock {
 
38
        uint64_t mbal;
 
39
        uint64_t bal;
 
40
        uint64_t inp;   /* host_id */
 
41
        uint64_t inp2;  /* host_id generation */
 
42
        uint64_t inp3;  /* host_id's timestamp */
 
43
        uint64_t lver;
 
44
        uint32_t checksum;
 
45
};
 
46
 
 
47
static uint32_t roundup_power_of_two(uint32_t val)
 
48
{
 
49
        val--;
 
50
        val |= val >> 1;
 
51
        val |= val >> 2;
 
52
        val |= val >> 4;
 
53
        val |= val >> 8;
 
54
        val |= val >> 16;
 
55
        val++;
 
56
        return val;
 
57
}
 
58
 
 
59
int majority_disks(struct token *token, int num)
 
60
{
 
61
        int num_disks = token->r.num_disks;
 
62
 
 
63
        /* odd number of disks */
 
64
 
 
65
        if (num_disks % 2)
 
66
                return num >= ((num_disks / 2) + 1);
 
67
 
 
68
        /* even number of disks */
 
69
 
 
70
        if (num > (num_disks / 2))
 
71
                return 1;
 
72
 
 
73
        if (num < (num_disks / 2))
 
74
                return 0;
 
75
 
 
76
        /* TODO: half of disks are majority if tiebreaker disk is present */
 
77
        return 0;
 
78
}
 
79
 
 
80
int paxos_lease_request_read(struct task *task, struct token *token,
 
81
                             struct request_record *rr)
 
82
{
 
83
        int rv;
 
84
 
 
85
        /* 1 = request record is second sector */
 
86
 
 
87
        rv = read_sectors(&token->disks[0], 1, 1, (char *)rr,
 
88
                          sizeof(struct request_record),
 
89
                          task, "request");
 
90
        if (rv < 0)
 
91
                return rv;
 
92
        return SANLK_OK;
 
93
}
 
94
 
 
95
int paxos_lease_request_write(struct task *task, struct token *token,
 
96
                              struct request_record *rr)
 
97
{
 
98
        int rv;
 
99
 
 
100
        rv = write_sector(&token->disks[0], 1, (char *)rr,
 
101
                          sizeof(struct request_record),
 
102
                          task, "request");
 
103
        if (rv < 0)
 
104
                return rv;
 
105
        return SANLK_OK;
 
106
}
 
107
 
 
108
static int write_dblock(struct task *task,
 
109
                        struct sync_disk *disk,
 
110
                        uint64_t host_id,
 
111
                        struct paxos_dblock *pd)
 
112
{
 
113
        int rv;
 
114
 
 
115
        /* 1 leader block + 1 request block;
 
116
           host_id N is block offset N-1 */
 
117
 
 
118
        rv = write_sector(disk, 2 + host_id - 1, (char *)pd, sizeof(struct paxos_dblock),
 
119
                          task, "dblock");
 
120
        return rv;
 
121
}
 
122
 
 
123
static int write_leader(struct task *task,
 
124
                        struct sync_disk *disk,
 
125
                        struct leader_record *lr)
 
126
{
 
127
        int rv;
 
128
 
 
129
        rv = write_sector(disk, 0, (char *)lr, sizeof(struct leader_record),
 
130
                          task, "leader");
 
131
        return rv;
 
132
}
 
133
 
 
134
static int read_dblock(struct task *task,
 
135
                       struct sync_disk *disk,
 
136
                       uint64_t host_id,
 
137
                       struct paxos_dblock *pd)
 
138
{
 
139
        int rv;
 
140
 
 
141
        /* 1 leader block + 1 request block; host_id N is block offset N-1 */
 
142
 
 
143
        rv = read_sectors(disk, 2 + host_id - 1, 1, (char *)pd, sizeof(struct paxos_dblock),
 
144
                          task, "dblock");
 
145
        return rv;
 
146
}
 
147
 
 
148
#if 0
 
149
static int read_dblocks(struct task *task,
 
150
                        struct sync_disk *disk,
 
151
                        struct paxos_dblock *pds,
 
152
                        int pds_count)
 
153
{
 
154
        char *data;
 
155
        int data_len, rv, i;
 
156
 
 
157
        data_len = pds_count * disk->sector_size;
 
158
 
 
159
        data = malloc(data_len);
 
160
        if (!data) {
 
161
                log_error("read_dblocks malloc %d %s", data_len, disk->path);
 
162
                rv = -ENOMEM;
 
163
                goto out;
 
164
        }
 
165
 
 
166
        /* 2 = 1 leader block + 1 request block */
 
167
 
 
168
        rv = read_sectors(disk, 2, pds_count, data, data_len,
 
169
                          task, "dblocks");
 
170
        if (rv < 0)
 
171
                goto out_free;
 
172
 
 
173
        /* copy the first N bytes from each sector, where N is size of
 
174
           paxos_dblock */
 
175
 
 
176
        for (i = 0; i < pds_count; i++) {
 
177
                memcpy(&pds[i], data + (i * disk->sector_size),
 
178
                       sizeof(struct paxos_dblock));
 
179
        }
 
180
 
 
181
        rv = 0;
 
182
 out_free:
 
183
        free(data);
 
184
 out:
 
185
        return rv;
 
186
}
 
187
#endif
 
188
 
 
189
static int read_leader(struct task *task,
 
190
                       struct sync_disk *disk,
 
191
                       struct leader_record *lr)
 
192
{
 
193
        int rv;
 
194
 
 
195
        /* 0 = leader record is first sector */
 
196
 
 
197
        rv = read_sectors(disk, 0, 1, (char *)lr, sizeof(struct leader_record),
 
198
                          task, "leader");
 
199
 
 
200
        return rv;
 
201
}
 
202
 
 
203
static uint32_t dblock_checksum(struct paxos_dblock *pd)
 
204
{
 
205
        return crc32c((uint32_t)~1, (uint8_t *)pd, DBLOCK_CHECKSUM_LEN);
 
206
}
 
207
 
 
208
static int verify_dblock(struct token *token, struct paxos_dblock *pd)
 
209
{
 
210
        uint32_t sum;
 
211
 
 
212
        if (!pd->checksum && !pd->mbal && !pd->bal && !pd->inp && !pd->lver)
 
213
                return SANLK_OK;
 
214
 
 
215
        sum = dblock_checksum(pd);
 
216
 
 
217
        if (pd->checksum != sum) {
 
218
                log_errot(token, "verify_dblock wrong checksum %x %x",
 
219
                          pd->checksum, sum);
 
220
                return SANLK_DBLOCK_CHECKSUM;
 
221
        }
 
222
 
 
223
        return SANLK_OK;
 
224
}
 
225
 
 
226
/*
 
227
 * It's possible that we pick a bk_max from another host which has our own
 
228
 * inp values in it, and we can end up commiting our own inp values, copied
 
229
 * from another host's dblock:
 
230
 *
 
231
 * host2 leader free
 
232
 * host2 phase1 mbal 14002
 
233
 * host2 writes dblock[1] mbal 14002
 
234
 * host2 reads  no higher mbal
 
235
 * host2 choose own inp 2,1
 
236
 * host2 phase2 mbal 14002 bal 14002 inp 2,1
 
237
 * host2 writes dblock[1] bal 14002 inp 2,1
 
238
 *                                           host1 leader free
 
239
 *                                           host1 phase1 mbal 20001
 
240
 *                                           host1 writes dblock[0] mbal 20001
 
241
 *                                           host1 reads  no higher mbal
 
242
 *                                           host1 choose dblock[1] bal 14002 inp 2,1
 
243
 *                                           host1 phase2 mbal 20001 bal 20001 inp 2,1
 
244
 *                                           host1 writes dblock[0] bal 20001 inp 2,1
 
245
 * host2 reads  dblock[0] mbal 20001 > 14002
 
246
 *              abort2, retry
 
247
 * host2 leader free
 
248
 * host2 phase1 mbal 16002
 
249
 * host2 writes dblock[1] mbal 16002
 
250
 * host2 reads  dblock[0] mbal 20001 > 16002
 
251
 *       abort1 retry
 
252
 * host2 leader free
 
253
 * host2 phase1 mbal 18002
 
254
 * host2 writes dblock[1] mbal 18002
 
255
 * host2 reads  dblock[0] mbal 20001 > 18002
 
256
 *       abort1 retry
 
257
 * host2 leader free
 
258
 * host2 phase1 mbal 20002
 
259
 * host2 writes dblock[1] mbal 20002
 
260
 * host2 reads  no higher mbal
 
261
 * host2 choose dblock[0] bal 20001 inp 2,1
 
262
 *                                           host1 reads  dblock[1] mbal 20002 > 20001
 
263
 *                                                 abort2 retry
 
264
 * host2 phase2 mbal 20002 bal 20002 inp 2,1
 
265
 * host2 writes dblock[1] bal 20002 inp 2,1
 
266
 * host2 reads  no higher mbal
 
267
 * host2 commit inp 2,1
 
268
 * host2 success
 
269
 *                                           host1 leader owner 2,1
 
270
 *                                           host1 fail
 
271
 */
 
272
 
 
273
static int run_ballot(struct task *task, struct token *token, int num_hosts,
 
274
                      uint64_t next_lver, uint64_t our_mbal,
 
275
                      struct paxos_dblock *dblock_out)
 
276
{
 
277
        struct paxos_dblock dblock;
 
278
        struct paxos_dblock bk_max;
 
279
        struct paxos_dblock *bk;
 
280
        struct sync_disk *disk;
 
281
        char *iobuf[SANLK_MAX_DISKS];
 
282
        char **p_iobuf[SANLK_MAX_DISKS];
 
283
        int num_disks = token->r.num_disks;
 
284
        int num_writes, num_reads;
 
285
        int sector_size = token->disks[0].sector_size;
 
286
        int sector_count;
 
287
        int iobuf_len;
 
288
        int d, q, rv;
 
289
        int q_max = -1;
 
290
        int error;
 
291
 
 
292
        sector_count = roundup_power_of_two(num_hosts + 2);
 
293
 
 
294
        iobuf_len = sector_count * sector_size;
 
295
 
 
296
        if (!iobuf_len)
 
297
                return -EINVAL;
 
298
 
 
299
        for (d = 0; d < num_disks; d++) {
 
300
                p_iobuf[d] = &iobuf[d];
 
301
 
 
302
                rv = posix_memalign((void *)p_iobuf[d], getpagesize(), iobuf_len);
 
303
                if (rv)
 
304
                        return rv;
 
305
        }
 
306
 
 
307
 
 
308
        /*
 
309
         * phase 1
 
310
         *
 
311
         * "For each disk d, it tries first to write dblock[p] to disk[d][p]
 
312
         * and then to read disk[d][q] for all other processors q.  It aborts
 
313
         * the ballot if, for any d and q, it finds disk[d][q].mbal >
 
314
         * dblock[p].mbal. The phase completes when p has written and read a
 
315
         * majority of the disks, without reading any block whose mbal
 
316
         * component is greater than dblock[p].mbal."
 
317
         */
 
318
 
 
319
        log_token(token, "ballot %llu phase1 mbal %llu",
 
320
                  (unsigned long long)next_lver,
 
321
                  (unsigned long long)our_mbal);
 
322
 
 
323
        memset(&dblock, 0, sizeof(struct paxos_dblock));
 
324
        dblock.mbal = our_mbal;
 
325
        dblock.lver = next_lver;
 
326
        dblock.checksum = dblock_checksum(&dblock);
 
327
 
 
328
        memset(&bk_max, 0, sizeof(struct paxos_dblock));
 
329
 
 
330
        num_writes = 0;
 
331
 
 
332
        for (d = 0; d < num_disks; d++) {
 
333
                rv = write_dblock(task, &token->disks[d], token->host_id, &dblock);
 
334
                if (rv < 0)
 
335
                        continue;
 
336
                num_writes++;
 
337
        }
 
338
 
 
339
        if (!majority_disks(token, num_writes)) {
 
340
                log_errot(token, "ballot %llu dblock write error %d",
 
341
                          (unsigned long long)next_lver, rv);
 
342
                error = SANLK_DBLOCK_WRITE;
 
343
                goto out;
 
344
        }
 
345
 
 
346
        num_reads = 0;
 
347
 
 
348
        for (d = 0; d < num_disks; d++) {
 
349
                disk = &token->disks[d];
 
350
 
 
351
                if (!iobuf[d])
 
352
                        continue;
 
353
                memset(iobuf[d], 0, iobuf_len);
 
354
 
 
355
                rv = read_iobuf(disk->fd, disk->offset, iobuf[d], iobuf_len, task);
 
356
                if (rv == SANLK_AIO_TIMEOUT)
 
357
                        iobuf[d] = NULL;
 
358
                if (rv < 0)
 
359
                        continue;
 
360
                num_reads++;
 
361
 
 
362
 
 
363
                for (q = 0; q < num_hosts; q++) {
 
364
                        bk = (struct paxos_dblock *)(iobuf[d] + ((2 + q)*sector_size));
 
365
 
 
366
                        rv = verify_dblock(token, bk);
 
367
                        if (rv < 0)
 
368
                                continue;
 
369
 
 
370
                        if (bk->lver < dblock.lver)
 
371
                                continue;
 
372
 
 
373
                        if (bk->lver > dblock.lver) {
 
374
                                /* I don't think this should happen */
 
375
                                log_errot(token, "ballot %llu larger1 lver[%d] %llu",
 
376
                                          (unsigned long long)next_lver, q,
 
377
                                          (unsigned long long)bk->lver);
 
378
                                error = SANLK_DBLOCK_LVER;
 
379
                                goto out;
 
380
                        }
 
381
 
 
382
                        /* see "It aborts the ballot" in comment above */
 
383
 
 
384
                        if (bk->mbal > dblock.mbal) {
 
385
                                log_errot(token, "ballot %llu abort1 mbal %llu mbal[%d] %llu",
 
386
                                          (unsigned long long)next_lver,
 
387
                                          (unsigned long long)our_mbal, q,
 
388
                                          (unsigned long long)bk->mbal);
 
389
                                error = SANLK_DBLOCK_MBAL;
 
390
                                goto out;
 
391
                        }
 
392
 
 
393
                        /* see choosing inp for phase 2 in comment below */
 
394
 
 
395
                        if (!bk->inp)
 
396
                                continue;
 
397
 
 
398
                        if (!bk->bal) {
 
399
                                log_errot(token, "ballot %llu zero bal inp[%d] %llu",
 
400
                                          (unsigned long long)next_lver, q,
 
401
                                          (unsigned long long)bk->inp);
 
402
                                continue;
 
403
                        }
 
404
 
 
405
                        if (bk->bal > bk_max.bal) {
 
406
                                bk_max = *bk;
 
407
                                q_max = q;
 
408
                        }
 
409
                }
 
410
        }
 
411
 
 
412
        if (!majority_disks(token, num_reads)) {
 
413
                log_errot(token, "ballot %llu dblock read error %d",
 
414
                          (unsigned long long)next_lver, rv);
 
415
                error = SANLK_DBLOCK_READ;
 
416
                goto out;
 
417
        }
 
418
 
 
419
 
 
420
        /*
 
421
         * "When it completes phase 1, p chooses a new value of dblock[p].inp,
 
422
         * sets dblock[p].bal to dblock[p].mbal (its current ballot number),
 
423
         * and begins phase 2."
 
424
         *
 
425
         * "We now describe how processor p chooses the value of dblock[p].inp
 
426
         * that it tries to commit in phase 2. Let blocksSeen be the set
 
427
         * consisting of dblock[p] and all the records disk[d][q] read by p in
 
428
         * phase 1. Let nonInitBlks be the subset of blocksSeen consisting of
 
429
         * those records whose inp field is not NotAnInput.  If nonInitBlks is
 
430
         * empty, then p sets dblock[p].inp to its own input value input[p].
 
431
         * Otherwise, it sets dblock[p].inp to bk.inp for some record bk in
 
432
         * nonInitBlks having the largest value of bk.bal."
 
433
         */
 
434
 
 
435
        if (bk_max.inp) {
 
436
                /* lver and mbal are already set */
 
437
                dblock.inp = bk_max.inp;
 
438
                dblock.inp2 = bk_max.inp2;
 
439
                dblock.inp3 = bk_max.inp3;
 
440
        } else {
 
441
                /* lver and mbal are already set */
 
442
                dblock.inp = token->host_id;
 
443
                dblock.inp2 = token->host_generation;
 
444
                dblock.inp3 = monotime();
 
445
        }
 
446
        dblock.bal = dblock.mbal;
 
447
        dblock.checksum = dblock_checksum(&dblock);
 
448
 
 
449
        if (bk_max.inp) {
 
450
                /* not a problem, but interesting to see, so use log_error */
 
451
                log_errot(token, "ballot %llu choose bk_max[%d] lver %llu mbal %llu bal %llu inp %llu %llu %llu",
 
452
                          (unsigned long long)next_lver, q_max,
 
453
                          (unsigned long long)bk_max.lver,
 
454
                          (unsigned long long)bk_max.mbal,
 
455
                          (unsigned long long)bk_max.bal,
 
456
                          (unsigned long long)bk_max.inp,
 
457
                          (unsigned long long)bk_max.inp2,
 
458
                          (unsigned long long)bk_max.inp3);
 
459
        }
 
460
 
 
461
 
 
462
        /*
 
463
         * phase 2
 
464
         *
 
465
         * Same description as phase 1, same sequence of writes/reads.
 
466
         */
 
467
 
 
468
        log_token(token, "ballot %llu phase2 bal %llu inp %llu %llu %llu q_max %d",
 
469
                  (unsigned long long)dblock.lver,
 
470
                  (unsigned long long)dblock.bal,
 
471
                  (unsigned long long)dblock.inp,
 
472
                  (unsigned long long)dblock.inp2,
 
473
                  (unsigned long long)dblock.inp3,
 
474
                  q_max);
 
475
 
 
476
        num_writes = 0;
 
477
 
 
478
        for (d = 0; d < num_disks; d++) {
 
479
                rv = write_dblock(task, &token->disks[d], token->host_id, &dblock);
 
480
                if (rv < 0)
 
481
                        continue;
 
482
                num_writes++;
 
483
        }
 
484
 
 
485
        if (!majority_disks(token, num_writes)) {
 
486
                log_errot(token, "ballot %llu our dblock write2 error %d",
 
487
                          (unsigned long long)next_lver, rv);
 
488
                error = SANLK_DBLOCK_WRITE;
 
489
                goto out;
 
490
        }
 
491
 
 
492
        num_reads = 0;
 
493
 
 
494
        for (d = 0; d < num_disks; d++) {
 
495
                disk = &token->disks[d];
 
496
 
 
497
                if (!iobuf[d])
 
498
                        continue;
 
499
                memset(iobuf[d], 0, iobuf_len);
 
500
 
 
501
                rv = read_iobuf(disk->fd, disk->offset, iobuf[d], iobuf_len, task);
 
502
                if (rv == SANLK_AIO_TIMEOUT)
 
503
                        iobuf[d] = NULL;
 
504
                if (rv < 0)
 
505
                        continue;
 
506
                num_reads++;
 
507
 
 
508
                for (q = 0; q < num_hosts; q++) {
 
509
                        bk = (struct paxos_dblock *)(iobuf[d] + ((2 + q)*sector_size));
 
510
 
 
511
                        rv = verify_dblock(token, bk);
 
512
                        if (rv < 0)
 
513
                                continue;
 
514
 
 
515
                        if (bk->lver < dblock.lver)
 
516
                                continue;
 
517
 
 
518
                        if (bk->lver > dblock.lver) {
 
519
                                /* I don't think this should happen */
 
520
                                log_errot(token, "ballot %llu larger2 lver[%d] %llu",
 
521
                                          (unsigned long long)next_lver, q,
 
522
                                          (unsigned long long)bk->lver);
 
523
                                error = SANLK_DBLOCK_LVER;
 
524
                                goto out;
 
525
                        }
 
526
 
 
527
                        /* see "It aborts the ballot" in comment above */
 
528
 
 
529
                        if (bk->mbal > dblock.mbal) {
 
530
                                log_errot(token, "ballot %llu abort2 mbal %llu mbal[%d] %llu",
 
531
                                          (unsigned long long)next_lver,
 
532
                                          (unsigned long long)our_mbal, q,
 
533
                                          (unsigned long long)bk->mbal);
 
534
                                error = SANLK_DBLOCK_MBAL;
 
535
                                goto out;
 
536
                        }
 
537
                }
 
538
        }
 
539
 
 
540
        if (!majority_disks(token, num_reads)) {
 
541
                log_errot(token, "ballot %llu dblock read2 error %d",
 
542
                          (unsigned long long)next_lver, rv);
 
543
                error = SANLK_DBLOCK_READ;
 
544
                goto out;
 
545
        }
 
546
 
 
547
        /* "When it completes phase 2, p has committed dblock[p].inp." */
 
548
 
 
549
        memcpy(dblock_out, &dblock, sizeof(struct paxos_dblock));
 
550
        error = SANLK_OK;
 
551
 out:
 
552
        for (d = 0; d < num_disks; d++) {
 
553
                /* don't free iobufs that have timed out */
 
554
                if (!iobuf[d])
 
555
                        continue;
 
556
                free(iobuf[d]);
 
557
        }
 
558
        return error;
 
559
}
 
560
 
 
561
uint32_t leader_checksum(struct leader_record *lr)
 
562
{
 
563
        return crc32c((uint32_t)~1, (uint8_t *)lr, LEADER_CHECKSUM_LEN);
 
564
}
 
565
 
 
566
static void log_leader_error(int result,
 
567
                             struct token *token,
 
568
                             struct sync_disk *disk,
 
569
                             struct leader_record *lr,
 
570
                             const char *caller)
 
571
{
 
572
        log_errot(token, "leader1 %s error %d sn %.48s rn %.48s",
 
573
                  caller ? caller : "unknown",
 
574
                  result,
 
575
                  token->r.lockspace_name,
 
576
                  token->r.name);
 
577
 
 
578
        log_errot(token, "leader2 path %s offset %llu fd %d",
 
579
                  disk->path,
 
580
                  (unsigned long long)disk->offset,
 
581
                  disk->fd);
 
582
 
 
583
        log_errot(token, "leader3 m %x v %x ss %u nh %llu mh %llu oi %llu og %llu lv %llu",
 
584
                  lr->magic,
 
585
                  lr->version,
 
586
                  lr->sector_size,
 
587
                  (unsigned long long)lr->num_hosts,
 
588
                  (unsigned long long)lr->max_hosts,
 
589
                  (unsigned long long)lr->owner_id,
 
590
                  (unsigned long long)lr->owner_generation,
 
591
                  (unsigned long long)lr->lver);
 
592
 
 
593
        log_errot(token, "leader4 sn %.48s rn %.48s ts %llu cs %x",
 
594
                  lr->space_name,
 
595
                  lr->resource_name,
 
596
                  (unsigned long long)lr->timestamp,
 
597
                  lr->checksum);
 
598
 
 
599
        log_errot(token, "leader5 wi %llu wg %llu wt %llu",
 
600
                  (unsigned long long)lr->write_id,
 
601
                  (unsigned long long)lr->write_generation,
 
602
                  (unsigned long long)lr->write_timestamp);
 
603
}
 
604
 
 
605
static int verify_leader(struct token *token,
 
606
                         struct sync_disk *disk,
 
607
                         struct leader_record *lr,
 
608
                         const char *caller)
 
609
{
 
610
        struct leader_record leader_rr;
 
611
        uint32_t sum;
 
612
        int result, rv;
 
613
 
 
614
        if (lr->magic != PAXOS_DISK_MAGIC) {
 
615
                log_errot(token, "verify_leader wrong magic %x %s",
 
616
                          lr->magic, disk->path);
 
617
                result = SANLK_LEADER_MAGIC;
 
618
                goto fail;
 
619
        }
 
620
 
 
621
        if ((lr->version & 0xFFFF0000) != PAXOS_DISK_VERSION_MAJOR) {
 
622
                log_errot(token, "verify_leader wrong version %x %s",
 
623
                          lr->version, disk->path);
 
624
                result = SANLK_LEADER_VERSION;
 
625
                goto fail;
 
626
        }
 
627
 
 
628
        if (lr->sector_size != disk->sector_size) {
 
629
                log_errot(token, "verify_leader wrong sector size %d %d %s",
 
630
                          lr->sector_size, disk->sector_size, disk->path);
 
631
                result = SANLK_LEADER_SECTORSIZE;
 
632
                goto fail;
 
633
        }
 
634
 
 
635
        if (strncmp(lr->space_name, token->r.lockspace_name, NAME_ID_SIZE)) {
 
636
                log_errot(token, "verify_leader wrong space name %.48s %.48s %s",
 
637
                          lr->space_name, token->r.lockspace_name, disk->path);
 
638
                result = SANLK_LEADER_LOCKSPACE;
 
639
                goto fail;
 
640
        }
 
641
 
 
642
        if (strncmp(lr->resource_name, token->r.name, NAME_ID_SIZE)) {
 
643
                log_errot(token, "verify_leader wrong resource name %.48s %.48s %s",
 
644
                          lr->resource_name, token->r.name, disk->path);
 
645
                result = SANLK_LEADER_RESOURCE;
 
646
                goto fail;
 
647
        }
 
648
 
 
649
        if (lr->num_hosts < token->host_id) {
 
650
                log_errot(token, "verify_leader num_hosts too small %llu %llu %s",
 
651
                          (unsigned long long)lr->num_hosts,
 
652
                          (unsigned long long)token->host_id, disk->path);
 
653
                result = SANLK_LEADER_NUMHOSTS;
 
654
                goto fail;
 
655
        }
 
656
 
 
657
        sum = leader_checksum(lr);
 
658
 
 
659
        if (lr->checksum != sum) {
 
660
                log_errot(token, "verify_leader wrong checksum %x %x %s",
 
661
                          lr->checksum, sum, disk->path);
 
662
                result = SANLK_LEADER_CHECKSUM;
 
663
                goto fail;
 
664
        }
 
665
 
 
666
        return SANLK_OK;
 
667
 
 
668
 fail:
 
669
        log_leader_error(result, token, disk, lr, caller);
 
670
 
 
671
        memset(&leader_rr, 0, sizeof(leader_rr));
 
672
 
 
673
        rv = read_sectors(disk, 0, 1, (char *)&leader_rr,
 
674
                          sizeof(struct leader_record),
 
675
                          NULL, "paxos_verify");
 
676
 
 
677
        log_leader_error(rv, token, disk, &leader_rr, "paxos_verify");
 
678
 
 
679
        return result;
 
680
}
 
681
 
 
682
static int leaders_match(struct leader_record *a, struct leader_record *b)
 
683
{
 
684
        if (!memcmp(a, b, LEADER_COMPARE_LEN))
 
685
                return 1;
 
686
        return 0;
 
687
}
 
688
 
 
689
static int _leader_read_single(struct task *task,
 
690
                               struct token *token,
 
691
                               struct leader_record *leader_ret,
 
692
                               const char *caller)
 
693
{
 
694
        struct leader_record leader;
 
695
        int rv;
 
696
 
 
697
        memset(&leader, 0, sizeof(struct leader_record));
 
698
 
 
699
        rv = read_leader(task, &token->disks[0], &leader);
 
700
        if (rv < 0)
 
701
                return rv;
 
702
 
 
703
        rv = verify_leader(token, &token->disks[0], &leader, caller);
 
704
 
 
705
        /* copy what we read even if verify finds a problem */
 
706
 
 
707
        memcpy(leader_ret, &leader, sizeof(struct leader_record));
 
708
        return rv;
 
709
}
 
710
 
 
711
static int _leader_read_multiple(struct task *task,
 
712
                                 struct token *token,
 
713
                                 struct leader_record *leader_ret,
 
714
                                 const char *caller)
 
715
{
 
716
        struct leader_record leader;
 
717
        struct leader_record *leaders;
 
718
        int *leader_reps;
 
719
        int leaders_len, leader_reps_len;
 
720
        int num_reads;
 
721
        int num_disks = token->r.num_disks;
 
722
        int rv = 0, d, i, found;
 
723
        int error;
 
724
 
 
725
        leaders_len = num_disks * sizeof(struct leader_record);
 
726
        leader_reps_len = num_disks * sizeof(int);
 
727
 
 
728
        leaders = malloc(leaders_len);
 
729
        if (!leaders)
 
730
                return -ENOMEM;
 
731
 
 
732
        leader_reps = malloc(leader_reps_len);
 
733
        if (!leader_reps) {
 
734
                free(leaders);
 
735
                return -ENOMEM;
 
736
        }
 
737
 
 
738
        /*
 
739
         * find a leader block that's consistent on the majority of disks,
 
740
         * so we can use as the basis for the new leader
 
741
         */
 
742
 
 
743
        memset(&leader, 0, sizeof(struct leader_record));
 
744
        memset(leaders, 0, leaders_len);
 
745
        memset(leader_reps, 0, leader_reps_len);
 
746
 
 
747
        num_reads = 0;
 
748
 
 
749
        for (d = 0; d < num_disks; d++) {
 
750
                rv = read_leader(task, &token->disks[d], &leaders[d]);
 
751
                if (rv < 0)
 
752
                        continue;
 
753
 
 
754
                rv = verify_leader(token, &token->disks[d], &leaders[d], caller);
 
755
                if (rv < 0)
 
756
                        continue;
 
757
 
 
758
                num_reads++;
 
759
 
 
760
                leader_reps[d] = 1;
 
761
 
 
762
                /* count how many times the same leader block repeats */
 
763
 
 
764
                for (i = 0; i < d; i++) {
 
765
                        if (leaders_match(&leaders[d], &leaders[i])) {
 
766
                                leader_reps[i]++;
 
767
                                break;
 
768
                        }
 
769
                }
 
770
        }
 
771
 
 
772
        if (!majority_disks(token, num_reads)) {
 
773
                log_errot(token, "%s leader read error %d", caller, rv);
 
774
                error = SANLK_LEADER_READ;
 
775
                goto out;
 
776
        }
 
777
 
 
778
        /* check that a majority of disks have the same leader */
 
779
 
 
780
        found = 0;
 
781
 
 
782
        for (d = 0; d < num_disks; d++) {
 
783
                if (!majority_disks(token, leader_reps[d]))
 
784
                        continue;
 
785
 
 
786
                /* leader on d is the same on a majority of disks,
 
787
                   leader becomes the prototype for new_leader */
 
788
 
 
789
                memcpy(&leader, &leaders[d], sizeof(struct leader_record));
 
790
                found = 1;
 
791
                break;
 
792
        }
 
793
 
 
794
        if (!found) {
 
795
                log_errot(token, "%s leader inconsistent", caller);
 
796
                error = SANLK_LEADER_DIFF;
 
797
                goto out;
 
798
        }
 
799
 
 
800
        error = SANLK_OK;
 
801
 out:
 
802
        memcpy(leader_ret, &leader, sizeof(struct leader_record));
 
803
        free(leaders);
 
804
        free(leader_reps);
 
805
        return error;
 
806
}
 
807
 
 
808
int paxos_lease_leader_read(struct task *task,
 
809
                            struct token *token,
 
810
                            struct leader_record *leader_ret,
 
811
                            const char *caller)
 
812
{
 
813
        int rv;
 
814
 
 
815
        /* _leader_read_multiple works fine for the single disk case, but
 
816
           we can cut out a bunch of stuff when we know there's one disk */
 
817
 
 
818
        if (token->r.num_disks > 1)
 
819
                rv = _leader_read_multiple(task, token, leader_ret, caller);
 
820
        else
 
821
                rv = _leader_read_single(task, token, leader_ret, caller);
 
822
 
 
823
        if (rv == SANLK_OK)
 
824
                log_token(token, "%s leader %llu owner %llu %llu %llu", caller,
 
825
                          (unsigned long long)leader_ret->lver,
 
826
                          (unsigned long long)leader_ret->owner_id,
 
827
                          (unsigned long long)leader_ret->owner_generation,
 
828
                          (unsigned long long)leader_ret->timestamp);
 
829
 
 
830
        return rv;
 
831
}
 
832
 
 
833
static int _leader_dblock_read_single(struct task *task,
 
834
                                      struct token *token,
 
835
                                      struct leader_record *leader_ret,
 
836
                                      struct paxos_dblock *our_dblock,
 
837
                                      const char *caller)
 
838
{
 
839
        struct sync_disk *disk = &token->disks[0];
 
840
        char *iobuf, **p_iobuf;
 
841
        uint32_t host_id = token->host_id;
 
842
        int sector_size = disk->sector_size;
 
843
        int sector_count;
 
844
        int rv, iobuf_len;
 
845
 
 
846
        /* sector 0: leader record
 
847
           sector 1: empty
 
848
           sector 2: dblock host_id 1
 
849
           sector 3: dblock host_id 2
 
850
           sector 4: dblock host_id 3
 
851
           for host_id N we need to read N+2 sectors */
 
852
 
 
853
        sector_count = roundup_power_of_two(host_id + 2);
 
854
 
 
855
        iobuf_len = sector_count * sector_size;
 
856
 
 
857
        if (!iobuf_len)
 
858
                return -EINVAL;
 
859
 
 
860
        p_iobuf = &iobuf;
 
861
 
 
862
        rv = posix_memalign((void *)p_iobuf, getpagesize(), iobuf_len);
 
863
        if (rv)
 
864
                return rv;
 
865
 
 
866
        memset(iobuf, 0, iobuf_len);
 
867
 
 
868
        rv = read_iobuf(disk->fd, disk->offset, iobuf, iobuf_len, task);
 
869
        if (rv < 0)
 
870
                goto out;
 
871
 
 
872
        memcpy(leader_ret, iobuf, sizeof(struct leader_record));
 
873
 
 
874
        rv = verify_leader(token, &token->disks[0], leader_ret, caller);
 
875
 
 
876
        memcpy(our_dblock, iobuf + (sector_size * (host_id + 1)),
 
877
               sizeof(struct paxos_dblock));
 
878
 out:
 
879
        if (rv != SANLK_AIO_TIMEOUT)
 
880
                free(iobuf);
 
881
        return rv;
 
882
}
 
883
 
 
884
/* TODO: the point of a combined leader+dblock read is to reduce iops by
 
885
   reading the leader and our dblock in a single read covering both, which
 
886
   this function obviously does not do. */
 
887
 
 
888
static int _leader_dblock_read_multiple(struct task *task,
 
889
                                        struct token *token,
 
890
                                        struct leader_record *leader_ret,
 
891
                                        struct paxos_dblock *our_dblock,
 
892
                                        const char *caller)
 
893
{
 
894
        struct paxos_dblock dblock;
 
895
        uint64_t our_mbal = 0;
 
896
        int d, num_reads;
 
897
        int rv;
 
898
 
 
899
        rv = _leader_read_multiple(task, token, leader_ret, caller);
 
900
        if (rv < 0)
 
901
                return rv;
 
902
 
 
903
        num_reads = 0;
 
904
 
 
905
        for (d = 0; d < token->r.num_disks; d++) {
 
906
                rv = read_dblock(task, &token->disks[d], token->host_id, &dblock);
 
907
                if (rv < 0)
 
908
                        continue;
 
909
                num_reads++;
 
910
 
 
911
                if (dblock.mbal > our_mbal) {
 
912
                        our_mbal = dblock.mbal;
 
913
                        memcpy(our_dblock, &dblock, sizeof(struct paxos_dblock));
 
914
                }
 
915
        }
 
916
 
 
917
        if (!num_reads) {
 
918
                log_errot(token, "paxos_acquire cannot read our dblock %d", rv);
 
919
                rv = SANLK_DBLOCK_READ;
 
920
        }
 
921
 
 
922
        return rv;
 
923
}
 
924
 
 
925
/* read the leader_record and our own dblock in a single larger read op
 
926
   instead of two smaller read ops */
 
927
 
 
928
static int paxos_lease_leader_dblock_read(struct task *task,
 
929
                                          struct token *token,
 
930
                                          struct leader_record *leader_ret,
 
931
                                          struct paxos_dblock *our_dblock,
 
932
                                          const char *caller)
 
933
{
 
934
        int rv;
 
935
 
 
936
        if (token->r.num_disks > 1)
 
937
                rv = _leader_dblock_read_multiple(task, token, leader_ret, our_dblock, caller);
 
938
        else
 
939
                rv = _leader_dblock_read_single(task, token, leader_ret, our_dblock, caller);
 
940
 
 
941
        if (rv == SANLK_OK)
 
942
                log_token(token, "%s leader %llu owner %llu %llu %llu "
 
943
                          "our_dblock %llu %llu %llu %llu %llu %llu",
 
944
                          caller,
 
945
                          (unsigned long long)leader_ret->lver,
 
946
                          (unsigned long long)leader_ret->owner_id,
 
947
                          (unsigned long long)leader_ret->owner_generation,
 
948
                          (unsigned long long)leader_ret->timestamp,
 
949
                          (unsigned long long)our_dblock->mbal,
 
950
                          (unsigned long long)our_dblock->bal,
 
951
                          (unsigned long long)our_dblock->inp,
 
952
                          (unsigned long long)our_dblock->inp2,
 
953
                          (unsigned long long)our_dblock->inp3,
 
954
                          (unsigned long long)our_dblock->lver);
 
955
 
 
956
        return rv;
 
957
}
 
958
 
 
959
static int write_new_leader(struct task *task,
 
960
                            struct token *token,
 
961
                            struct leader_record *nl,
 
962
                            const char *caller)
 
963
{
 
964
        int num_disks = token->r.num_disks;
 
965
        int num_writes = 0;
 
966
        int error = SANLK_OK;
 
967
        int rv = 0, d;
 
968
 
 
969
        for (d = 0; d < num_disks; d++) {
 
970
                rv = write_leader(task, &token->disks[d], nl);
 
971
                if (rv < 0)
 
972
                        continue;
 
973
                num_writes++;
 
974
        }
 
975
 
 
976
        if (!majority_disks(token, num_writes)) {
 
977
                log_errot(token, "%s write_new_leader error %d owner %llu %llu %llu",
 
978
                          caller, rv,
 
979
                          (unsigned long long)nl->owner_id,
 
980
                          (unsigned long long)nl->owner_generation,
 
981
                          (unsigned long long)nl->timestamp);
 
982
                error = SANLK_LEADER_WRITE;
 
983
        }
 
984
 
 
985
        return error;
 
986
}
 
987
 
 
988
/*
 
989
 * If we hang or crash after completing a ballot successfully, but before
 
990
 * commiting the leader_record, then the next host that runs a ballot (with the
 
991
 * same lver since we did not commit the new lver to the leader_record) will
 
992
 * commit the same inp values that we were about to commit.  If the inp values
 
993
 * they commit indicate we (who crashed or hung) are the new owner, then the
 
994
 * other hosts will begin monitoring the liveness of our host_id.  Once enough
 
995
 * time has passed, they assume we're dead, and go on with new versions.  The
 
996
 * "enough time" ensures that if we hung before writing the leader, that we
 
997
 * won't wake up and finally write what will then be an old invalid leader.
 
998
 */
 
999
 
 
1000
int paxos_lease_acquire(struct task *task,
 
1001
                        struct token *token,
 
1002
                        uint32_t flags,
 
1003
                        struct leader_record *leader_ret,
 
1004
                        uint64_t acquire_lver,
 
1005
                        int new_num_hosts)
 
1006
{
 
1007
        struct sync_disk host_id_disk;
 
1008
        struct leader_record host_id_leader;
 
1009
        struct leader_record cur_leader;
 
1010
        struct leader_record tmp_leader;
 
1011
        struct leader_record new_leader;
 
1012
        struct paxos_dblock our_dblock;
 
1013
        struct paxos_dblock dblock;
 
1014
        struct host_status hs;
 
1015
        uint64_t wait_start, now;
 
1016
        uint64_t last_timestamp;
 
1017
        uint64_t next_lver;
 
1018
        uint64_t our_mbal = 0;
 
1019
        int copy_cur_leader = 0;
 
1020
        int disk_open = 0;
 
1021
        int error, rv, us;
 
1022
 
 
1023
        log_token(token, "paxos_acquire begin lver %llu flags %x",
 
1024
                  (unsigned long long)acquire_lver, flags);
 
1025
 restart:
 
1026
 
 
1027
        error = paxos_lease_leader_dblock_read(task, token, &cur_leader, &our_dblock,
 
1028
                                               "paxos_acquire");
 
1029
        if (error < 0)
 
1030
                goto out;
 
1031
 
 
1032
        if (flags & PAXOS_ACQUIRE_FORCE) {
 
1033
                copy_cur_leader = 1;
 
1034
                goto run;
 
1035
        }
 
1036
 
 
1037
        if (acquire_lver && cur_leader.lver != acquire_lver) {
 
1038
                log_errot(token, "paxos_acquire acquire_lver %llu cur_leader %llu",
 
1039
                          (unsigned long long)acquire_lver,
 
1040
                          (unsigned long long)cur_leader.lver);
 
1041
                error = SANLK_ACQUIRE_LVER;
 
1042
                goto out;
 
1043
        }
 
1044
 
 
1045
        if (cur_leader.timestamp == LEASE_FREE) {
 
1046
                log_token(token, "paxos_acquire leader %llu free",
 
1047
                          (unsigned long long)cur_leader.lver);
 
1048
                copy_cur_leader = 1;
 
1049
                goto run;
 
1050
        }
 
1051
 
 
1052
        if (cur_leader.owner_id == token->host_id &&
 
1053
            cur_leader.owner_generation == token->host_generation) {
 
1054
                log_token(token, "paxos_acquire already owner id %llu gen %llu",
 
1055
                          (unsigned long long)token->host_id,
 
1056
                          (unsigned long long)token->host_generation);
 
1057
                copy_cur_leader = 1;
 
1058
                goto run;
 
1059
        }
 
1060
 
 
1061
        /*
 
1062
         * Check if current owner is alive based on its host_id renewals.
 
1063
         * If the current owner has been dead long enough we can assume that
 
1064
         * its watchdog has triggered and we can go for the paxos lease.
 
1065
         */
 
1066
 
 
1067
        if (!disk_open) {
 
1068
                memset(&host_id_disk, 0, sizeof(host_id_disk));
 
1069
 
 
1070
                rv = lockspace_disk(cur_leader.space_name, &host_id_disk);
 
1071
                if (rv < 0) {
 
1072
                        log_errot(token, "paxos_acquire no lockspace info %.48s",
 
1073
                                  cur_leader.space_name);
 
1074
                        error = SANLK_ACQUIRE_LOCKSPACE;
 
1075
                        goto out;
 
1076
                }
 
1077
                host_id_disk.fd = -1;
 
1078
 
 
1079
                disk_open = open_disks_fd(&host_id_disk, 1);
 
1080
                if (disk_open != 1) {
 
1081
                        log_errot(token, "paxos_acquire cannot open host_id_disk");
 
1082
                        error = SANLK_ACQUIRE_IDDISK;
 
1083
                        goto out;
 
1084
                }
 
1085
        }
 
1086
 
 
1087
        rv = host_info(cur_leader.space_name, cur_leader.owner_id, &hs);
 
1088
        if (!rv && hs.last_check && hs.last_live &&
 
1089
            hs.owner_id == cur_leader.owner_id &&
 
1090
            hs.owner_generation == cur_leader.owner_generation) {
 
1091
                wait_start = hs.last_live;
 
1092
                last_timestamp = hs.timestamp;
 
1093
        } else {
 
1094
                wait_start = monotime();
 
1095
                last_timestamp = 0;
 
1096
        }
 
1097
 
 
1098
        log_token(token, "paxos_acquire owner %llu %llu %llu "
 
1099
                  "host_status %llu %llu %llu wait_start %llu",
 
1100
                  (unsigned long long)cur_leader.owner_id,
 
1101
                  (unsigned long long)cur_leader.owner_generation,
 
1102
                  (unsigned long long)cur_leader.timestamp,
 
1103
                  (unsigned long long)hs.owner_id,
 
1104
                  (unsigned long long)hs.owner_generation,
 
1105
                  (unsigned long long)hs.timestamp,
 
1106
                  (unsigned long long)wait_start);
 
1107
 
 
1108
        while (1) {
 
1109
                error = delta_lease_leader_read(task, &host_id_disk,
 
1110
                                                cur_leader.space_name,
 
1111
                                                cur_leader.owner_id,
 
1112
                                                &host_id_leader,
 
1113
                                                "paxos_acquire");
 
1114
                if (error < 0) {
 
1115
                        log_errot(token, "paxos_acquire owner %llu %llu %llu "
 
1116
                                  "delta read %d fd %d path %s off %llu ss %u",
 
1117
                                  (unsigned long long)cur_leader.owner_id,
 
1118
                                  (unsigned long long)cur_leader.owner_generation,
 
1119
                                  (unsigned long long)cur_leader.timestamp,
 
1120
                                  error, host_id_disk.fd, host_id_disk.path,
 
1121
                                  (unsigned long long)host_id_disk.offset,
 
1122
                                  host_id_disk.sector_size);
 
1123
                        goto out;
 
1124
                }
 
1125
 
 
1126
                /* a host_id cannot become free in less than
 
1127
                   host_dead_seconds after the final renewal because
 
1128
                   a host_id must first be acquired before being freed,
 
1129
                   and acquiring cannot take less than host_dead_seconds */
 
1130
 
 
1131
                if (host_id_leader.timestamp == LEASE_FREE) {
 
1132
                        log_token(token, "paxos_acquire owner %llu delta free",
 
1133
                                  (unsigned long long)cur_leader.owner_id);
 
1134
                        goto run;
 
1135
                }
 
1136
 
 
1137
                /* another host has acquired the host_id of the host that
 
1138
                   owned this paxos lease; acquiring a host_id also cannot be
 
1139
                   done in less than host_dead_seconds, or
 
1140
 
 
1141
                   the host_id that owns this lease may be alive, but it
 
1142
                   owned the lease in a previous generation without freeing it,
 
1143
                   and no longer owns it */
 
1144
 
 
1145
                if (host_id_leader.owner_id != cur_leader.owner_id ||
 
1146
                    host_id_leader.owner_generation > cur_leader.owner_generation) {
 
1147
                        log_token(token, "paxos_acquire owner %llu %llu %llu "
 
1148
                                  "delta %llu %llu %llu mismatch",
 
1149
                                  (unsigned long long)cur_leader.owner_id,
 
1150
                                  (unsigned long long)cur_leader.owner_generation,
 
1151
                                  (unsigned long long)cur_leader.timestamp,
 
1152
                                  (unsigned long long)host_id_leader.owner_id,
 
1153
                                  (unsigned long long)host_id_leader.owner_generation,
 
1154
                                  (unsigned long long)host_id_leader.timestamp);
 
1155
                        goto run;
 
1156
                }
 
1157
 
 
1158
                if (!last_timestamp) {
 
1159
                        last_timestamp = host_id_leader.timestamp;
 
1160
                        goto skip_live_check;
 
1161
                }
 
1162
 
 
1163
                /* the owner is renewing its host_id so it's alive */
 
1164
 
 
1165
                if (host_id_leader.timestamp != last_timestamp) {
 
1166
                        if (flags & PAXOS_ACQUIRE_QUIET_FAIL) {
 
1167
                                log_token(token, "paxos_acquire owner %llu "
 
1168
                                          "delta %llu %llu %llu alive",
 
1169
                                          (unsigned long long)cur_leader.owner_id,
 
1170
                                          (unsigned long long)host_id_leader.owner_id,
 
1171
                                          (unsigned long long)host_id_leader.owner_generation,
 
1172
                                          (unsigned long long)host_id_leader.timestamp);
 
1173
                        } else {
 
1174
                                log_errot(token, "paxos_acquire owner %llu "
 
1175
                                          "delta %llu %llu %llu alive",
 
1176
                                          (unsigned long long)cur_leader.owner_id,
 
1177
                                          (unsigned long long)host_id_leader.owner_id,
 
1178
                                          (unsigned long long)host_id_leader.owner_generation,
 
1179
                                          (unsigned long long)host_id_leader.timestamp);
 
1180
                        }
 
1181
                        error = SANLK_ACQUIRE_IDLIVE;
 
1182
                        goto out;
 
1183
                }
 
1184
 
 
1185
 
 
1186
                /* if the owner hasn't renewed its host_id lease for
 
1187
                   host_dead_seconds then its watchdog should have fired
 
1188
                   by now */
 
1189
 
 
1190
                now = monotime();
 
1191
 
 
1192
                if (now - wait_start > task->host_dead_seconds) {
 
1193
                        log_token(token, "paxos_acquire owner %llu %llu %llu "
 
1194
                                  "delta %llu %llu %llu dead %llu-%llu>%d",
 
1195
                                  (unsigned long long)cur_leader.owner_id,
 
1196
                                  (unsigned long long)cur_leader.owner_generation,
 
1197
                                  (unsigned long long)cur_leader.timestamp,
 
1198
                                  (unsigned long long)host_id_leader.owner_id,
 
1199
                                  (unsigned long long)host_id_leader.owner_generation,
 
1200
                                  (unsigned long long)host_id_leader.timestamp,
 
1201
                                  (unsigned long long)now,
 
1202
                                  (unsigned long long)wait_start,
 
1203
                                  task->host_dead_seconds);
 
1204
                        goto run;
 
1205
                }
 
1206
 
 
1207
 skip_live_check:
 
1208
                /* TODO: test with sleep(2) here */
 
1209
                sleep(1);
 
1210
 
 
1211
                if (external_shutdown) {
 
1212
                        error = -1;
 
1213
                        goto out;
 
1214
                }
 
1215
 
 
1216
                error = paxos_lease_leader_read(task, token, &tmp_leader, "paxos_acquire");
 
1217
                if (error < 0)
 
1218
                        goto out;
 
1219
 
 
1220
                if (memcmp(&cur_leader, &tmp_leader, sizeof(struct leader_record))) {
 
1221
                        log_token(token, "paxos_acquire restart leader changed");
 
1222
                        goto restart;
 
1223
                }
 
1224
        }
 
1225
 run:
 
1226
        /*
 
1227
         * Use the disk paxos algorithm to attempt to commit a new leader.
 
1228
         *
 
1229
         * If we complete a ballot successfully, we can commit a leader record
 
1230
         * with next_lver.  If we find a higher mbal during a ballot, we increase
 
1231
         * our own mbal and try the ballot again.
 
1232
         *
 
1233
         * next_lver is derived from cur_leader with a zero or timed out owner.
 
1234
         * We need to monitor the leader record to see if another host commits
 
1235
         * a new leader_record with next_lver.
 
1236
         *
 
1237
         * TODO: may not need to increase mbal if dblock.inp and inp2 match
 
1238
         * current host_id and generation?
 
1239
         */
 
1240
 
 
1241
        next_lver = cur_leader.lver + 1;
 
1242
 
 
1243
        if (!our_dblock.mbal)
 
1244
                our_mbal = token->host_id;
 
1245
        else
 
1246
                our_mbal = our_dblock.mbal + cur_leader.max_hosts;
 
1247
 
 
1248
 retry_ballot:
 
1249
 
 
1250
        if (copy_cur_leader) {
 
1251
                /* reusing the initial read removes an iop in the common case */
 
1252
                copy_cur_leader = 0;
 
1253
                memcpy(&tmp_leader, &cur_leader, sizeof(struct leader_record));
 
1254
        } else {
 
1255
                error = paxos_lease_leader_read(task, token, &tmp_leader, "paxos_acquire");
 
1256
                if (error < 0)
 
1257
                        goto out;
 
1258
        }
 
1259
 
 
1260
        if (tmp_leader.lver == next_lver) {
 
1261
                /*
 
1262
                 * another host has commited a leader_record for next_lver,
 
1263
                 * check which inp (owner_id) they commited (possibly us).
 
1264
                 */
 
1265
 
 
1266
                if (tmp_leader.owner_id == token->host_id &&
 
1267
                    tmp_leader.owner_generation == token->host_generation) {
 
1268
                        /* not a problem, but interesting to see, so use log_error */
 
1269
 
 
1270
                        log_errot(token, "paxos_acquire %llu owner our inp "
 
1271
                                  "%llu %llu %llu commited by %llu",
 
1272
                                  (unsigned long long)next_lver,
 
1273
                                  (unsigned long long)tmp_leader.owner_id,
 
1274
                                  (unsigned long long)tmp_leader.owner_generation,
 
1275
                                  (unsigned long long)tmp_leader.timestamp,
 
1276
                                  (unsigned long long)tmp_leader.write_id);
 
1277
 
 
1278
                        memcpy(leader_ret, &tmp_leader, sizeof(struct leader_record));
 
1279
                        error = SANLK_OK;
 
1280
                } else {
 
1281
                        /* not a problem, but interesting to see, so use log_error */
 
1282
 
 
1283
                        log_errot(token, "paxos_acquire %llu owner is %llu",
 
1284
                                  (unsigned long long)next_lver,
 
1285
                                  (unsigned long long)tmp_leader.owner_id);
 
1286
 
 
1287
                        error = SANLK_ACQUIRE_OWNED;
 
1288
                }
 
1289
                goto out;
 
1290
        }
 
1291
 
 
1292
        error = run_ballot(task, token, cur_leader.num_hosts, next_lver, our_mbal,
 
1293
                           &dblock);
 
1294
 
 
1295
        if (error == SANLK_DBLOCK_MBAL) {
 
1296
                us = get_rand(0, 1000000);
 
1297
                if (us < 0)
 
1298
                        us = token->host_id * 100;
 
1299
 
 
1300
                /* not a problem, but interesting to see, so use log_error */
 
1301
                log_errot(token, "paxos_acquire %llu retry delay %d us",
 
1302
                          (unsigned long long)next_lver, us);
 
1303
 
 
1304
                usleep(us);
 
1305
                our_mbal += cur_leader.max_hosts;
 
1306
                goto retry_ballot;
 
1307
        }
 
1308
 
 
1309
        if (error < 0) {
 
1310
                log_errot(token, "paxos_acquire %llu ballot error %d",
 
1311
                          (unsigned long long)next_lver, error);
 
1312
                goto out;
 
1313
        }
 
1314
 
 
1315
        /* ballot success, commit next_lver with dblock values */
 
1316
 
 
1317
        memcpy(&new_leader, &cur_leader, sizeof(struct leader_record));
 
1318
        new_leader.lver = dblock.lver;
 
1319
        new_leader.owner_id = dblock.inp;
 
1320
        new_leader.owner_generation = dblock.inp2;
 
1321
        new_leader.timestamp = dblock.inp3;
 
1322
 
 
1323
        new_leader.write_id = token->host_id;
 
1324
        new_leader.write_generation = token->host_generation;
 
1325
        new_leader.write_timestamp = monotime();
 
1326
 
 
1327
        if (new_num_hosts)
 
1328
                new_leader.num_hosts = new_num_hosts;
 
1329
        new_leader.checksum = leader_checksum(&new_leader);
 
1330
 
 
1331
        error = write_new_leader(task, token, &new_leader, "paxos_acquire");
 
1332
        if (error < 0)
 
1333
                goto out;
 
1334
 
 
1335
        if (new_leader.owner_id != token->host_id) {
 
1336
                /* not a problem, but interesting to see, so use log_error */
 
1337
 
 
1338
                log_errot(token, "ballot %llu commit other owner %llu %llu %llu",
 
1339
                          (unsigned long long)new_leader.lver,
 
1340
                          (unsigned long long)new_leader.owner_id,
 
1341
                          (unsigned long long)new_leader.owner_generation,
 
1342
                          (unsigned long long)new_leader.timestamp);
 
1343
 
 
1344
                error = SANLK_ACQUIRE_OTHER;
 
1345
                goto out;
 
1346
        }
 
1347
 
 
1348
        log_token(token, "ballot %llu commit self owner %llu %llu %llu",
 
1349
                  (unsigned long long)next_lver,
 
1350
                  (unsigned long long)new_leader.owner_id,
 
1351
                  (unsigned long long)new_leader.owner_generation,
 
1352
                  (unsigned long long)new_leader.timestamp);
 
1353
 
 
1354
        memcpy(leader_ret, &new_leader, sizeof(struct leader_record));
 
1355
        error = SANLK_OK;
 
1356
 
 
1357
 out:
 
1358
        if (disk_open)
 
1359
                close_disks(&host_id_disk, 1);
 
1360
 
 
1361
        return error;
 
1362
}
 
1363
 
 
1364
#if 0
 
1365
int paxos_lease_renew(struct task *task,
 
1366
                      struct token *token,
 
1367
                      struct leader_record *leader_last,
 
1368
                      struct leader_record *leader_ret)
 
1369
{
 
1370
        struct leader_record new_leader;
 
1371
        int rv, d;
 
1372
        int error;
 
1373
 
 
1374
        for (d = 0; d < token->r.num_disks; d++) {
 
1375
                memset(&new_leader, 0, sizeof(struct leader_record));
 
1376
 
 
1377
                rv = read_leader(task, &token->disks[d], &new_leader);
 
1378
                if (rv < 0)
 
1379
                        continue;
 
1380
 
 
1381
                if (memcmp(&new_leader, leader_last,
 
1382
                           sizeof(struct leader_record))) {
 
1383
                        log_errot(token, "leader changed between renewals");
 
1384
                        return SANLK_BAD_LEADER;
 
1385
                }
 
1386
        }
 
1387
 
 
1388
        new_leader.timestamp = monotime();
 
1389
        new_leader.checksum = leader_checksum(&new_leader);
 
1390
 
 
1391
        error = write_new_leader(task, token, &new_leader);
 
1392
        if (error < 0)
 
1393
                goto out;
 
1394
 
 
1395
        memcpy(leader_ret, &new_leader, sizeof(struct leader_record));
 
1396
 out:
 
1397
        return error;
 
1398
}
 
1399
#endif
 
1400
 
 
1401
int paxos_lease_release(struct task *task,
 
1402
                        struct token *token,
 
1403
                        struct leader_record *leader_last,
 
1404
                        struct leader_record *leader_ret)
 
1405
{
 
1406
        struct leader_record leader;
 
1407
        int error;
 
1408
 
 
1409
        error = paxos_lease_leader_read(task, token, &leader, "paxos_release");
 
1410
        if (error < 0) {
 
1411
                log_errot(token, "release error cannot read leader");
 
1412
                goto out;
 
1413
        }
 
1414
 
 
1415
        if (leader.lver != leader_last->lver) {
 
1416
                log_errot(token, "paxos_release %llu other lver %llu",
 
1417
                          (unsigned long long)leader_last->lver,
 
1418
                          (unsigned long long)leader.lver);
 
1419
                return SANLK_RELEASE_LVER;
 
1420
        }
 
1421
 
 
1422
        if (leader.owner_id != token->host_id ||
 
1423
            leader.owner_generation != token->host_generation) {
 
1424
                log_errot(token, "paxos_release %llu other owner %llu %llu %llu",
 
1425
                          (unsigned long long)leader_last->lver,
 
1426
                          (unsigned long long)leader.owner_id,
 
1427
                          (unsigned long long)leader.owner_generation,
 
1428
                          (unsigned long long)leader.timestamp);
 
1429
                return SANLK_RELEASE_OWNER;
 
1430
        }
 
1431
 
 
1432
        if (memcmp(&leader, leader_last, sizeof(struct leader_record))) {
 
1433
                /*
 
1434
                 * This will happen when two hosts finish the same ballot
 
1435
                 * successfully, the second commiting the same inp values
 
1436
                 * that the first did, as it should.  But the second will
 
1437
                 * write it's own write_id/gen/timestap, which will differ
 
1438
                 * from what the first host wrote.  So when the first host
 
1439
                 * rereads here in the release, it will find different
 
1440
                 * write_id/gen/timestamp from what it wrote.  This is
 
1441
                 * perfectly fine (use log_error since it's interesting
 
1442
                 * to see when this happens.)
 
1443
                 */
 
1444
                log_errot(token, "paxos_release %llu leader different "
 
1445
                          "write %llu %llu %llu vs %llu %llu %llu",
 
1446
                          (unsigned long long)leader_last->lver,
 
1447
                          (unsigned long long)leader_last->write_id,
 
1448
                          (unsigned long long)leader_last->write_generation,
 
1449
                          (unsigned long long)leader_last->write_timestamp,
 
1450
                          (unsigned long long)leader.write_id,
 
1451
                          (unsigned long long)leader.write_generation,
 
1452
                          (unsigned long long)leader.write_timestamp);
 
1453
                /*
 
1454
                log_leader_error(0, token, &token->disks[0], leader_last, "paxos_release");
 
1455
                log_leader_error(0, token, &token->disks[0], &leader, "paxos_release");
 
1456
                */
 
1457
        }
 
1458
 
 
1459
        leader.timestamp = LEASE_FREE;
 
1460
        leader.write_id = token->host_id;
 
1461
        leader.write_generation = token->host_generation;
 
1462
        leader.write_timestamp = monotime();
 
1463
        leader.checksum = leader_checksum(&leader);
 
1464
 
 
1465
        error = write_new_leader(task, token, &leader, "paxos_release");
 
1466
        if (error < 0)
 
1467
                goto out;
 
1468
 
 
1469
        memcpy(leader_ret, &leader, sizeof(struct leader_record));
 
1470
 out:
 
1471
        return error;
 
1472
}
 
1473
 
 
1474
int paxos_lease_init(struct task *task,
 
1475
                     struct token *token,
 
1476
                     int num_hosts, int max_hosts)
 
1477
{
 
1478
        char *iobuf, **p_iobuf;
 
1479
        struct leader_record *leader;
 
1480
        struct request_record *rr;
 
1481
        int iobuf_len;
 
1482
        int sector_size;
 
1483
        int align_size;
 
1484
        int aio_timeout = 0;
 
1485
        int rv, d;
 
1486
 
 
1487
        if (!num_hosts)
 
1488
                num_hosts = DEFAULT_MAX_HOSTS;
 
1489
        if (!max_hosts)
 
1490
                max_hosts = DEFAULT_MAX_HOSTS;
 
1491
 
 
1492
        sector_size = token->disks[0].sector_size;
 
1493
 
 
1494
        align_size = direct_align(&token->disks[0]);
 
1495
        if (align_size < 0)
 
1496
                return align_size;
 
1497
 
 
1498
        if (sector_size * (2 + max_hosts) > align_size)
 
1499
                return -E2BIG;
 
1500
 
 
1501
        iobuf_len = align_size;
 
1502
 
 
1503
        p_iobuf = &iobuf;
 
1504
 
 
1505
        rv = posix_memalign((void *)p_iobuf, getpagesize(), iobuf_len);
 
1506
        if (rv)
 
1507
                return rv;
 
1508
 
 
1509
        memset(iobuf, 0, iobuf_len);
 
1510
 
 
1511
        leader = (struct leader_record *)iobuf;
 
1512
        leader->magic = PAXOS_DISK_MAGIC;
 
1513
        leader->version = PAXOS_DISK_VERSION_MAJOR | PAXOS_DISK_VERSION_MINOR;
 
1514
        leader->sector_size = sector_size;
 
1515
        leader->num_hosts = num_hosts;
 
1516
        leader->max_hosts = max_hosts;
 
1517
        leader->timestamp = LEASE_FREE;
 
1518
        strncpy(leader->space_name, token->r.lockspace_name, NAME_ID_SIZE);
 
1519
        strncpy(leader->resource_name, token->r.name, NAME_ID_SIZE);
 
1520
        leader->checksum = leader_checksum(leader);
 
1521
 
 
1522
        rr = (struct request_record *)(iobuf + sector_size);
 
1523
        rr->magic = REQ_DISK_MAGIC;
 
1524
        rr->version = REQ_DISK_VERSION_MAJOR | REQ_DISK_VERSION_MINOR;
 
1525
 
 
1526
        for (d = 0; d < token->r.num_disks; d++) {
 
1527
                rv = write_iobuf(token->disks[d].fd, token->disks[d].offset,
 
1528
                                 iobuf, iobuf_len, task);
 
1529
 
 
1530
                if (rv == SANLK_AIO_TIMEOUT)
 
1531
                        aio_timeout = 1;
 
1532
 
 
1533
                if (rv < 0)
 
1534
                        return rv;
 
1535
        }
 
1536
 
 
1537
        if (!aio_timeout)
 
1538
                free(iobuf);
 
1539
 
 
1540
        return 0;
 
1541
}
 
1542