~ubuntu-branches/ubuntu/trusty/dlm/trusty

« back to all changes in this revision

Viewing changes to dlm_controld/deadlock.c

  • Committer: Package Import Robot
  • Author(s): Andres Rodriguez
  • Date: 2013-07-23 15:50:10 UTC
  • Revision ID: package-import@ubuntu.com-20130723155010-khpwf6vc04wjho2a
Tags: upstream-4.0.1
ImportĀ upstreamĀ versionĀ 4.0.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * Copyright 2004-2012 Red Hat, Inc.
 
3
 *
 
4
 * This copyrighted material is made available to anyone wishing to use,
 
5
 * modify, copy, or redistribute it subject to the terms and conditions
 
6
 * of the GNU General Public License v2 or (at your option) any later version.
 
7
 */
 
8
 
 
9
#include "dlm_daemon.h"
 
10
#include "libdlm.h"
 
11
 
 
12
static SaCkptHandleT global_ckpt_h;
 
13
static SaCkptCallbacksT callbacks = { 0, 0 };
 
14
static SaVersionT version = { 'B', 1, 1 };
 
15
static char section_buf[10 * 1024 * 1024];  /* 10MB of pack_lock's enough? */
 
16
static uint32_t section_len;
 
17
static uint32_t section_max;
 
18
 
 
19
struct node {
 
20
        struct list_head        list;
 
21
        int                     nodeid;
 
22
        int                     checkpoint_ready; /* we've read its ckpt */
 
23
        int                     in_cycle;         /* participating in cycle */
 
24
};
 
25
 
 
26
enum {
 
27
        LOCAL_COPY = 1,
 
28
        MASTER_COPY = 2,
 
29
};
 
30
 
 
31
/* from linux/fs/dlm/dlm_internal.h */
 
32
#define DLM_LKSTS_WAITING       1
 
33
#define DLM_LKSTS_GRANTED       2
 
34
#define DLM_LKSTS_CONVERT       3
 
35
 
 
36
struct pack_lock {
 
37
        uint64_t                xid;
 
38
        uint32_t                id;
 
39
        int                     nodeid;
 
40
        uint32_t                remid;
 
41
        int                     ownpid;
 
42
        uint32_t                exflags;
 
43
        uint32_t                flags;
 
44
        int8_t                  status;
 
45
        int8_t                  grmode;
 
46
        int8_t                  rqmode;
 
47
        int8_t                  copy;
 
48
};
 
49
 
 
50
struct dlm_rsb {
 
51
        struct list_head        list;
 
52
        struct list_head        locks;
 
53
        char                    name[DLM_RESNAME_MAXLEN];
 
54
        int                     len;
 
55
};
 
56
 
 
57
/* information is saved in the lkb, and lkb->lock, from the perspective of the
 
58
   local or master copy, not the process copy */
 
59
 
 
60
struct dlm_lkb {
 
61
        struct list_head        list;       /* r->locks */
 
62
        struct pack_lock        lock;       /* data from debugfs/checkpoint */
 
63
        int                     home;       /* node where the lock owner lives*/
 
64
        struct dlm_rsb          *rsb;       /* lock is on resource */
 
65
        struct trans            *trans;     /* lock owned by this transaction */
 
66
        struct list_head        trans_list; /* tr->locks */
 
67
        struct trans            *waitfor_trans; /* the trans that's holding the
 
68
                                                   lock that's blocking us */
 
69
};
 
70
 
 
71
/* waitfor pointers alloc'ed 4 at at time */
 
72
#define TR_NALLOC               4
 
73
 
 
74
struct trans {
 
75
        struct list_head        list;
 
76
        struct list_head        locks;
 
77
        uint64_t                xid;
 
78
        int                     others_waiting_on_us; /* count of trans's
 
79
                                                         pointing to us in
 
80
                                                         waitfor */
 
81
        int                     waitfor_alloc;
 
82
        int                     waitfor_count;        /* count of in-use
 
83
                                                         waitfor slots and
 
84
                                                         num of trans's we're
 
85
                                                         waiting on */
 
86
        struct trans            **waitfor;            /* waitfor_alloc trans
 
87
                                                         pointers */
 
88
};
 
