~ubuntu-branches/ubuntu/utopic/linux-ti-omap/utopic

« back to all changes in this revision

Viewing changes to fs/ocfs2/dlm/dlmmaster.c

  • Committer: Bazaar Package Importer
  • Author(s): Amit Kucheria, Amit Kucheria
  • Date: 2010-03-10 02:28:15 UTC
  • Revision ID: james.westby@ubuntu.com-20100310022815-7sd3gwvn5kenaq33
Tags: 2.6.33-500.1
[ Amit Kucheria ]

* Initial release of a 2.6.33-based OMAP kernel
* UBUNTU: [Upstream] Fix omap 1-wire driver compilation
* UBUNTU: ubuntu: AppArmor -- update to mainline 2010-03-04

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* -*- mode: c; c-basic-offset: 8; -*-
 
2
 * vim: noexpandtab sw=8 ts=8 sts=0:
 
3
 *
 
4
 * dlmmod.c
 
5
 *
 
6
 * standalone DLM module
 
7
 *
 
8
 * Copyright (C) 2004 Oracle.  All rights reserved.
 
9
 *
 
10
 * This program is free software; you can redistribute it and/or
 
11
 * modify it under the terms of the GNU General Public
 
12
 * License as published by the Free Software Foundation; either
 
13
 * version 2 of the License, or (at your option) any later version.
 
14
 *
 
15
 * This program is distributed in the hope that it will be useful,
 
16
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
17
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
18
 * General Public License for more details.
 
19
 *
 
20
 * You should have received a copy of the GNU General Public
 
21
 * License along with this program; if not, write to the
 
22
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 
23
 * Boston, MA 021110-1307, USA.
 
24
 *
 
25
 */
 
26
 
 
27
 
 
28
#include <linux/module.h>
 
29
#include <linux/fs.h>
 
30
#include <linux/types.h>
 
31
#include <linux/slab.h>
 
32
#include <linux/highmem.h>
 
33
#include <linux/init.h>
 
34
#include <linux/sysctl.h>
 
35
#include <linux/random.h>
 
36
#include <linux/blkdev.h>
 
37
#include <linux/socket.h>
 
38
#include <linux/inet.h>
 
39
#include <linux/spinlock.h>
 
40
#include <linux/delay.h>
 
41
 
 
42
 
 
43
#include "cluster/heartbeat.h"
 
44
#include "cluster/nodemanager.h"
 
45
#include "cluster/tcp.h"
 
46
 
 
47
#include "dlmapi.h"
 
48
#include "dlmcommon.h"
 
49
#include "dlmdomain.h"
 
50
#include "dlmdebug.h"
 
51
 
 
52
#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
 
53
#include "cluster/masklog.h"
 
54
 
 
55
static void dlm_mle_node_down(struct dlm_ctxt *dlm,
 
56
                              struct dlm_master_list_entry *mle,
 
57
                              struct o2nm_node *node,
 
58
                              int idx);
 
59
static void dlm_mle_node_up(struct dlm_ctxt *dlm,
 
60
                            struct dlm_master_list_entry *mle,
 
61
                            struct o2nm_node *node,
 
62
                            int idx);
 
63
 
 
64
static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
 
65
static int dlm_do_assert_master(struct dlm_ctxt *dlm,
 
66
                                struct dlm_lock_resource *res,
 
67
                                void *nodemap, u32 flags);
 
68
static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
 
69
 
 
70
static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
 
71
                                struct dlm_master_list_entry *mle,
 
72
                                const char *name,
 
73
                                unsigned int namelen)
 
74
{
 
75
        if (dlm != mle->dlm)
 
76
                return 0;
 
77
 
 
78
        if (namelen != mle->mnamelen ||
 
79
            memcmp(name, mle->mname, namelen) != 0)
 
80
                return 0;
 
81
 
 
82
        return 1;
 
83
}
 
84
 
 
85
static struct kmem_cache *dlm_lockres_cache = NULL;
 
86
static struct kmem_cache *dlm_lockname_cache = NULL;
 
87
static struct kmem_cache *dlm_mle_cache = NULL;
 
88
 
 
89
static void dlm_mle_release(struct kref *kref);
 
90
static void dlm_init_mle(struct dlm_master_list_entry *mle,
 
91
                        enum dlm_mle_type type,
 
92
                        struct dlm_ctxt *dlm,
 
93
                        struct dlm_lock_resource *res,
 
94
                        const char *name,
 
95
                        unsigned int namelen);
 
96
static void dlm_put_mle(struct dlm_master_list_entry *mle);
 
97
static void __dlm_put_mle(struct dlm_master_list_entry *mle);
 
98
static int dlm_find_mle(struct dlm_ctxt *dlm,
 
99
                        struct dlm_master_list_entry **mle,
 
100
                        char *name, unsigned int namelen);
 
101
 
 
102
static int dlm_do_master_request(struct dlm_lock_resource *res,
 
103
                                 struct dlm_master_list_entry *mle, int to);
 
104
 
 
105
 
 
106
static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
 
107
                                     struct dlm_lock_resource *res,
 
108
                                     struct dlm_master_list_entry *mle,
 
109
                                     int *blocked);
 
110
static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
 
111
                                    struct dlm_lock_resource *res,
 
112
                                    struct dlm_master_list_entry *mle,
 
113
                                    int blocked);
 
114
static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
 
115
                                 struct dlm_lock_resource *res,
 
116
                                 struct dlm_master_list_entry *mle,
 
117
                                 struct dlm_master_list_entry **oldmle,
 
118
                                 const char *name, unsigned int namelen,
 
119
                                 u8 new_master, u8 master);
 
120
 
 
121
static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
 
122
                                    struct dlm_lock_resource *res);
 
123
static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
 
124
                                      struct dlm_lock_resource *res);
 
125
static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
 
126
                                       struct dlm_lock_resource *res,
 
127
                                       u8 target);
 
128
static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
 
129
                                       struct dlm_lock_resource *res);
 
130
 
 
131
 
 
132
int dlm_is_host_down(int errno)
 
133
{
 
134
        switch (errno) {
 
135
                case -EBADF:
 
136
                case -ECONNREFUSED:
 
137
                case -ENOTCONN:
 
138
                case -ECONNRESET:
 
139
                case -EPIPE:
 
140
                case -EHOSTDOWN:
 
141
                case -EHOSTUNREACH:
 
142
                case -ETIMEDOUT:
 
143
                case -ECONNABORTED:
 
144
                case -ENETDOWN:
 
145
                case -ENETUNREACH:
 
146
                case -ENETRESET:
 
147
                case -ESHUTDOWN:
 
148
                case -ENOPROTOOPT:
 
149
                case -EINVAL:   /* if returned from our tcp code,
 
150
                                   this means there is no socket */
 
151
                        return 1;
 
152
        }
 
153
        return 0;
 
154
}
 
155
 
 
156
 
 
157
/*
 
158
 * MASTER LIST FUNCTIONS
 
159
 */
 
160
 
 
161
 
 
162
/*
 
163
 * regarding master list entries and heartbeat callbacks:
 
164
 *
 
165
 * in order to avoid sleeping and allocation that occurs in
 
166
 * heartbeat, master list entries are simply attached to the
 
167
 * dlm's established heartbeat callbacks.  the mle is attached
 
168
 * when it is created, and since the dlm->spinlock is held at
 
169
 * that time, any heartbeat event will be properly discovered
 
170
 * by the mle.  the mle needs to be detached from the
 
171
 * dlm->mle_hb_events list as soon as heartbeat events are no
 
172
 * longer useful to the mle, and before the mle is freed.
 
173
 *
 
174
 * as a general rule, heartbeat events are no longer needed by
 
175
 * the mle once an "answer" regarding the lock master has been
 
176
 * received.
 
177
 */
 
178
static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
 
179
                                              struct dlm_master_list_entry *mle)
 
180
{
 
181
        assert_spin_locked(&dlm->spinlock);
 
182
 
 
183
        list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
 
184
}
 
185
 
 
186
 
 
187
static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
 
188
                                              struct dlm_master_list_entry *mle)
 
189
{
 
190
        if (!list_empty(&mle->hb_events))
 
191
                list_del_init(&mle->hb_events);
 
192
}
 
193
 
 
194
 
 
195
static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
 
196
                                            struct dlm_master_list_entry *mle)
 
197
{
 
198
        spin_lock(&dlm->spinlock);
 
199
        __dlm_mle_detach_hb_events(dlm, mle);
 
200
        spin_unlock(&dlm->spinlock);
 
201
}
 
202
 
 
203
static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
 
204
{
 
205
        struct dlm_ctxt *dlm;
 
206
        dlm = mle->dlm;
 
207
 
 
208
        assert_spin_locked(&dlm->spinlock);
 
209
        assert_spin_locked(&dlm->master_lock);
 
210
        mle->inuse++;
 
211
        kref_get(&mle->mle_refs);
 
212
}
 
213
 
 
214
static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
 
215
{
 
216
        struct dlm_ctxt *dlm;
 
217
        dlm = mle->dlm;
 
218
 
 
219
        spin_lock(&dlm->spinlock);
 
220
        spin_lock(&dlm->master_lock);
 
221
        mle->inuse--;
 
222
        __dlm_put_mle(mle);
 
223
        spin_unlock(&dlm->master_lock);
 
224
        spin_unlock(&dlm->spinlock);
 
225
 
 
226
}
 
227
 
 
228
/* remove from list and free */
 
229
static void __dlm_put_mle(struct dlm_master_list_entry *mle)
 
230
{
 
231
        struct dlm_ctxt *dlm;
 
232
        dlm = mle->dlm;
 
233
 
 
234
        assert_spin_locked(&dlm->spinlock);
 
235
        assert_spin_locked(&dlm->master_lock);
 
236
        if (!atomic_read(&mle->mle_refs.refcount)) {
 
237
                /* this may or may not crash, but who cares.
 
238
                 * it's a BUG. */
 
239
                mlog(ML_ERROR, "bad mle: %p\n", mle);
 
240
                dlm_print_one_mle(mle);
 
241
                BUG();
 
242
        } else
 
243
                kref_put(&mle->mle_refs, dlm_mle_release);
 
244
}
 
245
 
 
246
 
 
247
/* must not have any spinlocks coming in */
 
248
static void dlm_put_mle(struct dlm_master_list_entry *mle)
 
249
{
 
250
        struct dlm_ctxt *dlm;
 
251
        dlm = mle->dlm;
 
252
 
 
253
        spin_lock(&dlm->spinlock);
 
254
        spin_lock(&dlm->master_lock);
 
255
        __dlm_put_mle(mle);
 
256
        spin_unlock(&dlm->master_lock);
 
257
        spin_unlock(&dlm->spinlock);
 
258
}
 
259
 
 
260
static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
 
261
{
 
262
        kref_get(&mle->mle_refs);
 
263
}
 
264
 
 
265
static void dlm_init_mle(struct dlm_master_list_entry *mle,
 
266
                        enum dlm_mle_type type,
 
267
                        struct dlm_ctxt *dlm,
 
268
                        struct dlm_lock_resource *res,
 
269
                        const char *name,
 
270
                        unsigned int namelen)
 
271
{
 
272
        assert_spin_locked(&dlm->spinlock);
 
273
 
 
274
        mle->dlm = dlm;
 
275
        mle->type = type;
 
276
        INIT_HLIST_NODE(&mle->master_hash_node);
 
277
        INIT_LIST_HEAD(&mle->hb_events);
 
278
        memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
 
279
        spin_lock_init(&mle->spinlock);
 
280
        init_waitqueue_head(&mle->wq);
 
281
        atomic_set(&mle->woken, 0);
 
282
        kref_init(&mle->mle_refs);
 
283
        memset(mle->response_map, 0, sizeof(mle->response_map));
 
284
        mle->master = O2NM_MAX_NODES;
 
285
        mle->new_master = O2NM_MAX_NODES;
 
286
        mle->inuse = 0;
 
287
 
 
288
        BUG_ON(mle->type != DLM_MLE_BLOCK &&
 
289
               mle->type != DLM_MLE_MASTER &&
 
290
               mle->type != DLM_MLE_MIGRATION);
 
291
 
 
292
        if (mle->type == DLM_MLE_MASTER) {
 
293
                BUG_ON(!res);
 
294
                mle->mleres = res;
 
295
                memcpy(mle->mname, res->lockname.name, res->lockname.len);
 
296
                mle->mnamelen = res->lockname.len;
 
297
                mle->mnamehash = res->lockname.hash;
 
298
        } else {
 
299
                BUG_ON(!name);
 
300
                mle->mleres = NULL;
 
301
                memcpy(mle->mname, name, namelen);
 
302
                mle->mnamelen = namelen;
 
303
                mle->mnamehash = dlm_lockid_hash(name, namelen);
 
304
        }
 
305
 
 
306
        atomic_inc(&dlm->mle_tot_count[mle->type]);
 
307
        atomic_inc(&dlm->mle_cur_count[mle->type]);
 
308
 
 
309
        /* copy off the node_map and register hb callbacks on our copy */
 
310
        memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
 
311
        memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
 
312
        clear_bit(dlm->node_num, mle->vote_map);
 
313
        clear_bit(dlm->node_num, mle->node_map);
 
314
 
 
315
        /* attach the mle to the domain node up/down events */
 
316
        __dlm_mle_attach_hb_events(dlm, mle);
 
317
}
 
318
 
 
319
void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
 
320
{
 
321
        assert_spin_locked(&dlm->spinlock);
 
322
        assert_spin_locked(&dlm->master_lock);
 
323
 
 
324
        if (!hlist_unhashed(&mle->master_hash_node))
 
325
                hlist_del_init(&mle->master_hash_node);
 
326
}
 
327
 
 
328
void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
 
329
{
 
330
        struct hlist_head *bucket;
 
331
 
 
332
        assert_spin_locked(&dlm->master_lock);
 
333
 
 
334
        bucket = dlm_master_hash(dlm, mle->mnamehash);
 
335
        hlist_add_head(&mle->master_hash_node, bucket);
 
336
}
 
337
 
 
338
/* returns 1 if found, 0 if not */
 
339
static int dlm_find_mle(struct dlm_ctxt *dlm,
 
340
                        struct dlm_master_list_entry **mle,
 
341
                        char *name, unsigned int namelen)
 
342
{
 
343
        struct dlm_master_list_entry *tmpmle;
 
344
        struct hlist_head *bucket;
 
345
        struct hlist_node *list;
 
346
        unsigned int hash;
 
347
 
 
348
        assert_spin_locked(&dlm->master_lock);
 
349
 
 
350
        hash = dlm_lockid_hash(name, namelen);
 
351
        bucket = dlm_master_hash(dlm, hash);
 
352
        hlist_for_each(list, bucket) {
 
353
                tmpmle = hlist_entry(list, struct dlm_master_list_entry,
 
354
                                     master_hash_node);
 
355
                if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
 
356
                        continue;
 
357
                dlm_get_mle(tmpmle);
 
358
                *mle = tmpmle;
 
359
                return 1;
 
360
        }
 
361
        return 0;
 
362
}
 
363
 
 
364
void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
 
365
{
 
366
        struct dlm_master_list_entry *mle;
 
367
 
 
368
        assert_spin_locked(&dlm->spinlock);
 
369
 
 
370
        list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) {
 
371
                if (node_up)
 
372
                        dlm_mle_node_up(dlm, mle, NULL, idx);
 
373
                else
 
374
                        dlm_mle_node_down(dlm, mle, NULL, idx);
 
375
        }
 
376
}
 
377
 
 
378
static void dlm_mle_node_down(struct dlm_ctxt *dlm,
 
379
                              struct dlm_master_list_entry *mle,
 
380
                              struct o2nm_node *node, int idx)
 
381
{
 
382
        spin_lock(&mle->spinlock);
 
383
 
 
384
        if (!test_bit(idx, mle->node_map))
 
385
                mlog(0, "node %u already removed from nodemap!\n", idx);
 
386
        else
 
387
                clear_bit(idx, mle->node_map);
 
388
 
 
389
        spin_unlock(&mle->spinlock);
 
390
}
 
391
 
 
392
static void dlm_mle_node_up(struct dlm_ctxt *dlm,
 
393
                            struct dlm_master_list_entry *mle,
 
394
                            struct o2nm_node *node, int idx)
 
395
{
 
396
        spin_lock(&mle->spinlock);
 
397
 
 
398
        if (test_bit(idx, mle->node_map))
 
399
                mlog(0, "node %u already in node map!\n", idx);
 
400
        else
 
401
                set_bit(idx, mle->node_map);
 
402
 
 
403
        spin_unlock(&mle->spinlock);
 
404
}
 
405
 
 
406
 
 
407
int dlm_init_mle_cache(void)
 
408
{
 
409
        dlm_mle_cache = kmem_cache_create("o2dlm_mle",
 
410
                                          sizeof(struct dlm_master_list_entry),
 
411
                                          0, SLAB_HWCACHE_ALIGN,
 
412
                                          NULL);
 
413
        if (dlm_mle_cache == NULL)
 
414
                return -ENOMEM;
 
415
        return 0;
 
416
}
 
417
 
 
418
void dlm_destroy_mle_cache(void)
 
419
{
 
420
        if (dlm_mle_cache)
 
421
                kmem_cache_destroy(dlm_mle_cache);
 
422
}
 
423
 
 
424
static void dlm_mle_release(struct kref *kref)
 
425
{
 
426
        struct dlm_master_list_entry *mle;
 
427
        struct dlm_ctxt *dlm;
 
428
 
 
429
        mlog_entry_void();
 
430
 
 
431
        mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
 
432
        dlm = mle->dlm;
 
433
 
 
434
        assert_spin_locked(&dlm->spinlock);
 
435
        assert_spin_locked(&dlm->master_lock);
 
436
 
 
437
        mlog(0, "Releasing mle for %.*s, type %d\n", mle->mnamelen, mle->mname,
 
438
             mle->type);
 
439
 
 
440
        /* remove from list if not already */
 
441
        __dlm_unlink_mle(dlm, mle);
 
442
 
 
443
        /* detach the mle from the domain node up/down events */
 
444
        __dlm_mle_detach_hb_events(dlm, mle);
 
445
 
 
446
        atomic_dec(&dlm->mle_cur_count[mle->type]);
 
447
 
 
448
        /* NOTE: kfree under spinlock here.
 
449
         * if this is bad, we can move this to a freelist. */
 
450
        kmem_cache_free(dlm_mle_cache, mle);
 
451
}
 
