2
* Copyright 2010-2011 Red Hat, Inc.
4
* This copyrighted material is made available to anyone wishing to use,
5
* modify, copy, or redistribute it subject to the terms and conditions
6
* of the GNU General Public License v2 or (at your option) any later version.
21
#include <sys/types.h>
24
#include "sanlock_internal.h"
28
#include "lockspace.h"
29
#include "delta_lease.h"
30
#include "paxos_lease.h"
32
uint32_t crc32c(uint32_t crc, uint8_t *data, size_t length);
33
int get_rand(int a, int b);
35
#define DBLOCK_CHECKSUM_LEN 48 /* ends before checksum field */
40
uint64_t inp; /* host_id */
41
uint64_t inp2; /* host_id generation */
42
uint64_t inp3; /* host_id's timestamp */
47
static uint32_t roundup_power_of_two(uint32_t val)
59
int majority_disks(struct token *token, int num)
61
int num_disks = token->r.num_disks;
63
/* odd number of disks */
66
return num >= ((num_disks / 2) + 1);
68
/* even number of disks */
70
if (num > (num_disks / 2))
73
if (num < (num_disks / 2))
76
/* TODO: half of disks are majority if tiebreaker disk is present */
80
int paxos_lease_request_read(struct task *task, struct token *token,
81
struct request_record *rr)
85
/* 1 = request record is second sector */
87
rv = read_sectors(&token->disks[0], 1, 1, (char *)rr,
88
sizeof(struct request_record),
95
int paxos_lease_request_write(struct task *task, struct token *token,
96
struct request_record *rr)
100
rv = write_sector(&token->disks[0], 1, (char *)rr,
101
sizeof(struct request_record),
108
static int write_dblock(struct task *task,
109
struct sync_disk *disk,
111
struct paxos_dblock *pd)
115
/* 1 leader block + 1 request block;
116
host_id N is block offset N-1 */
118
rv = write_sector(disk, 2 + host_id - 1, (char *)pd, sizeof(struct paxos_dblock),
123
static int write_leader(struct task *task,
124
struct sync_disk *disk,
125
struct leader_record *lr)
129
rv = write_sector(disk, 0, (char *)lr, sizeof(struct leader_record),
134
static int read_dblock(struct task *task,
135
struct sync_disk *disk,
137
struct paxos_dblock *pd)
141
/* 1 leader block + 1 request block; host_id N is block offset N-1 */
143
rv = read_sectors(disk, 2 + host_id - 1, 1, (char *)pd, sizeof(struct paxos_dblock),
149
static int read_dblocks(struct task *task,
150
struct sync_disk *disk,
151
struct paxos_dblock *pds,
157
data_len = pds_count * disk->sector_size;
159
data = malloc(data_len);
161
log_error("read_dblocks malloc %d %s", data_len, disk->path);
166
/* 2 = 1 leader block + 1 request block */
168
rv = read_sectors(disk, 2, pds_count, data, data_len,
173
/* copy the first N bytes from each sector, where N is size of
176
for (i = 0; i < pds_count; i++) {
177
memcpy(&pds[i], data + (i * disk->sector_size),
178
sizeof(struct paxos_dblock));
189
static int read_leader(struct task *task,
190
struct sync_disk *disk,
191
struct leader_record *lr)
195
/* 0 = leader record is first sector */
197
rv = read_sectors(disk, 0, 1, (char *)lr, sizeof(struct leader_record),
203
static uint32_t dblock_checksum(struct paxos_dblock *pd)
205
return crc32c((uint32_t)~1, (uint8_t *)pd, DBLOCK_CHECKSUM_LEN);
208
static int verify_dblock(struct token *token, struct paxos_dblock *pd)
212
if (!pd->checksum && !pd->mbal && !pd->bal && !pd->inp && !pd->lver)
215
sum = dblock_checksum(pd);
217
if (pd->checksum != sum) {
218
log_errot(token, "verify_dblock wrong checksum %x %x",
220
return SANLK_DBLOCK_CHECKSUM;
227
* It's possible that we pick a bk_max from another host which has our own
228
* inp values in it, and we can end up commiting our own inp values, copied
229
* from another host's dblock:
232
* host2 phase1 mbal 14002
233
* host2 writes dblock[1] mbal 14002
234
* host2 reads no higher mbal
235
* host2 choose own inp 2,1
236
* host2 phase2 mbal 14002 bal 14002 inp 2,1
237
* host2 writes dblock[1] bal 14002 inp 2,1
239
* host1 phase1 mbal 20001
240
* host1 writes dblock[0] mbal 20001
241
* host1 reads no higher mbal
242
* host1 choose dblock[1] bal 14002 inp 2,1
243
* host1 phase2 mbal 20001 bal 20001 inp 2,1
244
* host1 writes dblock[0] bal 20001 inp 2,1
245
* host2 reads dblock[0] mbal 20001 > 14002
248
* host2 phase1 mbal 16002
249
* host2 writes dblock[1] mbal 16002
250
* host2 reads dblock[0] mbal 20001 > 16002
253
* host2 phase1 mbal 18002
254
* host2 writes dblock[1] mbal 18002
255
* host2 reads dblock[0] mbal 20001 > 18002
258
* host2 phase1 mbal 20002
259
* host2 writes dblock[1] mbal 20002
260
* host2 reads no higher mbal
261
* host2 choose dblock[0] bal 20001 inp 2,1
262
* host1 reads dblock[1] mbal 20002 > 20001
264
* host2 phase2 mbal 20002 bal 20002 inp 2,1
265
* host2 writes dblock[1] bal 20002 inp 2,1
266
* host2 reads no higher mbal
267
* host2 commit inp 2,1
269
* host1 leader owner 2,1
273
static int run_ballot(struct task *task, struct token *token, int num_hosts,
274
uint64_t next_lver, uint64_t our_mbal,
275
struct paxos_dblock *dblock_out)
277
struct paxos_dblock dblock;
278
struct paxos_dblock bk_max;
279
struct paxos_dblock *bk;
280
struct sync_disk *disk;
281
char *iobuf[SANLK_MAX_DISKS];
282
char **p_iobuf[SANLK_MAX_DISKS];
283
int num_disks = token->r.num_disks;
284
int num_writes, num_reads;
285
int sector_size = token->disks[0].sector_size;
292
sector_count = roundup_power_of_two(num_hosts + 2);
294
iobuf_len = sector_count * sector_size;
299
for (d = 0; d < num_disks; d++) {
300
p_iobuf[d] = &iobuf[d];
302
rv = posix_memalign((void *)p_iobuf[d], getpagesize(), iobuf_len);
311
* "For each disk d, it tries first to write dblock[p] to disk[d][p]
312
* and then to read disk[d][q] for all other processors q. It aborts
313
* the ballot if, for any d and q, it finds disk[d][q].mbal >
314
* dblock[p].mbal. The phase completes when p has written and read a
315
* majority of the disks, without reading any block whose mbal
316
* component is greater than dblock[p].mbal."
319
log_token(token, "ballot %llu phase1 mbal %llu",
320
(unsigned long long)next_lver,
321
(unsigned long long)our_mbal);
323
memset(&dblock, 0, sizeof(struct paxos_dblock));
324
dblock.mbal = our_mbal;
325
dblock.lver = next_lver;
326
dblock.checksum = dblock_checksum(&dblock);
328
memset(&bk_max, 0, sizeof(struct paxos_dblock));
332
for (d = 0; d < num_disks; d++) {
333
rv = write_dblock(task, &token->disks[d], token->host_id, &dblock);
339
if (!majority_disks(token, num_writes)) {
340
log_errot(token, "ballot %llu dblock write error %d",
341
(unsigned long long)next_lver, rv);
342
error = SANLK_DBLOCK_WRITE;
348
for (d = 0; d < num_disks; d++) {
349
disk = &token->disks[d];
353
memset(iobuf[d], 0, iobuf_len);
355
rv = read_iobuf(disk->fd, disk->offset, iobuf[d], iobuf_len, task);
356
if (rv == SANLK_AIO_TIMEOUT)
363
for (q = 0; q < num_hosts; q++) {
364
bk = (struct paxos_dblock *)(iobuf[d] + ((2 + q)*sector_size));
366
rv = verify_dblock(token, bk);
370
if (bk->lver < dblock.lver)
373
if (bk->lver > dblock.lver) {
374
/* I don't think this should happen */
375
log_errot(token, "ballot %llu larger1 lver[%d] %llu",
376
(unsigned long long)next_lver, q,
377
(unsigned long long)bk->lver);
378
error = SANLK_DBLOCK_LVER;
382
/* see "It aborts the ballot" in comment above */
384
if (bk->mbal > dblock.mbal) {
385
log_errot(token, "ballot %llu abort1 mbal %llu mbal[%d] %llu",
386
(unsigned long long)next_lver,
387
(unsigned long long)our_mbal, q,
388
(unsigned long long)bk->mbal);
389
error = SANLK_DBLOCK_MBAL;
393
/* see choosing inp for phase 2 in comment below */
399
log_errot(token, "ballot %llu zero bal inp[%d] %llu",
400
(unsigned long long)next_lver, q,
401
(unsigned long long)bk->inp);
405
if (bk->bal > bk_max.bal) {
412
if (!majority_disks(token, num_reads)) {
413
log_errot(token, "ballot %llu dblock read error %d",
414
(unsigned long long)next_lver, rv);
415
error = SANLK_DBLOCK_READ;
421
* "When it completes phase 1, p chooses a new value of dblock[p].inp,
422
* sets dblock[p].bal to dblock[p].mbal (its current ballot number),
423
* and begins phase 2."
425
* "We now describe how processor p chooses the value of dblock[p].inp
426
* that it tries to commit in phase 2. Let blocksSeen be the set
427
* consisting of dblock[p] and all the records disk[d][q] read by p in
428
* phase 1. Let nonInitBlks be the subset of blocksSeen consisting of
429
* those records whose inp field is not NotAnInput. If nonInitBlks is
430
* empty, then p sets dblock[p].inp to its own input value input[p].
431
* Otherwise, it sets dblock[p].inp to bk.inp for some record bk in
432
* nonInitBlks having the largest value of bk.bal."
436
/* lver and mbal are already set */
437
dblock.inp = bk_max.inp;
438
dblock.inp2 = bk_max.inp2;
439
dblock.inp3 = bk_max.inp3;
441
/* lver and mbal are already set */
442
dblock.inp = token->host_id;
443
dblock.inp2 = token->host_generation;
444
dblock.inp3 = monotime();
446
dblock.bal = dblock.mbal;
447
dblock.checksum = dblock_checksum(&dblock);
450
/* not a problem, but interesting to see, so use log_error */
451
log_errot(token, "ballot %llu choose bk_max[%d] lver %llu mbal %llu bal %llu inp %llu %llu %llu",
452
(unsigned long long)next_lver, q_max,
453
(unsigned long long)bk_max.lver,
454
(unsigned long long)bk_max.mbal,
455
(unsigned long long)bk_max.bal,
456
(unsigned long long)bk_max.inp,
457
(unsigned long long)bk_max.inp2,
458
(unsigned long long)bk_max.inp3);
465
* Same description as phase 1, same sequence of writes/reads.
468
log_token(token, "ballot %llu phase2 bal %llu inp %llu %llu %llu q_max %d",
469
(unsigned long long)dblock.lver,
470
(unsigned long long)dblock.bal,
471
(unsigned long long)dblock.inp,
472
(unsigned long long)dblock.inp2,
473
(unsigned long long)dblock.inp3,
478
for (d = 0; d < num_disks; d++) {
479
rv = write_dblock(task, &token->disks[d], token->host_id, &dblock);
485
if (!majority_disks(token, num_writes)) {
486
log_errot(token, "ballot %llu our dblock write2 error %d",
487
(unsigned long long)next_lver, rv);
488
error = SANLK_DBLOCK_WRITE;
494
for (d = 0; d < num_disks; d++) {
495
disk = &token->disks[d];
499
memset(iobuf[d], 0, iobuf_len);
501
rv = read_iobuf(disk->fd, disk->offset, iobuf[d], iobuf_len, task);
502
if (rv == SANLK_AIO_TIMEOUT)
508
for (q = 0; q < num_hosts; q++) {
509
bk = (struct paxos_dblock *)(iobuf[d] + ((2 + q)*sector_size));
511
rv = verify_dblock(token, bk);
515
if (bk->lver < dblock.lver)
518
if (bk->lver > dblock.lver) {
519
/* I don't think this should happen */
520
log_errot(token, "ballot %llu larger2 lver[%d] %llu",
521
(unsigned long long)next_lver, q,
522
(unsigned long long)bk->lver);
523
error = SANLK_DBLOCK_LVER;
527
/* see "It aborts the ballot" in comment above */
529
if (bk->mbal > dblock.mbal) {
530
log_errot(token, "ballot %llu abort2 mbal %llu mbal[%d] %llu",
531
(unsigned long long)next_lver,
532
(unsigned long long)our_mbal, q,
533
(unsigned long long)bk->mbal);
534
error = SANLK_DBLOCK_MBAL;
540
if (!majority_disks(token, num_reads)) {
541
log_errot(token, "ballot %llu dblock read2 error %d",
542
(unsigned long long)next_lver, rv);
543
error = SANLK_DBLOCK_READ;
547
/* "When it completes phase 2, p has committed dblock[p].inp." */
549
memcpy(dblock_out, &dblock, sizeof(struct paxos_dblock));
552
for (d = 0; d < num_disks; d++) {
553
/* don't free iobufs that have timed out */
561
uint32_t leader_checksum(struct leader_record *lr)
563
return crc32c((uint32_t)~1, (uint8_t *)lr, LEADER_CHECKSUM_LEN);
566
static void log_leader_error(int result,
568
struct sync_disk *disk,
569
struct leader_record *lr,
572
log_errot(token, "leader1 %s error %d sn %.48s rn %.48s",
573
caller ? caller : "unknown",
575
token->r.lockspace_name,
578
log_errot(token, "leader2 path %s offset %llu fd %d",
580
(unsigned long long)disk->offset,
583
log_errot(token, "leader3 m %x v %x ss %u nh %llu mh %llu oi %llu og %llu lv %llu",
587
(unsigned long long)lr->num_hosts,
588
(unsigned long long)lr->max_hosts,
589
(unsigned long long)lr->owner_id,
590
(unsigned long long)lr->owner_generation,
591
(unsigned long long)lr->lver);
593
log_errot(token, "leader4 sn %.48s rn %.48s ts %llu cs %x",
596
(unsigned long long)lr->timestamp,
599
log_errot(token, "leader5 wi %llu wg %llu wt %llu",
600
(unsigned long long)lr->write_id,
601
(unsigned long long)lr->write_generation,
602
(unsigned long long)lr->write_timestamp);
605
static int verify_leader(struct token *token,
606
struct sync_disk *disk,
607
struct leader_record *lr,
610
struct leader_record leader_rr;
614
if (lr->magic != PAXOS_DISK_MAGIC) {
615
log_errot(token, "verify_leader wrong magic %x %s",
616
lr->magic, disk->path);
617
result = SANLK_LEADER_MAGIC;
621
if ((lr->version & 0xFFFF0000) != PAXOS_DISK_VERSION_MAJOR) {
622
log_errot(token, "verify_leader wrong version %x %s",
623
lr->version, disk->path);
624
result = SANLK_LEADER_VERSION;
628
if (lr->sector_size != disk->sector_size) {
629
log_errot(token, "verify_leader wrong sector size %d %d %s",
630
lr->sector_size, disk->sector_size, disk->path);
631
result = SANLK_LEADER_SECTORSIZE;
635
if (strncmp(lr->space_name, token->r.lockspace_name, NAME_ID_SIZE)) {
636
log_errot(token, "verify_leader wrong space name %.48s %.48s %s",
637
lr->space_name, token->r.lockspace_name, disk->path);
638
result = SANLK_LEADER_LOCKSPACE;
642
if (strncmp(lr->resource_name, token->r.name, NAME_ID_SIZE)) {
643
log_errot(token, "verify_leader wrong resource name %.48s %.48s %s",
644
lr->resource_name, token->r.name, disk->path);
645
result = SANLK_LEADER_RESOURCE;
649
if (lr->num_hosts < token->host_id) {
650
log_errot(token, "verify_leader num_hosts too small %llu %llu %s",
651
(unsigned long long)lr->num_hosts,
652
(unsigned long long)token->host_id, disk->path);
653
result = SANLK_LEADER_NUMHOSTS;
657
sum = leader_checksum(lr);
659
if (lr->checksum != sum) {
660
log_errot(token, "verify_leader wrong checksum %x %x %s",
661
lr->checksum, sum, disk->path);
662
result = SANLK_LEADER_CHECKSUM;
669
log_leader_error(result, token, disk, lr, caller);
671
memset(&leader_rr, 0, sizeof(leader_rr));
673
rv = read_sectors(disk, 0, 1, (char *)&leader_rr,
674
sizeof(struct leader_record),
675
NULL, "paxos_verify");
677
log_leader_error(rv, token, disk, &leader_rr, "paxos_verify");
682
static int leaders_match(struct leader_record *a, struct leader_record *b)
684
if (!memcmp(a, b, LEADER_COMPARE_LEN))
689
static int _leader_read_single(struct task *task,
691
struct leader_record *leader_ret,
694
struct leader_record leader;
697
memset(&leader, 0, sizeof(struct leader_record));
699
rv = read_leader(task, &token->disks[0], &leader);
703
rv = verify_leader(token, &token->disks[0], &leader, caller);
705
/* copy what we read even if verify finds a problem */
707
memcpy(leader_ret, &leader, sizeof(struct leader_record));
711
static int _leader_read_multiple(struct task *task,
713
struct leader_record *leader_ret,
716
struct leader_record leader;
717
struct leader_record *leaders;
719
int leaders_len, leader_reps_len;
721
int num_disks = token->r.num_disks;
722
int rv = 0, d, i, found;
725
leaders_len = num_disks * sizeof(struct leader_record);
726
leader_reps_len = num_disks * sizeof(int);
728
leaders = malloc(leaders_len);
732
leader_reps = malloc(leader_reps_len);
739
* find a leader block that's consistent on the majority of disks,
740
* so we can use as the basis for the new leader
743
memset(&leader, 0, sizeof(struct leader_record));
744
memset(leaders, 0, leaders_len);
745
memset(leader_reps, 0, leader_reps_len);
749
for (d = 0; d < num_disks; d++) {
750
rv = read_leader(task, &token->disks[d], &leaders[d]);
754
rv = verify_leader(token, &token->disks[d], &leaders[d], caller);
762
/* count how many times the same leader block repeats */
764
for (i = 0; i < d; i++) {
765
if (leaders_match(&leaders[d], &leaders[i])) {
772
if (!majority_disks(token, num_reads)) {
773
log_errot(token, "%s leader read error %d", caller, rv);
774
error = SANLK_LEADER_READ;
778
/* check that a majority of disks have the same leader */
782
for (d = 0; d < num_disks; d++) {
783
if (!majority_disks(token, leader_reps[d]))
786
/* leader on d is the same on a majority of disks,
787
leader becomes the prototype for new_leader */
789
memcpy(&leader, &leaders[d], sizeof(struct leader_record));
795
log_errot(token, "%s leader inconsistent", caller);
796
error = SANLK_LEADER_DIFF;
802
memcpy(leader_ret, &leader, sizeof(struct leader_record));
808
int paxos_lease_leader_read(struct task *task,
810
struct leader_record *leader_ret,
815
/* _leader_read_multiple works fine for the single disk case, but
816
we can cut out a bunch of stuff when we know there's one disk */
818
if (token->r.num_disks > 1)
819
rv = _leader_read_multiple(task, token, leader_ret, caller);
821
rv = _leader_read_single(task, token, leader_ret, caller);
824
log_token(token, "%s leader %llu owner %llu %llu %llu", caller,
825
(unsigned long long)leader_ret->lver,
826
(unsigned long long)leader_ret->owner_id,
827
(unsigned long long)leader_ret->owner_generation,
828
(unsigned long long)leader_ret->timestamp);
833
static int _leader_dblock_read_single(struct task *task,
835
struct leader_record *leader_ret,
836
struct paxos_dblock *our_dblock,
839
struct sync_disk *disk = &token->disks[0];
840
char *iobuf, **p_iobuf;
841
uint32_t host_id = token->host_id;
842
int sector_size = disk->sector_size;
846
/* sector 0: leader record
848
sector 2: dblock host_id 1
849
sector 3: dblock host_id 2
850
sector 4: dblock host_id 3
851
for host_id N we need to read N+2 sectors */
853
sector_count = roundup_power_of_two(host_id + 2);
855
iobuf_len = sector_count * sector_size;
862
rv = posix_memalign((void *)p_iobuf, getpagesize(), iobuf_len);
866
memset(iobuf, 0, iobuf_len);
868
rv = read_iobuf(disk->fd, disk->offset, iobuf, iobuf_len, task);
872
memcpy(leader_ret, iobuf, sizeof(struct leader_record));
874
rv = verify_leader(token, &token->disks[0], leader_ret, caller);
876
memcpy(our_dblock, iobuf + (sector_size * (host_id + 1)),
877
sizeof(struct paxos_dblock));
879
if (rv != SANLK_AIO_TIMEOUT)
884
/* TODO: the point of a combined leader+dblock read is to reduce iops by
885
reading the leader and our dblock in a single read covering both, which
886
this function obviously does not do. */
888
static int _leader_dblock_read_multiple(struct task *task,
890
struct leader_record *leader_ret,
891
struct paxos_dblock *our_dblock,
894
struct paxos_dblock dblock;
895
uint64_t our_mbal = 0;
899
rv = _leader_read_multiple(task, token, leader_ret, caller);
905
for (d = 0; d < token->r.num_disks; d++) {
906
rv = read_dblock(task, &token->disks[d], token->host_id, &dblock);
911
if (dblock.mbal > our_mbal) {
912
our_mbal = dblock.mbal;
913
memcpy(our_dblock, &dblock, sizeof(struct paxos_dblock));
918
log_errot(token, "paxos_acquire cannot read our dblock %d", rv);
919
rv = SANLK_DBLOCK_READ;
925
/* read the leader_record and our own dblock in a single larger read op
926
instead of two smaller read ops */
928
static int paxos_lease_leader_dblock_read(struct task *task,
930
struct leader_record *leader_ret,
931
struct paxos_dblock *our_dblock,
936
if (token->r.num_disks > 1)
937
rv = _leader_dblock_read_multiple(task, token, leader_ret, our_dblock, caller);
939
rv = _leader_dblock_read_single(task, token, leader_ret, our_dblock, caller);
942
log_token(token, "%s leader %llu owner %llu %llu %llu "
943
"our_dblock %llu %llu %llu %llu %llu %llu",
945
(unsigned long long)leader_ret->lver,
946
(unsigned long long)leader_ret->owner_id,
947
(unsigned long long)leader_ret->owner_generation,
948
(unsigned long long)leader_ret->timestamp,
949
(unsigned long long)our_dblock->mbal,
950
(unsigned long long)our_dblock->bal,
951
(unsigned long long)our_dblock->inp,
952
(unsigned long long)our_dblock->inp2,
953
(unsigned long long)our_dblock->inp3,
954
(unsigned long long)our_dblock->lver);
959
static int write_new_leader(struct task *task,
961
struct leader_record *nl,
964
int num_disks = token->r.num_disks;
966
int error = SANLK_OK;
969
for (d = 0; d < num_disks; d++) {
970
rv = write_leader(task, &token->disks[d], nl);
976
if (!majority_disks(token, num_writes)) {
977
log_errot(token, "%s write_new_leader error %d owner %llu %llu %llu",
979
(unsigned long long)nl->owner_id,
980
(unsigned long long)nl->owner_generation,
981
(unsigned long long)nl->timestamp);
982
error = SANLK_LEADER_WRITE;
989
* If we hang or crash after completing a ballot successfully, but before
990
* commiting the leader_record, then the next host that runs a ballot (with the
991
* same lver since we did not commit the new lver to the leader_record) will
992
* commit the same inp values that we were about to commit. If the inp values
993
* they commit indicate we (who crashed or hung) are the new owner, then the
994
* other hosts will begin monitoring the liveness of our host_id. Once enough
995
* time has passed, they assume we're dead, and go on with new versions. The
996
* "enough time" ensures that if we hung before writing the leader, that we
997
* won't wake up and finally write what will then be an old invalid leader.
1000
int paxos_lease_acquire(struct task *task,
1001
struct token *token,
1003
struct leader_record *leader_ret,
1004
uint64_t acquire_lver,
1007
struct sync_disk host_id_disk;
1008
struct leader_record host_id_leader;
1009
struct leader_record cur_leader;
1010
struct leader_record tmp_leader;
1011
struct leader_record new_leader;
1012
struct paxos_dblock our_dblock;
1013
struct paxos_dblock dblock;
1014
struct host_status hs;
1015
uint64_t wait_start, now;
1016
uint64_t last_timestamp;
1018
uint64_t our_mbal = 0;
1019
int copy_cur_leader = 0;
1023
log_token(token, "paxos_acquire begin lver %llu flags %x",
1024
(unsigned long long)acquire_lver, flags);
1027
error = paxos_lease_leader_dblock_read(task, token, &cur_leader, &our_dblock,
1032
if (flags & PAXOS_ACQUIRE_FORCE) {
1033
copy_cur_leader = 1;
1037
if (acquire_lver && cur_leader.lver != acquire_lver) {
1038
log_errot(token, "paxos_acquire acquire_lver %llu cur_leader %llu",
1039
(unsigned long long)acquire_lver,
1040
(unsigned long long)cur_leader.lver);
1041
error = SANLK_ACQUIRE_LVER;
1045
if (cur_leader.timestamp == LEASE_FREE) {
1046
log_token(token, "paxos_acquire leader %llu free",
1047
(unsigned long long)cur_leader.lver);
1048
copy_cur_leader = 1;
1052
if (cur_leader.owner_id == token->host_id &&
1053
cur_leader.owner_generation == token->host_generation) {
1054
log_token(token, "paxos_acquire already owner id %llu gen %llu",
1055
(unsigned long long)token->host_id,
1056
(unsigned long long)token->host_generation);
1057
copy_cur_leader = 1;
1062
* Check if current owner is alive based on its host_id renewals.
1063
* If the current owner has been dead long enough we can assume that
1064
* its watchdog has triggered and we can go for the paxos lease.
1068
memset(&host_id_disk, 0, sizeof(host_id_disk));
1070
rv = lockspace_disk(cur_leader.space_name, &host_id_disk);
1072
log_errot(token, "paxos_acquire no lockspace info %.48s",
1073
cur_leader.space_name);
1074
error = SANLK_ACQUIRE_LOCKSPACE;
1077
host_id_disk.fd = -1;
1079
disk_open = open_disks_fd(&host_id_disk, 1);
1080
if (disk_open != 1) {
1081
log_errot(token, "paxos_acquire cannot open host_id_disk");
1082
error = SANLK_ACQUIRE_IDDISK;
1087
rv = host_info(cur_leader.space_name, cur_leader.owner_id, &hs);
1088
if (!rv && hs.last_check && hs.last_live &&
1089
hs.owner_id == cur_leader.owner_id &&
1090
hs.owner_generation == cur_leader.owner_generation) {
1091
wait_start = hs.last_live;
1092
last_timestamp = hs.timestamp;
1094
wait_start = monotime();
1098
log_token(token, "paxos_acquire owner %llu %llu %llu "
1099
"host_status %llu %llu %llu wait_start %llu",
1100
(unsigned long long)cur_leader.owner_id,
1101
(unsigned long long)cur_leader.owner_generation,
1102
(unsigned long long)cur_leader.timestamp,
1103
(unsigned long long)hs.owner_id,
1104
(unsigned long long)hs.owner_generation,
1105
(unsigned long long)hs.timestamp,
1106
(unsigned long long)wait_start);
1109
error = delta_lease_leader_read(task, &host_id_disk,
1110
cur_leader.space_name,
1111
cur_leader.owner_id,
1115
log_errot(token, "paxos_acquire owner %llu %llu %llu "
1116
"delta read %d fd %d path %s off %llu ss %u",
1117
(unsigned long long)cur_leader.owner_id,
1118
(unsigned long long)cur_leader.owner_generation,
1119
(unsigned long long)cur_leader.timestamp,
1120
error, host_id_disk.fd, host_id_disk.path,
1121
(unsigned long long)host_id_disk.offset,
1122
host_id_disk.sector_size);
1126
/* a host_id cannot become free in less than
1127
host_dead_seconds after the final renewal because
1128
a host_id must first be acquired before being freed,
1129
and acquiring cannot take less than host_dead_seconds */
1131
if (host_id_leader.timestamp == LEASE_FREE) {
1132
log_token(token, "paxos_acquire owner %llu delta free",
1133
(unsigned long long)cur_leader.owner_id);
1137
/* another host has acquired the host_id of the host that
1138
owned this paxos lease; acquiring a host_id also cannot be
1139
done in less than host_dead_seconds, or
1141
the host_id that owns this lease may be alive, but it
1142
owned the lease in a previous generation without freeing it,
1143
and no longer owns it */
1145
if (host_id_leader.owner_id != cur_leader.owner_id ||
1146
host_id_leader.owner_generation > cur_leader.owner_generation) {
1147
log_token(token, "paxos_acquire owner %llu %llu %llu "
1148
"delta %llu %llu %llu mismatch",
1149
(unsigned long long)cur_leader.owner_id,
1150
(unsigned long long)cur_leader.owner_generation,
1151
(unsigned long long)cur_leader.timestamp,
1152
(unsigned long long)host_id_leader.owner_id,
1153
(unsigned long long)host_id_leader.owner_generation,
1154
(unsigned long long)host_id_leader.timestamp);
1158
if (!last_timestamp) {
1159
last_timestamp = host_id_leader.timestamp;
1160
goto skip_live_check;
1163
/* the owner is renewing its host_id so it's alive */
1165
if (host_id_leader.timestamp != last_timestamp) {
1166
if (flags & PAXOS_ACQUIRE_QUIET_FAIL) {
1167
log_token(token, "paxos_acquire owner %llu "
1168
"delta %llu %llu %llu alive",
1169
(unsigned long long)cur_leader.owner_id,
1170
(unsigned long long)host_id_leader.owner_id,
1171
(unsigned long long)host_id_leader.owner_generation,
1172
(unsigned long long)host_id_leader.timestamp);
1174
log_errot(token, "paxos_acquire owner %llu "
1175
"delta %llu %llu %llu alive",
1176
(unsigned long long)cur_leader.owner_id,
1177
(unsigned long long)host_id_leader.owner_id,
1178
(unsigned long long)host_id_leader.owner_generation,
1179
(unsigned long long)host_id_leader.timestamp);
1181
error = SANLK_ACQUIRE_IDLIVE;
1186
/* if the owner hasn't renewed its host_id lease for
1187
host_dead_seconds then its watchdog should have fired
1192
if (now - wait_start > task->host_dead_seconds) {
1193
log_token(token, "paxos_acquire owner %llu %llu %llu "
1194
"delta %llu %llu %llu dead %llu-%llu>%d",
1195
(unsigned long long)cur_leader.owner_id,
1196
(unsigned long long)cur_leader.owner_generation,
1197
(unsigned long long)cur_leader.timestamp,
1198
(unsigned long long)host_id_leader.owner_id,
1199
(unsigned long long)host_id_leader.owner_generation,
1200
(unsigned long long)host_id_leader.timestamp,
1201
(unsigned long long)now,
1202
(unsigned long long)wait_start,
1203
task->host_dead_seconds);
1208
/* TODO: test with sleep(2) here */
1211
if (external_shutdown) {
1216
error = paxos_lease_leader_read(task, token, &tmp_leader, "paxos_acquire");
1220
if (memcmp(&cur_leader, &tmp_leader, sizeof(struct leader_record))) {
1221
log_token(token, "paxos_acquire restart leader changed");
1227
* Use the disk paxos algorithm to attempt to commit a new leader.
1229
* If we complete a ballot successfully, we can commit a leader record
1230
* with next_lver. If we find a higher mbal during a ballot, we increase
1231
* our own mbal and try the ballot again.
1233
* next_lver is derived from cur_leader with a zero or timed out owner.
1234
* We need to monitor the leader record to see if another host commits
1235
* a new leader_record with next_lver.
1237
* TODO: may not need to increase mbal if dblock.inp and inp2 match
1238
* current host_id and generation?
1241
next_lver = cur_leader.lver + 1;
1243
if (!our_dblock.mbal)
1244
our_mbal = token->host_id;
1246
our_mbal = our_dblock.mbal + cur_leader.max_hosts;
1250
if (copy_cur_leader) {
1251
/* reusing the initial read removes an iop in the common case */
1252
copy_cur_leader = 0;
1253
memcpy(&tmp_leader, &cur_leader, sizeof(struct leader_record));
1255
error = paxos_lease_leader_read(task, token, &tmp_leader, "paxos_acquire");
1260
if (tmp_leader.lver == next_lver) {
1262
* another host has commited a leader_record for next_lver,
1263
* check which inp (owner_id) they commited (possibly us).
1266
if (tmp_leader.owner_id == token->host_id &&
1267
tmp_leader.owner_generation == token->host_generation) {
1268
/* not a problem, but interesting to see, so use log_error */
1270
log_errot(token, "paxos_acquire %llu owner our inp "
1271
"%llu %llu %llu commited by %llu",
1272
(unsigned long long)next_lver,
1273
(unsigned long long)tmp_leader.owner_id,
1274
(unsigned long long)tmp_leader.owner_generation,
1275
(unsigned long long)tmp_leader.timestamp,
1276
(unsigned long long)tmp_leader.write_id);
1278
memcpy(leader_ret, &tmp_leader, sizeof(struct leader_record));
1281
/* not a problem, but interesting to see, so use log_error */
1283
log_errot(token, "paxos_acquire %llu owner is %llu",
1284
(unsigned long long)next_lver,
1285
(unsigned long long)tmp_leader.owner_id);
1287
error = SANLK_ACQUIRE_OWNED;
1292
error = run_ballot(task, token, cur_leader.num_hosts, next_lver, our_mbal,
1295
if (error == SANLK_DBLOCK_MBAL) {
1296
us = get_rand(0, 1000000);
1298
us = token->host_id * 100;
1300
/* not a problem, but interesting to see, so use log_error */
1301
log_errot(token, "paxos_acquire %llu retry delay %d us",
1302
(unsigned long long)next_lver, us);
1305
our_mbal += cur_leader.max_hosts;
1310
log_errot(token, "paxos_acquire %llu ballot error %d",
1311
(unsigned long long)next_lver, error);
1315
/* ballot success, commit next_lver with dblock values */
1317
memcpy(&new_leader, &cur_leader, sizeof(struct leader_record));
1318
new_leader.lver = dblock.lver;
1319
new_leader.owner_id = dblock.inp;
1320
new_leader.owner_generation = dblock.inp2;
1321
new_leader.timestamp = dblock.inp3;
1323
new_leader.write_id = token->host_id;
1324
new_leader.write_generation = token->host_generation;
1325
new_leader.write_timestamp = monotime();
1328
new_leader.num_hosts = new_num_hosts;
1329
new_leader.checksum = leader_checksum(&new_leader);
1331
error = write_new_leader(task, token, &new_leader, "paxos_acquire");
1335
if (new_leader.owner_id != token->host_id) {
1336
/* not a problem, but interesting to see, so use log_error */
1338
log_errot(token, "ballot %llu commit other owner %llu %llu %llu",
1339
(unsigned long long)new_leader.lver,
1340
(unsigned long long)new_leader.owner_id,
1341
(unsigned long long)new_leader.owner_generation,
1342
(unsigned long long)new_leader.timestamp);
1344
error = SANLK_ACQUIRE_OTHER;
1348
log_token(token, "ballot %llu commit self owner %llu %llu %llu",
1349
(unsigned long long)next_lver,
1350
(unsigned long long)new_leader.owner_id,
1351
(unsigned long long)new_leader.owner_generation,
1352
(unsigned long long)new_leader.timestamp);
1354
memcpy(leader_ret, &new_leader, sizeof(struct leader_record));
1359
close_disks(&host_id_disk, 1);
1365
int paxos_lease_renew(struct task *task,
1366
struct token *token,
1367
struct leader_record *leader_last,
1368
struct leader_record *leader_ret)
1370
struct leader_record new_leader;
1374
for (d = 0; d < token->r.num_disks; d++) {
1375
memset(&new_leader, 0, sizeof(struct leader_record));
1377
rv = read_leader(task, &token->disks[d], &new_leader);
1381
if (memcmp(&new_leader, leader_last,
1382
sizeof(struct leader_record))) {
1383
log_errot(token, "leader changed between renewals");
1384
return SANLK_BAD_LEADER;
1388
new_leader.timestamp = monotime();
1389
new_leader.checksum = leader_checksum(&new_leader);
1391
error = write_new_leader(task, token, &new_leader);
1395
memcpy(leader_ret, &new_leader, sizeof(struct leader_record));
1401
int paxos_lease_release(struct task *task,
1402
struct token *token,
1403
struct leader_record *leader_last,
1404
struct leader_record *leader_ret)
1406
struct leader_record leader;
1409
error = paxos_lease_leader_read(task, token, &leader, "paxos_release");
1411
log_errot(token, "release error cannot read leader");
1415
if (leader.lver != leader_last->lver) {
1416
log_errot(token, "paxos_release %llu other lver %llu",
1417
(unsigned long long)leader_last->lver,
1418
(unsigned long long)leader.lver);
1419
return SANLK_RELEASE_LVER;
1422
if (leader.owner_id != token->host_id ||
1423
leader.owner_generation != token->host_generation) {
1424
log_errot(token, "paxos_release %llu other owner %llu %llu %llu",
1425
(unsigned long long)leader_last->lver,
1426
(unsigned long long)leader.owner_id,
1427
(unsigned long long)leader.owner_generation,
1428
(unsigned long long)leader.timestamp);
1429
return SANLK_RELEASE_OWNER;
1432
if (memcmp(&leader, leader_last, sizeof(struct leader_record))) {
1434
* This will happen when two hosts finish the same ballot
1435
* successfully, the second commiting the same inp values
1436
* that the first did, as it should. But the second will
1437
* write it's own write_id/gen/timestap, which will differ
1438
* from what the first host wrote. So when the first host
1439
* rereads here in the release, it will find different
1440
* write_id/gen/timestamp from what it wrote. This is
1441
* perfectly fine (use log_error since it's interesting
1442
* to see when this happens.)
1444
log_errot(token, "paxos_release %llu leader different "
1445
"write %llu %llu %llu vs %llu %llu %llu",
1446
(unsigned long long)leader_last->lver,
1447
(unsigned long long)leader_last->write_id,
1448
(unsigned long long)leader_last->write_generation,
1449
(unsigned long long)leader_last->write_timestamp,
1450
(unsigned long long)leader.write_id,
1451
(unsigned long long)leader.write_generation,
1452
(unsigned long long)leader.write_timestamp);
1454
log_leader_error(0, token, &token->disks[0], leader_last, "paxos_release");
1455
log_leader_error(0, token, &token->disks[0], &leader, "paxos_release");
1459
leader.timestamp = LEASE_FREE;
1460
leader.write_id = token->host_id;
1461
leader.write_generation = token->host_generation;
1462
leader.write_timestamp = monotime();
1463
leader.checksum = leader_checksum(&leader);
1465
error = write_new_leader(task, token, &leader, "paxos_release");
1469
memcpy(leader_ret, &leader, sizeof(struct leader_record));
1474
int paxos_lease_init(struct task *task,
1475
struct token *token,
1476
int num_hosts, int max_hosts)
1478
char *iobuf, **p_iobuf;
1479
struct leader_record *leader;
1480
struct request_record *rr;
1484
int aio_timeout = 0;
1488
num_hosts = DEFAULT_MAX_HOSTS;
1490
max_hosts = DEFAULT_MAX_HOSTS;
1492
sector_size = token->disks[0].sector_size;
1494
align_size = direct_align(&token->disks[0]);
1498
if (sector_size * (2 + max_hosts) > align_size)
1501
iobuf_len = align_size;
1505
rv = posix_memalign((void *)p_iobuf, getpagesize(), iobuf_len);
1509
memset(iobuf, 0, iobuf_len);
1511
leader = (struct leader_record *)iobuf;
1512
leader->magic = PAXOS_DISK_MAGIC;
1513
leader->version = PAXOS_DISK_VERSION_MAJOR | PAXOS_DISK_VERSION_MINOR;
1514
leader->sector_size = sector_size;
1515
leader->num_hosts = num_hosts;
1516
leader->max_hosts = max_hosts;
1517
leader->timestamp = LEASE_FREE;
1518
strncpy(leader->space_name, token->r.lockspace_name, NAME_ID_SIZE);
1519
strncpy(leader->resource_name, token->r.name, NAME_ID_SIZE);
1520
leader->checksum = leader_checksum(leader);
1522
rr = (struct request_record *)(iobuf + sector_size);
1523
rr->magic = REQ_DISK_MAGIC;
1524
rr->version = REQ_DISK_VERSION_MAJOR | REQ_DISK_VERSION_MINOR;
1526
for (d = 0; d < token->r.num_disks; d++) {
1527
rv = write_iobuf(token->disks[d].fd, token->disks[d].offset,
1528
iobuf, iobuf_len, task);
1530
if (rv == SANLK_AIO_TIMEOUT)