89
 
 
90
static const int __dlm_compat_matrix[8][8] = {
 
91
      /* UN NL CR CW PR PW EX PD */
 
92
        {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
 
93
        {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
 
94
        {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
 
95
        {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
 
96
        {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
 
97
        {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
 
98
        {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
 
99
        {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
 
100
};
 
101
 
 
102
static inline int dlm_modes_compat(int mode1, int mode2)
 
103
{
 
104
        return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
 
105
}
 
106
 
 
107
static const char *status_str(int lksts)
 
108
{
 
109
        switch (lksts) {
 
110
        case DLM_LKSTS_WAITING:
 
111
                return "W";
 
112
        case DLM_LKSTS_GRANTED:
 
113
                return "G";
 
114
        case DLM_LKSTS_CONVERT:
 
115
                return "C";
 
116
        }
 
117
        return "?";
 
118
}
 
119
 
 
120
static void free_resources(struct lockspace *ls)
 
121
{
 
122
        struct dlm_rsb *r, *r_safe;
 
123
        struct dlm_lkb *lkb, *lkb_safe;
 
124
 
 
125
        list_for_each_entry_safe(r, r_safe, &ls->resources, list) {
 
126
                list_for_each_entry_safe(lkb, lkb_safe, &r->locks, list) {
 
127
                        list_del(&lkb->list);
 
128
                        if (!list_empty(&lkb->trans_list))
 
129
                                list_del(&lkb->trans_list);
 
130
                        free(lkb);
 
131
                }
 
132
                list_del(&r->list);
 
133
                free(r);
 
134
        }
 
135
}
 
136
 
 
137
static void free_transactions(struct lockspace *ls)
 
138
{
 
139
        struct trans *tr, *tr_safe;
 
140
 
 
141
        list_for_each_entry_safe(tr, tr_safe, &ls->transactions, list) {
 
142
                list_del(&tr->list);
 
143
                if (tr->waitfor)
 
144
                        free(tr->waitfor);
 
145
                free(tr);
 
146
        }
 
147
}
 
148
 
 
149
static void disable_deadlock(void)
 
150
{
 
151
        log_error("FIXME: deadlock detection disabled");
 
152
}
 
153
 
 
154
void setup_deadlock(void)
 
155
{
 
156
        SaAisErrorT rv;
 
157
 
 
158
        if (!cfgd_enable_deadlk)
 
159
                return;
 
160
 
 
161
        rv = saCkptInitialize(&global_ckpt_h, &callbacks, &version);
 
162
        if (rv != SA_AIS_OK)
 
163
                log_error("ckpt init error %d", rv);
 
164
}
 
165
 
 
166
static struct dlm_rsb *get_resource(struct lockspace *ls, char *name, int len)
 
167
{
 
168
        struct dlm_rsb *r;
 
169
 
 
170
        list_for_each_entry(r, &ls->resources, list) {
 
171
                if (r->len == len && !strncmp(r->name, name, len))
 
172
                        return r;
 
173
        }
 
174
 
 
175
        r = malloc(sizeof(struct dlm_rsb));
 
176
        if (!r) {
 
177
                log_error("get_resource: no memory");
 
178
                disable_deadlock();
 
179
                return NULL;
 
180
        }
 
181
        memset(r, 0, sizeof(struct dlm_rsb));
 
182
        memcpy(r->name, name, len);
 
183
        r->len = len;
 
184
        INIT_LIST_HEAD(&r->locks);
 
185
        list_add(&r->list, &ls->resources);
 
186
        return r;
 
187
}
 
188
 
 
189
static struct dlm_lkb *create_lkb(void)
 
190
{
 
191
        struct dlm_lkb *lkb;
 
192
 
 
193
        lkb = malloc(sizeof(struct dlm_lkb));
 
194
        if (!lkb) {
 
195
                log_error("create_lkb: no memory");
 
196
                disable_deadlock();
 
197
        } else {
 
198
                memset(lkb, 0, sizeof(struct dlm_lkb));
 
199
                INIT_LIST_HEAD(&lkb->list);
 
200
                INIT_LIST_HEAD(&lkb->trans_list);
 
201
        }
 
202
        return lkb;
 
203
}
 
204
 
 
205
static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
 
206
{
 
207
        list_add(&lkb->list, &r->locks);
 
208
        lkb->rsb = r;
 
209
}
 
210
 
 
211
/* from linux/fs/dlm/dlm_internal.h */
 
212
#define IFL_MSTCPY 0x00010000
 
213
 
 
214
/* called on a lock that's just been read from debugfs */
 
215
 
 
216
static void set_copy(struct pack_lock *lock)
 
217
{
 
218
        uint32_t id, remid;
 
219
 
 
220
        if (!lock->nodeid)
 
221
                lock->copy = LOCAL_COPY;
 
222
        else if (lock->flags & IFL_MSTCPY)
 
223
                lock->copy = MASTER_COPY;
 
224
        else {
 
225
                /* process copy lock is converted to a partial master copy
 
226
                   lock that will be combined with the real master copy */
 
227
                lock->copy = MASTER_COPY;
 
228
                id = lock->id;
 
229
                remid = lock->remid;
 
230
                lock->id = remid;
 
231
                lock->remid = id;
 
232
                lock->nodeid = our_nodeid;
 
233
        }
 
234
}
 
235
 
 
236
/* xid is always zero in the real master copy, xid should always be non-zero
 
237
   in the partial master copy (what was a process copy) */
 
238
/* TODO: confirm or enforce that the partial will always have non-zero xid */
 
239
 
 
240
static int partial_master_copy(struct pack_lock *lock)
 
241
{
 
242
        return (lock->xid != 0);
 
243
}
 
244
 
 
245
static struct dlm_lkb *get_lkb(struct dlm_rsb *r, struct pack_lock *lock)
 
246
{
 
247
        struct dlm_lkb *lkb;
 
248
 
 
249
        if (lock->copy != MASTER_COPY)
 
250
                goto out;
 
251
 
 
252
        list_for_each_entry(lkb, &r->locks, list) {
 
253
                if (lkb->lock.nodeid == lock->nodeid &&
 
254
                    lkb->lock.id == lock->id)
 
255
                        return lkb;
 
256
        }
 
257
 out:
 
258
        return create_lkb();
 
259
}
 
260
 
 
261
static struct dlm_lkb *add_lock(struct lockspace *ls, struct dlm_rsb *r,
 
262
                                int from_nodeid, struct pack_lock *lock)
 
263
{
 
264
        struct dlm_lkb *lkb;
 
265
 
 
266
        lkb = get_lkb(r, lock);
 
267
        if (!lkb)
 
268
                return NULL;
 
269
 
 
270
        switch (lock->copy) {
 
271
        case LOCAL_COPY:
 
272
                lkb->lock.xid     = lock->xid;
 
273
                lkb->lock.nodeid  = lock->nodeid;
 
274
                lkb->lock.id      = lock->id;
 
275
                lkb->lock.remid   = lock->remid;
 
276
                lkb->lock.ownpid  = lock->ownpid;
 
277
                lkb->lock.exflags = lock->exflags;
 
278
                lkb->lock.flags   = lock->flags;
 
279
                lkb->lock.status  = lock->status;
 
280
                lkb->lock.grmode  = lock->grmode;
 
281
                lkb->lock.rqmode  = lock->rqmode;
 
282
                lkb->lock.copy    = LOCAL_COPY;
 
283
                lkb->home = from_nodeid;
 
284
 
 
285
                log_group(ls, "add %s local nodeid %d id %x remid %x xid %llx",
 
286
                          r->name, lock->nodeid, lock->id, lock->remid,
 
287
                          (unsigned long long)lock->xid);
 
288
                break;
 
289
 
 
290
        case MASTER_COPY:
 
291
                if (partial_master_copy(lock)) {
 
292
                        lkb->lock.xid     = lock->xid;
 
293
                        lkb->lock.nodeid  = lock->nodeid;
 
294
                        lkb->lock.id      = lock->id;
 
295
                        lkb->lock.remid   = lock->remid;
 
296
                        lkb->lock.copy    = MASTER_COPY;
 
297
                } else {
 
298
                        /* only set xid from partial master copy above */
 
299
                        lkb->lock.nodeid  = lock->nodeid;
 
300
                        lkb->lock.id      = lock->id;
 
301
                        lkb->lock.remid   = lock->remid;
 
302
                        lkb->lock.copy    = MASTER_COPY;
 
303
                        /* set other fields from real master copy */
 
304
                        lkb->lock.ownpid  = lock->ownpid;
 
305
                        lkb->lock.exflags = lock->exflags;
 
306
                        lkb->lock.flags   = lock->flags;
 
307
                        lkb->lock.status  = lock->status;
 
308
                        lkb->lock.grmode  = lock->grmode;
 
309
                        lkb->lock.rqmode  = lock->rqmode;
 
310
                }
 
311
                lkb->home = lock->nodeid;
 
312
 
 
313
                log_group(ls, "add %s master nodeid %d id %x remid %x xid %llx",
 
314
                          r->name, lock->nodeid, lock->id, lock->remid,
 
315
                          (unsigned long long)lock->xid);
 
316
                break;
 
317
        }
 
318
 
 
319
        if (list_empty(&lkb->list))
 
320
                add_lkb(r, lkb);
 
321
        return lkb;
 
322
}
 
323
 
 
324
static void parse_r_name(char *line, char *name)
 
325
{
 
326
        char *p;
 
327
        int i = 0;
 
328
        int begin = 0;
 
329
 
 
330
        for (p = line; ; p++) {
 
331
                if (*p == '"') {
 
332
                        if (begin)
 
333
                                break;
 
334
                        begin = 1;
 
335
                        continue;
 
336
                }
 
337
                if (begin)
 
338
                        name[i++] = *p;
 
339
        }
 
340
}
 
341
 
 
342
#define LOCK_LINE_MAX 1024
 
343
 
 
344
static int read_debugfs_locks(struct lockspace *ls)
 
345
{
 
346
        FILE *file;
 
347
        char path[PATH_MAX];
 
348
        char line[LOCK_LINE_MAX];
 
349
        struct dlm_rsb *r;
 
350
        struct pack_lock lock;
 
351
        char r_name[65];
 
352
        unsigned long long xid;
 
353
        unsigned int waiting;
 
354
        int r_nodeid;
 
355
        int r_len;
 
356
        int rv;
 
357
 
 
358
        snprintf(path, PATH_MAX, "/sys/kernel/debug/dlm/%s_locks", ls->name);
 
359
 
 
360
        file = fopen(path, "r");
 
361
        if (!file)
 
362
                return -1;
 
363
 
 
364
        /* skip the header on the first line */
 
365
        if (!fgets(line, LOCK_LINE_MAX, file)) {
 
366
                log_error("Unable to read %s: %d", path, errno);
 
367
                goto out;
 
368
        }
 
369
 
 
370
        while (fgets(line, LOCK_LINE_MAX, file)) {
 
371
                memset(&lock, 0, sizeof(struct pack_lock));
 
372
 
 
373
                rv = sscanf(line, "%x %d %x %u %llu %x %x %hhd %hhd %hhd %u %d %d",
 
374
                            &lock.id,
 
375
                            &lock.nodeid,
 
376
                            &lock.remid,
 
377
                            &lock.ownpid,
 
378
                            &xid,
 
379
                            &lock.exflags,
 
380
                            &lock.flags,
 
381
                            &lock.status,
 
382
                            &lock.grmode,
 
383
                            &lock.rqmode,
 
384
                            &waiting,
 
385
                            &r_nodeid,
 
386
                            &r_len);
 
387
 
 
388
                lock.xid = xid; /* hack to avoid warning */
 
389
 
 
390
                if (rv != 13) {
 
391
                        log_error("invalid debugfs line %d: %s", rv, line);
 
392
                        goto out;
 
393
                }
 
394
 
 
395
                memset(r_name, 0, sizeof(r_name));
 
396
                parse_r_name(line, r_name);
 
397
 
 
398
                r = get_resource(ls, r_name, r_len);
 
399
                if (!r)
 
400
                        break;
 
401
 
 
402
                set_copy(&lock);
 
403
                add_lock(ls, r, our_nodeid, &lock);
 
404
        }
 
405
 out:
 
406
        fclose(file);
 
407
        return 0;
 
408
}
 
409
 
 
410
static int read_checkpoint_locks(struct lockspace *ls, int from_nodeid,
 
411
                                 char *numbuf, int buflen)
 
412
{
 
413
        struct dlm_rsb *r;
 
414
        struct pack_lock *lock;
 
415
        int count = section_len / sizeof(struct pack_lock);
 
416
        int i;
 
417
 
 
418
        r = get_resource(ls, numbuf, buflen - 1);
 
419
        if (!r)
 
420
                return -1;
 
421
 
 
422
        lock = (struct pack_lock *) &section_buf;
 
423
 
 
424
        for (i = 0; i < count; i++) {
 
425
                lock->xid     = le64_to_cpu(lock->xid);
 
426
                lock->id      = le32_to_cpu(lock->id);
 
427
                lock->nodeid  = le32_to_cpu(lock->nodeid);
 
428
                lock->remid   = le32_to_cpu(lock->remid);
 
429
                lock->ownpid  = le32_to_cpu(lock->ownpid);
 
430
                lock->exflags = le32_to_cpu(lock->exflags);
 
431
                lock->flags   = le32_to_cpu(lock->flags);
 
432
 
 
433
                add_lock(ls, r, from_nodeid, lock);
 
434
                lock++;
 
435
        }
 
436
        return 0;
 
437
}
 
438
 
 
439
static int pack_lkb_list(struct list_head *q, struct pack_lock **lockp)
 
440
{
 
441
        struct dlm_lkb *lkb;
 
442
        struct pack_lock *lock = *lockp;
 
443
        int count = 0;
 
444
 
 
445
        list_for_each_entry(lkb, q, list) {
 
446
                if (count + 1 > section_max) {
 
447
                        log_error("too many locks %d for ckpt buf", count);
 
448
                        break;
 
449
                }
 
450
 
 
451
                lock->xid     = cpu_to_le64(lkb->lock.xid);
 
452
                lock->id      = cpu_to_le32(lkb->lock.id);
 
453
                lock->nodeid  = cpu_to_le32(lkb->lock.nodeid);
 
454
                lock->remid   = cpu_to_le32(lkb->lock.remid);
 
455
                lock->ownpid  = cpu_to_le32(lkb->lock.ownpid);
 
456
                lock->exflags = cpu_to_le32(lkb->lock.exflags);
 
457
                lock->flags   = cpu_to_le32(lkb->lock.flags);
 
458
                lock->status  = lkb->lock.status;
 
459
                lock->grmode  = lkb->lock.grmode;
 
460
                lock->rqmode  = lkb->lock.rqmode;
 
461
                lock->copy    = lkb->lock.copy;
 
462
 
 
463
                lock++;
 
464
                count++;
 
465
        }
 
466
        return count;
 
467
}
 
468
 
 
469
static void pack_section_buf(struct lockspace *ls, struct dlm_rsb *r)
 
470
{
 
471
        struct pack_lock *lock;
 
472
        int count;
 
473
 
 
474
        memset(&section_buf, 0, sizeof(section_buf));
 
475
        section_max = sizeof(section_buf) / sizeof(struct pack_lock);
 
476
 
 
477
        lock = (struct pack_lock *) &section_buf;
 
478
 
 
479
        count = pack_lkb_list(&r->locks, &lock);
 
480
 
 
481
        section_len = count * sizeof(struct pack_lock);
 
482
}
 
483
 
 
484
static int _unlink_checkpoint(struct lockspace *ls, SaNameT *name)
 
485
{
 
486
        SaCkptCheckpointHandleT h;
 
487
        SaCkptCheckpointDescriptorT s;
 
488
        SaAisErrorT rv;
 
489
        int ret = 0;
 
490
        int retries;
 
491
 
 
492
        h = (SaCkptCheckpointHandleT) ls->deadlk_ckpt_handle;
 
493
        log_group(ls, "unlink ckpt %llx", (unsigned long long)h);
 
494
 
 
495
        retries = 0;
 
496
 unlink_retry:
 
497
        rv = saCkptCheckpointUnlink(global_ckpt_h, name);
 
498
        if (rv == SA_AIS_ERR_TRY_AGAIN) {
 
499
                log_group(ls, "unlink ckpt retry");
 
500
                sleep(1);
 
501
                if (retries++ < 10)
 
502
                        goto unlink_retry;
 
503
        }
 
504
        if (rv == SA_AIS_OK)
 
505
                goto out_close;
 
506
        if (!h)
 
507
                goto out;
 
508
 
 
509
        log_error("unlink ckpt error %d %s", rv, ls->name);
 
510
        ret = -1;
 
511
 
 
512
        retries = 0;
 
513
 status_retry:
 
514
        rv = saCkptCheckpointStatusGet(h, &s);
 
515
        if (rv == SA_AIS_ERR_TRY_AGAIN) {
 
516
                log_group(ls, "unlink ckpt status retry");
 
517
                sleep(1);
 
518
                if (retries++ < 10)
 
519
                        goto status_retry;
 
520
        }
 
521
        if (rv != SA_AIS_OK) {
 
522
                log_error("unlink ckpt status error %d %s", rv, ls->name);
 
523
                goto out_close;
 
524
        }
 
525
 
 
526
        log_group(ls, "unlink ckpt status: size %llu, max sections %u, "
 
527
                      "max section size %llu, section count %u, mem %u",
 
528
                 (unsigned long long)s.checkpointCreationAttributes.checkpointSize,
 
529
                 s.checkpointCreationAttributes.maxSections,
 
530
                 (unsigned long long)s.checkpointCreationAttributes.maxSectionSize,
 
531
                 s.numberOfSections, s.memoryUsed);
 
532
 
 
533
 out_close:
 
534
        retries = 0;
 
535
 close_retry:
 
536
        rv = saCkptCheckpointClose(h);
 
537
        if (rv == SA_AIS_ERR_TRY_AGAIN) {
 
538
                log_group(ls, "unlink ckpt close retry");
 
539
                sleep(1);
 
540
                if (retries++ < 10)
 
541
                        goto close_retry;
 
542
        }
 
543
        if (rv != SA_AIS_OK) {
 
544
                log_error("unlink ckpt %llx close err %d %s",
 
545
                          (unsigned long long)h, rv, ls->name);
 
546
        }
 
547
 out:
 
548
        ls->deadlk_ckpt_handle = 0;
 
549
        return ret;
 
550
}
 
551
 
 
552
static int unlink_checkpoint(struct lockspace *ls)
 
553
{
 
554
        SaNameT name;
 
555
        int len;
 
556
 
 
557
        len = snprintf((char *)name.value, SA_MAX_NAME_LENGTH, "dlmdeadlk.%s.%d",
 
558
                       ls->name, our_nodeid);
 
559
        name.length = len;
 
560
 
 
561
        return _unlink_checkpoint(ls, &name);
 
562
}
 
563
 
 
564
static void read_checkpoint(struct lockspace *ls, int nodeid)
 
565
{
 
566
        SaCkptCheckpointHandleT h;
 
567
        SaCkptSectionIterationHandleT itr;
 
568
        SaCkptSectionDescriptorT desc;
 
569
        SaCkptIOVectorElementT iov;
 
570
        SaNameT name;
 
571
        SaAisErrorT rv;
 
572
        char buf[DLM_RESNAME_MAXLEN];
 
573
        int len;
 
574
        int retries;
 
575
 
 
576
        if (nodeid == our_nodeid)
 
577
                return;
 
578
 
 
579
        log_group(ls, "read_checkpoint %d", nodeid);
 
580
 
 
581
        len = snprintf((char *)name.value, SA_MAX_NAME_LENGTH, "dlmdeadlk.%s.%d",
 
582
                       ls->name, nodeid);
 
583
        name.length = len;
 
584
 
 
585
        retries = 0;
 
586
 open_retry:
 
587
        rv = saCkptCheckpointOpen(global_ckpt_h, &name, NULL,
 
588
                                  SA_CKPT_CHECKPOINT_READ, 0, &h);
 
589
        if (rv == SA_AIS_ERR_TRY_AGAIN) {
 
590
                log_group(ls, "read_checkpoint: %d ckpt open retry", nodeid);
 
591
                sleep(1);
 
592
                if (retries++ < 10)
 
593
                        goto open_retry;
 
594
        }
 
595
        if (rv != SA_AIS_OK) {
 
596
                log_error("read_checkpoint: %d ckpt open error %d", nodeid, rv);
 
597
                return;
 
598
        }
 
599
 
 
600
        retries = 0;
 
601
 init_retry:
 
602
        rv = saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY, 0, &itr);
 
603
        if (rv == SA_AIS_ERR_TRY_AGAIN) {
 
604
                log_group(ls, "read_checkpoint: ckpt iterinit retry");
 
605
                sleep(1);
 
606
                if (retries++ < 10)
 
607
                        goto init_retry;
 
608
        }
 
609
        if (rv != SA_AIS_OK) {
 
610
                log_error("read_checkpoint: %d ckpt iterinit error %d", nodeid, rv);
 
611
                goto out;
 
612
        }
 
613
 
 
614
        while (1) {
 
615
                retries = 0;
 
616
         next_retry:
 
617
                rv = saCkptSectionIterationNext(itr, &desc);
 
618
                if (rv == SA_AIS_ERR_NO_SECTIONS)
 
619
                        break;
 
620
                if (rv == SA_AIS_ERR_TRY_AGAIN) {
 
621
                        log_group(ls, "read_checkpoint: ckpt iternext retry");
 
622
                        sleep(1);
 
623
                        if (retries++ < 10)
 
624
                                goto next_retry;
 
625
                }
 
626
                if (rv != SA_AIS_OK) {
 
627
                        log_error("read_checkpoint: %d ckpt iternext error %d",
 
628
                                  nodeid, rv);
 
629
                        goto out_it;
 
630
                }
 
631
 
 
632
                if (!desc.sectionSize)
 
633
                        continue;
 
634
 
 
635
                iov.sectionId = desc.sectionId;
 
636
                iov.dataBuffer = &section_buf;
 
637
                iov.dataSize = desc.sectionSize;
 
638
                iov.dataOffset = 0;
 
639
 
 
640
                memset(&buf, 0, sizeof(buf));
 
641
                snprintf(buf, sizeof(buf), "%s", desc.sectionId.id);
 
642
 
 
643
                log_group(ls, "read_checkpoint: section size %llu id %u \"%s\"",
 
644
                          (unsigned long long)iov.dataSize,
 
645
                          iov.sectionId.idLen, buf);
 
646
 
 
647
                retries = 0;
 
648
         read_retry:
 
649
                rv = saCkptCheckpointRead(h, &iov, 1, NULL);
 
650
                if (rv == SA_AIS_ERR_TRY_AGAIN) {
 
651
                        log_group(ls, "read_checkpoint: ckpt read retry");
 
652
                        sleep(1);
 
653
                        if (retries++ < 10)
 
654
                                goto read_retry;
 
655
                }
 
656
                if (rv != SA_AIS_OK) {
 
657
                        log_error("read_checkpoint: %d ckpt read error %d",
 
658
                                  nodeid, rv);
 
659
                        goto out_it;
 
660
                }
 
661
 
 
662
                section_len = iov.readSize;
 
663
 
 
664
                if (!section_len)
 
665
                       continue;
 
666
 
 
667
                if (section_len % sizeof(struct pack_lock)) {
 
668
                        log_error("read_checkpoint: %d bad section len %d",
 
669
                                  nodeid, section_len);
 
670
                        continue;
 
671
                }
 
672
 
 
673
                read_checkpoint_locks(ls, nodeid, (char *)desc.sectionId.id,
 
674
                                      desc.sectionId.idLen);
 
675
        }
 
676
 
 
677
 out_it:
 
678
        saCkptSectionIterationFinalize(itr);
 
679
        retries = 0;
 
680
 out:
 
681
        rv = saCkptCheckpointClose(h);
 
682
        if (rv == SA_AIS_ERR_TRY_AGAIN) {
 
683
                log_group(ls, "read_checkpoint: unlink ckpt close retry");
 
684
                sleep(1);
 
685
                if (retries++ < 10)
 
686
                        goto out;
 
687
        }
 
688
        if (rv != SA_AIS_OK)
 
689
                log_error("read_checkpoint: %d close error %d", nodeid, rv);
 
690
}
 
691
 
 
692
static void write_checkpoint(struct lockspace *ls)
 
693
{
 
694
        SaCkptCheckpointCreationAttributesT attr;
 
695
        SaCkptCheckpointHandleT h;
 
696
        SaCkptSectionIdT section_id;
 
697
        SaCkptSectionCreationAttributesT section_attr;
 
698
        SaCkptCheckpointOpenFlagsT flags;
 
699
        SaNameT name;
 
700
        SaAisErrorT rv;
 
701
        char buf[DLM_RESNAME_MAXLEN];
 
702
        struct dlm_rsb *r;
 
703
        struct dlm_lkb *lkb;
 
704
        int r_count, lock_count, total_size, section_size, max_section_size;
 
705
        int len;
 
706
 
 
707
        len = snprintf((char *)name.value, SA_MAX_NAME_LENGTH, "dlmdeadlk.%s.%d",
 
708
                      ls->name, our_nodeid);
 
709
        name.length = len;
 
710
 
 
711
        /* unlink an old checkpoint before we create a new one */
 
712
        if (ls->deadlk_ckpt_handle) {
 
713
                log_error("write_checkpoint: old ckpt");
 
714
                if (_unlink_checkpoint(ls, &name))
 
715
                        return;
 
716
        }
 
717
 
 
718
        /* loop through all locks to figure out sizes to set in
 
719
           the attr fields */
 
720
 
 
721
        r_count = 0;
 
722
        lock_count = 0;
 
723
        total_size = 0;
 
724
        max_section_size = 0;
 
725
 
 
726
        list_for_each_entry(r, &ls->resources, list) {
 
727
                r_count++;
 
728
                section_size = 0;
 
729
                list_for_each_entry(lkb, &r->locks, list) {
 
730
                        section_size += sizeof(struct pack_lock);
 
731
                        lock_count++;
 
732
                }
 
733
                total_size += section_size;
 
734
                if (section_size > max_section_size)
 
735
                        max_section_size = section_size;
 
736
        }
 
737
 
 
738
        log_group(ls, "write_checkpoint: r_count %d, lock_count %d",
 
739
                  r_count, lock_count);
 
740
 
 
741
        log_group(ls, "write_checkpoint: total %d bytes, max_section %d bytes",
 
742
                  total_size, max_section_size);
 
743
 
 
744
        attr.creationFlags = SA_CKPT_WR_ALL_REPLICAS;
 
745
        attr.checkpointSize = total_size;
 
746
        attr.retentionDuration = SA_TIME_MAX;
 
747
        attr.maxSections = r_count + 1;      /* don't know why we need +1 */
 
748
        attr.maxSectionSize = max_section_size;
 
749
        attr.maxSectionIdSize = DLM_RESNAME_MAXLEN;
 
750
 
 
751
        flags = SA_CKPT_CHECKPOINT_READ |
 
752
                SA_CKPT_CHECKPOINT_WRITE |
 
753
                SA_CKPT_CHECKPOINT_CREATE;
 
754
 
 
755
 open_retry:
 
756
        rv = saCkptCheckpointOpen(global_ckpt_h, &name, &attr, flags, 0, &h);
 
757
        if (rv == SA_AIS_ERR_TRY_AGAIN) {
 
758
                log_group(ls, "write_checkpoint: ckpt open retry");
 
759
                sleep(1);
 
760
                goto open_retry;
 
761
        }
 
762
        if (rv == SA_AIS_ERR_EXIST) {
 
763
                log_group(ls, "write_checkpoint: ckpt already exists");
 
764
                return;
 
765
        }
 
766
        if (rv != SA_AIS_OK) {
 
767
                log_group(ls, "write_checkpoint: ckpt open error %d", rv);
 
768
                return;
 
769
        }
 
770
 
 
771
        log_group(ls, "write_checkpoint: open ckpt handle %llx",
 
772
                  (unsigned long long)h);
 
773
        ls->deadlk_ckpt_handle = (uint64_t) h;
 
774
 
 
775
        list_for_each_entry(r, &ls->resources, list) {
 
776
                memset(buf, 0, sizeof(buf));
 
777
                len = snprintf(buf, sizeof(buf), "%s", r->name);
 
778
 
 
779
                section_id.id = (void *)buf;
 
780
                section_id.idLen = len + 1;
 
781
                section_attr.sectionId = &section_id;
 
782
                section_attr.expirationTime = SA_TIME_END;
 
783
 
 
784
                pack_section_buf(ls, r);
 
785
 
 
786
                log_group(ls, "write_checkpoint: section size %u id %u \"%s\"",
 
787
                          section_len, section_id.idLen, buf);
 
788
 
 
789
         create_retry:
 
790
                rv = saCkptSectionCreate(h, &section_attr, &section_buf,
 
791
                                         section_len);
 
792
                if (rv == SA_AIS_ERR_TRY_AGAIN) {
 
793
                        log_group(ls, "write_checkpoint: ckpt create retry");
 
794
                        sleep(1);
 
795
                        goto create_retry;
 
796
                }
 
797
                if (rv == SA_AIS_ERR_EXIST) {
 
798
                        /* this shouldn't happen in general */
 
799
                        log_error("write_checkpoint: clearing old ckpt");
 
800
                        saCkptCheckpointClose(h);
 
801
                        _unlink_checkpoint(ls, &name);
 
802
                        goto open_retry;
 
803
                }
 
804
                if (rv != SA_AIS_OK) {
 
805
                        log_error("write_checkpoint: section create %d", rv);
 
806
                        break;
 
807
                }
 
808
        }
 
809
}
 
810
 
 
811
static void send_message(struct lockspace *ls, int type,
 
812
                         uint32_t to_nodeid, uint32_t msgdata)
 
813
{
 
814
        struct dlm_header *hd;
 
815
        int len;
 
816
        char *buf;
 
817
 
 
818
        len = sizeof(struct dlm_header);
 
819
        buf = malloc(len);
 
820
        if (!buf) {
 
821
                log_error("send_message: no memory");
 
822
                disable_deadlock();
 
823
                return;
 
824
        }
 
825
        memset(buf, 0, len);
 
826
 
 
827
        hd = (struct dlm_header *)buf;
 
828
        hd->type = type;
 
829
        hd->to_nodeid = to_nodeid;
 
830
        hd->msgdata = msgdata;
 
831
 
 
832
        dlm_send_message(ls, buf, len);
 
833
 
 
834
        free(buf);
 
835
}
 
836
 
 
837
static void send_checkpoint_ready(struct lockspace *ls)
 
838
{
 
839
        log_group(ls, "send_checkpoint_ready");
 
840
        send_message(ls, DLM_MSG_DEADLK_CHECKPOINT_READY, 0, 0);
 
841
}
 
842
 
 
843
void send_cycle_start(struct lockspace *ls)
 
844
{
 
845
        log_group(ls, "send_cycle_start");
 
846
        send_message(ls, DLM_MSG_DEADLK_CYCLE_START, 0, 0);
 
847
}
 
848
 
 
849
static void send_cycle_end(struct lockspace *ls)
 
850
{
 
851
        log_group(ls, "send_cycle_end");
 
852
        send_message(ls, DLM_MSG_DEADLK_CYCLE_END, 0, 0);
 
853
}
 
854
 
 
855
static void send_cancel_lock(struct lockspace *ls, struct trans *tr,
 
856
                             struct dlm_lkb *lkb)
 
857
{
 
858
        int to_nodeid;
 
859
        uint32_t lkid;
 
860
 
 
861
        if (!lkb->lock.nodeid)
 
862
                lkid = lkb->lock.id;
 
863
        else
 
864
                lkid = lkb->lock.remid;
 
865
        to_nodeid = lkb->home;
 
866
 
 
867
        log_group(ls, "send_cancel_lock to nodeid %d rsb %s id %x xid %llx",
 
868
                  to_nodeid, lkb->rsb->name, lkid,
 
869
                  (unsigned long long)lkb->lock.xid);
 
870
 
 
871
        send_message(ls, DLM_MSG_DEADLK_CANCEL_LOCK, to_nodeid, lkid);
 
872
}
 
873
 
 
874
static void dump_resources(struct lockspace *ls)
 
875
{
 
876
        struct dlm_rsb *r;
 
877
        struct dlm_lkb *lkb;
 
878
 
 
879
        log_group(ls, "Resource dump:");
 
880
 
 
881
        list_for_each_entry(r, &ls->resources, list) {
 
882
                log_group(ls, "\"%s\" len %d", r->name, r->len);
 
883
                list_for_each_entry(lkb, &r->locks, list) {
 
884
                        log_group(ls, "  %s: nodeid %d id %08x remid %08x gr %s rq %s pid %u xid %llx",
 
885
                                  status_str(lkb->lock.status),
 
886
                                  lkb->lock.nodeid,
 
887
                                  lkb->lock.id,
 
888
                                  lkb->lock.remid,
 
889
                                  dlm_mode_str(lkb->lock.grmode),
 
890
                                  dlm_mode_str(lkb->lock.rqmode),
 
891
                                  lkb->lock.ownpid,
 
892
                                  (unsigned long long)lkb->lock.xid);
 
893
                }
 
894
        }
 
895
}
 
896
 
 
897
static void find_deadlock(struct lockspace *ls);
 
898
 
 
899
static void run_deadlock(struct lockspace *ls)
 
900
{
 
901
        struct node *node;
 
902
        int not_ready = 0;
 
903
        int low = -1;
 
904
 
 
905
        if (ls->all_checkpoints_ready)
 
906
                log_group(ls, "WARNING: run_deadlock all_checkpoints_ready");
 
907
 
 
908
        list_for_each_entry(node, &ls->deadlk_nodes, list) {
 
909
                if (!node->in_cycle)
 
910
                        continue;
 
911
                if (!node->checkpoint_ready)
 
912
                        not_ready++;
 
913
 
 
914
                log_group(ls, "nodeid %d checkpoint_ready = %d",
 
915
                          node->nodeid, node->checkpoint_ready);
 
916
        }
 
917
        if (not_ready)
 
918
                return;
 
919
 
 
920
        ls->all_checkpoints_ready = 1;
 
921
 
 
922
        list_for_each_entry(node, &ls->deadlk_nodes, list) {
 
923
                if (!node->in_cycle)
 
924
                        continue;
 
925
                if (node->nodeid < low || low == -1)
 
926
                        low = node->nodeid;
 
927
        }
 
928
        ls->deadlk_low_nodeid = low;
 
929
 
 
930
        if (low == our_nodeid)
 
931
                find_deadlock(ls);
 
932
        else
 
933
                log_group(ls, "defer resolution to low nodeid %d", low);
 
934
}
 
935
 
 
936
void receive_checkpoint_ready(struct lockspace *ls, struct dlm_header *hd,
 
937
                              int len)
 
938
{
 
939
        struct node *node;
 
940
        int nodeid = hd->nodeid;
 
941
 
 
942
        log_group(ls, "receive_checkpoint_ready from %d", nodeid);
 
943
 
 
944
        read_checkpoint(ls, nodeid);
 
945
 
 
946
        list_for_each_entry(node, &ls->deadlk_nodes, list) {
 
947
                if (node->nodeid == nodeid) {
 
948
                        node->checkpoint_ready = 1;
 
949
                        break;
 
950
                }
 
951
        }
 
952
 
 
953
        run_deadlock(ls);
 
954
}
 
955
 
 
956
void receive_cycle_start(struct lockspace *ls, struct dlm_header *hd, int len)
 
957
{
 
958
        struct node *node;
 
959
        int nodeid = hd->nodeid;
 
960
        int rv;
 
961
 
 
962
        log_group(ls, "receive_cycle_start from %d", nodeid);
 
963
 
 
964
        if (ls->cycle_running) {
 
965
                log_group(ls, "cycle already running");
 
966
                return;
 
967
        }
 
968
        ls->cycle_running = 1;
 
969
        gettimeofday(&ls->cycle_start_time, NULL);
 
970
 
 
971
        list_for_each_entry(node, &ls->deadlk_nodes, list)
 
972
                node->in_cycle = 1;
 
973
 
 
974
        rv = read_debugfs_locks(ls);
 
975
        if (rv < 0) {
 
976
                log_error("can't read dlm debugfs file: %s", strerror(errno));
 
977
                return;
 
978
        }
 
979
 
 
980
        write_checkpoint(ls);
 
981
        send_checkpoint_ready(ls);
 
982
}
 
983
 
 
984
static uint64_t dt_usec(struct timeval *start, struct timeval *stop)
 
985
{
 
986
        uint64_t dt;
 
987
 
 
988
        dt = stop->tv_sec - start->tv_sec;
 
989
        dt *= 1000000;
 
990
        dt += stop->tv_usec - start->tv_usec;
 
991
        return dt;
 
992
}
 
993
 
 
994
/* TODO: nodes added during a cycle - what will they do with messages
 
995
   they recv from other nodes running the cycle? */
 
996
 
 
997
void receive_cycle_end(struct lockspace *ls, struct dlm_header *hd, int len)
 
998
{
 
999
        struct node *node;
 
1000
        int nodeid = hd->nodeid;
 
1001
        uint64_t usec;
 
1002
 
 
1003
        if (!ls->cycle_running) {
 
1004
                log_error("receive_cycle_end %s from %d: no cycle running",
 
1005
                          ls->name, nodeid);
 
1006
                return;
 
1007
        }
 
1008
 
 
1009
        gettimeofday(&ls->cycle_end_time, NULL);
 
1010
        usec = dt_usec(&ls->cycle_start_time, &ls->cycle_end_time);
 
1011
        log_group(ls, "receive_cycle_end: from %d cycle time %.2f s",
 
1012
                  nodeid, usec * 1.e-6);
 
1013
 
 
1014
        ls->cycle_running = 0;
 
1015
        ls->all_checkpoints_ready = 0;
 
1016
 
 
1017
        list_for_each_entry(node, &ls->deadlk_nodes, list)
 
1018
                node->checkpoint_ready = 0;
 
1019
 
 
1020
        free_resources(ls);
 
1021
        free_transactions(ls);
 
1022
        unlink_checkpoint(ls);
 
1023
}
 
1024
 
 
1025
void receive_cancel_lock(struct lockspace *ls, struct dlm_header *hd, int len)
 
1026
{
 
1027
        dlm_lshandle_t h;
 
1028
        int nodeid = hd->nodeid;
 
1029
        uint32_t lkid = hd->msgdata;
 
1030
        int rv;
 
1031
 
 
1032
        if (nodeid != our_nodeid)
 
1033
                return;
 
1034
 
 
1035
        h = dlm_open_lockspace(ls->name);
 
1036
        if (!h) {
 
1037
                log_error("deadlock cancel %x from %d can't open lockspace %s",
 
1038
                          lkid, nodeid, ls->name);
 
1039
                return;
 
1040
        }
 
1041
 
 
1042
        log_group(ls, "receive_cancel_lock %x from %d", lkid, nodeid);
 
1043
 
 
1044
        rv = dlm_ls_deadlock_cancel(h, lkid, 0);
 
1045
        if (rv < 0) {
 
1046
                log_error("deadlock cancel %x from %x lib cancel errno %d",
 
1047
                          lkid, nodeid, errno);
 
1048
        }
 
1049
 
 
1050
        dlm_close_lockspace(h);
 
1051
}
 
1052
 
 
1053
static void node_joined(struct lockspace *ls, int nodeid)
 
1054
{
 
1055
        struct node *node;
 
1056
 
 
1057
        node = malloc(sizeof(struct node));
 
1058
        if (!node) {
 
1059
                log_error("node_joined: no memory");
 
1060
                disable_deadlock();
 
1061
                return;
 
1062
        }
 
1063
        memset(node, 0, sizeof(struct node));
 
1064
        node->nodeid = nodeid;
 
1065
        list_add_tail(&node->list, &ls->deadlk_nodes);
 
1066
        log_group(ls, "node %d joined deadlock cpg", nodeid);
 
1067
}
 
1068
 
 
1069
static void node_left(struct lockspace *ls, int nodeid, int reason)
 
1070
{
 
1071
        struct node *node, *safe;
 
1072
 
 
1073
        list_for_each_entry_safe(node, safe, &ls->deadlk_nodes, list) {
 
1074
                if (node->nodeid != nodeid)
 
1075
                        continue;
 
1076
 
 
1077
                list_del(&node->list);
 
1078
                free(node);
 
1079
                log_group(ls, "node %d left deadlock cpg", nodeid);
 
1080
        }
 
1081
}
 
1082
 
 
1083
static void purge_locks(struct lockspace *ls, int nodeid);
 
1084
 
 
1085
void deadlk_confchg(struct lockspace *ls,
 
1086
                const struct cpg_address *member_list,
 
1087
                size_t member_list_entries,
 
1088
                const struct cpg_address *left_list,
 
1089
                size_t left_list_entries,
 
1090
                const struct cpg_address *joined_list,
 
1091
                size_t joined_list_entries)
 
1092
{
 
1093
        int i;
 
1094
 
 
1095
        if (!cfgd_enable_deadlk)
 
1096
                return;
 
1097
 
 
1098
        if (!ls->deadlk_confchg_init) {
 
1099
                ls->deadlk_confchg_init = 1;
 
1100
                for (i = 0; i < member_list_entries; i++)
 
1101
                        node_joined(ls, member_list[i].nodeid);
 
1102
                return;
 
1103
        }
 
1104
 
 
1105
        /* nodes added during a cycle won't have node->in_cycle set so they
 
1106
           won't be included in any of the cycle processing */
 
1107
 
 
1108
        for (i = 0; i < joined_list_entries; i++)
 
1109
                node_joined(ls, joined_list[i].nodeid);
 
1110
 
 
1111
        for (i = 0; i < left_list_entries; i++)
 
1112
                node_left(ls, left_list[i].nodeid, left_list[i].reason);
 
1113
 
 
1114
        if (!ls->cycle_running)
 
1115
                return;
 
1116
 
 
1117
        if (!left_list_entries)
 
1118
                return;
 
1119
 
 
1120
        if (!ls->all_checkpoints_ready) {
 
1121
                run_deadlock(ls);
 
1122
                return;
 
1123
        }
 
1124
 
 
1125
        for (i = 0; i < left_list_entries; i++)
 
1126
                purge_locks(ls, left_list[i].nodeid);
 
1127
 
 
1128
        for (i = 0; i < left_list_entries; i++) {
 
1129
                if (left_list[i].nodeid != ls->deadlk_low_nodeid)
 
1130
                        continue;
 
1131
                /* this will set a new low node which will call find_deadlock */
 
1132
                run_deadlock(ls);
 
1133
                break;
 
1134
        }
 
1135
}
 
1136
 
 
1137
/* would we ever call this after we've created the transaction lists?
 
1138
   I don't think so; I think it can only be called between reading
 
1139
   checkpoints */
 
1140
 
 
1141
static void purge_locks(struct lockspace *ls, int nodeid)
 
1142
{
 
1143
        struct dlm_rsb *r;
 
1144
        struct dlm_lkb *lkb, *safe;
 
1145
 
 
1146
        list_for_each_entry(r, &ls->resources, list) {
 
1147
                list_for_each_entry_safe(lkb, safe, &r->locks, list) {
 
1148
                        if (lkb->home == nodeid) {
 
1149
                                list_del(&lkb->list);
 
1150
                                if (list_empty(&lkb->trans_list))
 
1151
                                        free(lkb);
 
1152
                                else
 
1153
                                        log_group(ls, "purge %d %x on trans",
 
1154
                                                  nodeid, lkb->lock.id);
 
1155
                        }
 
1156
                }
 
1157
        }
 
1158
}
 
1159
 
 
1160
static void add_lkb_trans(struct trans *tr, struct dlm_lkb *lkb)
 
1161
{
 
1162
        list_add(&lkb->trans_list, &tr->locks);
 
1163
        lkb->trans = tr;
 
1164
}
 
1165
 
 
1166
static struct trans *get_trans(struct lockspace *ls, uint64_t xid)
 
1167
{
 
1168
        struct trans *tr;
 
1169
 
 
1170
        list_for_each_entry(tr, &ls->transactions, list) {
 
1171
                if (tr->xid == xid)
 
1172
                        return tr;
 
1173
        }
 
1174
 
 
1175
        tr = malloc(sizeof(struct trans));
 
1176
        if (!tr) {
 
1177
                log_error("get_trans: no memory");
 
1178
                disable_deadlock();
 
1179
                return NULL;
 
1180
        }
 
1181
        memset(tr, 0, sizeof(struct trans));
 
1182
        tr->xid = xid;
 
1183
        tr->waitfor = NULL;
 
1184
        tr->waitfor_alloc = 0;
 
1185
        tr->waitfor_count = 0;
 
1186
        INIT_LIST_HEAD(&tr->locks);
 
1187
        list_add(&tr->list, &ls->transactions);
 
1188
        return tr;
 
1189
}
 
1190
 
 
1191
/* for each rsb, for each lock, find/create trans, add lkb to the trans list */
 
1192
 
 
1193
static void create_trans_list(struct lockspace *ls)
 
1194
{
 
1195
        struct dlm_rsb *r;
 
1196
        struct dlm_lkb *lkb;
 
1197
        struct trans *tr;
 
1198
        int r_count = 0, lkb_count = 0;
 
1199
 
 
1200
        list_for_each_entry(r, &ls->resources, list) {
 
1201
                r_count++;
 
1202
                list_for_each_entry(lkb, &r->locks, list) {
 
1203
                        lkb_count++;
 
1204
                        tr = get_trans(ls, lkb->lock.xid);
 
1205
                        if (!tr)
 
1206
                                goto out;
 
1207
                        add_lkb_trans(tr, lkb);
 
1208
                }
 
1209
        }
 
1210
 out:
 
1211
        log_group(ls, "create_trans_list: r_count %d lkb_count %d",
 
1212
                  r_count, lkb_count);
 
1213
}
 
1214
 
 
1215
static int locks_compat(struct dlm_lkb *waiting_lkb,
 
1216
                        struct dlm_lkb *granted_lkb)
 
1217
{
 
1218
        if (waiting_lkb == granted_lkb) {
 
1219
                log_debug("waiting and granted same lock");
 
1220
                return 0;
 
1221
        }
 
1222
 
 
1223
        if (waiting_lkb->trans->xid == granted_lkb->trans->xid) {
 
1224
                log_debug("waiting and granted same trans %llx",
 
1225
                          (unsigned long long)waiting_lkb->trans->xid);
 
1226
                return 0;
 
1227
        }
 
1228
 
 
1229
        return dlm_modes_compat(granted_lkb->lock.grmode,
 
1230
                                waiting_lkb->lock.rqmode);
 
1231
}
 
1232
 
 
1233
static int in_waitfor(struct trans *tr, struct trans *add_tr)
 
1234
{
 
1235
        int i;
 
1236
 
 
1237
        for (i = 0; i < tr->waitfor_alloc; i++) {
 
1238
                if (!tr->waitfor[i])
 
1239
                        continue;
 
1240
                if (tr->waitfor[i] == add_tr)
 
1241
                        return 1;
 
1242
        }
 
1243
        return 0;
 
1244
}
 
1245
 
 
1246
static void add_waitfor(struct lockspace *ls, struct dlm_lkb *waiting_lkb,
 
1247
                        struct dlm_lkb *granted_lkb)
 
1248
{
 
1249
        struct trans *tr = waiting_lkb->trans;
 
1250
        int i;
 
1251
 
 
1252
        if (locks_compat(waiting_lkb, granted_lkb))
 
1253
                return;
 
1254
 
 
1255
        /* this shouldn't happen AFAIK */
 
1256
        if (tr == granted_lkb->trans) {
 
1257
                log_group(ls, "trans %llx waiting on self",
 
1258
                          (unsigned long long)tr->xid);
 
1259
                return;
 
1260
        }
 
1261
 
 
1262
        /* don't add the same trans to the waitfor list multiple times */
 
1263
        if (tr->waitfor_count && in_waitfor(tr, granted_lkb->trans)) {
 
1264
                log_group(ls, "trans %llx already waiting for trans %llx, "
 
1265
                          "waiting %x %s, granted %x %s",
 
1266
                          (unsigned long long)waiting_lkb->trans->xid,
 
1267
                          (unsigned long long)granted_lkb->trans->xid,
 
1268
                          waiting_lkb->lock.id, waiting_lkb->rsb->name,
 
1269
                          granted_lkb->lock.id, granted_lkb->rsb->name);
 
1270
                return;
 
1271
        }
 
1272
 
 
1273
        if (tr->waitfor_count == tr->waitfor_alloc) {
 
1274
                struct trans **old_waitfor = tr->waitfor;
 
1275
                tr->waitfor_alloc += TR_NALLOC;
 
1276
                tr->waitfor = malloc(tr->waitfor_alloc * sizeof(tr));
 
1277
                if (!tr->waitfor) {
 
1278
                        log_error("add_waitfor no mem %u", tr->waitfor_alloc);
 
1279
                        return;
 
1280
                }
 
1281
                memset(tr->waitfor, 0, tr->waitfor_alloc * sizeof(tr));
 
1282
 
 
1283
                /* copy then free old set of pointers */
 
1284
                for (i = 0; i < tr->waitfor_count; i++)
 
1285
                        tr->waitfor[i] = old_waitfor[i];
 
1286
                if (old_waitfor)
 
1287
                        free(old_waitfor);
 
1288
        }
 
1289
 
 
1290
        tr->waitfor[tr->waitfor_count++] = granted_lkb->trans;
 
1291
        granted_lkb->trans->others_waiting_on_us++;
 
1292
        waiting_lkb->waitfor_trans = granted_lkb->trans;
 
1293
}
 
1294
 
 
1295
/* for each trans, for each waiting lock, go to rsb of the lock,
 
1296
   find granted locks on that rsb, then find the trans the
 
1297
   granted lock belongs to, add that trans to our waitfor list */
 
1298
 
 
1299
static void create_waitfor_graph(struct lockspace *ls)
 
1300
{
 
1301
        struct dlm_lkb *waiting_lkb, *granted_lkb;
 
1302
        struct dlm_rsb *r;
 
1303
        struct trans *tr;
 
1304
        int depend_count = 0;
 
1305
 
 
1306
        list_for_each_entry(tr, &ls->transactions, list) {
 
1307
                list_for_each_entry(waiting_lkb, &tr->locks, trans_list) {
 
1308
                        if (waiting_lkb->lock.status == DLM_LKSTS_GRANTED)
 
1309
                                continue;
 
1310
                        /* waiting_lkb status is CONVERT or WAITING */
 
1311
 
 
1312
                        r = waiting_lkb->rsb;
 
1313
 
 
1314
                        list_for_each_entry(granted_lkb, &r->locks, list) {
 
1315
                                if (granted_lkb->lock.status==DLM_LKSTS_WAITING)
 
1316
                                        continue;
 
1317
                                /* granted_lkb status is GRANTED or CONVERT */
 
1318
                                add_waitfor(ls, waiting_lkb, granted_lkb);
 
1319
                                depend_count++;
 
1320
                        }
 
1321
                }
 
1322
        }
 
1323
 
 
1324
        log_group(ls, "create_waitfor_graph: depend_count %d", depend_count);
 
1325
}
 
1326
 
 
1327
/* Assume a transaction that's not waiting on any locks will complete, release
 
1328
   all the locks it currently holds, and exit.  Other transactions that were
 
1329
   blocked waiting on the removed transaction's now-released locks may now be
 
1330
   unblocked, complete, release all held locks and exit.  Repeat this until
 
1331
   no more transactions can be removed.  If there are transactions remaining,
 
1332
   then they are deadlocked. */
 
1333
 
 
1334
static void remove_waitfor(struct trans *tr, struct trans *remove_tr)
 
1335
{
 
1336
        int i;
 
1337
 
 
1338
        for (i = 0; i < tr->waitfor_alloc; i++) {
 
1339
                if (!tr->waitfor_count)
 
1340
                        break;
 
1341
 
 
1342
                if (!tr->waitfor[i])
 
1343
                        continue;
 
1344
 
 
1345
                if (tr->waitfor[i] == remove_tr) {
 
1346
                        tr->waitfor[i] = NULL;
 
1347
                        tr->waitfor_count--;
 
1348
                        remove_tr->others_waiting_on_us--;
 
1349
                }
 
1350
        }
 
1351
}
 
1352
 
 
1353
/* remove_tr is not waiting for anything, assume it completes and goes away
 
1354
   and remove it from any other transaction's waitfor list */
 
1355
 
 
1356
static void remove_trans(struct lockspace *ls, struct trans *remove_tr)
 
1357
{
 
1358
        struct trans *tr;
 
1359
 
 
1360
        list_for_each_entry(tr, &ls->transactions, list) {
 
1361
                if (tr == remove_tr)
 
1362
                        continue;
 
1363
                if (!remove_tr->others_waiting_on_us)
 
1364
                        break;
 
1365
                remove_waitfor(tr, remove_tr);
 
1366
        }
 
1367
 
 
1368
        if (remove_tr->others_waiting_on_us)
 
1369
                log_group(ls, "trans %llx removed others waiting %d",
 
1370
                          (unsigned long long)remove_tr->xid,
 
1371
                          remove_tr->others_waiting_on_us);
 
1372
}
 
1373
 
 
1374
static int reduce_waitfor_graph(struct lockspace *ls)
 
1375
{
 
1376
        struct trans *tr, *safe;
 
1377
        int blocked = 0;
 
1378
        int removed = 0;
 
1379
 
 
1380
        list_for_each_entry_safe(tr, safe, &ls->transactions, list) {
 
1381
                if (tr->waitfor_count) {
 
1382
                        blocked++;
 
1383
                        continue;
 
1384
                }
 
1385
                remove_trans(ls, tr);
 
1386
                list_del(&tr->list);
 
1387
                if (tr->waitfor)
 
1388
                        free(tr->waitfor);
 
1389
                free(tr);
 
1390
                removed++;
 
1391
        }
 
1392
 
 
1393
        log_group(ls, "reduce_waitfor_graph: %d blocked, %d removed",
 
1394
                  blocked, removed);
 
1395
        return removed;
 
1396
}
 
1397
 
 
1398
static void reduce_waitfor_graph_loop(struct lockspace *ls)
 
1399
{
 
1400
        int removed;
 
1401
 
 
1402
        while (1) {
 
1403
                removed = reduce_waitfor_graph(ls);
 
1404
                if (!removed)
 
1405
                        break;
 
1406
        }
 
1407
}
 
1408
 
 
1409
static struct trans *find_trans_to_cancel(struct lockspace *ls)
 
1410
{
 
1411
        struct trans *tr;
 
1412
 
 
1413
        list_for_each_entry(tr, &ls->transactions, list) {
 
1414
                if (!tr->others_waiting_on_us)
 
1415
                        continue;
 
1416
                return tr;
 
1417
        }
 
1418
        return NULL;
 
1419
}
 
1420
 
 
1421
static void cancel_trans(struct lockspace *ls)
 
1422
{
 
1423
        struct trans *tr;
 
1424
        struct dlm_lkb *lkb;
 
1425
        int removed;
 
1426
 
 
1427
        tr = find_trans_to_cancel(ls);
 
1428
        if (!tr) {
 
1429
                log_group(ls, "cancel_trans: no trans found");
 
1430
                return;
 
1431
        }
 
1432
 
 
1433
        list_for_each_entry(lkb, &tr->locks, trans_list) {
 
1434
                if (lkb->lock.status == DLM_LKSTS_GRANTED)
 
1435
                        continue;
 
1436
                send_cancel_lock(ls, tr, lkb);
 
1437
 
 
1438
                /* When this canceled trans has multiple locks all blocked by
 
1439
                   locks held by one other trans, that other trans is only
 
1440
                   added to tr->waitfor once, and only one of these waiting
 
1441
                   locks will have waitfor_trans set.  So, the lkb with
 
1442
                   non-null waitfor_trans was the first one responsible
 
1443
                   for adding waitfor_trans to tr->waitfor.
 
1444
 
 
1445
                   We could potentially forget about keeping track of lkb->
 
1446
                   waitfor_trans, forget about calling remove_waitfor()
 
1447
                   here and just set tr->waitfor_count = 0 after this loop.
 
1448
                   The loss would be that waitfor_trans->others_waiting_on_us
 
1449
                   would not get decremented. */
 
1450
 
 
1451
                if (lkb->waitfor_trans)
 
1452
                        remove_waitfor(tr, lkb->waitfor_trans);
 
1453
        }
 
1454
 
 
1455
        /* this shouldn't happen, if it does something's not working right */
 
1456
        if (tr->waitfor_count) {
 
1457
                log_group(ls, "cancel_trans: %llx non-zero waitfor_count %d",
 
1458
                          (unsigned long long)tr->xid, tr->waitfor_count);
 
1459
        }
 
1460
 
 
1461
        /* this should now remove the canceled trans since it now has a zero
 
1462
           waitfor_count */
 
1463
        removed = reduce_waitfor_graph(ls);
 
1464
 
 
1465
        if (!removed)
 
1466
                log_group(ls, "canceled trans not removed from graph");
 
1467
 
 
1468
        /* now call reduce_waitfor_graph() in another loop and it
 
1469
           should completely reduce */
 
1470
}
 
1471
 
 
1472
static void dump_trans(struct lockspace *ls, struct trans *tr)
 
1473
{
 
1474
        struct dlm_lkb *lkb;
 
1475
        struct trans *wf;
 
1476
        int i;
 
1477
 
 
1478
        log_group(ls, "trans xid %llx waitfor_count %d others_waiting_on_us %d",
 
1479
                  (unsigned long long)tr->xid, tr->waitfor_count,
 
1480
                  tr->others_waiting_on_us);
 
1481
 
 
1482
        log_group(ls, "locks:");
 
1483
        
 
1484
        list_for_each_entry(lkb, &tr->locks, trans_list) {
 
1485
                log_group(ls, "  %s: id %08x gr %s rq %s pid %u:%u \"%s\"",
 
1486
                          status_str(lkb->lock.status),
 
1487
                          lkb->lock.id,
 
1488
                          dlm_mode_str(lkb->lock.grmode),
 
1489
                          dlm_mode_str(lkb->lock.rqmode),
 
1490
                          lkb->home,
 
1491
                          lkb->lock.ownpid,
 
1492
                          lkb->rsb->name);
 
1493
        }
 
1494
 
 
1495
        if (!tr->waitfor_count)
 
1496
                return;
 
1497
 
 
1498
        log_group(ls, "waitfor:");
 
1499
 
 
1500
        for (i = 0; i < tr->waitfor_alloc; i++) {
 
1501
                if (!tr->waitfor[i])
 
1502
                        continue;
 
1503
                wf = tr->waitfor[i];
 
1504
                log_group(ls, "  xid %llx", (unsigned long long)wf->xid);
 
1505
        }
 
1506
}
 
1507
 
 
1508
static void dump_all_trans(struct lockspace *ls)
 
1509
{
 
1510
        struct trans *tr;
 
1511
 
 
1512
        log_group(ls, "Transaction dump:");
 
1513
 
 
1514
        list_for_each_entry(tr, &ls->transactions, list)
 
1515
                dump_trans(ls, tr);
 
1516
}
 
1517
 
 
1518
static void find_deadlock(struct lockspace *ls)
 
1519
{
 
1520
        if (list_empty(&ls->resources)) {
 
1521
                log_group(ls, "no deadlock: no resources");
 
1522
                goto out;
 
1523
        }
 
1524
 
 
1525
        if (!list_empty(&ls->transactions)) {
 
1526
                log_group(ls, "transactions list should be empty");
 
1527
                goto out;
 
1528
        }
 
1529
 
 
1530
        dump_resources(ls);
 
1531
        create_trans_list(ls);
 
1532
        create_waitfor_graph(ls);
 
1533
        dump_all_trans(ls);
 
1534
        reduce_waitfor_graph_loop(ls);
 
1535
 
 
1536
        if (list_empty(&ls->transactions)) {
 
1537
                log_group(ls, "no deadlock: all transactions reduced");
 
1538
                goto out;
 
1539
        }
 
1540
 
 
1541
        log_group(ls, "found deadlock");
 
1542
        dump_all_trans(ls);
 
1543
 
 
1544
        cancel_trans(ls);
 
1545
        reduce_waitfor_graph_loop(ls);
 
1546
 
 
1547
        if (list_empty(&ls->transactions)) {
 
1548
                log_group(ls, "resolved deadlock with cancel");
 
1549
                goto out;
 
1550
        }
 
1551
 
 
1552
        log_error("deadlock resolution failed");
 
1553
        dump_all_trans(ls);
 
1554
 out:
 
1555
        send_cycle_end(ls);
 
1556
}
 
1557