452
 
 
453
 
 
454
/*
 
455
 * LOCK RESOURCE FUNCTIONS
 
456
 */
 
457
 
 
458
int dlm_init_master_caches(void)
 
459
{
 
460
        dlm_lockres_cache = kmem_cache_create("o2dlm_lockres",
 
461
                                              sizeof(struct dlm_lock_resource),
 
462
                                              0, SLAB_HWCACHE_ALIGN, NULL);
 
463
        if (!dlm_lockres_cache)
 
464
                goto bail;
 
465
 
 
466
        dlm_lockname_cache = kmem_cache_create("o2dlm_lockname",
 
467
                                               DLM_LOCKID_NAME_MAX, 0,
 
468
                                               SLAB_HWCACHE_ALIGN, NULL);
 
469
        if (!dlm_lockname_cache)
 
470
                goto bail;
 
471
 
 
472
        return 0;
 
473
bail:
 
474
        dlm_destroy_master_caches();
 
475
        return -ENOMEM;
 
476
}
 
477
 
 
478
void dlm_destroy_master_caches(void)
 
479
{
 
480
        if (dlm_lockname_cache)
 
481
                kmem_cache_destroy(dlm_lockname_cache);
 
482
 
 
483
        if (dlm_lockres_cache)
 
484
                kmem_cache_destroy(dlm_lockres_cache);
 
485
}
 
486
 
 
487
static void dlm_lockres_release(struct kref *kref)
 
488
{
 
489
        struct dlm_lock_resource *res;
 
490
        struct dlm_ctxt *dlm;
 
491
 
 
492
        res = container_of(kref, struct dlm_lock_resource, refs);
 
493
        dlm = res->dlm;
 
494
 
 
495
        /* This should not happen -- all lockres' have a name
 
496
         * associated with them at init time. */
 
497
        BUG_ON(!res->lockname.name);
 
498
 
 
499
        mlog(0, "destroying lockres %.*s\n", res->lockname.len,
 
500
             res->lockname.name);
 
501
 
 
502
        spin_lock(&dlm->track_lock);
 
503
        if (!list_empty(&res->tracking))
 
504
                list_del_init(&res->tracking);
 
505
        else {
 
506
                mlog(ML_ERROR, "Resource %.*s not on the Tracking list\n",
 
507
                     res->lockname.len, res->lockname.name);
 
508
                dlm_print_one_lock_resource(res);
 
509
        }
 
510
        spin_unlock(&dlm->track_lock);
 
511
 
 
512
        atomic_dec(&dlm->res_cur_count);
 
513
 
 
514
        dlm_put(dlm);
 
515
 
 
516
        if (!hlist_unhashed(&res->hash_node) ||
 
517
            !list_empty(&res->granted) ||
 
518
            !list_empty(&res->converting) ||
 
519
            !list_empty(&res->blocked) ||
 
520
            !list_empty(&res->dirty) ||
 
521
            !list_empty(&res->recovering) ||
 
522
            !list_empty(&res->purge)) {
 
523
                mlog(ML_ERROR,
 
524
                     "Going to BUG for resource %.*s."
 
525
                     "  We're on a list! [%c%c%c%c%c%c%c]\n",
 
526
                     res->lockname.len, res->lockname.name,
 
527
                     !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
 
528
                     !list_empty(&res->granted) ? 'G' : ' ',
 
529
                     !list_empty(&res->converting) ? 'C' : ' ',
 
530
                     !list_empty(&res->blocked) ? 'B' : ' ',
 
531
                     !list_empty(&res->dirty) ? 'D' : ' ',
 
532
                     !list_empty(&res->recovering) ? 'R' : ' ',
 
533
                     !list_empty(&res->purge) ? 'P' : ' ');
 
534
 
 
535
                dlm_print_one_lock_resource(res);
 
536
        }
 
537
 
 
538
        /* By the time we're ready to blow this guy away, we shouldn't
 
539
         * be on any lists. */
 
540
        BUG_ON(!hlist_unhashed(&res->hash_node));
 
541
        BUG_ON(!list_empty(&res->granted));
 
542
        BUG_ON(!list_empty(&res->converting));
 
543
        BUG_ON(!list_empty(&res->blocked));
 
544
        BUG_ON(!list_empty(&res->dirty));
 
545
        BUG_ON(!list_empty(&res->recovering));
 
546
        BUG_ON(!list_empty(&res->purge));
 
547
 
 
548
        kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
 
549
 
 
550
        kmem_cache_free(dlm_lockres_cache, res);
 
551
}
 
552
 
 
553
void dlm_lockres_put(struct dlm_lock_resource *res)
 
554
{
 
555
        kref_put(&res->refs, dlm_lockres_release);
 
556
}
 
557
 
 
558
static void dlm_init_lockres(struct dlm_ctxt *dlm,
 
559
                             struct dlm_lock_resource *res,
 
560
                             const char *name, unsigned int namelen)
 
561
{
 
562
        char *qname;
 
563
 
 
564
        /* If we memset here, we lose our reference to the kmalloc'd
 
565
         * res->lockname.name, so be sure to init every field
 
566
         * correctly! */
 
567
 
 
568
        qname = (char *) res->lockname.name;
 
569
        memcpy(qname, name, namelen);
 
570
 
 
571
        res->lockname.len = namelen;
 
572
        res->lockname.hash = dlm_lockid_hash(name, namelen);
 
573
 
 
574
        init_waitqueue_head(&res->wq);
 
575
        spin_lock_init(&res->spinlock);
 
576
        INIT_HLIST_NODE(&res->hash_node);
 
577
        INIT_LIST_HEAD(&res->granted);
 
578
        INIT_LIST_HEAD(&res->converting);
 
579
        INIT_LIST_HEAD(&res->blocked);
 
580
        INIT_LIST_HEAD(&res->dirty);
 
581
        INIT_LIST_HEAD(&res->recovering);
 
582
        INIT_LIST_HEAD(&res->purge);
 
583
        INIT_LIST_HEAD(&res->tracking);
 
584
        atomic_set(&res->asts_reserved, 0);
 
585
        res->migration_pending = 0;
 
586
        res->inflight_locks = 0;
 
587
 
 
588
        /* put in dlm_lockres_release */
 
589
        dlm_grab(dlm);
 
590
        res->dlm = dlm;
 
591
 
 
592
        kref_init(&res->refs);
 
593
 
 
594
        atomic_inc(&dlm->res_tot_count);
 
595
        atomic_inc(&dlm->res_cur_count);
 
596
 
 
597
        /* just for consistency */
 
598
        spin_lock(&res->spinlock);
 
599
        dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
 
600
        spin_unlock(&res->spinlock);
 
601
 
 
602
        res->state = DLM_LOCK_RES_IN_PROGRESS;
 
603
 
 
604
        res->last_used = 0;
 
605
 
 
606
        spin_lock(&dlm->spinlock);
 
607
        list_add_tail(&res->tracking, &dlm->tracking_list);
 
608
        spin_unlock(&dlm->spinlock);
 
609
 
 
610
        memset(res->lvb, 0, DLM_LVB_LEN);
 
611
        memset(res->refmap, 0, sizeof(res->refmap));
 
612
}
 
613
 
 
614
struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
 
615
                                   const char *name,
 
616
                                   unsigned int namelen)
 
617
{
 
618
        struct dlm_lock_resource *res = NULL;
 
619
 
 
620
        res = (struct dlm_lock_resource *)
 
621
                                kmem_cache_zalloc(dlm_lockres_cache, GFP_NOFS);
 
622
        if (!res)
 
623
                goto error;
 
624
 
 
625
        res->lockname.name = (char *)
 
626
                                kmem_cache_zalloc(dlm_lockname_cache, GFP_NOFS);
 
627
        if (!res->lockname.name)
 
628
                goto error;
 
629
 
 
630
        dlm_init_lockres(dlm, res, name, namelen);
 
631
        return res;
 
632
 
 
633
error:
 
634
        if (res && res->lockname.name)
 
635
                kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
 
636
 
 
637
        if (res)
 
638
                kmem_cache_free(dlm_lockres_cache, res);
 
639
        return NULL;
 
640
}
 
641
 
 
642
void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
 
643
                                   struct dlm_lock_resource *res,
 
644
                                   int new_lockres,
 
645
                                   const char *file,
 
646
                                   int line)
 
647
{
 
648
        if (!new_lockres)
 
649
                assert_spin_locked(&res->spinlock);
 
650
 
 
651
        if (!test_bit(dlm->node_num, res->refmap)) {
 
652
                BUG_ON(res->inflight_locks != 0);
 
653
                dlm_lockres_set_refmap_bit(dlm->node_num, res);
 
654
        }
 
655
        res->inflight_locks++;
 
656
        mlog(0, "%s:%.*s: inflight++: now %u\n",
 
657
             dlm->name, res->lockname.len, res->lockname.name,
 
658
             res->inflight_locks);
 
659
}
 
660
 
 
661
void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
 
662
                                   struct dlm_lock_resource *res,
 
663
                                   const char *file,
 
664
                                   int line)
 
665
{
 
666
        assert_spin_locked(&res->spinlock);
 
667
 
 
668
        BUG_ON(res->inflight_locks == 0);
 
669
        res->inflight_locks--;
 
670
        mlog(0, "%s:%.*s: inflight--: now %u\n",
 
671
             dlm->name, res->lockname.len, res->lockname.name,
 
672
             res->inflight_locks);
 
673
        if (res->inflight_locks == 0)
 
674
                dlm_lockres_clear_refmap_bit(dlm->node_num, res);
 
675
        wake_up(&res->wq);
 
676
}
 
677
 
 
678
/*
 
679
 * lookup a lock resource by name.
 
680
 * may already exist in the hashtable.
 
681
 * lockid is null terminated
 
682
 *
 
683
 * if not, allocate enough for the lockres and for
 
684
 * the temporary structure used in doing the mastering.
 
685
 *
 
686
 * also, do a lookup in the dlm->master_list to see
 
687
 * if another node has begun mastering the same lock.
 
688
 * if so, there should be a block entry in there
 
689
 * for this name, and we should *not* attempt to master
 
690
 * the lock here.   need to wait around for that node
 
691
 * to assert_master (or die).
 
692
 *
 
693
 */
 
694
struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
 
695
                                          const char *lockid,
 
696
                                          int namelen,
 
697
                                          int flags)
 
698
{
 
699
        struct dlm_lock_resource *tmpres=NULL, *res=NULL;
 
700
        struct dlm_master_list_entry *mle = NULL;
 
701
        struct dlm_master_list_entry *alloc_mle = NULL;
 
702
        int blocked = 0;
 
703
        int ret, nodenum;
 
704
        struct dlm_node_iter iter;
 
705
        unsigned int hash;
 
706
        int tries = 0;
 
707
        int bit, wait_on_recovery = 0;
 
708
        int drop_inflight_if_nonlocal = 0;
 
709
 
 
710
        BUG_ON(!lockid);
 
711
 
 
712
        hash = dlm_lockid_hash(lockid, namelen);
 
713
 
 
714
        mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
 
715
 
 
716
lookup:
 
717
        spin_lock(&dlm->spinlock);
 
718
        tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
 
719
        if (tmpres) {
 
720
                int dropping_ref = 0;
 
721
 
 
722
                spin_unlock(&dlm->spinlock);
 
723
 
 
724
                spin_lock(&tmpres->spinlock);
 
725
                /* We wait for the other thread that is mastering the resource */
 
726
                if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
 
727
                        __dlm_wait_on_lockres(tmpres);
 
728
                        BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN);
 
729
                }
 
730
 
 
731
                if (tmpres->owner == dlm->node_num) {
 
732
                        BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF);
 
733
                        dlm_lockres_grab_inflight_ref(dlm, tmpres);
 
734
                } else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF)
 
735
                        dropping_ref = 1;
 
736
                spin_unlock(&tmpres->spinlock);
 
737
 
 
738
                /* wait until done messaging the master, drop our ref to allow
 
739
                 * the lockres to be purged, start over. */
 
740
                if (dropping_ref) {
 
741
                        spin_lock(&tmpres->spinlock);
 
742
                        __dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF);
 
743
                        spin_unlock(&tmpres->spinlock);
 
744
                        dlm_lockres_put(tmpres);
 
745
                        tmpres = NULL;
 
746
                        goto lookup;
 
747
                }
 
748
 
 
749
                mlog(0, "found in hash!\n");
 
750
                if (res)
 
751
                        dlm_lockres_put(res);
 
752
                res = tmpres;
 
753
                goto leave;
 
754
        }
 
755
 
 
756
        if (!res) {
 
757
                spin_unlock(&dlm->spinlock);
 
758
                mlog(0, "allocating a new resource\n");
 
759
                /* nothing found and we need to allocate one. */
 
760
                alloc_mle = (struct dlm_master_list_entry *)
 
761
                        kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
 
762
                if (!alloc_mle)
 
763
                        goto leave;
 
764
                res = dlm_new_lockres(dlm, lockid, namelen);
 
765
                if (!res)
 
766
                        goto leave;
 
767
                goto lookup;
 
768
        }
 
769
 
 
770
        mlog(0, "no lockres found, allocated our own: %p\n", res);
 
771
 
 
772
        if (flags & LKM_LOCAL) {
 
773
                /* caller knows it's safe to assume it's not mastered elsewhere
 
774
                 * DONE!  return right away */
 
775
                spin_lock(&res->spinlock);
 
776
                dlm_change_lockres_owner(dlm, res, dlm->node_num);
 
777
                __dlm_insert_lockres(dlm, res);
 
778
                dlm_lockres_grab_inflight_ref(dlm, res);
 
779
                spin_unlock(&res->spinlock);
 
780
                spin_unlock(&dlm->spinlock);
 
781
                /* lockres still marked IN_PROGRESS */
 
782
                goto wake_waiters;
 
783
        }
 
784
 
 
785
        /* check master list to see if another node has started mastering it */
 
786
        spin_lock(&dlm->master_lock);
 
787
 
 
788
        /* if we found a block, wait for lock to be mastered by another node */
 
789
        blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
 
790
        if (blocked) {
 
791
                int mig;
 
792
                if (mle->type == DLM_MLE_MASTER) {
 
793
                        mlog(ML_ERROR, "master entry for nonexistent lock!\n");
 
794
                        BUG();
 
795
                }
 
796
                mig = (mle->type == DLM_MLE_MIGRATION);
 
797
                /* if there is a migration in progress, let the migration
 
798
                 * finish before continuing.  we can wait for the absence
 
799
                 * of the MIGRATION mle: either the migrate finished or
 
800
                 * one of the nodes died and the mle was cleaned up.
 
801
                 * if there is a BLOCK here, but it already has a master
 
802
                 * set, we are too late.  the master does not have a ref
 
803
                 * for us in the refmap.  detach the mle and drop it.
 
804
                 * either way, go back to the top and start over. */
 
805
                if (mig || mle->master != O2NM_MAX_NODES) {
 
806
                        BUG_ON(mig && mle->master == dlm->node_num);
 
807
                        /* we arrived too late.  the master does not
 
808
                         * have a ref for us. retry. */
 
809
                        mlog(0, "%s:%.*s: late on %s\n",
 
810
                             dlm->name, namelen, lockid,
 
811
                             mig ?  "MIGRATION" : "BLOCK");
 
812
                        spin_unlock(&dlm->master_lock);
 
813
                        spin_unlock(&dlm->spinlock);
 
814
 
 
815
                        /* master is known, detach */
 
816
                        if (!mig)
 
817
                                dlm_mle_detach_hb_events(dlm, mle);
 
818
                        dlm_put_mle(mle);
 
819
                        mle = NULL;
 
820
                        /* this is lame, but we cant wait on either
 
821
                         * the mle or lockres waitqueue here */
 
822
                        if (mig)
 
823
                                msleep(100);
 
824
                        goto lookup;
 
825
                }
 
826
        } else {
 
827
                /* go ahead and try to master lock on this node */
 
828
                mle = alloc_mle;
 
829
                /* make sure this does not get freed below */
 
830
                alloc_mle = NULL;
 
831
                dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
 
832
                set_bit(dlm->node_num, mle->maybe_map);
 
833
                __dlm_insert_mle(dlm, mle);
 
834
 
 
835
                /* still holding the dlm spinlock, check the recovery map
 
836
                 * to see if there are any nodes that still need to be
 
837
                 * considered.  these will not appear in the mle nodemap
 
838
                 * but they might own this lockres.  wait on them. */
 
839
                bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
 
840
                if (bit < O2NM_MAX_NODES) {
 
841
                        mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to "
 
842
                             "recover before lock mastery can begin\n",
 
843
                             dlm->name, namelen, (char *)lockid, bit);
 
844
                        wait_on_recovery = 1;
 
845
                }
 
846
        }
 
847
 
 
848
        /* at this point there is either a DLM_MLE_BLOCK or a
 
849
         * DLM_MLE_MASTER on the master list, so it's safe to add the
 
850
         * lockres to the hashtable.  anyone who finds the lock will
 
851
         * still have to wait on the IN_PROGRESS. */
 
852
 
 
853
        /* finally add the lockres to its hash bucket */
 
854
        __dlm_insert_lockres(dlm, res);
 
855
        /* since this lockres is new it doesnt not require the spinlock */
 
856
        dlm_lockres_grab_inflight_ref_new(dlm, res);
 
857
 
 
858
        /* if this node does not become the master make sure to drop
 
859
         * this inflight reference below */
 
860
        drop_inflight_if_nonlocal = 1;
 
861
 
 
862
        /* get an extra ref on the mle in case this is a BLOCK
 
863
         * if so, the creator of the BLOCK may try to put the last
 
864
         * ref at this time in the assert master handler, so we
 
865
         * need an extra one to keep from a bad ptr deref. */
 
866
        dlm_get_mle_inuse(mle);
 
867
        spin_unlock(&dlm->master_lock);
 
868
        spin_unlock(&dlm->spinlock);
 
869
 
 
870
redo_request:
 
