2
* Copyright 2004-2012 Red Hat, Inc.
4
* This copyrighted material is made available to anyone wishing to use,
5
* modify, copy, or redistribute it subject to the terms and conditions
6
* of the GNU General Public License v2 or (at your option) any later version.
9
#include "dlm_daemon.h"
12
static SaCkptHandleT global_ckpt_h;
13
static SaCkptCallbacksT callbacks = { 0, 0 };
14
static SaVersionT version = { 'B', 1, 1 };
15
static char section_buf[10 * 1024 * 1024]; /* 10MB of pack_lock's enough? */
16
static uint32_t section_len;
17
static uint32_t section_max;
20
struct list_head list;
22
int checkpoint_ready; /* we've read its ckpt */
23
int in_cycle; /* participating in cycle */
31
/* from linux/fs/dlm/dlm_internal.h */
32
#define DLM_LKSTS_WAITING 1
33
#define DLM_LKSTS_GRANTED 2
34
#define DLM_LKSTS_CONVERT 3
51
struct list_head list;
52
struct list_head locks;
53
char name[DLM_RESNAME_MAXLEN];
57
/* information is saved in the lkb, and lkb->lock, from the perspective of the
58
local or master copy, not the process copy */
61
struct list_head list; /* r->locks */
62
struct pack_lock lock; /* data from debugfs/checkpoint */
63
int home; /* node where the lock owner lives*/
64
struct dlm_rsb *rsb; /* lock is on resource */
65
struct trans *trans; /* lock owned by this transaction */
66
struct list_head trans_list; /* tr->locks */
67
struct trans *waitfor_trans; /* the trans that's holding the
68
lock that's blocking us */
71
/* waitfor pointers alloc'ed 4 at at time */
75
struct list_head list;
76
struct list_head locks;
78
int others_waiting_on_us; /* count of trans's
82
int waitfor_count; /* count of in-use
86
struct trans **waitfor; /* waitfor_alloc trans
90
static const int __dlm_compat_matrix[8][8] = {
91
/* UN NL CR CW PR PW EX PD */
92
{1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
93
{1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
94
{1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
95
{1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
96
{1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
97
{1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
98
{1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
99
{0, 0, 0, 0, 0, 0, 0, 0} /* PD */
102
static inline int dlm_modes_compat(int mode1, int mode2)
104
return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
107
static const char *status_str(int lksts)
110
case DLM_LKSTS_WAITING:
112
case DLM_LKSTS_GRANTED:
114
case DLM_LKSTS_CONVERT:
120
static void free_resources(struct lockspace *ls)
122
struct dlm_rsb *r, *r_safe;
123
struct dlm_lkb *lkb, *lkb_safe;
125
list_for_each_entry_safe(r, r_safe, &ls->resources, list) {
126
list_for_each_entry_safe(lkb, lkb_safe, &r->locks, list) {
127
list_del(&lkb->list);
128
if (!list_empty(&lkb->trans_list))
129
list_del(&lkb->trans_list);
137
static void free_transactions(struct lockspace *ls)
139
struct trans *tr, *tr_safe;
141
list_for_each_entry_safe(tr, tr_safe, &ls->transactions, list) {
149
static void disable_deadlock(void)
151
log_error("FIXME: deadlock detection disabled");
154
void setup_deadlock(void)
158
if (!cfgd_enable_deadlk)
161
rv = saCkptInitialize(&global_ckpt_h, &callbacks, &version);
163
log_error("ckpt init error %d", rv);
166
static struct dlm_rsb *get_resource(struct lockspace *ls, char *name, int len)
170
list_for_each_entry(r, &ls->resources, list) {
171
if (r->len == len && !strncmp(r->name, name, len))
175
r = malloc(sizeof(struct dlm_rsb));
177
log_error("get_resource: no memory");
181
memset(r, 0, sizeof(struct dlm_rsb));
182
memcpy(r->name, name, len);
184
INIT_LIST_HEAD(&r->locks);
185
list_add(&r->list, &ls->resources);
189
static struct dlm_lkb *create_lkb(void)
193
lkb = malloc(sizeof(struct dlm_lkb));
195
log_error("create_lkb: no memory");
198
memset(lkb, 0, sizeof(struct dlm_lkb));
199
INIT_LIST_HEAD(&lkb->list);
200
INIT_LIST_HEAD(&lkb->trans_list);
205
static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
207
list_add(&lkb->list, &r->locks);
211
/* from linux/fs/dlm/dlm_internal.h */
212
#define IFL_MSTCPY 0x00010000
214
/* called on a lock that's just been read from debugfs */
216
static void set_copy(struct pack_lock *lock)
221
lock->copy = LOCAL_COPY;
222
else if (lock->flags & IFL_MSTCPY)
223
lock->copy = MASTER_COPY;
225
/* process copy lock is converted to a partial master copy
226
lock that will be combined with the real master copy */
227
lock->copy = MASTER_COPY;
232
lock->nodeid = our_nodeid;
236
/* xid is always zero in the real master copy, xid should always be non-zero
237
in the partial master copy (what was a process copy) */
238
/* TODO: confirm or enforce that the partial will always have non-zero xid */
240
static int partial_master_copy(struct pack_lock *lock)
242
return (lock->xid != 0);
245
static struct dlm_lkb *get_lkb(struct dlm_rsb *r, struct pack_lock *lock)
249
if (lock->copy != MASTER_COPY)
252
list_for_each_entry(lkb, &r->locks, list) {
253
if (lkb->lock.nodeid == lock->nodeid &&
254
lkb->lock.id == lock->id)
261
static struct dlm_lkb *add_lock(struct lockspace *ls, struct dlm_rsb *r,
262
int from_nodeid, struct pack_lock *lock)
266
lkb = get_lkb(r, lock);
270
switch (lock->copy) {
272
lkb->lock.xid = lock->xid;
273
lkb->lock.nodeid = lock->nodeid;
274
lkb->lock.id = lock->id;
275
lkb->lock.remid = lock->remid;
276
lkb->lock.ownpid = lock->ownpid;
277
lkb->lock.exflags = lock->exflags;
278
lkb->lock.flags = lock->flags;
279
lkb->lock.status = lock->status;
280
lkb->lock.grmode = lock->grmode;
281
lkb->lock.rqmode = lock->rqmode;
282
lkb->lock.copy = LOCAL_COPY;
283
lkb->home = from_nodeid;
285
log_group(ls, "add %s local nodeid %d id %x remid %x xid %llx",
286
r->name, lock->nodeid, lock->id, lock->remid,
287
(unsigned long long)lock->xid);
291
if (partial_master_copy(lock)) {
292
lkb->lock.xid = lock->xid;
293
lkb->lock.nodeid = lock->nodeid;
294
lkb->lock.id = lock->id;
295
lkb->lock.remid = lock->remid;
296
lkb->lock.copy = MASTER_COPY;
298
/* only set xid from partial master copy above */
299
lkb->lock.nodeid = lock->nodeid;
300
lkb->lock.id = lock->id;
301
lkb->lock.remid = lock->remid;
302
lkb->lock.copy = MASTER_COPY;
303
/* set other fields from real master copy */
304
lkb->lock.ownpid = lock->ownpid;
305
lkb->lock.exflags = lock->exflags;
306
lkb->lock.flags = lock->flags;
307
lkb->lock.status = lock->status;
308
lkb->lock.grmode = lock->grmode;
309
lkb->lock.rqmode = lock->rqmode;
311
lkb->home = lock->nodeid;
313
log_group(ls, "add %s master nodeid %d id %x remid %x xid %llx",
314
r->name, lock->nodeid, lock->id, lock->remid,
315
(unsigned long long)lock->xid);
319
if (list_empty(&lkb->list))
324
static void parse_r_name(char *line, char *name)
330
for (p = line; ; p++) {
342
#define LOCK_LINE_MAX 1024
344
static int read_debugfs_locks(struct lockspace *ls)
348
char line[LOCK_LINE_MAX];
350
struct pack_lock lock;
352
unsigned long long xid;
353
unsigned int waiting;
358
snprintf(path, PATH_MAX, "/sys/kernel/debug/dlm/%s_locks", ls->name);
360
file = fopen(path, "r");
364
/* skip the header on the first line */
365
if (!fgets(line, LOCK_LINE_MAX, file)) {
366
log_error("Unable to read %s: %d", path, errno);
370
while (fgets(line, LOCK_LINE_MAX, file)) {
371
memset(&lock, 0, sizeof(struct pack_lock));
373
rv = sscanf(line, "%x %d %x %u %llu %x %x %hhd %hhd %hhd %u %d %d",
388
lock.xid = xid; /* hack to avoid warning */
391
log_error("invalid debugfs line %d: %s", rv, line);
395
memset(r_name, 0, sizeof(r_name));
396
parse_r_name(line, r_name);
398
r = get_resource(ls, r_name, r_len);
403
add_lock(ls, r, our_nodeid, &lock);
410
static int read_checkpoint_locks(struct lockspace *ls, int from_nodeid,
411
char *numbuf, int buflen)
414
struct pack_lock *lock;
415
int count = section_len / sizeof(struct pack_lock);
418
r = get_resource(ls, numbuf, buflen - 1);
422
lock = (struct pack_lock *) §ion_buf;
424
for (i = 0; i < count; i++) {
425
lock->xid = le64_to_cpu(lock->xid);
426
lock->id = le32_to_cpu(lock->id);
427
lock->nodeid = le32_to_cpu(lock->nodeid);
428
lock->remid = le32_to_cpu(lock->remid);
429
lock->ownpid = le32_to_cpu(lock->ownpid);
430
lock->exflags = le32_to_cpu(lock->exflags);
431
lock->flags = le32_to_cpu(lock->flags);
433
add_lock(ls, r, from_nodeid, lock);
439
static int pack_lkb_list(struct list_head *q, struct pack_lock **lockp)
442
struct pack_lock *lock = *lockp;
445
list_for_each_entry(lkb, q, list) {
446
if (count + 1 > section_max) {
447
log_error("too many locks %d for ckpt buf", count);
451
lock->xid = cpu_to_le64(lkb->lock.xid);
452
lock->id = cpu_to_le32(lkb->lock.id);
453
lock->nodeid = cpu_to_le32(lkb->lock.nodeid);
454
lock->remid = cpu_to_le32(lkb->lock.remid);
455
lock->ownpid = cpu_to_le32(lkb->lock.ownpid);
456
lock->exflags = cpu_to_le32(lkb->lock.exflags);
457
lock->flags = cpu_to_le32(lkb->lock.flags);
458
lock->status = lkb->lock.status;
459
lock->grmode = lkb->lock.grmode;
460
lock->rqmode = lkb->lock.rqmode;
461
lock->copy = lkb->lock.copy;
469
static void pack_section_buf(struct lockspace *ls, struct dlm_rsb *r)
471
struct pack_lock *lock;
474
memset(§ion_buf, 0, sizeof(section_buf));
475
section_max = sizeof(section_buf) / sizeof(struct pack_lock);
477
lock = (struct pack_lock *) §ion_buf;
479
count = pack_lkb_list(&r->locks, &lock);
481
section_len = count * sizeof(struct pack_lock);
484
static int _unlink_checkpoint(struct lockspace *ls, SaNameT *name)
486
SaCkptCheckpointHandleT h;
487
SaCkptCheckpointDescriptorT s;
492
h = (SaCkptCheckpointHandleT) ls->deadlk_ckpt_handle;
493
log_group(ls, "unlink ckpt %llx", (unsigned long long)h);
497
rv = saCkptCheckpointUnlink(global_ckpt_h, name);
498
if (rv == SA_AIS_ERR_TRY_AGAIN) {
499
log_group(ls, "unlink ckpt retry");
509
log_error("unlink ckpt error %d %s", rv, ls->name);
514
rv = saCkptCheckpointStatusGet(h, &s);
515
if (rv == SA_AIS_ERR_TRY_AGAIN) {
516
log_group(ls, "unlink ckpt status retry");
521
if (rv != SA_AIS_OK) {
522
log_error("unlink ckpt status error %d %s", rv, ls->name);
526
log_group(ls, "unlink ckpt status: size %llu, max sections %u, "
527
"max section size %llu, section count %u, mem %u",
528
(unsigned long long)s.checkpointCreationAttributes.checkpointSize,
529
s.checkpointCreationAttributes.maxSections,
530
(unsigned long long)s.checkpointCreationAttributes.maxSectionSize,
531
s.numberOfSections, s.memoryUsed);
536
rv = saCkptCheckpointClose(h);
537
if (rv == SA_AIS_ERR_TRY_AGAIN) {
538
log_group(ls, "unlink ckpt close retry");
543
if (rv != SA_AIS_OK) {
544
log_error("unlink ckpt %llx close err %d %s",
545
(unsigned long long)h, rv, ls->name);
548
ls->deadlk_ckpt_handle = 0;
552
static int unlink_checkpoint(struct lockspace *ls)
557
len = snprintf((char *)name.value, SA_MAX_NAME_LENGTH, "dlmdeadlk.%s.%d",
558
ls->name, our_nodeid);
561
return _unlink_checkpoint(ls, &name);
564
static void read_checkpoint(struct lockspace *ls, int nodeid)
566
SaCkptCheckpointHandleT h;
567
SaCkptSectionIterationHandleT itr;
568
SaCkptSectionDescriptorT desc;
569
SaCkptIOVectorElementT iov;
572
char buf[DLM_RESNAME_MAXLEN];
576
if (nodeid == our_nodeid)
579
log_group(ls, "read_checkpoint %d", nodeid);
581
len = snprintf((char *)name.value, SA_MAX_NAME_LENGTH, "dlmdeadlk.%s.%d",
587
rv = saCkptCheckpointOpen(global_ckpt_h, &name, NULL,
588
SA_CKPT_CHECKPOINT_READ, 0, &h);
589
if (rv == SA_AIS_ERR_TRY_AGAIN) {
590
log_group(ls, "read_checkpoint: %d ckpt open retry", nodeid);
595
if (rv != SA_AIS_OK) {
596
log_error("read_checkpoint: %d ckpt open error %d", nodeid, rv);
602
rv = saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY, 0, &itr);
603
if (rv == SA_AIS_ERR_TRY_AGAIN) {
604
log_group(ls, "read_checkpoint: ckpt iterinit retry");
609
if (rv != SA_AIS_OK) {
610
log_error("read_checkpoint: %d ckpt iterinit error %d", nodeid, rv);
617
rv = saCkptSectionIterationNext(itr, &desc);
618
if (rv == SA_AIS_ERR_NO_SECTIONS)
620
if (rv == SA_AIS_ERR_TRY_AGAIN) {
621
log_group(ls, "read_checkpoint: ckpt iternext retry");
626
if (rv != SA_AIS_OK) {
627
log_error("read_checkpoint: %d ckpt iternext error %d",
632
if (!desc.sectionSize)
635
iov.sectionId = desc.sectionId;
636
iov.dataBuffer = §ion_buf;
637
iov.dataSize = desc.sectionSize;
640
memset(&buf, 0, sizeof(buf));
641
snprintf(buf, sizeof(buf), "%s", desc.sectionId.id);
643
log_group(ls, "read_checkpoint: section size %llu id %u \"%s\"",
644
(unsigned long long)iov.dataSize,
645
iov.sectionId.idLen, buf);
649
rv = saCkptCheckpointRead(h, &iov, 1, NULL);
650
if (rv == SA_AIS_ERR_TRY_AGAIN) {
651
log_group(ls, "read_checkpoint: ckpt read retry");
656
if (rv != SA_AIS_OK) {
657
log_error("read_checkpoint: %d ckpt read error %d",
662
section_len = iov.readSize;
667
if (section_len % sizeof(struct pack_lock)) {
668
log_error("read_checkpoint: %d bad section len %d",
669
nodeid, section_len);
673
read_checkpoint_locks(ls, nodeid, (char *)desc.sectionId.id,
674
desc.sectionId.idLen);
678
saCkptSectionIterationFinalize(itr);
681
rv = saCkptCheckpointClose(h);
682
if (rv == SA_AIS_ERR_TRY_AGAIN) {
683
log_group(ls, "read_checkpoint: unlink ckpt close retry");
689
log_error("read_checkpoint: %d close error %d", nodeid, rv);
692
static void write_checkpoint(struct lockspace *ls)
694
SaCkptCheckpointCreationAttributesT attr;
695
SaCkptCheckpointHandleT h;
696
SaCkptSectionIdT section_id;
697
SaCkptSectionCreationAttributesT section_attr;
698
SaCkptCheckpointOpenFlagsT flags;
701
char buf[DLM_RESNAME_MAXLEN];
704
int r_count, lock_count, total_size, section_size, max_section_size;
707
len = snprintf((char *)name.value, SA_MAX_NAME_LENGTH, "dlmdeadlk.%s.%d",
708
ls->name, our_nodeid);
711
/* unlink an old checkpoint before we create a new one */
712
if (ls->deadlk_ckpt_handle) {
713
log_error("write_checkpoint: old ckpt");
714
if (_unlink_checkpoint(ls, &name))
718
/* loop through all locks to figure out sizes to set in
724
max_section_size = 0;
726
list_for_each_entry(r, &ls->resources, list) {
729
list_for_each_entry(lkb, &r->locks, list) {
730
section_size += sizeof(struct pack_lock);
733
total_size += section_size;
734
if (section_size > max_section_size)
735
max_section_size = section_size;
738
log_group(ls, "write_checkpoint: r_count %d, lock_count %d",
739
r_count, lock_count);
741
log_group(ls, "write_checkpoint: total %d bytes, max_section %d bytes",
742
total_size, max_section_size);
744
attr.creationFlags = SA_CKPT_WR_ALL_REPLICAS;
745
attr.checkpointSize = total_size;
746
attr.retentionDuration = SA_TIME_MAX;
747
attr.maxSections = r_count + 1; /* don't know why we need +1 */
748
attr.maxSectionSize = max_section_size;
749
attr.maxSectionIdSize = DLM_RESNAME_MAXLEN;
751
flags = SA_CKPT_CHECKPOINT_READ |
752
SA_CKPT_CHECKPOINT_WRITE |
753
SA_CKPT_CHECKPOINT_CREATE;
756
rv = saCkptCheckpointOpen(global_ckpt_h, &name, &attr, flags, 0, &h);
757
if (rv == SA_AIS_ERR_TRY_AGAIN) {
758
log_group(ls, "write_checkpoint: ckpt open retry");
762
if (rv == SA_AIS_ERR_EXIST) {
763
log_group(ls, "write_checkpoint: ckpt already exists");
766
if (rv != SA_AIS_OK) {
767
log_group(ls, "write_checkpoint: ckpt open error %d", rv);
771
log_group(ls, "write_checkpoint: open ckpt handle %llx",
772
(unsigned long long)h);
773
ls->deadlk_ckpt_handle = (uint64_t) h;
775
list_for_each_entry(r, &ls->resources, list) {
776
memset(buf, 0, sizeof(buf));
777
len = snprintf(buf, sizeof(buf), "%s", r->name);
779
section_id.id = (void *)buf;
780
section_id.idLen = len + 1;
781
section_attr.sectionId = §ion_id;
782
section_attr.expirationTime = SA_TIME_END;
784
pack_section_buf(ls, r);
786
log_group(ls, "write_checkpoint: section size %u id %u \"%s\"",
787
section_len, section_id.idLen, buf);
790
rv = saCkptSectionCreate(h, §ion_attr, §ion_buf,
792
if (rv == SA_AIS_ERR_TRY_AGAIN) {
793
log_group(ls, "write_checkpoint: ckpt create retry");
797
if (rv == SA_AIS_ERR_EXIST) {
798
/* this shouldn't happen in general */
799
log_error("write_checkpoint: clearing old ckpt");
800
saCkptCheckpointClose(h);
801
_unlink_checkpoint(ls, &name);
804
if (rv != SA_AIS_OK) {
805
log_error("write_checkpoint: section create %d", rv);
811
static void send_message(struct lockspace *ls, int type,
812
uint32_t to_nodeid, uint32_t msgdata)
814
struct dlm_header *hd;
818
len = sizeof(struct dlm_header);
821
log_error("send_message: no memory");
827
hd = (struct dlm_header *)buf;
829
hd->to_nodeid = to_nodeid;
830
hd->msgdata = msgdata;
832
dlm_send_message(ls, buf, len);
837
static void send_checkpoint_ready(struct lockspace *ls)
839
log_group(ls, "send_checkpoint_ready");
840
send_message(ls, DLM_MSG_DEADLK_CHECKPOINT_READY, 0, 0);
843
void send_cycle_start(struct lockspace *ls)
845
log_group(ls, "send_cycle_start");
846
send_message(ls, DLM_MSG_DEADLK_CYCLE_START, 0, 0);
849
static void send_cycle_end(struct lockspace *ls)
851
log_group(ls, "send_cycle_end");
852
send_message(ls, DLM_MSG_DEADLK_CYCLE_END, 0, 0);
855
static void send_cancel_lock(struct lockspace *ls, struct trans *tr,
861
if (!lkb->lock.nodeid)
864
lkid = lkb->lock.remid;
865
to_nodeid = lkb->home;
867
log_group(ls, "send_cancel_lock to nodeid %d rsb %s id %x xid %llx",
868
to_nodeid, lkb->rsb->name, lkid,
869
(unsigned long long)lkb->lock.xid);
871
send_message(ls, DLM_MSG_DEADLK_CANCEL_LOCK, to_nodeid, lkid);
874
static void dump_resources(struct lockspace *ls)
879
log_group(ls, "Resource dump:");
881
list_for_each_entry(r, &ls->resources, list) {
882
log_group(ls, "\"%s\" len %d", r->name, r->len);
883
list_for_each_entry(lkb, &r->locks, list) {
884
log_group(ls, " %s: nodeid %d id %08x remid %08x gr %s rq %s pid %u xid %llx",
885
status_str(lkb->lock.status),
889
dlm_mode_str(lkb->lock.grmode),
890
dlm_mode_str(lkb->lock.rqmode),
892
(unsigned long long)lkb->lock.xid);
897
static void find_deadlock(struct lockspace *ls);
899
static void run_deadlock(struct lockspace *ls)
905
if (ls->all_checkpoints_ready)
906
log_group(ls, "WARNING: run_deadlock all_checkpoints_ready");
908
list_for_each_entry(node, &ls->deadlk_nodes, list) {
911
if (!node->checkpoint_ready)
914
log_group(ls, "nodeid %d checkpoint_ready = %d",
915
node->nodeid, node->checkpoint_ready);
920
ls->all_checkpoints_ready = 1;
922
list_for_each_entry(node, &ls->deadlk_nodes, list) {
925
if (node->nodeid < low || low == -1)
928
ls->deadlk_low_nodeid = low;
930
if (low == our_nodeid)
933
log_group(ls, "defer resolution to low nodeid %d", low);
936
void receive_checkpoint_ready(struct lockspace *ls, struct dlm_header *hd,
940
int nodeid = hd->nodeid;
942
log_group(ls, "receive_checkpoint_ready from %d", nodeid);
944
read_checkpoint(ls, nodeid);
946
list_for_each_entry(node, &ls->deadlk_nodes, list) {
947
if (node->nodeid == nodeid) {
948
node->checkpoint_ready = 1;
956
void receive_cycle_start(struct lockspace *ls, struct dlm_header *hd, int len)
959
int nodeid = hd->nodeid;
962
log_group(ls, "receive_cycle_start from %d", nodeid);
964
if (ls->cycle_running) {
965
log_group(ls, "cycle already running");
968
ls->cycle_running = 1;
969
gettimeofday(&ls->cycle_start_time, NULL);
971
list_for_each_entry(node, &ls->deadlk_nodes, list)
974
rv = read_debugfs_locks(ls);
976
log_error("can't read dlm debugfs file: %s", strerror(errno));
980
write_checkpoint(ls);
981
send_checkpoint_ready(ls);
984
static uint64_t dt_usec(struct timeval *start, struct timeval *stop)
988
dt = stop->tv_sec - start->tv_sec;
990
dt += stop->tv_usec - start->tv_usec;
994
/* TODO: nodes added during a cycle - what will they do with messages
995
they recv from other nodes running the cycle? */
997
void receive_cycle_end(struct lockspace *ls, struct dlm_header *hd, int len)
1000
int nodeid = hd->nodeid;
1003
if (!ls->cycle_running) {
1004
log_error("receive_cycle_end %s from %d: no cycle running",
1009
gettimeofday(&ls->cycle_end_time, NULL);
1010
usec = dt_usec(&ls->cycle_start_time, &ls->cycle_end_time);
1011
log_group(ls, "receive_cycle_end: from %d cycle time %.2f s",
1012
nodeid, usec * 1.e-6);
1014
ls->cycle_running = 0;
1015
ls->all_checkpoints_ready = 0;
1017
list_for_each_entry(node, &ls->deadlk_nodes, list)
1018
node->checkpoint_ready = 0;
1021
free_transactions(ls);
1022
unlink_checkpoint(ls);
1025
void receive_cancel_lock(struct lockspace *ls, struct dlm_header *hd, int len)
1028
int nodeid = hd->nodeid;
1029
uint32_t lkid = hd->msgdata;
1032
if (nodeid != our_nodeid)
1035
h = dlm_open_lockspace(ls->name);
1037
log_error("deadlock cancel %x from %d can't open lockspace %s",
1038
lkid, nodeid, ls->name);
1042
log_group(ls, "receive_cancel_lock %x from %d", lkid, nodeid);
1044
rv = dlm_ls_deadlock_cancel(h, lkid, 0);
1046
log_error("deadlock cancel %x from %x lib cancel errno %d",
1047
lkid, nodeid, errno);
1050
dlm_close_lockspace(h);
1053
static void node_joined(struct lockspace *ls, int nodeid)
1057
node = malloc(sizeof(struct node));
1059
log_error("node_joined: no memory");
1063
memset(node, 0, sizeof(struct node));
1064
node->nodeid = nodeid;
1065
list_add_tail(&node->list, &ls->deadlk_nodes);
1066
log_group(ls, "node %d joined deadlock cpg", nodeid);
1069
static void node_left(struct lockspace *ls, int nodeid, int reason)
1071
struct node *node, *safe;
1073
list_for_each_entry_safe(node, safe, &ls->deadlk_nodes, list) {
1074
if (node->nodeid != nodeid)
1077
list_del(&node->list);
1079
log_group(ls, "node %d left deadlock cpg", nodeid);
1083
static void purge_locks(struct lockspace *ls, int nodeid);
1085
void deadlk_confchg(struct lockspace *ls,
1086
const struct cpg_address *member_list,
1087
size_t member_list_entries,
1088
const struct cpg_address *left_list,
1089
size_t left_list_entries,
1090
const struct cpg_address *joined_list,
1091
size_t joined_list_entries)
1095
if (!cfgd_enable_deadlk)
1098
if (!ls->deadlk_confchg_init) {
1099
ls->deadlk_confchg_init = 1;
1100
for (i = 0; i < member_list_entries; i++)
1101
node_joined(ls, member_list[i].nodeid);
1105
/* nodes added during a cycle won't have node->in_cycle set so they
1106
won't be included in any of the cycle processing */
1108
for (i = 0; i < joined_list_entries; i++)
1109
node_joined(ls, joined_list[i].nodeid);
1111
for (i = 0; i < left_list_entries; i++)
1112
node_left(ls, left_list[i].nodeid, left_list[i].reason);
1114
if (!ls->cycle_running)
1117
if (!left_list_entries)
1120
if (!ls->all_checkpoints_ready) {
1125
for (i = 0; i < left_list_entries; i++)
1126
purge_locks(ls, left_list[i].nodeid);
1128
for (i = 0; i < left_list_entries; i++) {
1129
if (left_list[i].nodeid != ls->deadlk_low_nodeid)
1131
/* this will set a new low node which will call find_deadlock */
1137
/* would we ever call this after we've created the transaction lists?
1138
I don't think so; I think it can only be called between reading
1141
static void purge_locks(struct lockspace *ls, int nodeid)
1144
struct dlm_lkb *lkb, *safe;
1146
list_for_each_entry(r, &ls->resources, list) {
1147
list_for_each_entry_safe(lkb, safe, &r->locks, list) {
1148
if (lkb->home == nodeid) {
1149
list_del(&lkb->list);
1150
if (list_empty(&lkb->trans_list))
1153
log_group(ls, "purge %d %x on trans",
1154
nodeid, lkb->lock.id);
1160
static void add_lkb_trans(struct trans *tr, struct dlm_lkb *lkb)
1162
list_add(&lkb->trans_list, &tr->locks);
1166
static struct trans *get_trans(struct lockspace *ls, uint64_t xid)
1170
list_for_each_entry(tr, &ls->transactions, list) {
1175
tr = malloc(sizeof(struct trans));
1177
log_error("get_trans: no memory");
1181
memset(tr, 0, sizeof(struct trans));
1184
tr->waitfor_alloc = 0;
1185
tr->waitfor_count = 0;
1186
INIT_LIST_HEAD(&tr->locks);
1187
list_add(&tr->list, &ls->transactions);
1191
/* for each rsb, for each lock, find/create trans, add lkb to the trans list */
1193
static void create_trans_list(struct lockspace *ls)
1196
struct dlm_lkb *lkb;
1198
int r_count = 0, lkb_count = 0;
1200
list_for_each_entry(r, &ls->resources, list) {
1202
list_for_each_entry(lkb, &r->locks, list) {
1204
tr = get_trans(ls, lkb->lock.xid);
1207
add_lkb_trans(tr, lkb);
1211
log_group(ls, "create_trans_list: r_count %d lkb_count %d",
1212
r_count, lkb_count);
1215
static int locks_compat(struct dlm_lkb *waiting_lkb,
1216
struct dlm_lkb *granted_lkb)
1218
if (waiting_lkb == granted_lkb) {
1219
log_debug("waiting and granted same lock");
1223
if (waiting_lkb->trans->xid == granted_lkb->trans->xid) {
1224
log_debug("waiting and granted same trans %llx",
1225
(unsigned long long)waiting_lkb->trans->xid);
1229
return dlm_modes_compat(granted_lkb->lock.grmode,
1230
waiting_lkb->lock.rqmode);
1233
static int in_waitfor(struct trans *tr, struct trans *add_tr)
1237
for (i = 0; i < tr->waitfor_alloc; i++) {
1238
if (!tr->waitfor[i])
1240
if (tr->waitfor[i] == add_tr)
1246
static void add_waitfor(struct lockspace *ls, struct dlm_lkb *waiting_lkb,
1247
struct dlm_lkb *granted_lkb)
1249
struct trans *tr = waiting_lkb->trans;
1252
if (locks_compat(waiting_lkb, granted_lkb))
1255
/* this shouldn't happen AFAIK */
1256
if (tr == granted_lkb->trans) {
1257
log_group(ls, "trans %llx waiting on self",
1258
(unsigned long long)tr->xid);
1262
/* don't add the same trans to the waitfor list multiple times */
1263
if (tr->waitfor_count && in_waitfor(tr, granted_lkb->trans)) {
1264
log_group(ls, "trans %llx already waiting for trans %llx, "
1265
"waiting %x %s, granted %x %s",
1266
(unsigned long long)waiting_lkb->trans->xid,
1267
(unsigned long long)granted_lkb->trans->xid,
1268
waiting_lkb->lock.id, waiting_lkb->rsb->name,
1269
granted_lkb->lock.id, granted_lkb->rsb->name);
1273
if (tr->waitfor_count == tr->waitfor_alloc) {
1274
struct trans **old_waitfor = tr->waitfor;
1275
tr->waitfor_alloc += TR_NALLOC;
1276
tr->waitfor = malloc(tr->waitfor_alloc * sizeof(tr));
1278
log_error("add_waitfor no mem %u", tr->waitfor_alloc);
1281
memset(tr->waitfor, 0, tr->waitfor_alloc * sizeof(tr));
1283
/* copy then free old set of pointers */
1284
for (i = 0; i < tr->waitfor_count; i++)
1285
tr->waitfor[i] = old_waitfor[i];
1290
tr->waitfor[tr->waitfor_count++] = granted_lkb->trans;
1291
granted_lkb->trans->others_waiting_on_us++;
1292
waiting_lkb->waitfor_trans = granted_lkb->trans;
1295
/* for each trans, for each waiting lock, go to rsb of the lock,
1296
find granted locks on that rsb, then find the trans the
1297
granted lock belongs to, add that trans to our waitfor list */
1299
static void create_waitfor_graph(struct lockspace *ls)
1301
struct dlm_lkb *waiting_lkb, *granted_lkb;
1304
int depend_count = 0;
1306
list_for_each_entry(tr, &ls->transactions, list) {
1307
list_for_each_entry(waiting_lkb, &tr->locks, trans_list) {
1308
if (waiting_lkb->lock.status == DLM_LKSTS_GRANTED)
1310
/* waiting_lkb status is CONVERT or WAITING */
1312
r = waiting_lkb->rsb;
1314
list_for_each_entry(granted_lkb, &r->locks, list) {
1315
if (granted_lkb->lock.status==DLM_LKSTS_WAITING)
1317
/* granted_lkb status is GRANTED or CONVERT */
1318
add_waitfor(ls, waiting_lkb, granted_lkb);
1324
log_group(ls, "create_waitfor_graph: depend_count %d", depend_count);
1327
/* Assume a transaction that's not waiting on any locks will complete, release
1328
all the locks it currently holds, and exit. Other transactions that were
1329
blocked waiting on the removed transaction's now-released locks may now be
1330
unblocked, complete, release all held locks and exit. Repeat this until
1331
no more transactions can be removed. If there are transactions remaining,
1332
then they are deadlocked. */
1334
static void remove_waitfor(struct trans *tr, struct trans *remove_tr)
1338
for (i = 0; i < tr->waitfor_alloc; i++) {
1339
if (!tr->waitfor_count)
1342
if (!tr->waitfor[i])
1345
if (tr->waitfor[i] == remove_tr) {
1346
tr->waitfor[i] = NULL;
1347
tr->waitfor_count--;
1348
remove_tr->others_waiting_on_us--;
1353
/* remove_tr is not waiting for anything, assume it completes and goes away
1354
and remove it from any other transaction's waitfor list */
1356
static void remove_trans(struct lockspace *ls, struct trans *remove_tr)
1360
list_for_each_entry(tr, &ls->transactions, list) {
1361
if (tr == remove_tr)
1363
if (!remove_tr->others_waiting_on_us)
1365
remove_waitfor(tr, remove_tr);
1368
if (remove_tr->others_waiting_on_us)
1369
log_group(ls, "trans %llx removed others waiting %d",
1370
(unsigned long long)remove_tr->xid,
1371
remove_tr->others_waiting_on_us);
1374
static int reduce_waitfor_graph(struct lockspace *ls)
1376
struct trans *tr, *safe;
1380
list_for_each_entry_safe(tr, safe, &ls->transactions, list) {
1381
if (tr->waitfor_count) {
1385
remove_trans(ls, tr);
1386
list_del(&tr->list);
1393
log_group(ls, "reduce_waitfor_graph: %d blocked, %d removed",
1398
static void reduce_waitfor_graph_loop(struct lockspace *ls)
1403
removed = reduce_waitfor_graph(ls);
1409
static struct trans *find_trans_to_cancel(struct lockspace *ls)
1413
list_for_each_entry(tr, &ls->transactions, list) {
1414
if (!tr->others_waiting_on_us)
1421
static void cancel_trans(struct lockspace *ls)
1424
struct dlm_lkb *lkb;
1427
tr = find_trans_to_cancel(ls);
1429
log_group(ls, "cancel_trans: no trans found");
1433
list_for_each_entry(lkb, &tr->locks, trans_list) {
1434
if (lkb->lock.status == DLM_LKSTS_GRANTED)
1436
send_cancel_lock(ls, tr, lkb);
1438
/* When this canceled trans has multiple locks all blocked by
1439
locks held by one other trans, that other trans is only
1440
added to tr->waitfor once, and only one of these waiting
1441
locks will have waitfor_trans set. So, the lkb with
1442
non-null waitfor_trans was the first one responsible
1443
for adding waitfor_trans to tr->waitfor.
1445
We could potentially forget about keeping track of lkb->
1446
waitfor_trans, forget about calling remove_waitfor()
1447
here and just set tr->waitfor_count = 0 after this loop.
1448
The loss would be that waitfor_trans->others_waiting_on_us
1449
would not get decremented. */
1451
if (lkb->waitfor_trans)
1452
remove_waitfor(tr, lkb->waitfor_trans);
1455
/* this shouldn't happen, if it does something's not working right */
1456
if (tr->waitfor_count) {
1457
log_group(ls, "cancel_trans: %llx non-zero waitfor_count %d",
1458
(unsigned long long)tr->xid, tr->waitfor_count);
1461
/* this should now remove the canceled trans since it now has a zero
1463
removed = reduce_waitfor_graph(ls);
1466
log_group(ls, "canceled trans not removed from graph");
1468
/* now call reduce_waitfor_graph() in another loop and it
1469
should completely reduce */
1472
static void dump_trans(struct lockspace *ls, struct trans *tr)
1474
struct dlm_lkb *lkb;
1478
log_group(ls, "trans xid %llx waitfor_count %d others_waiting_on_us %d",
1479
(unsigned long long)tr->xid, tr->waitfor_count,
1480
tr->others_waiting_on_us);
1482
log_group(ls, "locks:");
1484
list_for_each_entry(lkb, &tr->locks, trans_list) {
1485
log_group(ls, " %s: id %08x gr %s rq %s pid %u:%u \"%s\"",
1486
status_str(lkb->lock.status),
1488
dlm_mode_str(lkb->lock.grmode),
1489
dlm_mode_str(lkb->lock.rqmode),
1495
if (!tr->waitfor_count)
1498
log_group(ls, "waitfor:");
1500
for (i = 0; i < tr->waitfor_alloc; i++) {
1501
if (!tr->waitfor[i])
1503
wf = tr->waitfor[i];
1504
log_group(ls, " xid %llx", (unsigned long long)wf->xid);
1508
static void dump_all_trans(struct lockspace *ls)
1512
log_group(ls, "Transaction dump:");
1514
list_for_each_entry(tr, &ls->transactions, list)
1518
static void find_deadlock(struct lockspace *ls)
1520
if (list_empty(&ls->resources)) {
1521
log_group(ls, "no deadlock: no resources");
1525
if (!list_empty(&ls->transactions)) {
1526
log_group(ls, "transactions list should be empty");
1531
create_trans_list(ls);
1532
create_waitfor_graph(ls);
1534
reduce_waitfor_graph_loop(ls);
1536
if (list_empty(&ls->transactions)) {
1537
log_group(ls, "no deadlock: all transactions reduced");
1541
log_group(ls, "found deadlock");
1545
reduce_waitfor_graph_loop(ls);
1547
if (list_empty(&ls->transactions)) {
1548
log_group(ls, "resolved deadlock with cancel");
1552
log_error("deadlock resolution failed");