871
        while (wait_on_recovery) {
 
872
                /* any cluster changes that occurred after dropping the
 
873
                 * dlm spinlock would be detectable be a change on the mle,
 
874
                 * so we only need to clear out the recovery map once. */
 
875
                if (dlm_is_recovery_lock(lockid, namelen)) {
 
876
                        mlog(ML_NOTICE, "%s: recovery map is not empty, but "
 
877
                             "must master $RECOVERY lock now\n", dlm->name);
 
878
                        if (!dlm_pre_master_reco_lockres(dlm, res))
 
879
                                wait_on_recovery = 0;
 
880
                        else {
 
881
                                mlog(0, "%s: waiting 500ms for heartbeat state "
 
882
                                    "change\n", dlm->name);
 
883
                                msleep(500);
 
884
                        }
 
885
                        continue;
 
886
                }
 
887
 
 
888
                dlm_kick_recovery_thread(dlm);
 
889
                msleep(1000);
 
890
                dlm_wait_for_recovery(dlm);
 
891
 
 
892
                spin_lock(&dlm->spinlock);
 
893
                bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
 
894
                if (bit < O2NM_MAX_NODES) {
 
895
                        mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to "
 
896
                             "recover before lock mastery can begin\n",
 
897
                             dlm->name, namelen, (char *)lockid, bit);
 
898
                        wait_on_recovery = 1;
 
899
                } else
 
900
                        wait_on_recovery = 0;
 
901
                spin_unlock(&dlm->spinlock);
 
902
 
 
903
                if (wait_on_recovery)
 
904
                        dlm_wait_for_node_recovery(dlm, bit, 10000);
 
905
        }
 
906
 
 
907
        /* must wait for lock to be mastered elsewhere */
 
908
        if (blocked)
 
909
                goto wait;
 
910
 
 
911
        ret = -EINVAL;
 
912
        dlm_node_iter_init(mle->vote_map, &iter);
 
913
        while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
 
914
                ret = dlm_do_master_request(res, mle, nodenum);
 
915
                if (ret < 0)
 
916
                        mlog_errno(ret);
 
917
                if (mle->master != O2NM_MAX_NODES) {
 
918
                        /* found a master ! */
 
919
                        if (mle->master <= nodenum)
 
920
                                break;
 
921
                        /* if our master request has not reached the master
 
922
                         * yet, keep going until it does.  this is how the
 
923
                         * master will know that asserts are needed back to
 
924
                         * the lower nodes. */
 
925
                        mlog(0, "%s:%.*s: requests only up to %u but master "
 
926
                             "is %u, keep going\n", dlm->name, namelen,
 
927
                             lockid, nodenum, mle->master);
 
928
                }
 
929
        }
 
930
 
 
931
wait:
 
932
        /* keep going until the response map includes all nodes */
 
933
        ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
 
934
        if (ret < 0) {
 
935
                wait_on_recovery = 1;
 
936
                mlog(0, "%s:%.*s: node map changed, redo the "
 
937
                     "master request now, blocked=%d\n",
 
938
                     dlm->name, res->lockname.len,
 
939
                     res->lockname.name, blocked);
 
940
                if (++tries > 20) {
 
941
                        mlog(ML_ERROR, "%s:%.*s: spinning on "
 
942
                             "dlm_wait_for_lock_mastery, blocked=%d\n",
 
943
                             dlm->name, res->lockname.len,
 
944
                             res->lockname.name, blocked);
 
945
                        dlm_print_one_lock_resource(res);
 
946
                        dlm_print_one_mle(mle);
 
947
                        tries = 0;
 
948
                }
 
949
                goto redo_request;
 
950
        }
 
951
 
 
952
        mlog(0, "lockres mastered by %u\n", res->owner);
 
953
        /* make sure we never continue without this */
 
954
        BUG_ON(res->owner == O2NM_MAX_NODES);
 
955
 
 
956
        /* master is known, detach if not already detached */
 
957
        dlm_mle_detach_hb_events(dlm, mle);
 
958
        dlm_put_mle(mle);
 
959
        /* put the extra ref */
 
960
        dlm_put_mle_inuse(mle);
 
961
 
 
962
wake_waiters:
 
963
        spin_lock(&res->spinlock);
 
964
        if (res->owner != dlm->node_num && drop_inflight_if_nonlocal)
 
965
                dlm_lockres_drop_inflight_ref(dlm, res);
 
966
        res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
 
967
        spin_unlock(&res->spinlock);
 
968
        wake_up(&res->wq);
 
969
 
 
970
leave:
 
971
        /* need to free the unused mle */
 
972
        if (alloc_mle)
 
973
                kmem_cache_free(dlm_mle_cache, alloc_mle);
 
974
 
 
975
        return res;
 
976
}
 
977
 
 
978
 
 
979
#define DLM_MASTERY_TIMEOUT_MS   5000
 
980
 
 
981
static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
 
982
                                     struct dlm_lock_resource *res,
 
983
                                     struct dlm_master_list_entry *mle,
 
984
                                     int *blocked)
 
985
{
 
986
        u8 m;
 
987
        int ret, bit;
 
988
        int map_changed, voting_done;
 
989
        int assert, sleep;
 
990
 
 
991
recheck:
 
992
        ret = 0;
 
993
        assert = 0;
 
994
 
 
995
        /* check if another node has already become the owner */
 
996
        spin_lock(&res->spinlock);
 
997
        if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
 
998
                mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
 
999
                     res->lockname.len, res->lockname.name, res->owner);
 
1000
                spin_unlock(&res->spinlock);
 
1001
                /* this will cause the master to re-assert across
 
1002
                 * the whole cluster, freeing up mles */
 
1003
                if (res->owner != dlm->node_num) {
 
1004
                        ret = dlm_do_master_request(res, mle, res->owner);
 
1005
                        if (ret < 0) {
 
1006
                                /* give recovery a chance to run */
 
1007
                                mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
 
1008
                                msleep(500);
 
1009
                                goto recheck;
 
1010
                        }
 
1011
                }
 
1012
                ret = 0;
 
1013
                goto leave;
 
1014
        }
 
1015
        spin_unlock(&res->spinlock);
 
1016
 
 
1017
        spin_lock(&mle->spinlock);
 
1018
        m = mle->master;
 
1019
        map_changed = (memcmp(mle->vote_map, mle->node_map,
 
1020
                              sizeof(mle->vote_map)) != 0);
 
1021
        voting_done = (memcmp(mle->vote_map, mle->response_map,
 
1022
                             sizeof(mle->vote_map)) == 0);
 
1023
 
 
1024
        /* restart if we hit any errors */
 
1025
        if (map_changed) {
 
1026
                int b;
 
1027
                mlog(0, "%s: %.*s: node map changed, restarting\n",
 
1028
                     dlm->name, res->lockname.len, res->lockname.name);
 
1029
                ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
 
1030
                b = (mle->type == DLM_MLE_BLOCK);
 
1031
                if ((*blocked && !b) || (!*blocked && b)) {
 
1032
                        mlog(0, "%s:%.*s: status change: old=%d new=%d\n",
 
1033
                             dlm->name, res->lockname.len, res->lockname.name,
 
1034
                             *blocked, b);
 
1035
                        *blocked = b;
 
1036
                }
 
1037
                spin_unlock(&mle->spinlock);
 
1038
                if (ret < 0) {
 
1039
                        mlog_errno(ret);
 
1040
                        goto leave;
 
1041
                }
 
1042
                mlog(0, "%s:%.*s: restart lock mastery succeeded, "
 
1043
                     "rechecking now\n", dlm->name, res->lockname.len,
 
1044
                     res->lockname.name);
 
1045
                goto recheck;
 
1046
        } else {
 
1047
                if (!voting_done) {
 
1048
                        mlog(0, "map not changed and voting not done "
 
1049
                             "for %s:%.*s\n", dlm->name, res->lockname.len,
 
1050
                             res->lockname.name);
 
1051
                }
 
1052
        }
 
1053
 
 
1054
        if (m != O2NM_MAX_NODES) {
 
1055
                /* another node has done an assert!
 
1056
                 * all done! */
 
1057
                sleep = 0;
 
1058
        } else {
 
1059
                sleep = 1;
 
1060
                /* have all nodes responded? */
 
1061
                if (voting_done && !*blocked) {
 
1062
                        bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
 
1063
                        if (dlm->node_num <= bit) {
 
1064
                                /* my node number is lowest.
 
1065
                                 * now tell other nodes that I am
 
1066
                                 * mastering this. */
 
1067
                                mle->master = dlm->node_num;
 
1068
                                /* ref was grabbed in get_lock_resource
 
1069
                                 * will be dropped in dlmlock_master */
 
1070
                                assert = 1;
 
1071
                                sleep = 0;
 
1072
                        }
 
1073
                        /* if voting is done, but we have not received
 
1074
                         * an assert master yet, we must sleep */
 
1075
                }
 
1076
        }
 
1077
 
 
1078
        spin_unlock(&mle->spinlock);
 
1079
 
 
1080
        /* sleep if we haven't finished voting yet */
 
1081
        if (sleep) {
 
1082
                unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
 
1083
 
 
1084
                /*
 
1085
                if (atomic_read(&mle->mle_refs.refcount) < 2)
 
1086
                        mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,
 
1087
                        atomic_read(&mle->mle_refs.refcount),
 
1088
                        res->lockname.len, res->lockname.name);
 
1089
                */
 
1090
                atomic_set(&mle->woken, 0);
 
1091
                (void)wait_event_timeout(mle->wq,
 
1092
                                         (atomic_read(&mle->woken) == 1),
 
1093
                                         timeo);
 
1094
                if (res->owner == O2NM_MAX_NODES) {
 
1095
                        mlog(0, "%s:%.*s: waiting again\n", dlm->name,
 
1096
                             res->lockname.len, res->lockname.name);
 
1097
                        goto recheck;
 
1098
                }
 
1099
                mlog(0, "done waiting, master is %u\n", res->owner);
 
1100
                ret = 0;
 
1101
                goto leave;
 
1102
        }
 
1103
 
 
1104
        ret = 0;   /* done */
 
1105
        if (assert) {
 
1106
                m = dlm->node_num;
 
1107
                mlog(0, "about to master %.*s here, this=%u\n",
 
1108
                     res->lockname.len, res->lockname.name, m);
 
1109
                ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
 
1110
                if (ret) {
 
1111
                        /* This is a failure in the network path,
 
1112
                         * not in the response to the assert_master
 
1113
                         * (any nonzero response is a BUG on this node).
 
1114
                         * Most likely a socket just got disconnected
 
1115
                         * due to node death. */
 
1116
                        mlog_errno(ret);
 
1117
                }
 
1118
                /* no longer need to restart lock mastery.
 
1119
                 * all living nodes have been contacted. */
 
1120
                ret = 0;
 
1121
        }
 
1122
 
 
1123
        /* set the lockres owner */
 
1124
        spin_lock(&res->spinlock);
 
1125
        /* mastery reference obtained either during
 
1126
         * assert_master_handler or in get_lock_resource */
 
1127
        dlm_change_lockres_owner(dlm, res, m);
 
1128
        spin_unlock(&res->spinlock);
 
1129
 
 
1130
leave:
 
1131
        return ret;
 
1132
}
 
1133
 
 
1134
struct dlm_bitmap_diff_iter
 
1135
{
 
1136
        int curnode;
 
1137
        unsigned long *orig_bm;
 
1138
        unsigned long *cur_bm;
 
1139
        unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
 
1140
};
 
1141
 
 
1142
enum dlm_node_state_change
 
1143
{
 
1144
        NODE_DOWN = -1,
 
1145
        NODE_NO_CHANGE = 0,
 
1146
        NODE_UP
 
1147
};
 
1148
 
 
1149
static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
 
1150
                                      unsigned long *orig_bm,
 
1151
                                      unsigned long *cur_bm)
 
1152
{
 
1153
        unsigned long p1, p2;
 
1154
        int i;
 
1155
 
 
1156
        iter->curnode = -1;
 
1157
        iter->orig_bm = orig_bm;
 
1158
        iter->cur_bm = cur_bm;
 
1159
 
 
1160
        for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
 
1161
                p1 = *(iter->orig_bm + i);
 
1162
                p2 = *(iter->cur_bm + i);
 
1163
                iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
 
1164
        }
 
1165
}
 
1166
 
 
1167
static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
 
1168
                                     enum dlm_node_state_change *state)
 
1169
{
 
1170
        int bit;
 
1171
 
 
1172
        if (iter->curnode >= O2NM_MAX_NODES)
 
1173
                return -ENOENT;
 
1174
 
 
1175
        bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
 
1176
                            iter->curnode+1);
 
1177
        if (bit >= O2NM_MAX_NODES) {
 
1178
                iter->curnode = O2NM_MAX_NODES;
 
1179
                return -ENOENT;
 
1180
        }
 
1181
 
 
1182
        /* if it was there in the original then this node died */
 
1183
        if (test_bit(bit, iter->orig_bm))
 
1184
                *state = NODE_DOWN;
 
1185
        else
 
1186
                *state = NODE_UP;
 
1187
 
 
1188
        iter->curnode = bit;
 
1189
        return bit;
 
1190
}
 
1191
 
 
1192
 
 
1193
static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
 
1194
                                    struct dlm_lock_resource *res,
 
1195
                                    struct dlm_master_list_entry *mle,
 
1196
                                    int blocked)
 
1197
{
 
1198
        struct dlm_bitmap_diff_iter bdi;
 
1199
        enum dlm_node_state_change sc;
 
1200
        int node;
 
1201
        int ret = 0;
 
1202
 
 
1203
        mlog(0, "something happened such that the "
 
1204
             "master process may need to be restarted!\n");
 
1205
 
 
1206
        assert_spin_locked(&mle->spinlock);
 
1207
 
 
1208
        dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
 
1209
        node = dlm_bitmap_diff_iter_next(&bdi, &sc);
 
1210
        while (node >= 0) {
 
1211
                if (sc == NODE_UP) {
 
1212
                        /* a node came up.  clear any old vote from
 
1213
                         * the response map and set it in the vote map
 
1214
                         * then restart the mastery. */
 
1215
                        mlog(ML_NOTICE, "node %d up while restarting\n", node);
 
1216
 
 
1217
                        /* redo the master request, but only for the new node */
 
1218
                        mlog(0, "sending request to new node\n");
 
1219
                        clear_bit(node, mle->response_map);
 
1220
                        set_bit(node, mle->vote_map);
 
1221
                } else {
 
1222
                        mlog(ML_ERROR, "node down! %d\n", node);
 
1223
                        if (blocked) {
 
1224
                                int lowest = find_next_bit(mle->maybe_map,
 
1225
                                                       O2NM_MAX_NODES, 0);
 
1226
 
 
1227
                                /* act like it was never there */
 
1228
                                clear_bit(node, mle->maybe_map);
 
1229
 
 
1230
                                if (node == lowest) {
 
1231
                                        mlog(0, "expected master %u died"
 
1232
                                            " while this node was blocked "
 
1233
                                            "waiting on it!\n", node);
 
1234
                                        lowest = find_next_bit(mle->maybe_map,
 
1235
                                                        O2NM_MAX_NODES,
 
1236
                                                        lowest+1);
 
1237
                                        if (lowest < O2NM_MAX_NODES) {
 
1238
                                                mlog(0, "%s:%.*s:still "
 
1239
                                                     "blocked. waiting on %u "
 
1240
                                                     "now\n", dlm->name,
 
1241
                                                     res->lockname.len,
 
1242
                                                     res->lockname.name,
 
1243
                                                     lowest);
 
1244
                                        } else {
 
1245
                                                /* mle is an MLE_BLOCK, but
 
1246
                                                 * there is now nothing left to
 
1247
                                                 * block on.  we need to return
 
1248
                                                 * all the way back out and try
 
1249
                                                 * again with an MLE_MASTER.
 
1250
                                                 * dlm_do_local_recovery_cleanup
 
1251
                                                 * has already run, so the mle
 
1252
                                                 * refcount is ok */
 
1253
                                                mlog(0, "%s:%.*s: no "
 
1254
                                                     "longer blocking. try to "
 
1255
                                                     "master this here\n",
 
1256
                                                     dlm->name,
 
1257
                                                     res->lockname.len,
 
1258
                                                     res->lockname.name);
 
1259
                                                mle->type = DLM_MLE_MASTER;
 
1260
                                                mle->mleres = res;
 
1261
                                        }
 
1262
                                }
 
1263
                        }
 
1264
 
 
1265
                        /* now blank out everything, as if we had never
 
1266
                         * contacted anyone */
 
1267
                        memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
 
1268
                        memset(mle->response_map, 0, sizeof(mle->response_map));
 
1269
                        /* reset the vote_map to the current node_map */
 
1270
                        memcpy(mle->vote_map, mle->node_map,
 
1271
                               sizeof(mle->node_map));
 
1272
                        /* put myself into the maybe map */
 
1273
                        if (mle->type != DLM_MLE_BLOCK)
 
1274
                                set_bit(dlm->node_num, mle->maybe_map);
 
1275
                }
 
1276
                ret = -EAGAIN;
 
1277
                node = dlm_bitmap_diff_iter_next(&bdi, &sc);
 
1278
        }
 
1279
        return ret;
 
1280
}
 
1281
 
 
1282
 
 
1283
/*
 
1284
 * DLM_MASTER_REQUEST_MSG
 
1285
 *
 
1286
 * returns: 0 on success,
 
1287
 *          -errno on a network error
 
1288
 *
 
1289
 * on error, the caller should assume the target node is "dead"
 
1290
 *
 
1291
 */
 
1292
 
 
1293
static int dlm_do_master_request(struct dlm_lock_resource *res,
 
1294
                                 struct dlm_master_list_entry *mle, int to)
 
1295
{
 
1296
        struct dlm_ctxt *dlm = mle->dlm;
 
1297
        struct dlm_master_request request;
 
1298
        int ret, response=0, resend;
 
1299
 
 
1300
        memset(&request, 0, sizeof(request));
 
1301
        request.node_idx = dlm->node_num;
 
1302
 
 
1303
        BUG_ON(mle->type == DLM_MLE_MIGRATION);
 
1304
 
 
1305
        request.namelen = (u8)mle->mnamelen;
 
1306
        memcpy(request.name, mle->mname, request.namelen);
 
1307
 
 
1308
again:
 
1309
        ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
 
1310
                                 sizeof(request), to, &response);
 
1311
        if (ret < 0)  {
 
1312
                if (ret == -ESRCH) {
 
1313
                        /* should never happen */
 
1314
                        mlog(ML_ERROR, "TCP stack not ready!\n");
 
1315
                        BUG();
 
1316
                } else if (ret == -EINVAL) {
 
1317
                        mlog(ML_ERROR, "bad args passed to o2net!\n");
 
1318
                        BUG();
 
1319
                } else if (ret == -ENOMEM) {
 
1320
                        mlog(ML_ERROR, "out of memory while trying to send "
 
1321
                             "network message!  retrying\n");
 
1322
                        /* this is totally crude */
 
1323
                        msleep(50);
 
1324
                        goto again;
 
1325
                } else if (!dlm_is_host_down(ret)) {
 
1326
                        /* not a network error. bad. */
 
1327
                        mlog_errno(ret);
 
1328
                        mlog(ML_ERROR, "unhandled error!");
 
1329
                        BUG();
 
1330
                }
 
1331
                /* all other errors should be network errors,
 
1332
                 * and likely indicate node death */
 
1333
                mlog(ML_ERROR, "link to %d went down!\n", to);
 
1334
                goto out;
 
1335
        }
 
1336
 
 
1337
        ret = 0;
 
1338
        resend = 0;
 
1339
        spin_lock(&mle->spinlock);
 
1340
        switch (response) {
 
1341
                case DLM_MASTER_RESP_YES:
 
1342
                        set_bit(to, mle->response_map);
 
1343
                        mlog(0, "node %u is the master, response=YES\n", to);
 
1344
                        mlog(0, "%s:%.*s: master node %u now knows I have a "
 
1345
                             "reference\n", dlm->name, res->lockname.len,
 
1346
                             res->lockname.name, to);
 
1347
                        mle->master = to;
 
1348
                        break;
 
1349
                case DLM_MASTER_RESP_NO:
 
1350
                        mlog(0, "node %u not master, response=NO\n", to);
 
1351
                        set_bit(to, mle->response_map);
 
1352
                        break;
 
1353
                case DLM_MASTER_RESP_MAYBE:
 
1354
                        mlog(0, "node %u not master, response=MAYBE\n", to);
 
1355
                        set_bit(to, mle->response_map);
 
1356
                        set_bit(to, mle->maybe_map);
 
1357
                        break;
 
1358
                case DLM_MASTER_RESP_ERROR:
 
1359
                        mlog(0, "node %u hit an error, resending\n", to);
 
1360
                        resend = 1;
 
1361
                        response = 0;
 
1362
                        break;
 
1363
                default:
 
1364
                        mlog(ML_ERROR, "bad response! %u\n", response);
 
1365
                        BUG();
 
1366
        }
 
1367
        spin_unlock(&mle->spinlock);
 
1368
        if (resend) {
 
1369
                /* this is also totally crude */
 
1370
                msleep(50);
 
1371
                goto again;
 
1372
        }
 
1373
 
 
1374
out:
 
1375
        return ret;
 
1376
}
 
1377
 
 
1378
/*
 
1379
 * locks that can be taken here:
 
1380
 * dlm->spinlock
 
1381
 * res->spinlock
 
1382
 * mle->spinlock
 
1383
 * dlm->master_list
 
1384
 *
 
1385
 * if possible, TRIM THIS DOWN!!!
 
1386
 */
 
1387
int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
 
1388
                               void **ret_data)
 
1389
{
 
1390
        u8 response = DLM_MASTER_RESP_MAYBE;
 
1391
        struct dlm_ctxt *dlm = data;
 
1392
        struct dlm_lock_resource *res = NULL;
 
1393
        struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
 
1394
        struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
 
1395
        char *name;
 
1396
        unsigned int namelen, hash;
 
1397
        int found, ret;
 
1398
        int set_maybe;
 
1399
        int dispatch_assert = 0;
 
1400
 
 
1401
        if (!dlm_grab(dlm))
 
1402
                return DLM_MASTER_RESP_NO;
 
1403
 
 
1404
        if (!dlm_domain_fully_joined(dlm)) {
 
1405
                response = DLM_MASTER_RESP_NO;
 
1406
                goto send_response;
 
1407
        }
 
1408
 
 
1409
        name = request->name;
 
1410
        namelen = request->namelen;
 
1411
        hash = dlm_lockid_hash(name, namelen);
 
1412
 
 
1413
        if (namelen > DLM_LOCKID_NAME_MAX) {
 
1414
                response = DLM_IVBUFLEN;
 
1415
                goto send_response;
 
1416
        }
 
1417
 
 
1418
way_up_top:
 
1419
        spin_lock(&dlm->spinlock);
 
1420
        res = __dlm_lookup_lockres(dlm, name, namelen, hash);
 
1421
        if (res) {
 
1422
                spin_unlock(&dlm->spinlock);
 
1423
 
 
1424
                /* take care of the easy cases up front */
 
1425
                spin_lock(&res->spinlock);
 
1426
                if (res->state & (DLM_LOCK_RES_RECOVERING|
 
1427
                                  DLM_LOCK_RES_MIGRATING)) {
 
1428
                        spin_unlock(&res->spinlock);
 
1429
                        mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
 
1430
                             "being recovered/migrated\n");
 
1431
                        response = DLM_MASTER_RESP_ERROR;
 
1432
                        if (mle)
 
1433
                                kmem_cache_free(dlm_mle_cache, mle);
 
1434
                        goto send_response;
 
1435
                }
 
1436
 
 
1437
                if (res->owner == dlm->node_num) {
 
1438
                        mlog(0, "%s:%.*s: setting bit %u in refmap\n",
 
1439
                             dlm->name, namelen, name, request->node_idx);
 
1440
                        dlm_lockres_set_refmap_bit(request->node_idx, res);
 
1441
                        spin_unlock(&res->spinlock);
 
1442
                        response = DLM_MASTER_RESP_YES;
 
1443
                        if (mle)
 
1444
                                kmem_cache_free(dlm_mle_cache, mle);
 
1445
 
 
1446
                        /* this node is the owner.
 
1447
                         * there is some extra work that needs to
 
1448
                         * happen now.  the requesting node has
 
1449
                         * caused all nodes up to this one to
 
1450
                         * create mles.  this node now needs to
 
1451
                         * go back and clean those up. */
 
1452
                        dispatch_assert = 1;
 
1453
                        goto send_response;
 
1454
                } else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
 
1455
                        spin_unlock(&res->spinlock);
 
1456
                        // mlog(0, "node %u is the master\n", res->owner);
 
1457
                        response = DLM_MASTER_RESP_NO;
 
1458
                        if (mle)
 
1459
                                kmem_cache_free(dlm_mle_cache, mle);
 
1460
                        goto send_response;
 
1461
                }
 
1462
 
 
1463
                /* ok, there is no owner.  either this node is
 
1464
                 * being blocked, or it is actively trying to
 
1465
                 * master this lock. */
 
1466
                if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
 
1467
                        mlog(ML_ERROR, "lock with no owner should be "
 
1468
                             "in-progress!\n");
 
1469
                        BUG();
 
1470
                }
 
1471
 
 
1472
                // mlog(0, "lockres is in progress...\n");
 
1473
                spin_lock(&dlm->master_lock);
 
1474
                found = dlm_find_mle(dlm, &tmpmle, name, namelen);
 
1475
                if (!found) {
 
1476
                        mlog(ML_ERROR, "no mle found for this lock!\n");
 
1477
                        BUG();
 
1478
                }
 
1479
                set_maybe = 1;
 
1480
                spin_lock(&tmpmle->spinlock);
 
1481
                if (tmpmle->type == DLM_MLE_BLOCK) {
 
1482
                        // mlog(0, "this node is waiting for "
 
1483
                        // "lockres to be mastered\n");
 
1484
                        response = DLM_MASTER_RESP_NO;
 
1485
                } else if (tmpmle->type == DLM_MLE_MIGRATION) {
 
1486
                        mlog(0, "node %u is master, but trying to migrate to "
 
1487
                             "node %u.\n", tmpmle->master, tmpmle->new_master);
 
1488
                        if (tmpmle->master == dlm->node_num) {
 
1489
                                mlog(ML_ERROR, "no owner on lockres, but this "
 
1490
                                     "node is trying to migrate it to %u?!\n",
 
1491
                                     tmpmle->new_master);
 
1492
                                BUG();
 
1493
                        } else {
 
1494
                                /* the real master can respond on its own */
 
1495
                                response = DLM_MASTER_RESP_NO;
 
1496
                        }
 
1497
                } else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
 
1498
                        set_maybe = 0;
 
1499
                        if (tmpmle->master == dlm->node_num) {
 
1500
                                response = DLM_MASTER_RESP_YES;
 
1501
                                /* this node will be the owner.
 
1502
                                 * go back and clean the mles on any
 
1503
                                 * other nodes */
 
1504
                                dispatch_assert = 1;
 
1505
                                dlm_lockres_set_refmap_bit(request->node_idx, res);
 
1506
                                mlog(0, "%s:%.*s: setting bit %u in refmap\n",
 
1507
                                     dlm->name, namelen, name,
 
1508
                                     request->node_idx);
 
1509
                        } else
 
1510
                                response = DLM_MASTER_RESP_NO;
 
1511
                } else {
 
1512
                        // mlog(0, "this node is attempting to "
 
1513
                        // "master lockres\n");
 
1514
                        response = DLM_MASTER_RESP_MAYBE;
 
1515
                }
 
1516
                if (set_maybe)
 
1517
                        set_bit(request->node_idx, tmpmle->maybe_map);
 
1518
                spin_unlock(&tmpmle->spinlock);
 
1519
 
 
1520
                spin_unlock(&dlm->master_lock);
 
1521
                spin_unlock(&res->spinlock);
 
1522
 
 
1523
                /* keep the mle attached to heartbeat events */
 
1524
                dlm_put_mle(tmpmle);
 
1525
                if (mle)
 
1526
                        kmem_cache_free(dlm_mle_cache, mle);
 
1527
                goto send_response;
 
1528
        }
 
1529
 
 
1530
        /*
 
1531
         * lockres doesn't exist on this node
 
1532
         * if there is an MLE_BLOCK, return NO
 
1533
         * if there is an MLE_MASTER, return MAYBE
 
1534
         * otherwise, add an MLE_BLOCK, return NO
 
1535
         */
 
1536
        spin_lock(&dlm->master_lock);
 
1537
        found = dlm_find_mle(dlm, &tmpmle, name, namelen);
 
1538
        if (!found) {
 
1539
                /* this lockid has never been seen on this node yet */
 
1540
                // mlog(0, "no mle found\n");
 
1541
                if (!mle) {
 
1542
                        spin_unlock(&dlm->master_lock);
 
1543
                        spin_unlock(&dlm->spinlock);
 
1544
 
 
1545
                        mle = (struct dlm_master_list_entry *)
 
1546
                                kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
 
1547
                        if (!mle) {
 
1548
                                response = DLM_MASTER_RESP_ERROR;
 
1549
                                mlog_errno(-ENOMEM);
 
1550
                                goto send_response;
 
1551
                        }
 
1552
                        goto way_up_top;
 
1553
                }
 
1554
 
 
1555
                // mlog(0, "this is second time thru, already allocated, "
 
1556
                // "add the block.\n");
 
1557
                dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
 
1558
                set_bit(request->node_idx, mle->maybe_map);
 
1559
                __dlm_insert_mle(dlm, mle);
 
1560
                response = DLM_MASTER_RESP_NO;
 
1561
        } else {
 
1562
                // mlog(0, "mle was found\n");
 
1563
                set_maybe = 1;
 
1564
                spin_lock(&tmpmle->spinlock);
 
1565
                if (tmpmle->master == dlm->node_num) {
 
1566
                        mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
 
1567
                        BUG();
 
1568
                }
 
1569
                if (tmpmle->type == DLM_MLE_BLOCK)
 
1570
                        response = DLM_MASTER_RESP_NO;
 
1571
                else if (tmpmle->type == DLM_MLE_MIGRATION) {
 
1572
                        mlog(0, "migration mle was found (%u->%u)\n",
 
1573
                             tmpmle->master, tmpmle->new_master);
 
1574
                        /* real master can respond on its own */
 
1575
                        response = DLM_MASTER_RESP_NO;
 
1576
                } else
 
1577
                        response = DLM_MASTER_RESP_MAYBE;
 
1578
                if (set_maybe)
 
1579
                        set_bit(request->node_idx, tmpmle->maybe_map);
 
1580
                spin_unlock(&tmpmle->spinlock);
 
1581
        }
 
1582
        spin_unlock(&dlm->master_lock);
 
1583
        spin_unlock(&dlm->spinlock);
 
1584
 
 
1585
        if (found) {
 
1586
                /* keep the mle attached to heartbeat events */
 
1587
                dlm_put_mle(tmpmle);
 
1588
        }
 
1589
send_response:
 
1590
        /*
 
1591
         * __dlm_lookup_lockres() grabbed a reference to this lockres.
 
1592
         * The reference is released by dlm_assert_master_worker() under
 
1593
         * the call to dlm_dispatch_assert_master().  If
 
1594
         * dlm_assert_master_worker() isn't called, we drop it here.
 
1595
         */
 
1596
        if (dispatch_assert) {
 
1597
                if (response != DLM_MASTER_RESP_YES)
 
1598
                        mlog(ML_ERROR, "invalid response %d\n", response);
 
1599
                if (!res) {
 
1600
                        mlog(ML_ERROR, "bad lockres while trying to assert!\n");
 
1601
                        BUG();
 
1602
                }
 
1603
                mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
 
1604
                             dlm->node_num, res->lockname.len, res->lockname.name);
 
1605
                ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
 
1606
                                                 DLM_ASSERT_MASTER_MLE_CLEANUP);
 
1607
                if (ret < 0) {
 
1608
                        mlog(ML_ERROR, "failed to dispatch assert master work\n");
 
1609
                        response = DLM_MASTER_RESP_ERROR;
 
1610
                        dlm_lockres_put(res);
 
1611
                }
 
1612
        } else {
 
1613
                if (res)
 
1614
                        dlm_lockres_put(res);
 
1615
        }
 
1616
 
 
1617
        dlm_put(dlm);
 
1618
        return response;
 
1619
}
 
1620
 
 
1621
/*
 
1622
 * DLM_ASSERT_MASTER_MSG
 
1623
 */
 
1624
 
 
1625
 
 
1626
/*
 
1627
 * NOTE: this can be used for debugging
 
1628
 * can periodically run all locks owned by this node
 
1629
 * and re-assert across the cluster...
 
1630
 */
 
1631
static int dlm_do_assert_master(struct dlm_ctxt *dlm,
 
1632
                                struct dlm_lock_resource *res,
 
1633
                                void *nodemap, u32 flags)
 
1634
{
 
1635
        struct dlm_assert_master assert;
 
1636
        int to, tmpret;
 
1637
        struct dlm_node_iter iter;
 
1638
        int ret = 0;
 
1639
        int reassert;
 
1640
        const char *lockname = res->lockname.name;
 
1641
        unsigned int namelen = res->lockname.len;
 
1642
 
 
1643
        BUG_ON(namelen > O2NM_MAX_NAME_LEN);
 
1644
 
 
1645
        spin_lock(&res->spinlock);
 
1646
        res->state |= DLM_LOCK_RES_SETREF_INPROG;
 
1647
        spin_unlock(&res->spinlock);
 
1648
 
 
1649
again:
 
1650
        reassert = 0;
 
1651
 
 
1652
        /* note that if this nodemap is empty, it returns 0 */
 
1653
        dlm_node_iter_init(nodemap, &iter);
 
1654
        while ((to = dlm_node_iter_next(&iter)) >= 0) {
 
1655
                int r = 0;
 
1656
                struct dlm_master_list_entry *mle = NULL;
 
1657
 
 
1658
                mlog(0, "sending assert master to %d (%.*s)\n", to,
 
1659
                     namelen, lockname);
 
1660
                memset(&assert, 0, sizeof(assert));
 
1661
                assert.node_idx = dlm->node_num;
 
1662
                assert.namelen = namelen;
 
1663
                memcpy(assert.name, lockname, namelen);
 
1664
                assert.flags = cpu_to_be32(flags);
 
1665
 
 
1666
                tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
 
1667
                                            &assert, sizeof(assert), to, &r);
 
1668
                if (tmpret < 0) {
 
1669
                        mlog(0, "assert_master returned %d!\n", tmpret);
 
1670
                        if (!dlm_is_host_down(tmpret)) {
 
1671
                                mlog(ML_ERROR, "unhandled error=%d!\n", tmpret);
 
1672
                                BUG();
 
1673
                        }
 
1674
                        /* a node died.  finish out the rest of the nodes. */
 
1675
                        mlog(0, "link to %d went down!\n", to);
 
1676
                        /* any nonzero status return will do */
 
1677
                        ret = tmpret;
 
1678
                        r = 0;
 
1679
                } else if (r < 0) {
 
1680
                        /* ok, something horribly messed.  kill thyself. */
 
1681
                        mlog(ML_ERROR,"during assert master of %.*s to %u, "
 
1682
                             "got %d.\n", namelen, lockname, to, r);
 
1683
                        spin_lock(&dlm->spinlock);
 
1684
                        spin_lock(&dlm->master_lock);
 
1685
                        if (dlm_find_mle(dlm, &mle, (char *)lockname,
 
1686
                                         namelen)) {
 
1687
                                dlm_print_one_mle(mle);
 
1688
                                __dlm_put_mle(mle);
 
1689
                        }
 
1690
                        spin_unlock(&dlm->master_lock);
 
1691
                        spin_unlock(&dlm->spinlock);
 
1692
                        BUG();
 
1693
                }
 
1694
 
 
1695
                if (r & DLM_ASSERT_RESPONSE_REASSERT &&
 
1696
                    !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
 
1697
                                mlog(ML_ERROR, "%.*s: very strange, "
 
1698
                                     "master MLE but no lockres on %u\n",
 
1699
                                     namelen, lockname, to);
 
1700
                }
 
1701
 
 
1702
                if (r & DLM_ASSERT_RESPONSE_REASSERT) {
 
1703
                        mlog(0, "%.*s: node %u create mles on other "
 
1704
                             "nodes and requests a re-assert\n",
 
1705
                             namelen, lockname, to);
 
1706
                        reassert = 1;
 
1707
                }
 
1708
                if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
 
1709
                        mlog(0, "%.*s: node %u has a reference to this "
 
1710
                             "lockres, set the bit in the refmap\n",
 
1711
                             namelen, lockname, to);
 
1712
                        spin_lock(&res->spinlock);
 
1713
                        dlm_lockres_set_refmap_bit(to, res);
 
1714
                        spin_unlock(&res->spinlock);
 
1715
                }
 
1716
        }
 
1717
 
 
1718
        if (reassert)
 
1719
                goto again;
 
1720
 
 
1721
        spin_lock(&res->spinlock);
 
1722
        res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
 
1723
        spin_unlock(&res->spinlock);
 
1724
        wake_up(&res->wq);
 
1725
 
 
1726
        return ret;
 
1727
}
 
1728
 
 
1729
/*
 
1730
 * locks that can be taken here:
 
1731
 * dlm->spinlock
 
1732
 * res->spinlock
 
1733
 * mle->spinlock
 
1734
 * dlm->master_list
 
1735
 *
 
1736
 * if possible, TRIM THIS DOWN!!!
 
1737
 */
 
1738
int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
 
1739
                              void **ret_data)
 
1740
{
 
1741
        struct dlm_ctxt *dlm = data;
 
1742
        struct dlm_master_list_entry *mle = NULL;
 
1743
        struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
 
1744
        struct dlm_lock_resource *res = NULL;
 
1745
        char *name;
 
1746
        unsigned int namelen, hash;
 
1747
        u32 flags;
 
1748
        int master_request = 0, have_lockres_ref = 0;
 
1749
        int ret = 0;
 
1750
 
 
1751
        if (!dlm_grab(dlm))
 
1752
                return 0;
 
1753
 
 
1754
        name = assert->name;
 
1755
        namelen = assert->namelen;
 
1756
        hash = dlm_lockid_hash(name, namelen);
 
1757
        flags = be32_to_cpu(assert->flags);
 
1758
 
 
1759
        if (namelen > DLM_LOCKID_NAME_MAX) {
 
1760
                mlog(ML_ERROR, "Invalid name length!");
 
1761
                goto done;
 
1762
        }
 
1763
 
 
1764
        spin_lock(&dlm->spinlock);
 
1765
 
 
1766
        if (flags)
 
1767
                mlog(0, "assert_master with flags: %u\n", flags);
 
1768
 
 
1769
        /* find the MLE */
 
1770
        spin_lock(&dlm->master_lock);
 
1771
        if (!dlm_find_mle(dlm, &mle, name, namelen)) {
 
1772
                /* not an error, could be master just re-asserting */
 
1773
                mlog(0, "just got an assert_master from %u, but no "
 
1774
                     "MLE for it! (%.*s)\n", assert->node_idx,
 
1775
                     namelen, name);
 
1776
        } else {
 
1777
                int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);
 
1778
                if (bit >= O2NM_MAX_NODES) {
 
1779
                        /* not necessarily an error, though less likely.
 
1780
                         * could be master just re-asserting. */
 
1781
                        mlog(0, "no bits set in the maybe_map, but %u "
 
1782
                             "is asserting! (%.*s)\n", assert->node_idx,
 
1783
                             namelen, name);
 
1784
                } else if (bit != assert->node_idx) {
 
1785
                        if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
 
1786
                                mlog(0, "master %u was found, %u should "
 
1787
                                     "back off\n", assert->node_idx, bit);
 
1788
                        } else {
 
1789
                                /* with the fix for bug 569, a higher node
 
1790
                                 * number winning the mastery will respond
 
1791
                                 * YES to mastery requests, but this node
 
1792
                                 * had no way of knowing.  let it pass. */
 
1793
                                mlog(0, "%u is the lowest node, "
 
1794
                                     "%u is asserting. (%.*s)  %u must "
 
1795
                                     "have begun after %u won.\n", bit,
 
1796
                                     assert->node_idx, namelen, name, bit,
 
1797
                                     assert->node_idx);
 
1798
                        }
 
1799
                }
 
1800
                if (mle->type == DLM_MLE_MIGRATION) {
 
1801
                        if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
 
1802
                                mlog(0, "%s:%.*s: got cleanup assert"
 
1803
                                     " from %u for migration\n",
 
1804
                                     dlm->name, namelen, name,
 
1805
                                     assert->node_idx);
 
1806
                        } else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
 
1807
                                mlog(0, "%s:%.*s: got unrelated assert"
 
1808
                                     " from %u for migration, ignoring\n",
 
1809
                                     dlm->name, namelen, name,
 
1810
                                     assert->node_idx);
 
1811
                                __dlm_put_mle(mle);
 
1812
                                spin_unlock(&dlm->master_lock);
 
1813
                                spin_unlock(&dlm->spinlock);
 
1814
                                goto done;
 
1815
                        }
 
1816
                }
 
1817
        }
 
1818
        spin_unlock(&dlm->master_lock);
 
1819
 
 
1820
        /* ok everything checks out with the MLE
 
1821
         * now check to see if there is a lockres */
 
1822
        res = __dlm_lookup_lockres(dlm, name, namelen, hash);
 
1823
        if (res) {
 
1824
                spin_lock(&res->spinlock);
 
1825
                if (res->state & DLM_LOCK_RES_RECOVERING)  {
 
1826
                        mlog(ML_ERROR, "%u asserting but %.*s is "
 
1827
                             "RECOVERING!\n", assert->node_idx, namelen, name);
 
1828
                        goto kill;
 
1829
                }
 
1830
                if (!mle) {
 
1831
                        if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
 
1832
                            res->owner != assert->node_idx) {
 
1833
                                mlog(ML_ERROR, "DIE! Mastery assert from %u, "
 
1834
                                     "but current owner is %u! (%.*s)\n",
 
1835
                                     assert->node_idx, res->owner, namelen,
 
1836
                                     name);
 
1837
                                __dlm_print_one_lock_resource(res);
 
1838
                                BUG();
 
1839
                        }
 
1840
                } else if (mle->type != DLM_MLE_MIGRATION) {
 
1841
                        if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
 
1842
                                /* owner is just re-asserting */
 
1843
                                if (res->owner == assert->node_idx) {
 
1844
                                        mlog(0, "owner %u re-asserting on "
 
1845
                                             "lock %.*s\n", assert->node_idx,
 
1846
                                             namelen, name);
 
1847
                                        goto ok;
 
1848
                                }
 
1849
                                mlog(ML_ERROR, "got assert_master from "
 
1850
                                     "node %u, but %u is the owner! "
 
1851
                                     "(%.*s)\n", assert->node_idx,
 
1852
                                     res->owner, namelen, name);
 
1853
                                goto kill;
 
1854
                        }
 
1855
                        if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
 
1856
                                mlog(ML_ERROR, "got assert from %u, but lock "
 
1857
                                     "with no owner should be "
 
1858
                                     "in-progress! (%.*s)\n",
 
1859
                                     assert->node_idx,
 
1860
                                     namelen, name);
 
1861
                                goto kill;
 
1862
                        }
 
1863
                } else /* mle->type == DLM_MLE_MIGRATION */ {
 
1864
                        /* should only be getting an assert from new master */
 
1865
                        if (assert->node_idx != mle->new_master) {
 
1866
                                mlog(ML_ERROR, "got assert from %u, but "
 
1867
                                     "new master is %u, and old master "
 
1868
                                     "was %u (%.*s)\n",
 
1869
                                     assert->node_idx, mle->new_master,
 
1870
                                     mle->master, namelen, name);
 
1871
                                goto kill;
 
1872
                        }
 
1873
 
 
1874
                }
 
1875
ok:
 
1876
                spin_unlock(&res->spinlock);
 
1877
        }
 
1878
        spin_unlock(&dlm->spinlock);
 
1879
 
 
1880
        // mlog(0, "woo!  got an assert_master from node %u!\n",
 
1881
        //           assert->node_idx);
 
1882
        if (mle) {
 
1883
                int extra_ref = 0;
 
1884
                int nn = -1;
 
1885
                int rr, err = 0;
 
1886
 
 
1887
                spin_lock(&mle->spinlock);
 
1888
                if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
 
1889
                        extra_ref = 1;
 
1890
                else {
 
1891
                        /* MASTER mle: if any bits set in the response map
 
1892
                         * then the calling node needs to re-assert to clear
 
1893
                         * up nodes that this node contacted */
 
1894
                        while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
 
1895
                                                    nn+1)) < O2NM_MAX_NODES) {
 
1896
                                if (nn != dlm->node_num && nn != assert->node_idx)
 
1897
                                        master_request = 1;
 
1898
                        }
 
1899
                }
 
1900
                mle->master = assert->node_idx;
 
1901
                atomic_set(&mle->woken, 1);
 
1902
                wake_up(&mle->wq);
 
1903
                spin_unlock(&mle->spinlock);
 
1904
 
 
1905
                if (res) {
 
1906
                        int wake = 0;
 
1907
                        spin_lock(&res->spinlock);
 
1908
                        if (mle->type == DLM_MLE_MIGRATION) {
 
1909
                                mlog(0, "finishing off migration of lockres %.*s, "
 
1910
                                        "from %u to %u\n",
 
1911
                                        res->lockname.len, res->lockname.name,
 
1912
                                        dlm->node_num, mle->new_master);
 
1913
                                res->state &= ~DLM_LOCK_RES_MIGRATING;
 
1914
                                wake = 1;
 
1915
                                dlm_change_lockres_owner(dlm, res, mle->new_master);
 
1916
                                BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
 
1917
                        } else {
 
1918
                                dlm_change_lockres_owner(dlm, res, mle->master);
 
1919
                        }
 
1920
                        spin_unlock(&res->spinlock);
 
1921
                        have_lockres_ref = 1;
 
1922
                        if (wake)
 
1923
                                wake_up(&res->wq);
 
1924
                }
 
1925
 
 
1926
                /* master is known, detach if not already detached.
 
1927
                 * ensures that only one assert_master call will happen
 
1928
                 * on this mle. */
 
1929
                spin_lock(&dlm->spinlock);
 
1930
                spin_lock(&dlm->master_lock);
 
1931
 
 
1932
                rr = atomic_read(&mle->mle_refs.refcount);
 
1933
                if (mle->inuse > 0) {
 
1934
                        if (extra_ref && rr < 3)
 
1935
                                err = 1;
 
1936
                        else if (!extra_ref && rr < 2)
 
1937
                                err = 1;
 
1938
                } else {
 
1939
                        if (extra_ref && rr < 2)
 
1940
                                err = 1;
 
1941
                        else if (!extra_ref && rr < 1)
 
1942
                                err = 1;
 
1943
                }
 
1944
                if (err) {
 
1945
                        mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
 
1946
                             "that will mess up this node, refs=%d, extra=%d, "
 
1947
                             "inuse=%d\n", dlm->name, namelen, name,
 
1948
                             assert->node_idx, rr, extra_ref, mle->inuse);
 
1949
                        dlm_print_one_mle(mle);
 
1950
                }
 
1951
                __dlm_unlink_mle(dlm, mle);
 
1952
                __dlm_mle_detach_hb_events(dlm, mle);
 
1953
                __dlm_put_mle(mle);
 
1954
                if (extra_ref) {
 
1955
                        /* the assert master message now balances the extra
 
1956
                         * ref given by the master / migration request message.
 
1957
                         * if this is the last put, it will be removed
 
1958
                         * from the list. */
 
1959
                        __dlm_put_mle(mle);
 
1960
                }
 
1961
                spin_unlock(&dlm->master_lock);
 
1962
                spin_unlock(&dlm->spinlock);
 
1963
        } else if (res) {
 
1964
                if (res->owner != assert->node_idx) {
 
1965
                        mlog(0, "assert_master from %u, but current "
 
1966
                             "owner is %u (%.*s), no mle\n", assert->node_idx,
 
1967
                             res->owner, namelen, name);
 
1968
                }
 
1969
        }
 
1970
 
 
1971
done:
 
1972
        ret = 0;
 
1973
        if (res) {
 
1974
                spin_lock(&res->spinlock);
 
1975
                res->state |= DLM_LOCK_RES_SETREF_INPROG;
 
1976
                spin_unlock(&res->spinlock);
 
1977
                *ret_data = (void *)res;
 
1978
        }
 
1979
        dlm_put(dlm);
 
1980
        if (master_request) {
 
1981
                mlog(0, "need to tell master to reassert\n");
 
1982
                /* positive. negative would shoot down the node. */
 
1983
                ret |= DLM_ASSERT_RESPONSE_REASSERT;
 
1984
                if (!have_lockres_ref) {
 
1985
                        mlog(ML_ERROR, "strange, got assert from %u, MASTER "
 
1986
                             "mle present here for %s:%.*s, but no lockres!\n",
 
1987
                             assert->node_idx, dlm->name, namelen, name);
 
1988
                }
 
1989
        }
 
1990
        if (have_lockres_ref) {
 
1991
                /* let the master know we have a reference to the lockres */
 
1992
                ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
 
1993
                mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
 
1994
                     dlm->name, namelen, name, assert->node_idx);
 
1995
        }
 
1996
        return ret;
 
1997
 
 
1998
kill:
 
1999
        /* kill the caller! */
 
2000
        mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
 
2001
             "and killing the other node now!  This node is OK and can continue.\n");
 
2002
        __dlm_print_one_lock_resource(res);
 
2003
        spin_unlock(&res->spinlock);
 
2004
        spin_unlock(&dlm->spinlock);
 
2005
        *ret_data = (void *)res;
 
2006
        dlm_put(dlm);
 
2007
        return -EINVAL;
 
2008
}
 
2009
 
 
2010
void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
 
2011
{
 
2012
        struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
 
2013
 
 
2014
        if (ret_data) {
 
2015
                spin_lock(&res->spinlock);
 
2016
                res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
 
2017
                spin_unlock(&res->spinlock);
 
2018
                wake_up(&res->wq);
 
2019
                dlm_lockres_put(res);
 
2020
        }
 
2021
        return;
 
2022
}
 
2023
 
 
2024
int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
 
2025
                               struct dlm_lock_resource *res,
 
2026
                               int ignore_higher, u8 request_from, u32 flags)
 
2027
{
 
2028
        struct dlm_work_item *item;
 
2029
        item = kzalloc(sizeof(*item), GFP_NOFS);
 
2030
        if (!item)
 
2031
                return -ENOMEM;
 
2032
 
 
2033
 
 
2034
        /* queue up work for dlm_assert_master_worker */
 
2035
        dlm_grab(dlm);  /* get an extra ref for the work item */
 
2036
        dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
 
2037
        item->u.am.lockres = res; /* already have a ref */
 
2038
        /* can optionally ignore node numbers higher than this node */
 
2039
        item->u.am.ignore_higher = ignore_higher;
 
2040
        item->u.am.request_from = request_from;
 
2041
        item->u.am.flags = flags;
 
2042
 
 
2043
        if (ignore_higher)
 
2044
                mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len,
 
2045
                     res->lockname.name);
 
2046
 
 
2047
        spin_lock(&dlm->work_lock);
 
2048
        list_add_tail(&item->list, &dlm->work_list);
 
2049
        spin_unlock(&dlm->work_lock);
 
2050
 
 
2051
        queue_work(dlm->dlm_worker, &dlm->dispatched_work);
 
2052
        return 0;
 
2053
}
 
2054
 
 
2055
static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
 
2056
{
 
2057
        struct dlm_ctxt *dlm = data;
 
2058
        int ret = 0;
 
2059
        struct dlm_lock_resource *res;
 
2060
        unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
 
2061
        int ignore_higher;
 
2062
        int bit;
 
2063
        u8 request_from;
 
2064
        u32 flags;
 
2065
 
 
2066
        dlm = item->dlm;
 
2067
        res = item->u.am.lockres;
 
2068
        ignore_higher = item->u.am.ignore_higher;
 
2069
        request_from = item->u.am.request_from;
 
2070
        flags = item->u.am.flags;
 
2071
 
 
2072
        spin_lock(&dlm->spinlock);
 
2073
        memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
 
2074
        spin_unlock(&dlm->spinlock);
 
2075
 
 
2076
        clear_bit(dlm->node_num, nodemap);
 
2077
        if (ignore_higher) {
 
2078
                /* if is this just to clear up mles for nodes below
 
2079
                 * this node, do not send the message to the original
 
2080
                 * caller or any node number higher than this */
 
2081
                clear_bit(request_from, nodemap);
 
2082
                bit = dlm->node_num;
 
2083
                while (1) {
 
2084
                        bit = find_next_bit(nodemap, O2NM_MAX_NODES,
 
2085
                                            bit+1);
 
2086
                        if (bit >= O2NM_MAX_NODES)
 
2087
                                break;
 
2088
                        clear_bit(bit, nodemap);
 
2089
                }
 
2090
        }
 
2091
 
 
2092
        /*
 
2093
         * If we're migrating this lock to someone else, we are no
 
2094
         * longer allowed to assert out own mastery.  OTOH, we need to
 
2095
         * prevent migration from starting while we're still asserting
 
2096
         * our dominance.  The reserved ast delays migration.
 
2097
         */
 
2098
        spin_lock(&res->spinlock);
 
2099
        if (res->state & DLM_LOCK_RES_MIGRATING) {
 
2100
                mlog(0, "Someone asked us to assert mastery, but we're "
 
2101
                     "in the middle of migration.  Skipping assert, "
 
2102
                     "the new master will handle that.\n");
 
2103
                spin_unlock(&res->spinlock);
 
2104
                goto put;
 
2105
        } else
 
2106
                __dlm_lockres_reserve_ast(res);
 
2107
        spin_unlock(&res->spinlock);
 
2108
 
 
2109
        /* this call now finishes out the nodemap
 
2110
         * even if one or more nodes die */
 
2111
        mlog(0, "worker about to master %.*s here, this=%u\n",
 
2112
                     res->lockname.len, res->lockname.name, dlm->node_num);
 
2113
        ret = dlm_do_assert_master(dlm, res, nodemap, flags);
 
2114
        if (ret < 0) {
 
2115
                /* no need to restart, we are done */
 
2116
                if (!dlm_is_host_down(ret))
 
2117
                        mlog_errno(ret);
 
2118
        }
 
2119
 
 
2120
        /* Ok, we've asserted ourselves.  Let's let migration start. */
 
2121
        dlm_lockres_release_ast(dlm, res);
 
2122
 
 
2123
put:
 
2124
        dlm_lockres_put(res);
 
2125
 
 
2126
        mlog(0, "finished with dlm_assert_master_worker\n");
 
2127
}
 
2128
 
 
2129
/* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
 
2130
 * We cannot wait for node recovery to complete to begin mastering this
 
2131
 * lockres because this lockres is used to kick off recovery! ;-)
 
2132
 * So, do a pre-check on all living nodes to see if any of those nodes
 
2133
 * think that $RECOVERY is currently mastered by a dead node.  If so,
 
2134
 * we wait a short time to allow that node to get notified by its own
 
2135
 * heartbeat stack, then check again.  All $RECOVERY lock resources
 
2136
 * mastered by dead nodes are purged when the hearbeat callback is
 
2137
 * fired, so we can know for sure that it is safe to continue once
 
2138
 * the node returns a live node or no node.  */
 
2139
static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
 
2140
                                       struct dlm_lock_resource *res)
 
2141
{
 
2142
        struct dlm_node_iter iter;
 
2143
        int nodenum;
 
2144
        int ret = 0;
 
2145
        u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
 
2146
 
 
2147
        spin_lock(&dlm->spinlock);
 
2148
        dlm_node_iter_init(dlm->domain_map, &iter);
 
2149
        spin_unlock(&dlm->spinlock);
 
2150
 
 
2151
        while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
 
2152
                /* do not send to self */
 
2153
                if (nodenum == dlm->node_num)
 
2154
                        continue;
 
2155
                ret = dlm_do_master_requery(dlm, res, nodenum, &master);
 
2156
                if (ret < 0) {
 
2157
                        mlog_errno(ret);
 
2158
                        if (!dlm_is_host_down(ret))
 
2159
                                BUG();
 
2160
                        /* host is down, so answer for that node would be
 
2161
                         * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
 
2162
                        ret = 0;
 
2163
                }
 
2164
 
 
2165
                if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
 
2166
                        /* check to see if this master is in the recovery map */
 
2167
                        spin_lock(&dlm->spinlock);
 
2168
                        if (test_bit(master, dlm->recovery_map)) {
 
2169
                                mlog(ML_NOTICE, "%s: node %u has not seen "
 
2170
                                     "node %u go down yet, and thinks the "
 
2171
                                     "dead node is mastering the recovery "
 
2172
                                     "lock.  must wait.\n", dlm->name,
 
2173
                                     nodenum, master);
 
2174
                                ret = -EAGAIN;
 
2175
                        }
 
2176
                        spin_unlock(&dlm->spinlock);
 
2177
                        mlog(0, "%s: reco lock master is %u\n", dlm->name,
 
2178
                             master);
 
2179
                        break;
 
2180
                }
 
2181
        }
 
2182
        return ret;
 
2183
}
 
2184
 
 
2185
/*
 
2186
 * DLM_DEREF_LOCKRES_MSG
 
2187
 */
 
2188
 
 
2189
int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
 
2190
{
 
2191
        struct dlm_deref_lockres deref;
 
2192
        int ret = 0, r;
 
2193
        const char *lockname;
 
2194
        unsigned int namelen;
 
2195
 
 
2196
        lockname = res->lockname.name;
 
2197
        namelen = res->lockname.len;
 
2198
        BUG_ON(namelen > O2NM_MAX_NAME_LEN);
 
2199
 
 
2200
        mlog(0, "%s:%.*s: sending deref to %d\n",
 
2201
             dlm->name, namelen, lockname, res->owner);
 
2202
        memset(&deref, 0, sizeof(deref));
 
2203
        deref.node_idx = dlm->node_num;
 
2204
        deref.namelen = namelen;
 
2205
        memcpy(deref.name, lockname, namelen);
 
2206
 
 
2207
        ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
 
2208
                                 &deref, sizeof(deref), res->owner, &r);
 
2209
        if (ret < 0)
 
2210
                mlog_errno(ret);
 
2211
        else if (r < 0) {
 
2212
                /* BAD.  other node says I did not have a ref. */
 
2213
                mlog(ML_ERROR,"while dropping ref on %s:%.*s "
 
2214
                    "(master=%u) got %d.\n", dlm->name, namelen,
 
2215
                    lockname, res->owner, r);
 
2216
                dlm_print_one_lock_resource(res);
 
2217
                BUG();
 
2218
        }
 
2219
        return ret;
 
2220
}
 
2221
 
 
2222
int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
 
2223
                              void **ret_data)
 
2224
{
 
2225
        struct dlm_ctxt *dlm = data;
 
2226
        struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
 
2227
        struct dlm_lock_resource *res = NULL;
 
2228
        char *name;
 
2229
        unsigned int namelen;
 
2230
        int ret = -EINVAL;
 
2231
        u8 node;
 
2232
        unsigned int hash;
 
2233
        struct dlm_work_item *item;
 
2234
        int cleared = 0;
 
2235
        int dispatch = 0;
 
2236
 
 
2237
        if (!dlm_grab(dlm))
 
2238
                return 0;
 
2239
 
 
2240
        name = deref->name;
 
2241
        namelen = deref->namelen;
 
2242
        node = deref->node_idx;
 
2243
 
 
2244
        if (namelen > DLM_LOCKID_NAME_MAX) {
 
2245
                mlog(ML_ERROR, "Invalid name length!");
 
2246
                goto done;
 
2247
        }
 
2248
        if (deref->node_idx >= O2NM_MAX_NODES) {
 
2249
                mlog(ML_ERROR, "Invalid node number: %u\n", node);
 
2250
                goto done;
 
2251
        }
 
2252
 
 
2253
        hash = dlm_lockid_hash(name, namelen);
 
2254
 
 
2255
        spin_lock(&dlm->spinlock);
 
2256
        res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
 
2257
        if (!res) {
 
2258
                spin_unlock(&dlm->spinlock);
 
2259
                mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
 
2260
                     dlm->name, namelen, name);
 
2261
                goto done;
 
2262
        }
 
2263
        spin_unlock(&dlm->spinlock);
 
2264
 
 
2265
        spin_lock(&res->spinlock);
 
2266
        if (res->state & DLM_LOCK_RES_SETREF_INPROG)
 
2267
                dispatch = 1;
 
2268
        else {
 
2269
                BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
 
2270
                if (test_bit(node, res->refmap)) {
 
2271
                        dlm_lockres_clear_refmap_bit(node, res);
 
2272
                        cleared = 1;
 
2273
                }
 
2274
        }
 
2275
        spin_unlock(&res->spinlock);
 
2276
 
 
2277
        if (!dispatch) {
 
2278
                if (cleared)
 
2279
                        dlm_lockres_calc_usage(dlm, res);
 
2280
                else {
 
2281
                        mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
 
2282
                        "but it is already dropped!\n", dlm->name,
 
2283
                        res->lockname.len, res->lockname.name, node);
 
2284
                        dlm_print_one_lock_resource(res);
 
2285
                }
 
2286
                ret = 0;
 
2287
                goto done;
 
2288
        }
 
2289
 
 
2290
        item = kzalloc(sizeof(*item), GFP_NOFS);
 
2291
        if (!item) {
 
2292
                ret = -ENOMEM;
 
2293
                mlog_errno(ret);
 
2294
                goto done;
 
2295
        }
 
2296
 
 
2297
        dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
 
2298
        item->u.dl.deref_res = res;
 
2299
        item->u.dl.deref_node = node;
 
2300
 
 
2301
        spin_lock(&dlm->work_lock);
 
2302
        list_add_tail(&item->list, &dlm->work_list);
 
2303
        spin_unlock(&dlm->work_lock);
 
2304
 
 
2305
        queue_work(dlm->dlm_worker, &dlm->dispatched_work);
 
2306
        return 0;
 
2307
 
 
2308
done:
 
2309
        if (res)
 
2310
                dlm_lockres_put(res);
 
2311
        dlm_put(dlm);
 
2312
 
 
2313
        return ret;
 
2314
}
 
2315
 
 
2316
static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
 
2317
{
 
2318
        struct dlm_ctxt *dlm;
 
2319
        struct dlm_lock_resource *res;
 
2320
        u8 node;
 
2321
        u8 cleared = 0;
 
2322
 
 
2323
        dlm = item->dlm;
 
2324
        res = item->u.dl.deref_res;
 
2325
        node = item->u.dl.deref_node;
 
2326
 
 
2327
        spin_lock(&res->spinlock);
 
2328
        BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
 
2329
        if (test_bit(node, res->refmap)) {
 
2330
                __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
 
2331
                dlm_lockres_clear_refmap_bit(node, res);
 
2332
                cleared = 1;
 
2333
        }
 
2334
        spin_unlock(&res->spinlock);
 
2335
 
 
2336
        if (cleared) {
 
2337
                mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
 
2338
                     dlm->name, res->lockname.len, res->lockname.name, node);
 
2339
                dlm_lockres_calc_usage(dlm, res);
 
2340
        } else {
 
2341
                mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
 
2342
                     "but it is already dropped!\n", dlm->name,
 
2343
                     res->lockname.len, res->lockname.name, node);
 
2344
                dlm_print_one_lock_resource(res);
 
2345
        }
 
2346
 
 
2347
        dlm_lockres_put(res);
 
2348
}
 
2349
 
 
2350
/* Checks whether the lockres can be migrated. Returns 0 if yes, < 0
 
2351
 * if not. If 0, numlocks is set to the number of locks in the lockres.
 
2352
 */
 
2353
static int dlm_is_lockres_migrateable(struct dlm_ctxt *dlm,
 
2354
                                      struct dlm_lock_resource *res,
 
2355
                                      int *numlocks)
 
2356
{
 
2357
        int ret;
 
2358
        int i;
 
2359
        int count = 0;
 
2360
        struct list_head *queue;
 
2361
        struct dlm_lock *lock;
 
2362
 
 
2363
        assert_spin_locked(&res->spinlock);
 
2364
 
 
2365
        ret = -EINVAL;
 
2366
        if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
 
2367
                mlog(0, "cannot migrate lockres with unknown owner!\n");
 
2368
                goto leave;
 
2369
        }
 
2370
 
 
2371
        if (res->owner != dlm->node_num) {
 
2372
                mlog(0, "cannot migrate lockres this node doesn't own!\n");
 
2373
                goto leave;
 
2374
        }
 
2375
 
 
2376
        ret = 0;
 
2377
        queue = &res->granted;
 
2378
        for (i = 0; i < 3; i++) {
 
2379
                list_for_each_entry(lock, queue, list) {
 
2380
                        ++count;
 
2381
                        if (lock->ml.node == dlm->node_num) {
 
2382
                                mlog(0, "found a lock owned by this node still "
 
2383
                                     "on the %s queue!  will not migrate this "
 
2384
                                     "lockres\n", (i == 0 ? "granted" :
 
2385
                                                   (i == 1 ? "converting" :
 
2386
                                                    "blocked")));
 
2387
                                ret = -ENOTEMPTY;
 
2388
                                goto leave;
 
2389
                        }
 
2390
                }
 
2391
                queue++;
 
2392
        }
 
2393
 
 
2394
        *numlocks = count;
 
2395
        mlog(0, "migrateable lockres having %d locks\n", *numlocks);
 
2396
 
 
2397
leave:
 
2398
        return ret;
 
2399
}
 
2400
 
 
2401
/*
 
2402
 * DLM_MIGRATE_LOCKRES
 
2403
 */
 
2404
 
 
2405
 
 
2406
static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
 
2407
                               struct dlm_lock_resource *res,
 
2408
                               u8 target)
 
2409
{
 
2410
        struct dlm_master_list_entry *mle = NULL;
 
2411
        struct dlm_master_list_entry *oldmle = NULL;
 
2412
        struct dlm_migratable_lockres *mres = NULL;
 
2413
        int ret = 0;
 
2414
        const char *name;
 
2415
        unsigned int namelen;
 
2416
        int mle_added = 0;
 
2417
        int numlocks;
 
2418
        int wake = 0;
 
2419
 
 
2420
        if (!dlm_grab(dlm))
 
2421
                return -EINVAL;
 
2422
 
 
2423
        name = res->lockname.name;
 
2424
        namelen = res->lockname.len;
 
2425
 
 
2426
        mlog(0, "migrating %.*s to %u\n", namelen, name, target);
 
2427
 
 
2428
        /*
 
2429
         * ensure this lockres is a proper candidate for migration
 
2430
         */
 
2431
        spin_lock(&res->spinlock);
 
2432
        ret = dlm_is_lockres_migrateable(dlm, res, &numlocks);
 
2433
        if (ret < 0) {
 
2434
                spin_unlock(&res->spinlock);
 
2435
                goto leave;
 
2436
        }
 
2437
        spin_unlock(&res->spinlock);
 
2438
 
 
2439
        /* no work to do */
 
2440
        if (numlocks == 0) {
 
2441
                mlog(0, "no locks were found on this lockres! done!\n");
 
2442
                goto leave;
 
2443
        }
 
2444
 
 
2445
        /*
 
2446
         * preallocate up front
 
2447
         * if this fails, abort
 
2448
         */
 
2449
 
 
2450
        ret = -ENOMEM;
 
2451
        mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS);
 
2452
        if (!mres) {
 
2453
                mlog_errno(ret);
 
2454
                goto leave;
 
2455
        }
 
2456
 
 
2457
        mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
 
2458
                                                                GFP_NOFS);
 
2459
        if (!mle) {
 
2460
                mlog_errno(ret);
 
2461
                goto leave;
 
2462
        }
 
2463
        ret = 0;
 
2464
 
 
2465
        /*
 
2466
         * find a node to migrate the lockres to
 
2467
         */
 
2468
 
 
2469
        mlog(0, "picking a migration node\n");
 
2470
        spin_lock(&dlm->spinlock);
 
2471
        /* pick a new node */
 
2472
        if (!test_bit(target, dlm->domain_map) ||
 
2473
            target >= O2NM_MAX_NODES) {
 
2474
                target = dlm_pick_migration_target(dlm, res);
 
2475
        }
 
2476
        mlog(0, "node %u chosen for migration\n", target);
 
2477
 
 
2478
        if (target >= O2NM_MAX_NODES ||
 
2479
            !test_bit(target, dlm->domain_map)) {
 
2480
                /* target chosen is not alive */
 
2481
                ret = -EINVAL;
 
2482
        }
 
2483
 
 
2484
        if (ret) {
 
2485
                spin_unlock(&dlm->spinlock);
 
2486
                goto fail;
 
2487
        }
 
2488
 
 
2489
        mlog(0, "continuing with target = %u\n", target);
 
2490
 
 
2491
        /*
 
2492
         * clear any existing master requests and
 
2493
         * add the migration mle to the list
 
2494
         */
 
2495
        spin_lock(&dlm->master_lock);
 
2496
        ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
 
2497
                                    namelen, target, dlm->node_num);
 
2498
        spin_unlock(&dlm->master_lock);
 
2499
        spin_unlock(&dlm->spinlock);
 
2500
 
 
2501
        if (ret == -EEXIST) {
 
2502
                mlog(0, "another process is already migrating it\n");
 
2503
                goto fail;
 
2504
        }
 
2505
        mle_added = 1;
 
2506
 
 
2507
        /*
 
2508
         * set the MIGRATING flag and flush asts
 
2509
         * if we fail after this we need to re-dirty the lockres
 
2510
         */
 
2511
        if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
 
2512
                mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
 
2513
                     "the target went down.\n", res->lockname.len,
 
2514
                     res->lockname.name, target);
 
2515
                spin_lock(&res->spinlock);
 
2516
                res->state &= ~DLM_LOCK_RES_MIGRATING;
 
2517
                wake = 1;
 
2518
                spin_unlock(&res->spinlock);
 
2519
                ret = -EINVAL;
 
2520
        }
 
2521
 
 
2522
fail:
 
2523
        if (oldmle) {
 
2524
                /* master is known, detach if not already detached */
 
2525
                dlm_mle_detach_hb_events(dlm, oldmle);
 
2526
                dlm_put_mle(oldmle);
 
2527
        }
 
2528
 
 
2529
        if (ret < 0) {
 
2530
                if (mle_added) {
 
2531
                        dlm_mle_detach_hb_events(dlm, mle);
 
2532
                        dlm_put_mle(mle);
 
2533
                } else if (mle) {
 
2534
                        kmem_cache_free(dlm_mle_cache, mle);
 
2535
                }
 
2536
                goto leave;
 
2537
        }
 
2538
 
 
2539
        /*
 
2540
         * at this point, we have a migration target, an mle
 
2541
         * in the master list, and the MIGRATING flag set on
 
2542
         * the lockres
 
2543
         */
 
2544
 
 
2545
        /* now that remote nodes are spinning on the MIGRATING flag,
 
2546
         * ensure that all assert_master work is flushed. */
 
2547
        flush_workqueue(dlm->dlm_worker);
 
2548
 
 
2549
        /* get an extra reference on the mle.
 
2550
         * otherwise the assert_master from the new
 
2551
         * master will destroy this.
 
2552
         * also, make sure that all callers of dlm_get_mle
 
2553
         * take both dlm->spinlock and dlm->master_lock */
 
2554
        spin_lock(&dlm->spinlock);
 
2555
        spin_lock(&dlm->master_lock);
 
2556
        dlm_get_mle_inuse(mle);
 
2557
        spin_unlock(&dlm->master_lock);
 
2558
        spin_unlock(&dlm->spinlock);
 
2559
 
 
2560
        /* notify new node and send all lock state */
 
2561
        /* call send_one_lockres with migration flag.
 
2562
         * this serves as notice to the target node that a
 
2563
         * migration is starting. */
 
2564
        ret = dlm_send_one_lockres(dlm, res, mres, target,
 
2565
                                   DLM_MRES_MIGRATION);
 
2566
 
 
2567
        if (ret < 0) {
 
2568
                mlog(0, "migration to node %u failed with %d\n",
 
2569
                     target, ret);
 
2570
                /* migration failed, detach and clean up mle */
 
2571
                dlm_mle_detach_hb_events(dlm, mle);
 
2572
                dlm_put_mle(mle);
 
2573
                dlm_put_mle_inuse(mle);
 
2574
                spin_lock(&res->spinlock);
 
2575
                res->state &= ~DLM_LOCK_RES_MIGRATING;
 
2576
                wake = 1;
 
2577
                spin_unlock(&res->spinlock);
 
2578
                goto leave;
 
2579
        }
 
2580
 
 
2581
        /* at this point, the target sends a message to all nodes,
 
2582
         * (using dlm_do_migrate_request).  this node is skipped since
 
2583
         * we had to put an mle in the list to begin the process.  this
 
2584
         * node now waits for target to do an assert master.  this node
 
2585
         * will be the last one notified, ensuring that the migration
 
2586
         * is complete everywhere.  if the target dies while this is
 
2587
         * going on, some nodes could potentially see the target as the
 
2588
         * master, so it is important that my recovery finds the migration
 
2589
         * mle and sets the master to UNKNOWN. */
 
2590
 
 
2591
 
 
2592
        /* wait for new node to assert master */
 
2593
        while (1) {
 
2594
                ret = wait_event_interruptible_timeout(mle->wq,
 
2595
                                        (atomic_read(&mle->woken) == 1),
 
2596
                                        msecs_to_jiffies(5000));
 
2597
 
 
2598
                if (ret >= 0) {
 
2599
                        if (atomic_read(&mle->woken) == 1 ||
 
2600
                            res->owner == target)
 
2601
                                break;
 
2602
 
 
2603
                        mlog(0, "%s:%.*s: timed out during migration\n",
 
2604
                             dlm->name, res->lockname.len, res->lockname.name);
 
2605
                        /* avoid hang during shutdown when migrating lockres
 
2606
                         * to a node which also goes down */
 
2607
                        if (dlm_is_node_dead(dlm, target)) {
 
2608
                                mlog(0, "%s:%.*s: expected migration "
 
2609
                                     "target %u is no longer up, restarting\n",
 
2610
                                     dlm->name, res->lockname.len,
 
2611
                                     res->lockname.name, target);
 
2612
                                ret = -EINVAL;
 
2613
                                /* migration failed, detach and clean up mle */
 
2614
                                dlm_mle_detach_hb_events(dlm, mle);
 
2615
                                dlm_put_mle(mle);
 
2616
                                dlm_put_mle_inuse(mle);
 
2617
                                spin_lock(&res->spinlock);
 
2618
                                res->state &= ~DLM_LOCK_RES_MIGRATING;
 
2619
                                wake = 1;
 
2620
                                spin_unlock(&res->spinlock);
 
2621
                                goto leave;
 
2622
                        }
 
2623
                } else
 
2624
                        mlog(0, "%s:%.*s: caught signal during migration\n",
 
2625
                             dlm->name, res->lockname.len, res->lockname.name);
 
2626
        }
 
2627
 
 
2628
        /* all done, set the owner, clear the flag */
 
2629
        spin_lock(&res->spinlock);
 
2630
        dlm_set_lockres_owner(dlm, res, target);
 
2631
        res->state &= ~DLM_LOCK_RES_MIGRATING;
 
2632
        dlm_remove_nonlocal_locks(dlm, res);
 
2633
        spin_unlock(&res->spinlock);
 
2634
        wake_up(&res->wq);
 
2635
 
 
2636
        /* master is known, detach if not already detached */
 
2637
        dlm_mle_detach_hb_events(dlm, mle);
 
2638
        dlm_put_mle_inuse(mle);
 
2639
        ret = 0;
 
2640
 
 
2641
        dlm_lockres_calc_usage(dlm, res);
 
2642
 
 
2643
leave:
 
2644
        /* re-dirty the lockres if we failed */
 
2645
        if (ret < 0)
 
2646
                dlm_kick_thread(dlm, res);
 
2647
 
 
2648
        /* wake up waiters if the MIGRATING flag got set
 
2649
         * but migration failed */
 
2650
        if (wake)
 
2651
                wake_up(&res->wq);
 
2652
 
 
2653
        /* TODO: cleanup */
 
2654
        if (mres)
 
2655
                free_page((unsigned long)mres);
 
2656
 
 
2657
        dlm_put(dlm);
 
2658
 
 
2659
        mlog(0, "returning %d\n", ret);
 
2660
        return ret;
 
2661
}
 
2662
 
 
2663
#define DLM_MIGRATION_RETRY_MS  100
 
2664
 
 
2665
/* Should be called only after beginning the domain leave process.
 
2666
 * There should not be any remaining locks on nonlocal lock resources,
 
2667
 * and there should be no local locks left on locally mastered resources.
 
2668
 *
 
2669
 * Called with the dlm spinlock held, may drop it to do migration, but
 
2670
 * will re-acquire before exit.
 
2671
 *
 
2672
 * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped */
 
2673
int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
 
2674
{
 
2675
        int ret;
 
2676
        int lock_dropped = 0;
 
2677
        int numlocks;
 
2678
 
 
2679
        spin_lock(&res->spinlock);
 
2680
        if (res->owner != dlm->node_num) {
 
2681
                if (!__dlm_lockres_unused(res)) {
 
2682
                        mlog(ML_ERROR, "%s:%.*s: this node is not master, "
 
2683
                             "trying to free this but locks remain\n",
 
2684
                             dlm->name, res->lockname.len, res->lockname.name);
 
2685
                }
 
2686
                spin_unlock(&res->spinlock);
 
2687
                goto leave;
 
2688
        }
 
2689
 
 
2690
        /* No need to migrate a lockres having no locks */
 
2691
        ret = dlm_is_lockres_migrateable(dlm, res, &numlocks);
 
2692
        if (ret >= 0 && numlocks == 0) {
 
2693
                spin_unlock(&res->spinlock);
 
2694
                goto leave;
 
2695
        }
 
2696
        spin_unlock(&res->spinlock);
 
2697
 
 
2698
        /* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
 
2699
        spin_unlock(&dlm->spinlock);
 
2700
        lock_dropped = 1;
 
2701
        while (1) {
 
2702
                ret = dlm_migrate_lockres(dlm, res, O2NM_MAX_NODES);
 
2703
                if (ret >= 0)
 
2704
                        break;
 
2705
                if (ret == -ENOTEMPTY) {
 
2706
                        mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
 
2707
                                res->lockname.len, res->lockname.name);
 
2708
                        BUG();
 
2709
                }
 
2710
 
 
2711
                mlog(0, "lockres %.*s: migrate failed, "
 
2712
                     "retrying\n", res->lockname.len,
 
2713
                     res->lockname.name);
 
2714
                msleep(DLM_MIGRATION_RETRY_MS);
 
2715
        }
 
2716
        spin_lock(&dlm->spinlock);
 
2717
leave:
 
2718
        return lock_dropped;
 
2719
}
 
2720
 
 
2721
int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
 
2722
{
 
2723
        int ret;
 
2724
        spin_lock(&dlm->ast_lock);
 
2725
        spin_lock(&lock->spinlock);
 
2726
        ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
 
2727
        spin_unlock(&lock->spinlock);
 
2728
        spin_unlock(&dlm->ast_lock);
 
2729
        return ret;
 
2730
}
 
2731
 
 
2732
static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
 
2733
                                     struct dlm_lock_resource *res,
 
2734
                                     u8 mig_target)
 
2735
{
 
2736
        int can_proceed;
 
2737
        spin_lock(&res->spinlock);
 
2738
        can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
 
2739
        spin_unlock(&res->spinlock);
 
2740
 
 
2741
        /* target has died, so make the caller break out of the
 
2742
         * wait_event, but caller must recheck the domain_map */
 
2743
        spin_lock(&dlm->spinlock);
 
2744
        if (!test_bit(mig_target, dlm->domain_map))
 
2745
                can_proceed = 1;
 
2746
        spin_unlock(&dlm->spinlock);
 
2747
        return can_proceed;
 
2748
}
 
2749
 
 
2750
static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
 
2751
                                struct dlm_lock_resource *res)
 
2752
{
 
2753
        int ret;
 
2754
        spin_lock(&res->spinlock);
 
2755
        ret = !!(res->state & DLM_LOCK_RES_DIRTY);
 
2756
        spin_unlock(&res->spinlock);
 
2757
        return ret;
 
2758
}
 
2759
 
 
2760
 
 
2761
static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
 
2762
                                       struct dlm_lock_resource *res,
 
2763
                                       u8 target)
 
2764
{
 
2765
        int ret = 0;
 
2766
 
 
2767
        mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
 
2768
               res->lockname.len, res->lockname.name, dlm->node_num,
 
2769
               target);
 
2770
        /* need to set MIGRATING flag on lockres.  this is done by
 
2771
         * ensuring that all asts have been flushed for this lockres. */
 
2772
        spin_lock(&res->spinlock);
 
2773
        BUG_ON(res->migration_pending);
 
2774
        res->migration_pending = 1;
 
2775
        /* strategy is to reserve an extra ast then release
 
2776
         * it below, letting the release do all of the work */
 
2777
        __dlm_lockres_reserve_ast(res);
 
2778
        spin_unlock(&res->spinlock);
 
2779
 
 
2780
        /* now flush all the pending asts */
 
2781
        dlm_kick_thread(dlm, res);
 
2782
        /* before waiting on DIRTY, block processes which may
 
2783
         * try to dirty the lockres before MIGRATING is set */
 
2784
        spin_lock(&res->spinlock);
 
2785
        BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY);
 
2786
        res->state |= DLM_LOCK_RES_BLOCK_DIRTY;
 
2787
        spin_unlock(&res->spinlock);
 
2788
        /* now wait on any pending asts and the DIRTY state */
 
2789
        wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
 
2790
        dlm_lockres_release_ast(dlm, res);
 
2791
 
 
2792
        mlog(0, "about to wait on migration_wq, dirty=%s\n",
 
2793
               res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
 
2794
        /* if the extra ref we just put was the final one, this
 
2795
         * will pass thru immediately.  otherwise, we need to wait
 
2796
         * for the last ast to finish. */
 
2797
again:
 
2798
        ret = wait_event_interruptible_timeout(dlm->migration_wq,
 
2799
                   dlm_migration_can_proceed(dlm, res, target),
 
2800
                   msecs_to_jiffies(1000));
 
2801
        if (ret < 0) {
 
2802
                mlog(0, "woken again: migrating? %s, dead? %s\n",
 
2803
                       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
 
2804
                       test_bit(target, dlm->domain_map) ? "no":"yes");
 
2805
        } else {
 
2806
                mlog(0, "all is well: migrating? %s, dead? %s\n",
 
2807
                       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
 
2808
                       test_bit(target, dlm->domain_map) ? "no":"yes");
 
2809
        }
 
2810
        if (!dlm_migration_can_proceed(dlm, res, target)) {
 
2811
                mlog(0, "trying again...\n");
 
2812
                goto again;
 
2813
        }
 
2814
        /* now that we are sure the MIGRATING state is there, drop
 
2815
         * the unneded state which blocked threads trying to DIRTY */
 
2816
        spin_lock(&res->spinlock);
 
2817
        BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY));
 
2818
        BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
 
2819
        res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY;
 
2820
        spin_unlock(&res->spinlock);
 
2821
 
 
2822
        /* did the target go down or die? */
 
2823
        spin_lock(&dlm->spinlock);
 
2824
        if (!test_bit(target, dlm->domain_map)) {
 
2825
                mlog(ML_ERROR, "aha. migration target %u just went down\n",
 
2826
                     target);
 
2827
                ret = -EHOSTDOWN;
 
2828
        }
 
2829
        spin_unlock(&dlm->spinlock);
 
2830
 
 
2831
        /*
 
2832
         * at this point:
 
2833
         *
 
2834
         *   o the DLM_LOCK_RES_MIGRATING flag is set
 
2835
         *   o there are no pending asts on this lockres
 
2836
         *   o all processes trying to reserve an ast on this
 
2837
         *     lockres must wait for the MIGRATING flag to clear
 
2838
         */
 
2839
        return ret;
 
2840
}
 
2841
 
 
2842
/* last step in the migration process.
 
2843
 * original master calls this to free all of the dlm_lock
 
2844
 * structures that used to be for other nodes. */
 
2845
static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
 
2846
                                      struct dlm_lock_resource *res)
 
2847
{
 
2848
        struct list_head *queue = &res->granted;
 
2849
        int i, bit;
 
2850
        struct dlm_lock *lock, *next;
 
2851
 
 
2852
        assert_spin_locked(&res->spinlock);
 
2853
 
 
2854
        BUG_ON(res->owner == dlm->node_num);
 
2855
 
 
2856
        for (i=0; i<3; i++) {
 
2857
                list_for_each_entry_safe(lock, next, queue, list) {
 
2858
                        if (lock->ml.node != dlm->node_num) {
 
2859
                                mlog(0, "putting lock for node %u\n",
 
2860
                                     lock->ml.node);
 
2861
                                /* be extra careful */
 
2862
                                BUG_ON(!list_empty(&lock->ast_list));
 
2863
                                BUG_ON(!list_empty(&lock->bast_list));
 
2864
                                BUG_ON(lock->ast_pending);
 
2865
                                BUG_ON(lock->bast_pending);
 
2866
                                dlm_lockres_clear_refmap_bit(lock->ml.node, res);
 
2867
                                list_del_init(&lock->list);
 
2868
                                dlm_lock_put(lock);
 
2869
                                /* In a normal unlock, we would have added a
 
2870
                                 * DLM_UNLOCK_FREE_LOCK action. Force it. */
 
2871
                                dlm_lock_put(lock);
 
2872
                        }
 
2873
                }
 
2874
                queue++;
 
2875
        }
 
2876
        bit = 0;
 
2877
        while (1) {
 
2878
                bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
 
2879
                if (bit >= O2NM_MAX_NODES)
 
2880
                        break;
 
2881
                /* do not clear the local node reference, if there is a
 
2882
                 * process holding this, let it drop the ref itself */
 
2883
                if (bit != dlm->node_num) {
 
2884
                        mlog(0, "%s:%.*s: node %u had a ref to this "
 
2885
                             "migrating lockres, clearing\n", dlm->name,
 
2886
                             res->lockname.len, res->lockname.name, bit);
 
2887
                        dlm_lockres_clear_refmap_bit(bit, res);
 
2888
                }
 
2889
                bit++;
 
2890
        }
 
2891
}
 
2892
 
 
2893
/* for now this is not too intelligent.  we will
 
2894
 * need stats to make this do the right thing.
 
2895
 * this just finds the first lock on one of the
 
2896
 * queues and uses that node as the target. */
 
2897
static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
 
2898
                                    struct dlm_lock_resource *res)
 
2899
{
 
2900
        int i;
 
2901
        struct list_head *queue = &res->granted;
 
2902
        struct dlm_lock *lock;
 
2903
        int nodenum;
 
2904
 
 
2905
        assert_spin_locked(&dlm->spinlock);
 
2906
 
 
2907
        spin_lock(&res->spinlock);
 
2908
        for (i=0; i<3; i++) {
 
2909
                list_for_each_entry(lock, queue, list) {
 
2910
                        /* up to the caller to make sure this node
 
2911
                         * is alive */
 
2912
                        if (lock->ml.node != dlm->node_num) {
 
2913
                                spin_unlock(&res->spinlock);
 
2914
                                return lock->ml.node;
 
2915
                        }
 
2916
                }
 
2917
                queue++;
 
2918
        }
 
2919
        spin_unlock(&res->spinlock);
 
2920
        mlog(0, "have not found a suitable target yet! checking domain map\n");
 
2921
 
 
2922
        /* ok now we're getting desperate.  pick anyone alive. */
 
2923
        nodenum = -1;
 
2924
        while (1) {
 
2925
                nodenum = find_next_bit(dlm->domain_map,
 
2926
                                        O2NM_MAX_NODES, nodenum+1);
 
2927
                mlog(0, "found %d in domain map\n", nodenum);
 
2928
                if (nodenum >= O2NM_MAX_NODES)
 
2929
                        break;
 
2930
                if (nodenum != dlm->node_num) {
 
2931
                        mlog(0, "picking %d\n", nodenum);
 
2932
                        return nodenum;
 
2933
                }
 
2934
        }
 
2935
 
 
2936
        mlog(0, "giving up.  no master to migrate to\n");
 
2937
        return DLM_LOCK_RES_OWNER_UNKNOWN;
 
2938
}
 
2939
 
 
2940
 
 
2941
 
 
2942
/* this is called by the new master once all lockres
 
2943
 * data has been received */
 
2944
static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
 
2945
                                  struct dlm_lock_resource *res,
 
2946
                                  u8 master, u8 new_master,
 
2947
                                  struct dlm_node_iter *iter)
 
2948
{
 
2949
        struct dlm_migrate_request migrate;
 
2950
        int ret, skip, status = 0;
 
2951
        int nodenum;
 
2952
 
 
2953
        memset(&migrate, 0, sizeof(migrate));
 
2954
        migrate.namelen = res->lockname.len;
 
2955
        memcpy(migrate.name, res->lockname.name, migrate.namelen);
 
2956
        migrate.new_master = new_master;
 
2957
        migrate.master = master;
 
2958
 
 
2959
        ret = 0;
 
2960
 
 
2961
        /* send message to all nodes, except the master and myself */
 
2962
        while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
 
2963
                if (nodenum == master ||
 
2964
                    nodenum == new_master)
 
2965
                        continue;
 
2966
 
 
2967
                /* We could race exit domain. If exited, skip. */
 
2968
                spin_lock(&dlm->spinlock);
 
2969
                skip = (!test_bit(nodenum, dlm->domain_map));
 
2970
                spin_unlock(&dlm->spinlock);
 
2971
                if (skip) {
 
2972
                        clear_bit(nodenum, iter->node_map);
 
2973
                        continue;
 
2974
                }
 
2975
 
 
2976
                ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
 
2977
                                         &migrate, sizeof(migrate), nodenum,
 
2978
                                         &status);
 
2979
                if (ret < 0) {
 
2980
                        mlog(0, "migrate_request returned %d!\n", ret);
 
2981
                        if (!dlm_is_host_down(ret)) {
 
2982
                                mlog(ML_ERROR, "unhandled error=%d!\n", ret);
 
2983
                                BUG();
 
2984
                        }
 
2985
                        clear_bit(nodenum, iter->node_map);
 
2986
                        ret = 0;
 
2987
                } else if (status < 0) {
 
2988
                        mlog(0, "migrate request (node %u) returned %d!\n",
 
2989
                             nodenum, status);
 
2990
                        ret = status;
 
2991
                } else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
 
2992
                        /* during the migration request we short-circuited
 
2993
                         * the mastery of the lockres.  make sure we have
 
2994
                         * a mastery ref for nodenum */
 
2995
                        mlog(0, "%s:%.*s: need ref for node %u\n",
 
2996
                             dlm->name, res->lockname.len, res->lockname.name,
 
2997
                             nodenum);
 
2998
                        spin_lock(&res->spinlock);
 
2999
                        dlm_lockres_set_refmap_bit(nodenum, res);
 
3000
                        spin_unlock(&res->spinlock);
 
3001
                }
 
3002
        }
 
3003
 
 
3004
        if (ret < 0)
 
3005
                mlog_errno(ret);
 
3006
 
 
3007
        mlog(0, "returning ret=%d\n", ret);
 
3008
        return ret;
 
3009
}
 
3010
 
 
3011
 
 
3012
/* if there is an existing mle for this lockres, we now know who the master is.
 
3013
 * (the one who sent us *this* message) we can clear it up right away.
 
3014
 * since the process that put the mle on the list still has a reference to it,
 
3015
 * we can unhash it now, set the master and wake the process.  as a result,
 
3016
 * we will have no mle in the list to start with.  now we can add an mle for
 
3017
 * the migration and this should be the only one found for those scanning the
 
3018
 * list.  */
 
3019
int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
 
3020
                                void **ret_data)
 
3021
{
 
3022
        struct dlm_ctxt *dlm = data;
 
3023
        struct dlm_lock_resource *res = NULL;
 
3024
        struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
 
3025
        struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
 
3026
        const char *name;
 
3027
        unsigned int namelen, hash;
 
3028
        int ret = 0;
 
3029
 
 
3030
        if (!dlm_grab(dlm))
 
3031
                return -EINVAL;
 
3032
 
 
3033
        name = migrate->name;
 
3034
        namelen = migrate->namelen;
 
3035
        hash = dlm_lockid_hash(name, namelen);
 
3036
 
 
3037
        /* preallocate.. if this fails, abort */
 
3038
        mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
 
3039
                                                         GFP_NOFS);
 
3040
 
 
3041
        if (!mle) {
 
3042
                ret = -ENOMEM;
 
3043
                goto leave;
 
3044
        }
 
3045
 
 
3046
        /* check for pre-existing lock */
 
3047
        spin_lock(&dlm->spinlock);
 
3048
        res = __dlm_lookup_lockres(dlm, name, namelen, hash);
 
3049
        spin_lock(&dlm->master_lock);
 
3050
 
 
3051
        if (res) {
 
3052
                spin_lock(&res->spinlock);
 
3053
                if (res->state & DLM_LOCK_RES_RECOVERING) {
 
3054
                        /* if all is working ok, this can only mean that we got
 
3055
                        * a migrate request from a node that we now see as
 
3056
                        * dead.  what can we do here?  drop it to the floor? */
 
3057
                        spin_unlock(&res->spinlock);
 
3058
                        mlog(ML_ERROR, "Got a migrate request, but the "
 
3059
                             "lockres is marked as recovering!");
 
3060
                        kmem_cache_free(dlm_mle_cache, mle);
 
3061
                        ret = -EINVAL; /* need a better solution */
 
3062
                        goto unlock;
 
3063
                }
 
3064
                res->state |= DLM_LOCK_RES_MIGRATING;
 
3065
                spin_unlock(&res->spinlock);
 
3066
        }
 
3067
 
 
3068
        /* ignore status.  only nonzero status would BUG. */
 
3069
        ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
 
3070
                                    name, namelen,
 
3071
                                    migrate->new_master,
 
3072
                                    migrate->master);
 
3073
 
 
3074
unlock:
 
3075
        spin_unlock(&dlm->master_lock);
 
3076
        spin_unlock(&dlm->spinlock);
 
3077
 
 
3078
        if (oldmle) {
 
3079
                /* master is known, detach if not already detached */
 
3080
                dlm_mle_detach_hb_events(dlm, oldmle);
 
3081
                dlm_put_mle(oldmle);
 
3082
        }
 
3083
 
 
3084
        if (res)
 
3085
                dlm_lockres_put(res);
 
3086
leave:
 
3087
        dlm_put(dlm);
 
3088
        return ret;
 
3089
}
 
3090
 
 
3091
/* must be holding dlm->spinlock and dlm->master_lock
 
3092
 * when adding a migration mle, we can clear any other mles
 
3093
 * in the master list because we know with certainty that
 
3094
 * the master is "master".  so we remove any old mle from
 
3095
 * the list after setting it's master field, and then add
 
3096
 * the new migration mle.  this way we can hold with the rule
 
3097
 * of having only one mle for a given lock name at all times. */
 
3098
static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
 
3099
                                 struct dlm_lock_resource *res,
 
3100
                                 struct dlm_master_list_entry *mle,
 
3101
                                 struct dlm_master_list_entry **oldmle,
 
3102
                                 const char *name, unsigned int namelen,
 
3103
                                 u8 new_master, u8 master)
 
3104
{
 
3105
        int found;
 
3106
        int ret = 0;
 
3107
 
 
3108
        *oldmle = NULL;
 
3109
 
 
3110
        mlog_entry_void();
 
3111
 
 
3112
        assert_spin_locked(&dlm->spinlock);
 
3113
        assert_spin_locked(&dlm->master_lock);
 
3114
 
 
3115
        /* caller is responsible for any ref taken here on oldmle */
 
3116
        found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
 
3117
        if (found) {
 
3118
                struct dlm_master_list_entry *tmp = *oldmle;
 
3119
                spin_lock(&tmp->spinlock);
 
3120
                if (tmp->type == DLM_MLE_MIGRATION) {
 
3121
                        if (master == dlm->node_num) {
 
3122
                                /* ah another process raced me to it */
 
3123
                                mlog(0, "tried to migrate %.*s, but some "
 
3124
                                     "process beat me to it\n",
 
3125
                                     namelen, name);
 
3126
                                ret = -EEXIST;
 
3127
                        } else {
 
3128
                                /* bad.  2 NODES are trying to migrate! */
 
3129
                                mlog(ML_ERROR, "migration error  mle: "
 
3130
                                     "master=%u new_master=%u // request: "
 
3131
                                     "master=%u new_master=%u // "
 
3132
                                     "lockres=%.*s\n",
 
3133
                                     tmp->master, tmp->new_master,
 
3134
                                     master, new_master,
 
3135
                                     namelen, name);
 
3136
                                BUG();
 
3137
                        }
 
3138
                } else {
 
3139
                        /* this is essentially what assert_master does */
 
3140
                        tmp->master = master;
 
3141
                        atomic_set(&tmp->woken, 1);
 
3142
                        wake_up(&tmp->wq);
 
3143
                        /* remove it so that only one mle will be found */
 
3144
                        __dlm_unlink_mle(dlm, tmp);
 
3145
                        __dlm_mle_detach_hb_events(dlm, tmp);
 
3146
                        ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
 
3147
                        mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
 
3148
                            "telling master to get ref for cleared out mle "
 
3149
                            "during migration\n", dlm->name, namelen, name,
 
3150
                            master, new_master);
 
3151
                }
 
3152
                spin_unlock(&tmp->spinlock);
 
3153
        }
 
3154
 
 
3155
        /* now add a migration mle to the tail of the list */
 
3156
        dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
 
3157
        mle->new_master = new_master;
 
3158
        /* the new master will be sending an assert master for this.
 
3159
         * at that point we will get the refmap reference */
 
3160
        mle->master = master;
 
3161
        /* do this for consistency with other mle types */
 
3162
        set_bit(new_master, mle->maybe_map);
 
3163
        __dlm_insert_mle(dlm, mle);
 
3164
 
 
3165
        return ret;
 
3166
}
 
3167
 
 
3168
/*
 
3169
 * Sets the owner of the lockres, associated to the mle, to UNKNOWN
 
3170
 */
 
3171
static struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm,
 
3172
                                        struct dlm_master_list_entry *mle)
 
3173
{
 
3174
        struct dlm_lock_resource *res;
 
3175
 
 
3176
        /* Find the lockres associated to the mle and set its owner to UNK */
 
3177
        res = __dlm_lookup_lockres(dlm, mle->mname, mle->mnamelen,
 
3178
                                   mle->mnamehash);
 
3179
        if (res) {
 
3180
                spin_unlock(&dlm->master_lock);
 
3181
 
 
3182
                /* move lockres onto recovery list */
 
3183
                spin_lock(&res->spinlock);
 
3184
                dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
 
3185
                dlm_move_lockres_to_recovery_list(dlm, res);
 
3186
                spin_unlock(&res->spinlock);
 
3187
                dlm_lockres_put(res);
 
3188
 
 
3189
                /* about to get rid of mle, detach from heartbeat */
 
3190
                __dlm_mle_detach_hb_events(dlm, mle);
 
3191
 
 
3192
                /* dump the mle */
 
3193
                spin_lock(&dlm->master_lock);
 
3194
                __dlm_put_mle(mle);
 
3195
                spin_unlock(&dlm->master_lock);
 
3196
        }
 
3197
 
 
3198
        return res;
 
3199
}
 
3200
 
 
3201
static void dlm_clean_migration_mle(struct dlm_ctxt *dlm,
 
3202
                                    struct dlm_master_list_entry *mle)
 
3203
{
 
3204
        __dlm_mle_detach_hb_events(dlm, mle);
 
3205
 
 
3206
        spin_lock(&mle->spinlock);
 
3207
        __dlm_unlink_mle(dlm, mle);
 
3208
        atomic_set(&mle->woken, 1);
 
3209
        spin_unlock(&mle->spinlock);
 
3210
 
 
3211
        wake_up(&mle->wq);
 
3212
}
 
3213
 
 
3214
static void dlm_clean_block_mle(struct dlm_ctxt *dlm,
 
3215
                                struct dlm_master_list_entry *mle, u8 dead_node)
 
3216
{
 
3217
        int bit;
 
3218
 
 
3219
        BUG_ON(mle->type != DLM_MLE_BLOCK);
 
3220
 
 
3221
        spin_lock(&mle->spinlock);
 
3222
        bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
 
3223
        if (bit != dead_node) {
 
3224
                mlog(0, "mle found, but dead node %u would not have been "
 
3225
                     "master\n", dead_node);
 
3226
                spin_unlock(&mle->spinlock);
 
3227
        } else {
 
3228
                /* Must drop the refcount by one since the assert_master will
 
3229
                 * never arrive. This may result in the mle being unlinked and
 
3230
                 * freed, but there may still be a process waiting in the
 
3231
                 * dlmlock path which is fine. */
 
3232
                mlog(0, "node %u was expected master\n", dead_node);
 
3233
                atomic_set(&mle->woken, 1);
 
3234
                spin_unlock(&mle->spinlock);
 
3235
                wake_up(&mle->wq);
 
3236
 
 
3237
                /* Do not need events any longer, so detach from heartbeat */
 
3238
                __dlm_mle_detach_hb_events(dlm, mle);
 
3239
                __dlm_put_mle(mle);
 
3240
        }
 
3241
}
 
3242
 
 
3243
void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
 
3244
{
 
3245
        struct dlm_master_list_entry *mle;
 
3246
        struct dlm_lock_resource *res;
 
3247
        struct hlist_head *bucket;
 
3248
        struct hlist_node *list;
 
3249
        unsigned int i;
 
3250
 
 
3251
        mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node);
 
3252
top:
 
3253
        assert_spin_locked(&dlm->spinlock);
 
3254
 
 
3255
        /* clean the master list */
 
3256
        spin_lock(&dlm->master_lock);
 
3257
        for (i = 0; i < DLM_HASH_BUCKETS; i++) {
 
3258
                bucket = dlm_master_hash(dlm, i);
 
3259
                hlist_for_each(list, bucket) {
 
3260
                        mle = hlist_entry(list, struct dlm_master_list_entry,
 
3261
                                          master_hash_node);
 
3262
 
 
3263
                        BUG_ON(mle->type != DLM_MLE_BLOCK &&
 
3264
                               mle->type != DLM_MLE_MASTER &&
 
3265
                               mle->type != DLM_MLE_MIGRATION);
 
3266
 
 
3267
                        /* MASTER mles are initiated locally. The waiting
 
3268
                         * process will notice the node map change shortly.
 
3269
                         * Let that happen as normal. */
 
3270
                        if (mle->type == DLM_MLE_MASTER)
 
3271
                                continue;
 
3272
 
 
3273
                        /* BLOCK mles are initiated by other nodes. Need to
 
3274
                         * clean up if the dead node would have been the
 
3275
                         * master. */
 
3276
                        if (mle->type == DLM_MLE_BLOCK) {
 
3277
                                dlm_clean_block_mle(dlm, mle, dead_node);
 
3278
                                continue;
 
3279
                        }
 
3280
 
 
3281
                        /* Everything else is a MIGRATION mle */
 
3282
 
 
3283
                        /* The rule for MIGRATION mles is that the master
 
3284
                         * becomes UNKNOWN if *either* the original or the new
 
3285
                         * master dies. All UNKNOWN lockres' are sent to
 
3286
                         * whichever node becomes the recovery master. The new
 
3287
                         * master is responsible for determining if there is
 
3288
                         * still a master for this lockres, or if he needs to
 
3289
                         * take over mastery. Either way, this node should
 
3290
                         * expect another message to resolve this. */
 
3291
 
 
3292
                        if (mle->master != dead_node &&
 
3293
                            mle->new_master != dead_node)
 
3294
                                continue;
 
3295
 
 
3296
                        /* If we have reached this point, this mle needs to be
 
3297
                         * removed from the list and freed. */
 
3298
                        dlm_clean_migration_mle(dlm, mle);
 
3299
 
 
3300
                        mlog(0, "%s: node %u died during migration from "
 
3301
                             "%u to %u!\n", dlm->name, dead_node, mle->master,
 
3302
                             mle->new_master);
 
3303
 
 
3304
                        /* If we find a lockres associated with the mle, we've
 
3305
                         * hit this rare case that messes up our lock ordering.
 
3306
                         * If so, we need to drop the master lock so that we can
 
3307
                         * take the lockres lock, meaning that we will have to
 
3308
                         * restart from the head of list. */
 
3309
                        res = dlm_reset_mleres_owner(dlm, mle);
 
3310
                        if (res)
 
3311
                                /* restart */
 
3312
                                goto top;
 
3313
 
 
3314
                        /* This may be the last reference */
 
3315
                        __dlm_put_mle(mle);
 
3316
                }
 
3317
        }
 
3318
        spin_unlock(&dlm->master_lock);
 
3319
}
 
3320
 
 
3321
int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
 
3322
                         u8 old_master)
 
3323
{
 
3324
        struct dlm_node_iter iter;
 
3325
        int ret = 0;
 
3326
 
 
3327
        spin_lock(&dlm->spinlock);
 
3328
        dlm_node_iter_init(dlm->domain_map, &iter);
 
3329
        clear_bit(old_master, iter.node_map);
 
3330
        clear_bit(dlm->node_num, iter.node_map);
 
3331
        spin_unlock(&dlm->spinlock);
 
3332
 
 
3333
        /* ownership of the lockres is changing.  account for the
 
3334
         * mastery reference here since old_master will briefly have
 
3335
         * a reference after the migration completes */
 
3336
        spin_lock(&res->spinlock);
 
3337
        dlm_lockres_set_refmap_bit(old_master, res);
 
3338
        spin_unlock(&res->spinlock);
 
3339
 
 
3340
        mlog(0, "now time to do a migrate request to other nodes\n");
 
3341
        ret = dlm_do_migrate_request(dlm, res, old_master,
 
3342
                                     dlm->node_num, &iter);
 
3343
        if (ret < 0) {
 
3344
                mlog_errno(ret);
 
3345
                goto leave;
 
3346
        }
 
3347
 
 
3348
        mlog(0, "doing assert master of %.*s to all except the original node\n",
 
3349
             res->lockname.len, res->lockname.name);
 
3350
        /* this call now finishes out the nodemap
 
3351
         * even if one or more nodes die */
 
3352
        ret = dlm_do_assert_master(dlm, res, iter.node_map,
 
3353
                                   DLM_ASSERT_MASTER_FINISH_MIGRATION);
 
3354
        if (ret < 0) {
 
3355
                /* no longer need to retry.  all living nodes contacted. */
 
3356
                mlog_errno(ret);
 
3357
                ret = 0;
 
3358
        }
 
3359
 
 
3360
        memset(iter.node_map, 0, sizeof(iter.node_map));
 
3361
        set_bit(old_master, iter.node_map);
 
3362
        mlog(0, "doing assert master of %.*s back to %u\n",
 
3363
             res->lockname.len, res->lockname.name, old_master);
 
3364
        ret = dlm_do_assert_master(dlm, res, iter.node_map,
 
3365
                                   DLM_ASSERT_MASTER_FINISH_MIGRATION);
 
3366
        if (ret < 0) {
 
3367
                mlog(0, "assert master to original master failed "
 
3368
                     "with %d.\n", ret);
 
3369
                /* the only nonzero status here would be because of
 
3370
                 * a dead original node.  we're done. */
 
3371
                ret = 0;
 
3372
        }
 
3373
 
 
3374
        /* all done, set the owner, clear the flag */
 
3375
        spin_lock(&res->spinlock);
 
3376
        dlm_set_lockres_owner(dlm, res, dlm->node_num);
 
3377
        res->state &= ~DLM_LOCK_RES_MIGRATING;
 
3378
        spin_unlock(&res->spinlock);
 
3379
        /* re-dirty it on the new master */
 
3380
        dlm_kick_thread(dlm, res);
 
3381
        wake_up(&res->wq);
 
3382
leave:
 
3383
        return ret;
 
3384
}
 
3385
 
 
3386
/*
 
3387
 * LOCKRES AST REFCOUNT
 
3388
 * this is integral to migration
 
3389
 */
 
3390
 
 
3391
/* for future intent to call an ast, reserve one ahead of time.
 
3392
 * this should be called only after waiting on the lockres
 
3393
 * with dlm_wait_on_lockres, and while still holding the
 
3394
 * spinlock after the call. */
 
3395
void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
 
3396
{
 
3397
        assert_spin_locked(&res->spinlock);
 
3398
        if (res->state & DLM_LOCK_RES_MIGRATING) {
 
3399
                __dlm_print_one_lock_resource(res);
 
3400
        }
 
3401
        BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
 
3402
 
 
3403
        atomic_inc(&res->asts_reserved);
 
3404
}
 
3405
 
 
3406
/*
 
3407
 * used to drop the reserved ast, either because it went unused,
 
3408
 * or because the ast/bast was actually called.
 
3409
 *
 
3410
 * also, if there is a pending migration on this lockres,
 
3411
 * and this was the last pending ast on the lockres,
 
3412
 * atomically set the MIGRATING flag before we drop the lock.
 
3413
 * this is how we ensure that migration can proceed with no
 
3414
 * asts in progress.  note that it is ok if the state of the
 
3415
 * queues is such that a lock should be granted in the future
 
3416
 * or that a bast should be fired, because the new master will
 
3417
 * shuffle the lists on this lockres as soon as it is migrated.
 
3418
 */
 
3419
void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
 
3420
                             struct dlm_lock_resource *res)
 
3421
{
 
3422
        if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
 
3423
                return;
 
3424
 
 
3425
        if (!res->migration_pending) {
 
3426
                spin_unlock(&res->spinlock);
 
3427
                return;
 
3428
        }
 
3429
 
 
3430
        BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
 
3431
        res->migration_pending = 0;
 
3432
        res->state |= DLM_LOCK_RES_MIGRATING;
 
3433
        spin_unlock(&res->spinlock);
 
3434
        wake_up(&res->wq);
 
3435
        wake_up(&dlm->migration_wq);
 
3436
}