~ubuntu-branches/ubuntu/precise/linux-lowlatency/precise

« back to all changes in this revision

Viewing changes to fs/ocfs2/cluster/heartbeat.c

  • Committer: Package Import Robot
  • Author(s): Alessio Igor Bogani
  • Date: 2011-10-26 11:13:05 UTC
  • Revision ID: package-import@ubuntu.com-20111026111305-tz023xykf0i6eosh
Tags: upstream-3.2.0
ImportĀ upstreamĀ versionĀ 3.2.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* -*- mode: c; c-basic-offset: 8; -*-
 
2
 * vim: noexpandtab sw=8 ts=8 sts=0:
 
3
 *
 
4
 * Copyright (C) 2004, 2005 Oracle.  All rights reserved.
 
5
 *
 
6
 * This program is free software; you can redistribute it and/or
 
7
 * modify it under the terms of the GNU General Public
 
8
 * License as published by the Free Software Foundation; either
 
9
 * version 2 of the License, or (at your option) any later version.
 
10
 *
 
11
 * This program is distributed in the hope that it will be useful,
 
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
14
 * General Public License for more details.
 
15
 *
 
16
 * You should have received a copy of the GNU General Public
 
17
 * License along with this program; if not, write to the
 
18
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 
19
 * Boston, MA 021110-1307, USA.
 
20
 */
 
21
 
 
22
#include <linux/kernel.h>
 
23
#include <linux/sched.h>
 
24
#include <linux/jiffies.h>
 
25
#include <linux/module.h>
 
26
#include <linux/fs.h>
 
27
#include <linux/bio.h>
 
28
#include <linux/blkdev.h>
 
29
#include <linux/delay.h>
 
30
#include <linux/file.h>
 
31
#include <linux/kthread.h>
 
32
#include <linux/configfs.h>
 
33
#include <linux/random.h>
 
34
#include <linux/crc32.h>
 
35
#include <linux/time.h>
 
36
#include <linux/debugfs.h>
 
37
#include <linux/slab.h>
 
38
 
 
39
#include "heartbeat.h"
 
40
#include "tcp.h"
 
41
#include "nodemanager.h"
 
42
#include "quorum.h"
 
43
 
 
44
#include "masklog.h"
 
45
 
 
46
 
 
47
/*
 
48
 * The first heartbeat pass had one global thread that would serialize all hb
 
49
 * callback calls.  This global serializing sem should only be removed once
 
50
 * we've made sure that all callees can deal with being called concurrently
 
51
 * from multiple hb region threads.
 
52
 */
 
53
static DECLARE_RWSEM(o2hb_callback_sem);
 
54
 
 
55
/*
 
56
 * multiple hb threads are watching multiple regions.  A node is live
 
57
 * whenever any of the threads sees activity from the node in its region.
 
58
 */
 
59
static DEFINE_SPINLOCK(o2hb_live_lock);
 
60
static struct list_head o2hb_live_slots[O2NM_MAX_NODES];
 
61
static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
 
62
static LIST_HEAD(o2hb_node_events);
 
63
static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
 
64
 
 
65
/*
 
66
 * In global heartbeat, we maintain a series of region bitmaps.
 
67
 *      - o2hb_region_bitmap allows us to limit the region number to max region.
 
68
 *      - o2hb_live_region_bitmap tracks live regions (seen steady iterations).
 
69
 *      - o2hb_quorum_region_bitmap tracks live regions that have seen all nodes
 
70
 *              heartbeat on it.
 
71
 *      - o2hb_failed_region_bitmap tracks the regions that have seen io timeouts.
 
72
 */
 
73
static unsigned long o2hb_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
 
74
static unsigned long o2hb_live_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
 
75
static unsigned long o2hb_quorum_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
 
76
static unsigned long o2hb_failed_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
 
77
 
 
78
#define O2HB_DB_TYPE_LIVENODES          0
 
79
#define O2HB_DB_TYPE_LIVEREGIONS        1
 
80
#define O2HB_DB_TYPE_QUORUMREGIONS      2
 
81
#define O2HB_DB_TYPE_FAILEDREGIONS      3
 
82
#define O2HB_DB_TYPE_REGION_LIVENODES   4
 
83
#define O2HB_DB_TYPE_REGION_NUMBER      5
 
84
#define O2HB_DB_TYPE_REGION_ELAPSED_TIME        6
 
85
#define O2HB_DB_TYPE_REGION_PINNED      7
 
86
struct o2hb_debug_buf {
 
87
        int db_type;
 
88
        int db_size;
 
89
        int db_len;
 
90
        void *db_data;
 
91
};
 
92
 
 
93
static struct o2hb_debug_buf *o2hb_db_livenodes;
 
94
static struct o2hb_debug_buf *o2hb_db_liveregions;
 
95
static struct o2hb_debug_buf *o2hb_db_quorumregions;
 
96
static struct o2hb_debug_buf *o2hb_db_failedregions;
 
97
 
 
98
#define O2HB_DEBUG_DIR                  "o2hb"
 
99
#define O2HB_DEBUG_LIVENODES            "livenodes"
 
100
#define O2HB_DEBUG_LIVEREGIONS          "live_regions"
 
101
#define O2HB_DEBUG_QUORUMREGIONS        "quorum_regions"
 
102
#define O2HB_DEBUG_FAILEDREGIONS        "failed_regions"
 
103
#define O2HB_DEBUG_REGION_NUMBER        "num"
 
104
#define O2HB_DEBUG_REGION_ELAPSED_TIME  "elapsed_time_in_ms"
 
105
#define O2HB_DEBUG_REGION_PINNED        "pinned"
 
106
 
 
107
static struct dentry *o2hb_debug_dir;
 
108
static struct dentry *o2hb_debug_livenodes;
 
109
static struct dentry *o2hb_debug_liveregions;
 
110
static struct dentry *o2hb_debug_quorumregions;
 
111
static struct dentry *o2hb_debug_failedregions;
 
112
 
 
113
static LIST_HEAD(o2hb_all_regions);
 
114
 
 
115
static struct o2hb_callback {
 
116
        struct list_head list;
 
117
} o2hb_callbacks[O2HB_NUM_CB];
 
118
 
 
119
static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);
 
120
 
 
121
#define O2HB_DEFAULT_BLOCK_BITS       9
 
122
 
 
123
enum o2hb_heartbeat_modes {
 
124
        O2HB_HEARTBEAT_LOCAL            = 0,
 
125
        O2HB_HEARTBEAT_GLOBAL,
 
126
        O2HB_HEARTBEAT_NUM_MODES,
 
127
};
 
128
 
 
129
char *o2hb_heartbeat_mode_desc[O2HB_HEARTBEAT_NUM_MODES] = {
 
130
                "local",        /* O2HB_HEARTBEAT_LOCAL */
 
131
                "global",       /* O2HB_HEARTBEAT_GLOBAL */
 
132
};
 
133
 
 
134
unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;
 
135
unsigned int o2hb_heartbeat_mode = O2HB_HEARTBEAT_LOCAL;
 
136
 
 
137
/*
 
138
 * o2hb_dependent_users tracks the number of registered callbacks that depend
 
139
 * on heartbeat. o2net and o2dlm are two entities that register this callback.
 
140
 * However only o2dlm depends on the heartbeat. It does not want the heartbeat
 
141
 * to stop while a dlm domain is still active.
 
142
 */
 
143
unsigned int o2hb_dependent_users;
 
144
 
 
145
/*
 
146
 * In global heartbeat mode, all regions are pinned if there are one or more
 
147
 * dependent users and the quorum region count is <= O2HB_PIN_CUT_OFF. All
 
148
 * regions are unpinned if the region count exceeds the cut off or the number
 
149
 * of dependent users falls to zero.
 
150
 */
 
151
#define O2HB_PIN_CUT_OFF                3
 
152
 
 
153
/*
 
154
 * In local heartbeat mode, we assume the dlm domain name to be the same as
 
155
 * region uuid. This is true for domains created for the file system but not
 
156
 * necessarily true for userdlm domains. This is a known limitation.
 
157
 *
 
158
 * In global heartbeat mode, we pin/unpin all o2hb regions. This solution
 
159
 * works for both file system and userdlm domains.
 
160
 */
 
161
static int o2hb_region_pin(const char *region_uuid);
 
162
static void o2hb_region_unpin(const char *region_uuid);
 
163
 
 
164
/* Only sets a new threshold if there are no active regions.
 
165
 *
 
166
 * No locking or otherwise interesting code is required for reading
 
167
 * o2hb_dead_threshold as it can't change once regions are active and
 
168
 * it's not interesting to anyone until then anyway. */
 
169
static void o2hb_dead_threshold_set(unsigned int threshold)
 
170
{
 
171
        if (threshold > O2HB_MIN_DEAD_THRESHOLD) {
 
172
                spin_lock(&o2hb_live_lock);
 
173
                if (list_empty(&o2hb_all_regions))
 
174
                        o2hb_dead_threshold = threshold;
 
175
                spin_unlock(&o2hb_live_lock);
 
176
        }
 
177
}
 
178
 
 
179
static int o2hb_global_hearbeat_mode_set(unsigned int hb_mode)
 
180
{
 
181
        int ret = -1;
 
182
 
 
183
        if (hb_mode < O2HB_HEARTBEAT_NUM_MODES) {
 
184
                spin_lock(&o2hb_live_lock);
 
185
                if (list_empty(&o2hb_all_regions)) {
 
186
                        o2hb_heartbeat_mode = hb_mode;
 
187
                        ret = 0;
 
188
                }
 
189
                spin_unlock(&o2hb_live_lock);
 
190
        }
 
191
 
 
192
        return ret;
 
193
}
 
194
 
 
195
struct o2hb_node_event {
 
196
        struct list_head        hn_item;
 
197
        enum o2hb_callback_type hn_event_type;
 
198
        struct o2nm_node        *hn_node;
 
199
        int                     hn_node_num;
 
200
};
 
201
 
 
202
struct o2hb_disk_slot {
 
203
        struct o2hb_disk_heartbeat_block *ds_raw_block;
 
204
        u8                      ds_node_num;
 
205
        u64                     ds_last_time;
 
206
        u64                     ds_last_generation;
 
207
        u16                     ds_equal_samples;
 
208
        u16                     ds_changed_samples;
 
209
        struct list_head        ds_live_item;
 
210
};
 
211
 
 
212
/* each thread owns a region.. when we're asked to tear down the region
 
213
 * we ask the thread to stop, who cleans up the region */
 
214
struct o2hb_region {
 
215
        struct config_item      hr_item;
 
216
 
 
217
        struct list_head        hr_all_item;
 
218
        unsigned                hr_unclean_stop:1,
 
219
                                hr_aborted_start:1,
 
220
                                hr_item_pinned:1,
 
221
                                hr_item_dropped:1;
 
222
 
 
223
        /* protected by the hr_callback_sem */
 
224
        struct task_struct      *hr_task;
 
225
 
 
226
        unsigned int            hr_blocks;
 
227
        unsigned long long      hr_start_block;
 
228
 
 
229
        unsigned int            hr_block_bits;
 
230
        unsigned int            hr_block_bytes;
 
231
 
 
232
        unsigned int            hr_slots_per_page;
 
233
        unsigned int            hr_num_pages;
 
234
 
 
235
        struct page             **hr_slot_data;
 
236
        struct block_device     *hr_bdev;
 
237
        struct o2hb_disk_slot   *hr_slots;
 
238
 
 
239
        /* live node map of this region */
 
240
        unsigned long           hr_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
 
241
        unsigned int            hr_region_num;
 
242
 
 
243
        struct dentry           *hr_debug_dir;
 
244
        struct dentry           *hr_debug_livenodes;
 
245
        struct dentry           *hr_debug_regnum;
 
246
        struct dentry           *hr_debug_elapsed_time;
 
247
        struct dentry           *hr_debug_pinned;
 
248
        struct o2hb_debug_buf   *hr_db_livenodes;
 
249
        struct o2hb_debug_buf   *hr_db_regnum;
 
250
        struct o2hb_debug_buf   *hr_db_elapsed_time;
 
251
        struct o2hb_debug_buf   *hr_db_pinned;
 
252
 
 
253
        /* let the person setting up hb wait for it to return until it
 
254
         * has reached a 'steady' state.  This will be fixed when we have
 
255
         * a more complete api that doesn't lead to this sort of fragility. */
 
256
        atomic_t                hr_steady_iterations;
 
257
 
 
258
        /* terminate o2hb thread if it does not reach steady state
 
259
         * (hr_steady_iterations == 0) within hr_unsteady_iterations */
 
260
        atomic_t                hr_unsteady_iterations;
 
261
 
 
262
        char                    hr_dev_name[BDEVNAME_SIZE];
 
263
 
 
264
        unsigned int            hr_timeout_ms;
 
265
 
 
266
        /* randomized as the region goes up and down so that a node
 
267
         * recognizes a node going up and down in one iteration */
 
268
        u64                     hr_generation;
 
269
 
 
270
        struct delayed_work     hr_write_timeout_work;
 
271
        unsigned long           hr_last_timeout_start;
 
272
 
 
273
        /* Used during o2hb_check_slot to hold a copy of the block
 
274
         * being checked because we temporarily have to zero out the
 
275
         * crc field. */
 
276
        struct o2hb_disk_heartbeat_block *hr_tmp_block;
 
277
};
 
278
 
 
279
struct o2hb_bio_wait_ctxt {
 
280
        atomic_t          wc_num_reqs;
 
281
        struct completion wc_io_complete;
 
282
        int               wc_error;
 
283
};
 
284
 
 
285
static int o2hb_pop_count(void *map, int count)
 
286
{
 
287
        int i = -1, pop = 0;
 
288
 
 
289
        while ((i = find_next_bit(map, count, i + 1)) < count)
 
290
                pop++;
 
291
        return pop;
 
292
}
 
293
 
 
294
static void o2hb_write_timeout(struct work_struct *work)
 
295
{
 
296
        int failed, quorum;
 
297
        unsigned long flags;
 
298
        struct o2hb_region *reg =
 
299
                container_of(work, struct o2hb_region,
 
300
                             hr_write_timeout_work.work);
 
301
 
 
302
        mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "
 
303
             "milliseconds\n", reg->hr_dev_name,
 
304
             jiffies_to_msecs(jiffies - reg->hr_last_timeout_start));
 
305
 
 
306
        if (o2hb_global_heartbeat_active()) {
 
307
                spin_lock_irqsave(&o2hb_live_lock, flags);
 
308
                if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
 
309
                        set_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
 
310
                failed = o2hb_pop_count(&o2hb_failed_region_bitmap,
 
311
                                        O2NM_MAX_REGIONS);
 
312
                quorum = o2hb_pop_count(&o2hb_quorum_region_bitmap,
 
313
                                        O2NM_MAX_REGIONS);
 
314
                spin_unlock_irqrestore(&o2hb_live_lock, flags);
 
315
 
 
316
                mlog(ML_HEARTBEAT, "Number of regions %d, failed regions %d\n",
 
317
                     quorum, failed);
 
318
 
 
319
                /*
 
320
                 * Fence if the number of failed regions >= half the number
 
321
                 * of  quorum regions
 
322
                 */
 
323
                if ((failed << 1) < quorum)
 
324
                        return;
 
325
        }
 
326
 
 
327
        o2quo_disk_timeout();
 
328
}
 
329
 
 
330
static void o2hb_arm_write_timeout(struct o2hb_region *reg)
 
331
{
 
332
        /* Arm writeout only after thread reaches steady state */
 
333
        if (atomic_read(&reg->hr_steady_iterations) != 0)
 
334
                return;
 
335
 
 
336
        mlog(ML_HEARTBEAT, "Queue write timeout for %u ms\n",
 
337
             O2HB_MAX_WRITE_TIMEOUT_MS);
 
338
 
 
339
        if (o2hb_global_heartbeat_active()) {
 
340
                spin_lock(&o2hb_live_lock);
 
341
                clear_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
 
342
                spin_unlock(&o2hb_live_lock);
 
343
        }
 
344
        cancel_delayed_work(&reg->hr_write_timeout_work);
 
345
        reg->hr_last_timeout_start = jiffies;
 
346
        schedule_delayed_work(&reg->hr_write_timeout_work,
 
347
                              msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS));
 
348
}
 
349
 
 
350
static void o2hb_disarm_write_timeout(struct o2hb_region *reg)
 
351
{
 
352
        cancel_delayed_work_sync(&reg->hr_write_timeout_work);
 
353
}
 
354
 
 
355
static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc)
 
356
{
 
357
        atomic_set(&wc->wc_num_reqs, 1);
 
358
        init_completion(&wc->wc_io_complete);
 
359
        wc->wc_error = 0;
 
360
}
 
361
 
 
362
/* Used in error paths too */
 
363
static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc,
 
364
                                     unsigned int num)
 
365
{
 
366
        /* sadly atomic_sub_and_test() isn't available on all platforms.  The
 
367
         * good news is that the fast path only completes one at a time */
 
368
        while(num--) {
 
369
                if (atomic_dec_and_test(&wc->wc_num_reqs)) {
 
370
                        BUG_ON(num > 0);
 
371
                        complete(&wc->wc_io_complete);
 
372
                }
 
373
        }
 
374
}
 
375
 
 
376
static void o2hb_wait_on_io(struct o2hb_region *reg,
 
377
                            struct o2hb_bio_wait_ctxt *wc)
 
378
{
 
379
        o2hb_bio_wait_dec(wc, 1);
 
380
        wait_for_completion(&wc->wc_io_complete);
 
381
}
 
382
 
 
383
static void o2hb_bio_end_io(struct bio *bio,
 
384
                           int error)
 
385
{
 
386
        struct o2hb_bio_wait_ctxt *wc = bio->bi_private;
 
387
 
 
388
        if (error) {
 
389
                mlog(ML_ERROR, "IO Error %d\n", error);
 
390
                wc->wc_error = error;
 
391
        }
 
392
 
 
393
        o2hb_bio_wait_dec(wc, 1);
 
394
        bio_put(bio);
 
395
}
 
396
 
 
397
/* Setup a Bio to cover I/O against num_slots slots starting at
 
398
 * start_slot. */
 
399
static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
 
400
                                      struct o2hb_bio_wait_ctxt *wc,
 
401
                                      unsigned int *current_slot,
 
402
                                      unsigned int max_slots)
 
403
{
 
404
        int len, current_page;
 
405
        unsigned int vec_len, vec_start;
 
406
        unsigned int bits = reg->hr_block_bits;
 
407
        unsigned int spp = reg->hr_slots_per_page;
 
408
        unsigned int cs = *current_slot;
 
409
        struct bio *bio;
 
410
        struct page *page;
 
411
 
 
412
        /* Testing has shown this allocation to take long enough under
 
413
         * GFP_KERNEL that the local node can get fenced. It would be
 
414
         * nicest if we could pre-allocate these bios and avoid this
 
415
         * all together. */
 
416
        bio = bio_alloc(GFP_ATOMIC, 16);
 
417
        if (!bio) {
 
418
                mlog(ML_ERROR, "Could not alloc slots BIO!\n");
 
419
                bio = ERR_PTR(-ENOMEM);
 
420
                goto bail;
 
421
        }
 
422
 
 
423
        /* Must put everything in 512 byte sectors for the bio... */
 
424
        bio->bi_sector = (reg->hr_start_block + cs) << (bits - 9);
 
425
        bio->bi_bdev = reg->hr_bdev;
 
426
        bio->bi_private = wc;
 
427
        bio->bi_end_io = o2hb_bio_end_io;
 
428
 
 
429
        vec_start = (cs << bits) % PAGE_CACHE_SIZE;
 
430
        while(cs < max_slots) {
 
431
                current_page = cs / spp;
 
432
                page = reg->hr_slot_data[current_page];
 
433
 
 
434
                vec_len = min(PAGE_CACHE_SIZE - vec_start,
 
435
                              (max_slots-cs) * (PAGE_CACHE_SIZE/spp) );
 
436
 
 
437
                mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
 
438
                     current_page, vec_len, vec_start);
 
439
 
 
440
                len = bio_add_page(bio, page, vec_len, vec_start);
 
441
                if (len != vec_len) break;
 
442
 
 
443
                cs += vec_len / (PAGE_CACHE_SIZE/spp);
 
444
                vec_start = 0;
 
445
        }
 
446
 
 
447
bail:
 
448
        *current_slot = cs;
 
449
        return bio;
 
450
}
 
451
 
 
452
static int o2hb_read_slots(struct o2hb_region *reg,
 
453
                           unsigned int max_slots)
 
454
{
 
455
        unsigned int current_slot=0;
 
456
        int status;
 
457
        struct o2hb_bio_wait_ctxt wc;
 
458
        struct bio *bio;
 
459
 
 
460
        o2hb_bio_wait_init(&wc);
 
461
 
 
462
        while(current_slot < max_slots) {
 
463
                bio = o2hb_setup_one_bio(reg, &wc, &current_slot, max_slots);
 
464
                if (IS_ERR(bio)) {
 
465
                        status = PTR_ERR(bio);
 
466
                        mlog_errno(status);
 
467
                        goto bail_and_wait;
 
468
                }
 
469
 
 
470
                atomic_inc(&wc.wc_num_reqs);
 
471
                submit_bio(READ, bio);
 
472
        }
 
473
 
 
474
        status = 0;
 
475
 
 
476
bail_and_wait:
 
477
        o2hb_wait_on_io(reg, &wc);
 
478
        if (wc.wc_error && !status)
 
479
                status = wc.wc_error;
 
480
 
 
481
        return status;
 
482
}
 
483
 
 
484
static int o2hb_issue_node_write(struct o2hb_region *reg,
 
485
                                 struct o2hb_bio_wait_ctxt *write_wc)
 
486
{
 
487
        int status;
 
488
        unsigned int slot;
 
489
        struct bio *bio;
 
490
 
 
491
        o2hb_bio_wait_init(write_wc);
 
492
 
 
493
        slot = o2nm_this_node();
 
494
 
 
495
        bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1);
 
496
        if (IS_ERR(bio)) {
 
497
                status = PTR_ERR(bio);
 
498
                mlog_errno(status);
 
499
                goto bail;
 
500
        }
 
501
 
 
502
        atomic_inc(&write_wc->wc_num_reqs);
 
503
        submit_bio(WRITE, bio);
 
504
 
 
505
        status = 0;
 
506
bail:
 
507
        return status;
 
508
}
 
509
 
 
510
static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg,
 
511
                                     struct o2hb_disk_heartbeat_block *hb_block)
 
512
{
 
513
        __le32 old_cksum;
 
514
        u32 ret;
 
515
 
 
516
        /* We want to compute the block crc with a 0 value in the
 
517
         * hb_cksum field. Save it off here and replace after the
 
518
         * crc. */
 
519
        old_cksum = hb_block->hb_cksum;
 
520
        hb_block->hb_cksum = 0;
 
521
 
 
522
        ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes);
 
523
 
 
524
        hb_block->hb_cksum = old_cksum;
 
525
 
 
526
        return ret;
 
527
}
 
528
 
 
529
static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block)
 
530
{
 
531
        mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, "
 
532
             "cksum = 0x%x, generation 0x%llx\n",
 
533
             (long long)le64_to_cpu(hb_block->hb_seq),
 
534
             hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum),
 
535
             (long long)le64_to_cpu(hb_block->hb_generation));
 
536
}
 
537
 
 
538
static int o2hb_verify_crc(struct o2hb_region *reg,
 
539
                           struct o2hb_disk_heartbeat_block *hb_block)
 
540
{
 
541
        u32 read, computed;
 
542
 
 
543
        read = le32_to_cpu(hb_block->hb_cksum);
 
544
        computed = o2hb_compute_block_crc_le(reg, hb_block);
 
545
 
 
546
        return read == computed;
 
547
}
 
548
 
 
549
/*
 
550
 * Compare the slot data with what we wrote in the last iteration.
 
551
 * If the match fails, print an appropriate error message. This is to
 
552
 * detect errors like... another node hearting on the same slot,
 
553
 * flaky device that is losing writes, etc.
 
554
 * Returns 1 if check succeeds, 0 otherwise.
 
555
 */
 
556
static int o2hb_check_own_slot(struct o2hb_region *reg)
 
557
{
 
558
        struct o2hb_disk_slot *slot;
 
559
        struct o2hb_disk_heartbeat_block *hb_block;
 
560
        char *errstr;
 
561
 
 
562
        slot = &reg->hr_slots[o2nm_this_node()];
 
563
        /* Don't check on our 1st timestamp */
 
564
        if (!slot->ds_last_time)
 
565
                return 0;
 
566
 
 
567
        hb_block = slot->ds_raw_block;
 
568
        if (le64_to_cpu(hb_block->hb_seq) == slot->ds_last_time &&
 
569
            le64_to_cpu(hb_block->hb_generation) == slot->ds_last_generation &&
 
570
            hb_block->hb_node == slot->ds_node_num)
 
571
                return 1;
 
572
 
 
573
#define ERRSTR1         "Another node is heartbeating on device"
 
574
#define ERRSTR2         "Heartbeat generation mismatch on device"
 
575
#define ERRSTR3         "Heartbeat sequence mismatch on device"
 
576
 
 
577
        if (hb_block->hb_node != slot->ds_node_num)
 
578
                errstr = ERRSTR1;
 
579
        else if (le64_to_cpu(hb_block->hb_generation) !=
 
580
                 slot->ds_last_generation)
 
581
                errstr = ERRSTR2;
 
582
        else
 
583
                errstr = ERRSTR3;
 
584
 
 
585
        mlog(ML_ERROR, "%s (%s): expected(%u:0x%llx, 0x%llx), "
 
586
             "ondisk(%u:0x%llx, 0x%llx)\n", errstr, reg->hr_dev_name,
 
587
             slot->ds_node_num, (unsigned long long)slot->ds_last_generation,
 
588
             (unsigned long long)slot->ds_last_time, hb_block->hb_node,
 
589
             (unsigned long long)le64_to_cpu(hb_block->hb_generation),
 
590
             (unsigned long long)le64_to_cpu(hb_block->hb_seq));
 
591
 
 
592
        return 0;
 
593
}
 
594
 
 
595
static inline void o2hb_prepare_block(struct o2hb_region *reg,
 
596
                                      u64 generation)
 
597
{
 
598
        int node_num;
 
599
        u64 cputime;
 
600
        struct o2hb_disk_slot *slot;
 
601
        struct o2hb_disk_heartbeat_block *hb_block;
 
602
 
 
603
        node_num = o2nm_this_node();
 
604
        slot = &reg->hr_slots[node_num];
 
605
 
 
606
        hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block;
 
607
        memset(hb_block, 0, reg->hr_block_bytes);
 
608
        /* TODO: time stuff */
 
609
        cputime = CURRENT_TIME.tv_sec;
 
610
        if (!cputime)
 
611
                cputime = 1;
 
612
 
 
613
        hb_block->hb_seq = cpu_to_le64(cputime);
 
614
        hb_block->hb_node = node_num;
 
615
        hb_block->hb_generation = cpu_to_le64(generation);
 
616
        hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS);
 
617
 
 
618
        /* This step must always happen last! */
 
619
        hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg,
 
620
                                                                   hb_block));
 
621
 
 
622
        mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n",
 
623
             (long long)generation,
 
624
             le32_to_cpu(hb_block->hb_cksum));
 
625
}
 
626
 
 
627
static void o2hb_fire_callbacks(struct o2hb_callback *hbcall,
 
628
                                struct o2nm_node *node,
 
629
                                int idx)
 
630
{
 
631
        struct list_head *iter;
 
632
        struct o2hb_callback_func *f;
 
633
 
 
634
        list_for_each(iter, &hbcall->list) {
 
635
                f = list_entry(iter, struct o2hb_callback_func, hc_item);
 
636
                mlog(ML_HEARTBEAT, "calling funcs %p\n", f);
 
637
                (f->hc_func)(node, idx, f->hc_data);
 
638
        }
 
639
}
 
640
 
 
641
/* Will run the list in order until we process the passed event */
 
642
static void o2hb_run_event_list(struct o2hb_node_event *queued_event)
 
643
{
 
644
        int empty;
 
645
        struct o2hb_callback *hbcall;
 
646
        struct o2hb_node_event *event;
 
647
 
 
648
        spin_lock(&o2hb_live_lock);
 
649
        empty = list_empty(&queued_event->hn_item);
 
650
        spin_unlock(&o2hb_live_lock);
 
651
        if (empty)
 
652
                return;
 
653
 
 
654
        /* Holding callback sem assures we don't alter the callback
 
655
         * lists when doing this, and serializes ourselves with other
 
656
         * processes wanting callbacks. */
 
657
        down_write(&o2hb_callback_sem);
 
658
 
 
659
        spin_lock(&o2hb_live_lock);
 
660
        while (!list_empty(&o2hb_node_events)
 
661
               && !list_empty(&queued_event->hn_item)) {
 
662
                event = list_entry(o2hb_node_events.next,
 
663
                                   struct o2hb_node_event,
 
664
                                   hn_item);
 
665
                list_del_init(&event->hn_item);
 
666
                spin_unlock(&o2hb_live_lock);
 
667
 
 
668
                mlog(ML_HEARTBEAT, "Node %s event for %d\n",
 
669
                     event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN",
 
670
                     event->hn_node_num);
 
671
 
 
672
                hbcall = hbcall_from_type(event->hn_event_type);
 
673
 
 
674
                /* We should *never* have gotten on to the list with a
 
675
                 * bad type... This isn't something that we should try
 
676
                 * to recover from. */
 
677
                BUG_ON(IS_ERR(hbcall));
 
678
 
 
679
                o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num);
 
680
 
 
681
                spin_lock(&o2hb_live_lock);
 
682
        }
 
683
        spin_unlock(&o2hb_live_lock);
 
684
 
 
685
        up_write(&o2hb_callback_sem);
 
686
}
 
687
 
 
688
static void o2hb_queue_node_event(struct o2hb_node_event *event,
 
689
                                  enum o2hb_callback_type type,
 
690
                                  struct o2nm_node *node,
 
691
                                  int node_num)
 
692
{
 
693
        assert_spin_locked(&o2hb_live_lock);
 
694
 
 
695
        BUG_ON((!node) && (type != O2HB_NODE_DOWN_CB));
 
696
 
 
697
        event->hn_event_type = type;
 
698
        event->hn_node = node;
 
699
        event->hn_node_num = node_num;
 
700
 
 
701
        mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n",
 
702
             type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num);
 
703
 
 
704
        list_add_tail(&event->hn_item, &o2hb_node_events);
 
705
}
 
706
 
 
707
static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot)
 
708
{
 
709
        struct o2hb_node_event event =
 
710
                { .hn_item = LIST_HEAD_INIT(event.hn_item), };
 
711
        struct o2nm_node *node;
 
712
 
 
713
        node = o2nm_get_node_by_num(slot->ds_node_num);
 
714
        if (!node)
 
715
                return;
 
716
 
 
717
        spin_lock(&o2hb_live_lock);
 
718
        if (!list_empty(&slot->ds_live_item)) {
 
719
                mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n",
 
720
                     slot->ds_node_num);
 
721
 
 
722
                list_del_init(&slot->ds_live_item);
 
723
 
 
724
                if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
 
725
                        clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
 
726
 
 
727
                        o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
 
728
                                              slot->ds_node_num);
 
729
                }
 
730
        }
 
731
        spin_unlock(&o2hb_live_lock);
 
732
 
 
733
        o2hb_run_event_list(&event);
 
734
 
 
735
        o2nm_node_put(node);
 
736
}
 
737
 
 
738
static void o2hb_set_quorum_device(struct o2hb_region *reg)
 
739
{
 
740
        if (!o2hb_global_heartbeat_active())
 
741
                return;
 
742
 
 
743
        /* Prevent race with o2hb_heartbeat_group_drop_item() */
 
744
        if (kthread_should_stop())
 
745
                return;
 
746
 
 
747
        /* Tag region as quorum only after thread reaches steady state */
 
748
        if (atomic_read(&reg->hr_steady_iterations) != 0)
 
749
                return;
 
750
 
 
751
        spin_lock(&o2hb_live_lock);
 
752
 
 
753
        if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
 
754
                goto unlock;
 
755
 
 
756
        /*
 
757
         * A region can be added to the quorum only when it sees all
 
758
         * live nodes heartbeat on it. In other words, the region has been
 
759
         * added to all nodes.
 
760
         */
 
761
        if (memcmp(reg->hr_live_node_bitmap, o2hb_live_node_bitmap,
 
762
                   sizeof(o2hb_live_node_bitmap)))
 
763
                goto unlock;
 
764
 
 
765
        printk(KERN_NOTICE "o2hb: Region %s (%s) is now a quorum device\n",
 
766
               config_item_name(&reg->hr_item), reg->hr_dev_name);
 
767
 
 
768
        set_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
 
769
 
 
770
        /*
 
771
         * If global heartbeat active, unpin all regions if the
 
772
         * region count > CUT_OFF
 
773
         */
 
774
        if (o2hb_pop_count(&o2hb_quorum_region_bitmap,
 
775
                           O2NM_MAX_REGIONS) > O2HB_PIN_CUT_OFF)
 
776
                o2hb_region_unpin(NULL);
 
777
unlock:
 
778
        spin_unlock(&o2hb_live_lock);
 
779
}
 
780
 
 
781
static int o2hb_check_slot(struct o2hb_region *reg,
 
782
                           struct o2hb_disk_slot *slot)
 
783
{
 
784
        int changed = 0, gen_changed = 0;
 
785
        struct o2hb_node_event event =
 
786
                { .hn_item = LIST_HEAD_INIT(event.hn_item), };
 
787
        struct o2nm_node *node;
 
788
        struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block;
 
789
        u64 cputime;
 
790
        unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS;
 
791
        unsigned int slot_dead_ms;
 
792
        int tmp;
 
793
 
 
794
        memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
 
795
 
 
796
        /*
 
797
         * If a node is no longer configured but is still in the livemap, we
 
798
         * may need to clear that bit from the livemap.
 
799
         */
 
800
        node = o2nm_get_node_by_num(slot->ds_node_num);
 
801
        if (!node) {
 
802
                spin_lock(&o2hb_live_lock);
 
803
                tmp = test_bit(slot->ds_node_num, o2hb_live_node_bitmap);
 
804
                spin_unlock(&o2hb_live_lock);
 
805
                if (!tmp)
 
806
                        return 0;
 
807
        }
 
808
 
 
809
        if (!o2hb_verify_crc(reg, hb_block)) {
 
810
                /* all paths from here will drop o2hb_live_lock for
 
811
                 * us. */
 
812
                spin_lock(&o2hb_live_lock);
 
813
 
 
814
                /* Don't print an error on the console in this case -
 
815
                 * a freshly formatted heartbeat area will not have a
 
816
                 * crc set on it. */
 
817
                if (list_empty(&slot->ds_live_item))
 
818
                        goto out;
 
819
 
 
820
                /* The node is live but pushed out a bad crc. We
 
821
                 * consider it a transient miss but don't populate any
 
822
                 * other values as they may be junk. */
 
823
                mlog(ML_ERROR, "Node %d has written a bad crc to %s\n",
 
824
                     slot->ds_node_num, reg->hr_dev_name);
 
825
                o2hb_dump_slot(hb_block);
 
826
 
 
827
                slot->ds_equal_samples++;
 
828
                goto fire_callbacks;
 
829
        }
 
830
 
 
831
        /* we don't care if these wrap.. the state transitions below
 
832
         * clear at the right places */
 
833
        cputime = le64_to_cpu(hb_block->hb_seq);
 
834
        if (slot->ds_last_time != cputime)
 
835
                slot->ds_changed_samples++;
 
836
        else
 
837
                slot->ds_equal_samples++;
 
838
        slot->ds_last_time = cputime;
 
839
 
 
840
        /* The node changed heartbeat generations. We assume this to
 
841
         * mean it dropped off but came back before we timed out. We
 
842
         * want to consider it down for the time being but don't want
 
843
         * to lose any changed_samples state we might build up to
 
844
         * considering it live again. */
 
845
        if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) {
 
846
                gen_changed = 1;
 
847
                slot->ds_equal_samples = 0;
 
848
                mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx "
 
849
                     "to 0x%llx)\n", slot->ds_node_num,
 
850
                     (long long)slot->ds_last_generation,
 
851
                     (long long)le64_to_cpu(hb_block->hb_generation));
 
852
        }
 
853
 
 
854
        slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
 
855
 
 
856
        mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x "
 
857
             "seq %llu last %llu changed %u equal %u\n",
 
858
             slot->ds_node_num, (long long)slot->ds_last_generation,
 
859
             le32_to_cpu(hb_block->hb_cksum),
 
860
             (unsigned long long)le64_to_cpu(hb_block->hb_seq),
 
861
             (unsigned long long)slot->ds_last_time, slot->ds_changed_samples,
 
862
             slot->ds_equal_samples);
 
863
 
 
864
        spin_lock(&o2hb_live_lock);
 
865
 
 
866
fire_callbacks:
 
867
        /* dead nodes only come to life after some number of
 
868
         * changes at any time during their dead time */
 
869
        if (list_empty(&slot->ds_live_item) &&
 
870
            slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) {
 
871
                mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n",
 
872
                     slot->ds_node_num, (long long)slot->ds_last_generation);
 
873
 
 
874
                set_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
 
875
 
 
876
                /* first on the list generates a callback */
 
877
                if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
 
878
                        mlog(ML_HEARTBEAT, "o2hb: Add node %d to live nodes "
 
879
                             "bitmap\n", slot->ds_node_num);
 
880
                        set_bit(slot->ds_node_num, o2hb_live_node_bitmap);
 
881
 
 
882
                        o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node,
 
883
                                              slot->ds_node_num);
 
884
 
 
885
                        changed = 1;
 
886
                }
 
887
 
 
888
                list_add_tail(&slot->ds_live_item,
 
889
                              &o2hb_live_slots[slot->ds_node_num]);
 
890
 
 
891
                slot->ds_equal_samples = 0;
 
892
 
 
893
                /* We want to be sure that all nodes agree on the
 
894
                 * number of milliseconds before a node will be
 
895
                 * considered dead. The self-fencing timeout is
 
896
                 * computed from this value, and a discrepancy might
 
897
                 * result in heartbeat calling a node dead when it
 
898
                 * hasn't self-fenced yet. */
 
899
                slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms);
 
900
                if (slot_dead_ms && slot_dead_ms != dead_ms) {
 
901
                        /* TODO: Perhaps we can fail the region here. */
 
902
                        mlog(ML_ERROR, "Node %d on device %s has a dead count "
 
903
                             "of %u ms, but our count is %u ms.\n"
 
904
                             "Please double check your configuration values "
 
905
                             "for 'O2CB_HEARTBEAT_THRESHOLD'\n",
 
906
                             slot->ds_node_num, reg->hr_dev_name, slot_dead_ms,
 
907
                             dead_ms);
 
908
                }
 
909
                goto out;
 
910
        }
 
911
 
 
912
        /* if the list is dead, we're done.. */
 
913
        if (list_empty(&slot->ds_live_item))
 
914
                goto out;
 
915
 
 
916
        /* live nodes only go dead after enough consequtive missed
 
917
         * samples..  reset the missed counter whenever we see
 
918
         * activity */
 
919
        if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) {
 
920
                mlog(ML_HEARTBEAT, "Node %d left my region\n",
 
921
                     slot->ds_node_num);
 
922
 
 
923
                clear_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
 
924
 
 
925
                /* last off the live_slot generates a callback */
 
926
                list_del_init(&slot->ds_live_item);
 
927
                if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
 
928
                        mlog(ML_HEARTBEAT, "o2hb: Remove node %d from live "
 
929
                             "nodes bitmap\n", slot->ds_node_num);
 
930
                        clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
 
931
 
 
932
                        /* node can be null */
 
933
                        o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB,
 
934
                                              node, slot->ds_node_num);
 
935
 
 
936
                        changed = 1;
 
937
                }
 
938
 
 
939
                /* We don't clear this because the node is still
 
940
                 * actually writing new blocks. */
 
941
                if (!gen_changed)
 
942
                        slot->ds_changed_samples = 0;
 
943
                goto out;
 
944
        }
 
945
        if (slot->ds_changed_samples) {
 
946
                slot->ds_changed_samples = 0;
 
947
                slot->ds_equal_samples = 0;
 
948
        }
 
949
out:
 
950
        spin_unlock(&o2hb_live_lock);
 
951
 
 
952
        o2hb_run_event_list(&event);
 
953
 
 
954
        if (node)
 
955
                o2nm_node_put(node);
 
956
        return changed;
 
957
}
 
958
 
 
959
/* This could be faster if we just implmented a find_last_bit, but I
 
960
 * don't think the circumstances warrant it. */
 
961
static int o2hb_highest_node(unsigned long *nodes,
 
962
                             int numbits)
 
963
{
 
964
        int highest, node;
 
965
 
 
966
        highest = numbits;
 
967
        node = -1;
 
968
        while ((node = find_next_bit(nodes, numbits, node + 1)) != -1) {
 
969
                if (node >= numbits)
 
970
                        break;
 
971
 
 
972
                highest = node;
 
973
        }
 
974
 
 
975
        return highest;
 
976
}
 
977
 
 
978
static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
 
979
{
 
980
        int i, ret, highest_node;
 
981
        int membership_change = 0, own_slot_ok = 0;
 
982
        unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
 
983
        unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
 
984
        struct o2hb_bio_wait_ctxt write_wc;
 
985
 
 
986
        ret = o2nm_configured_node_map(configured_nodes,
 
987
                                       sizeof(configured_nodes));
 
988
        if (ret) {
 
989
                mlog_errno(ret);
 
990
                goto bail;
 
991
        }
 
992
 
 
993
        /*
 
994
         * If a node is not configured but is in the livemap, we still need
 
995
         * to read the slot so as to be able to remove it from the livemap.
 
996
         */
 
997
        o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap));
 
998
        i = -1;
 
999
        while ((i = find_next_bit(live_node_bitmap,
 
1000
                                  O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
 
1001
                set_bit(i, configured_nodes);
 
1002
        }
 
1003
 
 
1004
        highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
 
1005
        if (highest_node >= O2NM_MAX_NODES) {
 
1006
                mlog(ML_NOTICE, "o2hb: No configured nodes found!\n");
 
1007
                ret = -EINVAL;
 
1008
                goto bail;
 
1009
        }
 
1010
 
 
1011
        /* No sense in reading the slots of nodes that don't exist
 
1012
         * yet. Of course, if the node definitions have holes in them
 
1013
         * then we're reading an empty slot anyway... Consider this
 
1014
         * best-effort. */
 
1015
        ret = o2hb_read_slots(reg, highest_node + 1);
 
1016
        if (ret < 0) {
 
1017
                mlog_errno(ret);
 
1018
                goto bail;
 
1019
        }
 
1020
 
 
1021
        /* With an up to date view of the slots, we can check that no
 
1022
         * other node has been improperly configured to heartbeat in
 
1023
         * our slot. */
 
1024
        own_slot_ok = o2hb_check_own_slot(reg);
 
1025
 
 
1026
        /* fill in the proper info for our next heartbeat */
 
1027
        o2hb_prepare_block(reg, reg->hr_generation);
 
1028
 
 
1029
        ret = o2hb_issue_node_write(reg, &write_wc);
 
1030
        if (ret < 0) {
 
1031
                mlog_errno(ret);
 
1032
                goto bail;
 
1033
        }
 
1034
 
 
1035
        i = -1;
 
1036
        while((i = find_next_bit(configured_nodes,
 
1037
                                 O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
 
1038
                membership_change |= o2hb_check_slot(reg, &reg->hr_slots[i]);
 
1039
        }
 
1040
 
 
1041
        /*
 
1042
         * We have to be sure we've advertised ourselves on disk
 
1043
         * before we can go to steady state.  This ensures that
 
1044
         * people we find in our steady state have seen us.
 
1045
         */
 
1046
        o2hb_wait_on_io(reg, &write_wc);
 
1047
        if (write_wc.wc_error) {
 
1048
                /* Do not re-arm the write timeout on I/O error - we
 
1049
                 * can't be sure that the new block ever made it to
 
1050
                 * disk */
 
1051
                mlog(ML_ERROR, "Write error %d on device \"%s\"\n",
 
1052
                     write_wc.wc_error, reg->hr_dev_name);
 
1053
                ret = write_wc.wc_error;
 
1054
                goto bail;
 
1055
        }
 
1056
 
 
1057
        /* Skip disarming the timeout if own slot has stale/bad data */
 
1058
        if (own_slot_ok) {
 
1059
                o2hb_set_quorum_device(reg);
 
1060
                o2hb_arm_write_timeout(reg);
 
1061
        }
 
1062
 
 
1063
bail:
 
1064
        /* let the person who launched us know when things are steady */
 
1065
        if (atomic_read(&reg->hr_steady_iterations) != 0) {
 
1066
                if (!ret && own_slot_ok && !membership_change) {
 
1067
                        if (atomic_dec_and_test(&reg->hr_steady_iterations))
 
1068
                                wake_up(&o2hb_steady_queue);
 
1069
                }
 
1070
        }
 
1071
 
 
1072
        if (atomic_read(&reg->hr_steady_iterations) != 0) {
 
1073
                if (atomic_dec_and_test(&reg->hr_unsteady_iterations)) {
 
1074
                        printk(KERN_NOTICE "o2hb: Unable to stabilize "
 
1075
                               "heartbeart on region %s (%s)\n",
 
1076
                               config_item_name(&reg->hr_item),
 
1077
                               reg->hr_dev_name);
 
1078
                        atomic_set(&reg->hr_steady_iterations, 0);
 
1079
                        reg->hr_aborted_start = 1;
 
1080
                        wake_up(&o2hb_steady_queue);
 
1081
                        ret = -EIO;
 
1082
                }
 
1083
        }
 
1084
 
 
1085
        return ret;
 
1086
}
 
1087
 
 
1088
/* Subtract b from a, storing the result in a. a *must* have a larger
 
1089
 * value than b. */
 
1090
static void o2hb_tv_subtract(struct timeval *a,
 
1091
                             struct timeval *b)
 
1092
{
 
1093
        /* just return 0 when a is after b */
 
1094
        if (a->tv_sec < b->tv_sec ||
 
1095
            (a->tv_sec == b->tv_sec && a->tv_usec < b->tv_usec)) {
 
1096
                a->tv_sec = 0;
 
1097
                a->tv_usec = 0;
 
1098
                return;
 
1099
        }
 
1100
 
 
1101
        a->tv_sec -= b->tv_sec;
 
1102
        a->tv_usec -= b->tv_usec;
 
1103
        while ( a->tv_usec < 0 ) {
 
1104
                a->tv_sec--;
 
1105
                a->tv_usec += 1000000;
 
1106
        }
 
1107
}
 
1108
 
 
1109
static unsigned int o2hb_elapsed_msecs(struct timeval *start,
 
1110
                                       struct timeval *end)
 
1111
{
 
1112
        struct timeval res = *end;
 
1113
 
 
1114
        o2hb_tv_subtract(&res, start);
 
1115
 
 
1116
        return res.tv_sec * 1000 + res.tv_usec / 1000;
 
1117
}
 
1118
 
 
1119
/*
 
1120
 * we ride the region ref that the region dir holds.  before the region
 
1121
 * dir is removed and drops it ref it will wait to tear down this
 
1122
 * thread.
 
1123
 */
 
1124
static int o2hb_thread(void *data)
 
1125
{
 
1126
        int i, ret;
 
1127
        struct o2hb_region *reg = data;
 
1128
        struct o2hb_bio_wait_ctxt write_wc;
 
1129
        struct timeval before_hb, after_hb;
 
1130
        unsigned int elapsed_msec;
 
1131
 
 
1132
        mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n");
 
1133
 
 
1134
        set_user_nice(current, -20);
 
1135
 
 
1136
        /* Pin node */
 
1137
        o2nm_depend_this_node();
 
1138
 
 
1139
        while (!kthread_should_stop() &&
 
1140
               !reg->hr_unclean_stop && !reg->hr_aborted_start) {
 
1141
                /* We track the time spent inside
 
1142
                 * o2hb_do_disk_heartbeat so that we avoid more than
 
1143
                 * hr_timeout_ms between disk writes. On busy systems
 
1144
                 * this should result in a heartbeat which is less
 
1145
                 * likely to time itself out. */
 
1146
                do_gettimeofday(&before_hb);
 
1147
 
 
1148
                ret = o2hb_do_disk_heartbeat(reg);
 
1149
 
 
1150
                do_gettimeofday(&after_hb);
 
1151
                elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb);
 
1152
 
 
1153
                mlog(ML_HEARTBEAT,
 
1154
                     "start = %lu.%lu, end = %lu.%lu, msec = %u\n",
 
1155
                     before_hb.tv_sec, (unsigned long) before_hb.tv_usec,
 
1156
                     after_hb.tv_sec, (unsigned long) after_hb.tv_usec,
 
1157
                     elapsed_msec);
 
1158
 
 
1159
                if (!kthread_should_stop() &&
 
1160
                    elapsed_msec < reg->hr_timeout_ms) {
 
1161
                        /* the kthread api has blocked signals for us so no
 
1162
                         * need to record the return value. */
 
1163
                        msleep_interruptible(reg->hr_timeout_ms - elapsed_msec);
 
1164
                }
 
1165
        }
 
1166
 
 
1167
        o2hb_disarm_write_timeout(reg);
 
1168
 
 
1169
        /* unclean stop is only used in very bad situation */
 
1170
        for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++)
 
1171
                o2hb_shutdown_slot(&reg->hr_slots[i]);
 
1172
 
 
1173
        /* Explicit down notification - avoid forcing the other nodes
 
1174
         * to timeout on this region when we could just as easily
 
1175
         * write a clear generation - thus indicating to them that
 
1176
         * this node has left this region.
 
1177
         */
 
1178
        if (!reg->hr_unclean_stop && !reg->hr_aborted_start) {
 
1179
                o2hb_prepare_block(reg, 0);
 
1180
                ret = o2hb_issue_node_write(reg, &write_wc);
 
1181
                if (ret == 0)
 
1182
                        o2hb_wait_on_io(reg, &write_wc);
 
1183
                else
 
1184
                        mlog_errno(ret);
 
1185
        }
 
1186
 
 
1187
        /* Unpin node */
 
1188
        o2nm_undepend_this_node();
 
1189
 
 
1190
        mlog(ML_HEARTBEAT|ML_KTHREAD, "o2hb thread exiting\n");
 
1191
 
 
1192
        return 0;
 
1193
}
 
1194
 
 
1195
#ifdef CONFIG_DEBUG_FS
 
1196
static int o2hb_debug_open(struct inode *inode, struct file *file)
 
1197
{
 
1198
        struct o2hb_debug_buf *db = inode->i_private;
 
1199
        struct o2hb_region *reg;
 
1200
        unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)];
 
1201
        unsigned long lts;
 
1202
        char *buf = NULL;
 
1203
        int i = -1;
 
1204
        int out = 0;
 
1205
 
 
1206
        /* max_nodes should be the largest bitmap we pass here */
 
1207
        BUG_ON(sizeof(map) < db->db_size);
 
1208
 
 
1209
        buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
 
1210
        if (!buf)
 
1211
                goto bail;
 
1212
 
 
1213
        switch (db->db_type) {
 
1214
        case O2HB_DB_TYPE_LIVENODES:
 
1215
        case O2HB_DB_TYPE_LIVEREGIONS:
 
1216
        case O2HB_DB_TYPE_QUORUMREGIONS:
 
1217
        case O2HB_DB_TYPE_FAILEDREGIONS:
 
1218
                spin_lock(&o2hb_live_lock);
 
1219
                memcpy(map, db->db_data, db->db_size);
 
1220
                spin_unlock(&o2hb_live_lock);
 
1221
                break;
 
1222
 
 
1223
        case O2HB_DB_TYPE_REGION_LIVENODES:
 
1224
                spin_lock(&o2hb_live_lock);
 
1225
                reg = (struct o2hb_region *)db->db_data;
 
1226
                memcpy(map, reg->hr_live_node_bitmap, db->db_size);
 
1227
                spin_unlock(&o2hb_live_lock);
 
1228
                break;
 
1229
 
 
1230
        case O2HB_DB_TYPE_REGION_NUMBER:
 
1231
                reg = (struct o2hb_region *)db->db_data;
 
1232
                out += snprintf(buf + out, PAGE_SIZE - out, "%d\n",
 
1233
                                reg->hr_region_num);
 
1234
                goto done;
 
1235
 
 
1236
        case O2HB_DB_TYPE_REGION_ELAPSED_TIME:
 
1237
                reg = (struct o2hb_region *)db->db_data;
 
1238
                lts = reg->hr_last_timeout_start;
 
1239
                /* If 0, it has never been set before */
 
1240
                if (lts)
 
1241
                        lts = jiffies_to_msecs(jiffies - lts);
 
1242
                out += snprintf(buf + out, PAGE_SIZE - out, "%lu\n", lts);
 
1243
                goto done;
 
1244
 
 
1245
        case O2HB_DB_TYPE_REGION_PINNED:
 
1246
                reg = (struct o2hb_region *)db->db_data;
 
1247
                out += snprintf(buf + out, PAGE_SIZE - out, "%u\n",
 
1248
                                !!reg->hr_item_pinned);
 
1249
                goto done;
 
1250
 
 
1251
        default:
 
1252
                goto done;
 
1253
        }
 
1254
 
 
1255
        while ((i = find_next_bit(map, db->db_len, i + 1)) < db->db_len)
 
1256
                out += snprintf(buf + out, PAGE_SIZE - out, "%d ", i);
 
1257
        out += snprintf(buf + out, PAGE_SIZE - out, "\n");
 
1258
 
 
1259
done:
 
1260
        i_size_write(inode, out);
 
1261
 
 
1262
        file->private_data = buf;
 
1263
 
 
1264
        return 0;
 
1265
bail:
 
1266
        return -ENOMEM;
 
1267
}
 
1268
 
 
1269
static int o2hb_debug_release(struct inode *inode, struct file *file)
 
1270
{
 
1271
        kfree(file->private_data);
 
1272
        return 0;
 
1273
}
 
1274
 
 
1275
static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
 
1276
                                 size_t nbytes, loff_t *ppos)
 
1277
{
 
1278
        return simple_read_from_buffer(buf, nbytes, ppos, file->private_data,
 
1279
                                       i_size_read(file->f_mapping->host));
 
1280
}
 
1281
#else
 
1282
static int o2hb_debug_open(struct inode *inode, struct file *file)
 
1283
{
 
1284
        return 0;
 
1285
}
 
1286
static int o2hb_debug_release(struct inode *inode, struct file *file)
 
1287
{
 
1288
        return 0;
 
1289
}
 
1290
static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
 
1291
                               size_t nbytes, loff_t *ppos)
 
1292
{
 
1293
        return 0;
 
1294
}
 
1295
#endif  /* CONFIG_DEBUG_FS */
 
1296
 
 
1297
static const struct file_operations o2hb_debug_fops = {
 
1298
        .open =         o2hb_debug_open,
 
1299
        .release =      o2hb_debug_release,
 
1300
        .read =         o2hb_debug_read,
 
1301
        .llseek =       generic_file_llseek,
 
1302
};
 
1303
 
 
1304
void o2hb_exit(void)
 
1305
{
 
1306
        kfree(o2hb_db_livenodes);
 
1307
        kfree(o2hb_db_liveregions);
 
1308
        kfree(o2hb_db_quorumregions);
 
1309
        kfree(o2hb_db_failedregions);
 
1310
        debugfs_remove(o2hb_debug_failedregions);
 
1311
        debugfs_remove(o2hb_debug_quorumregions);
 
1312
        debugfs_remove(o2hb_debug_liveregions);
 
1313
        debugfs_remove(o2hb_debug_livenodes);
 
1314
        debugfs_remove(o2hb_debug_dir);
 
1315
}
 
1316
 
 
1317
static struct dentry *o2hb_debug_create(const char *name, struct dentry *dir,
 
1318
                                        struct o2hb_debug_buf **db, int db_len,
 
1319
                                        int type, int size, int len, void *data)
 
1320
{
 
1321
        *db = kmalloc(db_len, GFP_KERNEL);
 
1322
        if (!*db)
 
1323
                return NULL;
 
1324
 
 
1325
        (*db)->db_type = type;
 
1326
        (*db)->db_size = size;
 
1327
        (*db)->db_len = len;
 
1328
        (*db)->db_data = data;
 
1329
 
 
1330
        return debugfs_create_file(name, S_IFREG|S_IRUSR, dir, *db,
 
1331
                                   &o2hb_debug_fops);
 
1332
}
 
1333
 
 
1334
static int o2hb_debug_init(void)
 
1335
{
 
1336
        int ret = -ENOMEM;
 
1337
 
 
1338
        o2hb_debug_dir = debugfs_create_dir(O2HB_DEBUG_DIR, NULL);
 
1339
        if (!o2hb_debug_dir) {
 
1340
                mlog_errno(ret);
 
1341
                goto bail;
 
1342
        }
 
1343
 
 
1344
        o2hb_debug_livenodes = o2hb_debug_create(O2HB_DEBUG_LIVENODES,
 
1345
                                                 o2hb_debug_dir,
 
1346
                                                 &o2hb_db_livenodes,
 
1347
                                                 sizeof(*o2hb_db_livenodes),
 
1348
                                                 O2HB_DB_TYPE_LIVENODES,
 
1349
                                                 sizeof(o2hb_live_node_bitmap),
 
1350
                                                 O2NM_MAX_NODES,
 
1351
                                                 o2hb_live_node_bitmap);
 
1352
        if (!o2hb_debug_livenodes) {
 
1353
                mlog_errno(ret);
 
1354
                goto bail;
 
1355
        }
 
1356
 
 
1357
        o2hb_debug_liveregions = o2hb_debug_create(O2HB_DEBUG_LIVEREGIONS,
 
1358
                                                   o2hb_debug_dir,
 
1359
                                                   &o2hb_db_liveregions,
 
1360
                                                   sizeof(*o2hb_db_liveregions),
 
1361
                                                   O2HB_DB_TYPE_LIVEREGIONS,
 
1362
                                                   sizeof(o2hb_live_region_bitmap),
 
1363
                                                   O2NM_MAX_REGIONS,
 
1364
                                                   o2hb_live_region_bitmap);
 
1365
        if (!o2hb_debug_liveregions) {
 
1366
                mlog_errno(ret);
 
1367
                goto bail;
 
1368
        }
 
1369
 
 
1370
        o2hb_debug_quorumregions =
 
1371
                        o2hb_debug_create(O2HB_DEBUG_QUORUMREGIONS,
 
1372
                                          o2hb_debug_dir,
 
1373
                                          &o2hb_db_quorumregions,
 
1374
                                          sizeof(*o2hb_db_quorumregions),
 
1375
                                          O2HB_DB_TYPE_QUORUMREGIONS,
 
1376
                                          sizeof(o2hb_quorum_region_bitmap),
 
1377
                                          O2NM_MAX_REGIONS,
 
1378
                                          o2hb_quorum_region_bitmap);
 
1379
        if (!o2hb_debug_quorumregions) {
 
1380
                mlog_errno(ret);
 
1381
                goto bail;
 
1382
        }
 
1383
 
 
1384
        o2hb_debug_failedregions =
 
1385
                        o2hb_debug_create(O2HB_DEBUG_FAILEDREGIONS,
 
1386
                                          o2hb_debug_dir,
 
1387
                                          &o2hb_db_failedregions,
 
1388
                                          sizeof(*o2hb_db_failedregions),
 
1389
                                          O2HB_DB_TYPE_FAILEDREGIONS,
 
1390
                                          sizeof(o2hb_failed_region_bitmap),
 
1391
                                          O2NM_MAX_REGIONS,
 
1392
                                          o2hb_failed_region_bitmap);
 
1393
        if (!o2hb_debug_failedregions) {
 
1394
                mlog_errno(ret);
 
1395
                goto bail;
 
1396
        }
 
1397
 
 
1398
        ret = 0;
 
1399
bail:
 
1400
        if (ret)
 
1401
                o2hb_exit();
 
1402
 
 
1403
        return ret;
 
1404
}
 
1405
 
 
1406
int o2hb_init(void)
 
1407
{
 
1408
        int i;
 
1409
 
 
1410
        for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++)
 
1411
                INIT_LIST_HEAD(&o2hb_callbacks[i].list);
 
1412
 
 
1413
        for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++)
 
1414
                INIT_LIST_HEAD(&o2hb_live_slots[i]);
 
1415
 
 
1416
        INIT_LIST_HEAD(&o2hb_node_events);
 
1417
 
 
1418
        memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap));
 
1419
        memset(o2hb_region_bitmap, 0, sizeof(o2hb_region_bitmap));
 
1420
        memset(o2hb_live_region_bitmap, 0, sizeof(o2hb_live_region_bitmap));
 
1421
        memset(o2hb_quorum_region_bitmap, 0, sizeof(o2hb_quorum_region_bitmap));
 
1422
        memset(o2hb_failed_region_bitmap, 0, sizeof(o2hb_failed_region_bitmap));
 
1423
 
 
1424
        o2hb_dependent_users = 0;
 
1425
 
 
1426
        return o2hb_debug_init();
 
1427
}
 
1428
 
 
1429
/* if we're already in a callback then we're already serialized by the sem */
 
1430
static void o2hb_fill_node_map_from_callback(unsigned long *map,
 
1431
                                             unsigned bytes)
 
1432
{
 
1433
        BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
 
1434
 
 
1435
        memcpy(map, &o2hb_live_node_bitmap, bytes);
 
1436
}
 
1437
 
 
1438
/*
 
1439
 * get a map of all nodes that are heartbeating in any regions
 
1440
 */
 
1441
void o2hb_fill_node_map(unsigned long *map, unsigned bytes)
 
1442
{
 
1443
        /* callers want to serialize this map and callbacks so that they
 
1444
         * can trust that they don't miss nodes coming to the party */
 
1445
        down_read(&o2hb_callback_sem);
 
1446
        spin_lock(&o2hb_live_lock);
 
1447
        o2hb_fill_node_map_from_callback(map, bytes);
 
1448
        spin_unlock(&o2hb_live_lock);
 
1449
        up_read(&o2hb_callback_sem);
 
1450
}
 
1451
EXPORT_SYMBOL_GPL(o2hb_fill_node_map);
 
1452
 
 
1453
/*
 
1454
 * heartbeat configfs bits.  The heartbeat set is a default set under
 
1455
 * the cluster set in nodemanager.c.
 
1456
 */
 
1457
 
 
1458
static struct o2hb_region *to_o2hb_region(struct config_item *item)
 
1459
{
 
1460
        return item ? container_of(item, struct o2hb_region, hr_item) : NULL;
 
1461
}
 
1462
 
 
1463
/* drop_item only drops its ref after killing the thread, nothing should
 
1464
 * be using the region anymore.  this has to clean up any state that
 
1465
 * attributes might have built up. */
 
1466
static void o2hb_region_release(struct config_item *item)
 
1467
{
 
1468
        int i;
 
1469
        struct page *page;
 
1470
        struct o2hb_region *reg = to_o2hb_region(item);
 
1471
 
 
1472
        mlog(ML_HEARTBEAT, "hb region release (%s)\n", reg->hr_dev_name);
 
1473
 
 
1474
        if (reg->hr_tmp_block)
 
1475
                kfree(reg->hr_tmp_block);
 
1476
 
 
1477
        if (reg->hr_slot_data) {
 
1478
                for (i = 0; i < reg->hr_num_pages; i++) {
 
1479
                        page = reg->hr_slot_data[i];
 
1480
                        if (page)
 
1481
                                __free_page(page);
 
1482
                }
 
1483
                kfree(reg->hr_slot_data);
 
1484
        }
 
1485
 
 
1486
        if (reg->hr_bdev)
 
1487
                blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
 
1488
 
 
1489
        if (reg->hr_slots)
 
1490
                kfree(reg->hr_slots);
 
1491
 
 
1492
        kfree(reg->hr_db_regnum);
 
1493
        kfree(reg->hr_db_livenodes);
 
1494
        debugfs_remove(reg->hr_debug_livenodes);
 
1495
        debugfs_remove(reg->hr_debug_regnum);
 
1496
        debugfs_remove(reg->hr_debug_elapsed_time);
 
1497
        debugfs_remove(reg->hr_debug_pinned);
 
1498
        debugfs_remove(reg->hr_debug_dir);
 
1499
 
 
1500
        spin_lock(&o2hb_live_lock);
 
1501
        list_del(&reg->hr_all_item);
 
1502
        spin_unlock(&o2hb_live_lock);
 
1503
 
 
1504
        kfree(reg);
 
1505
}
 
1506
 
 
1507
static int o2hb_read_block_input(struct o2hb_region *reg,
 
1508
                                 const char *page,
 
1509
                                 size_t count,
 
1510
                                 unsigned long *ret_bytes,
 
1511
                                 unsigned int *ret_bits)
 
1512
{
 
1513
        unsigned long bytes;
 
1514
        char *p = (char *)page;
 
1515
 
 
1516
        bytes = simple_strtoul(p, &p, 0);
 
1517
        if (!p || (*p && (*p != '\n')))
 
1518
                return -EINVAL;
 
1519
 
 
1520
        /* Heartbeat and fs min / max block sizes are the same. */
 
1521
        if (bytes > 4096 || bytes < 512)
 
1522
                return -ERANGE;
 
1523
        if (hweight16(bytes) != 1)
 
1524
                return -EINVAL;
 
1525
 
 
1526
        if (ret_bytes)
 
1527
                *ret_bytes = bytes;
 
1528
        if (ret_bits)
 
1529
                *ret_bits = ffs(bytes) - 1;
 
1530
 
 
1531
        return 0;
 
1532
}
 
1533
 
 
1534
static ssize_t o2hb_region_block_bytes_read(struct o2hb_region *reg,
 
1535
                                            char *page)
 
1536
{
 
1537
        return sprintf(page, "%u\n", reg->hr_block_bytes);
 
1538
}
 
1539
 
 
1540
static ssize_t o2hb_region_block_bytes_write(struct o2hb_region *reg,
 
1541
                                             const char *page,
 
1542
                                             size_t count)
 
1543
{
 
1544
        int status;
 
1545
        unsigned long block_bytes;
 
1546
        unsigned int block_bits;
 
1547
 
 
1548
        if (reg->hr_bdev)
 
1549
                return -EINVAL;
 
1550
 
 
1551
        status = o2hb_read_block_input(reg, page, count,
 
1552
                                       &block_bytes, &block_bits);
 
1553
        if (status)
 
1554
                return status;
 
1555
 
 
1556
        reg->hr_block_bytes = (unsigned int)block_bytes;
 
1557
        reg->hr_block_bits = block_bits;
 
1558
 
 
1559
        return count;
 
1560
}
 
1561
 
 
1562
static ssize_t o2hb_region_start_block_read(struct o2hb_region *reg,
 
1563
                                            char *page)
 
1564
{
 
1565
        return sprintf(page, "%llu\n", reg->hr_start_block);
 
1566
}
 
1567
 
 
1568
static ssize_t o2hb_region_start_block_write(struct o2hb_region *reg,
 
1569
                                             const char *page,
 
1570
                                             size_t count)
 
1571
{
 
1572
        unsigned long long tmp;
 
1573
        char *p = (char *)page;
 
1574
 
 
1575
        if (reg->hr_bdev)
 
1576
                return -EINVAL;
 
1577
 
 
1578
        tmp = simple_strtoull(p, &p, 0);
 
1579
        if (!p || (*p && (*p != '\n')))
 
1580
                return -EINVAL;
 
1581
 
 
1582
        reg->hr_start_block = tmp;
 
1583
 
 
1584
        return count;
 
1585
}
 
1586
 
 
1587
static ssize_t o2hb_region_blocks_read(struct o2hb_region *reg,
 
1588
                                       char *page)
 
1589
{
 
1590
        return sprintf(page, "%d\n", reg->hr_blocks);
 
1591
}
 
1592
 
 
1593
static ssize_t o2hb_region_blocks_write(struct o2hb_region *reg,
 
1594
                                        const char *page,
 
1595
                                        size_t count)
 
1596
{
 
1597
        unsigned long tmp;
 
1598
        char *p = (char *)page;
 
1599
 
 
1600
        if (reg->hr_bdev)
 
1601
                return -EINVAL;
 
1602
 
 
1603
        tmp = simple_strtoul(p, &p, 0);
 
1604
        if (!p || (*p && (*p != '\n')))
 
1605
                return -EINVAL;
 
1606
 
 
1607
        if (tmp > O2NM_MAX_NODES || tmp == 0)
 
1608
                return -ERANGE;
 
1609
 
 
1610
        reg->hr_blocks = (unsigned int)tmp;
 
1611
 
 
1612
        return count;
 
1613
}
 
1614
 
 
1615
static ssize_t o2hb_region_dev_read(struct o2hb_region *reg,
 
1616
                                    char *page)
 
1617
{
 
1618
        unsigned int ret = 0;
 
1619
 
 
1620
        if (reg->hr_bdev)
 
1621
                ret = sprintf(page, "%s\n", reg->hr_dev_name);
 
1622
 
 
1623
        return ret;
 
1624
}
 
1625
 
 
1626
static void o2hb_init_region_params(struct o2hb_region *reg)
 
1627
{
 
1628
        reg->hr_slots_per_page = PAGE_CACHE_SIZE >> reg->hr_block_bits;
 
1629
        reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS;
 
1630
 
 
1631
        mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n",
 
1632
             reg->hr_start_block, reg->hr_blocks);
 
1633
        mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n",
 
1634
             reg->hr_block_bytes, reg->hr_block_bits);
 
1635
        mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms);
 
1636
        mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold);
 
1637
}
 
1638
 
 
1639
static int o2hb_map_slot_data(struct o2hb_region *reg)
 
1640
{
 
1641
        int i, j;
 
1642
        unsigned int last_slot;
 
1643
        unsigned int spp = reg->hr_slots_per_page;
 
1644
        struct page *page;
 
1645
        char *raw;
 
1646
        struct o2hb_disk_slot *slot;
 
1647
 
 
1648
        reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL);
 
1649
        if (reg->hr_tmp_block == NULL) {
 
1650
                mlog_errno(-ENOMEM);
 
1651
                return -ENOMEM;
 
1652
        }
 
1653
 
 
1654
        reg->hr_slots = kcalloc(reg->hr_blocks,
 
1655
                                sizeof(struct o2hb_disk_slot), GFP_KERNEL);
 
1656
        if (reg->hr_slots == NULL) {
 
1657
                mlog_errno(-ENOMEM);
 
1658
                return -ENOMEM;
 
1659
        }
 
1660
 
 
1661
        for(i = 0; i < reg->hr_blocks; i++) {
 
1662
                slot = &reg->hr_slots[i];
 
1663
                slot->ds_node_num = i;
 
1664
                INIT_LIST_HEAD(&slot->ds_live_item);
 
1665
                slot->ds_raw_block = NULL;
 
1666
        }
 
1667
 
 
1668
        reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp;
 
1669
        mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks "
 
1670
                           "at %u blocks per page\n",
 
1671
             reg->hr_num_pages, reg->hr_blocks, spp);
 
1672
 
 
1673
        reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *),
 
1674
                                    GFP_KERNEL);
 
1675
        if (!reg->hr_slot_data) {
 
1676
                mlog_errno(-ENOMEM);
 
1677
                return -ENOMEM;
 
1678
        }
 
1679
 
 
1680
        for(i = 0; i < reg->hr_num_pages; i++) {
 
1681
                page = alloc_page(GFP_KERNEL);
 
1682
                if (!page) {
 
1683
                        mlog_errno(-ENOMEM);
 
1684
                        return -ENOMEM;
 
1685
                }
 
1686
 
 
1687
                reg->hr_slot_data[i] = page;
 
1688
 
 
1689
                last_slot = i * spp;
 
1690
                raw = page_address(page);
 
1691
                for (j = 0;
 
1692
                     (j < spp) && ((j + last_slot) < reg->hr_blocks);
 
1693
                     j++) {
 
1694
                        BUG_ON((j + last_slot) >= reg->hr_blocks);
 
1695
 
 
1696
                        slot = &reg->hr_slots[j + last_slot];
 
1697
                        slot->ds_raw_block =
 
1698
                                (struct o2hb_disk_heartbeat_block *) raw;
 
1699
 
 
1700
                        raw += reg->hr_block_bytes;
 
1701
                }
 
1702
        }
 
1703
 
 
1704
        return 0;
 
1705
}
 
1706
 
 
1707
/* Read in all the slots available and populate the tracking
 
1708
 * structures so that we can start with a baseline idea of what's
 
1709
 * there. */
 
1710
static int o2hb_populate_slot_data(struct o2hb_region *reg)
 
1711
{
 
1712
        int ret, i;
 
1713
        struct o2hb_disk_slot *slot;
 
1714
        struct o2hb_disk_heartbeat_block *hb_block;
 
1715
 
 
1716
        ret = o2hb_read_slots(reg, reg->hr_blocks);
 
1717
        if (ret) {
 
1718
                mlog_errno(ret);
 
1719
                goto out;
 
1720
        }
 
1721
 
 
1722
        /* We only want to get an idea of the values initially in each
 
1723
         * slot, so we do no verification - o2hb_check_slot will
 
1724
         * actually determine if each configured slot is valid and
 
1725
         * whether any values have changed. */
 
1726
        for(i = 0; i < reg->hr_blocks; i++) {
 
1727
                slot = &reg->hr_slots[i];
 
1728
                hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block;
 
1729
 
 
1730
                /* Only fill the values that o2hb_check_slot uses to
 
1731
                 * determine changing slots */
 
1732
                slot->ds_last_time = le64_to_cpu(hb_block->hb_seq);
 
1733
                slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
 
1734
        }
 
1735
 
 
1736
out:
 
1737
        return ret;
 
1738
}
 
1739
 
 
1740
/* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */
 
1741
static ssize_t o2hb_region_dev_write(struct o2hb_region *reg,
 
1742
                                     const char *page,
 
1743
                                     size_t count)
 
1744
{
 
1745
        struct task_struct *hb_task;
 
1746
        long fd;
 
1747
        int sectsize;
 
1748
        char *p = (char *)page;
 
1749
        struct file *filp = NULL;
 
1750
        struct inode *inode = NULL;
 
1751
        ssize_t ret = -EINVAL;
 
1752
        int live_threshold;
 
1753
 
 
1754
        if (reg->hr_bdev)
 
1755
                goto out;
 
1756
 
 
1757
        /* We can't heartbeat without having had our node number
 
1758
         * configured yet. */
 
1759
        if (o2nm_this_node() == O2NM_MAX_NODES)
 
1760
                goto out;
 
1761
 
 
1762
        fd = simple_strtol(p, &p, 0);
 
1763
        if (!p || (*p && (*p != '\n')))
 
1764
                goto out;
 
1765
 
 
1766
        if (fd < 0 || fd >= INT_MAX)
 
1767
                goto out;
 
1768
 
 
1769
        filp = fget(fd);
 
1770
        if (filp == NULL)
 
1771
                goto out;
 
1772
 
 
1773
        if (reg->hr_blocks == 0 || reg->hr_start_block == 0 ||
 
1774
            reg->hr_block_bytes == 0)
 
1775
                goto out;
 
1776
 
 
1777
        inode = igrab(filp->f_mapping->host);
 
1778
        if (inode == NULL)
 
1779
                goto out;
 
1780
 
 
1781
        if (!S_ISBLK(inode->i_mode))
 
1782
                goto out;
 
1783
 
 
1784
        reg->hr_bdev = I_BDEV(filp->f_mapping->host);
 
1785
        ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, NULL);
 
1786
        if (ret) {
 
1787
                reg->hr_bdev = NULL;
 
1788
                goto out;
 
1789
        }
 
1790
        inode = NULL;
 
1791
 
 
1792
        bdevname(reg->hr_bdev, reg->hr_dev_name);
 
1793
 
 
1794
        sectsize = bdev_logical_block_size(reg->hr_bdev);
 
1795
        if (sectsize != reg->hr_block_bytes) {
 
1796
                mlog(ML_ERROR,
 
1797
                     "blocksize %u incorrect for device, expected %d",
 
1798
                     reg->hr_block_bytes, sectsize);
 
1799
                ret = -EINVAL;
 
1800
                goto out;
 
1801
        }
 
1802
 
 
1803
        o2hb_init_region_params(reg);
 
1804
 
 
1805
        /* Generation of zero is invalid */
 
1806
        do {
 
1807
                get_random_bytes(&reg->hr_generation,
 
1808
                                 sizeof(reg->hr_generation));
 
1809
        } while (reg->hr_generation == 0);
 
1810
 
 
1811
        ret = o2hb_map_slot_data(reg);
 
1812
        if (ret) {
 
1813
                mlog_errno(ret);
 
1814
                goto out;
 
1815
        }
 
1816
 
 
1817
        ret = o2hb_populate_slot_data(reg);
 
1818
        if (ret) {
 
1819
                mlog_errno(ret);
 
1820
                goto out;
 
1821
        }
 
1822
 
 
1823
        INIT_DELAYED_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout);
 
1824
 
 
1825
        /*
 
1826
         * A node is considered live after it has beat LIVE_THRESHOLD
 
1827
         * times.  We're not steady until we've given them a chance
 
1828
         * _after_ our first read.
 
1829
         * The default threshold is bare minimum so as to limit the delay
 
1830
         * during mounts. For global heartbeat, the threshold doubled for the
 
1831
         * first region.
 
1832
         */
 
1833
        live_threshold = O2HB_LIVE_THRESHOLD;
 
1834
        if (o2hb_global_heartbeat_active()) {
 
1835
                spin_lock(&o2hb_live_lock);
 
1836
                if (o2hb_pop_count(&o2hb_region_bitmap, O2NM_MAX_REGIONS) == 1)
 
1837
                        live_threshold <<= 1;
 
1838
                spin_unlock(&o2hb_live_lock);
 
1839
        }
 
1840
        ++live_threshold;
 
1841
        atomic_set(&reg->hr_steady_iterations, live_threshold);
 
1842
        /* unsteady_iterations is double the steady_iterations */
 
1843
        atomic_set(&reg->hr_unsteady_iterations, (live_threshold << 1));
 
1844
 
 
1845
        hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s",
 
1846
                              reg->hr_item.ci_name);
 
1847
        if (IS_ERR(hb_task)) {
 
1848
                ret = PTR_ERR(hb_task);
 
1849
                mlog_errno(ret);
 
1850
                goto out;
 
1851
        }
 
1852
 
 
1853
        spin_lock(&o2hb_live_lock);
 
1854
        reg->hr_task = hb_task;
 
1855
        spin_unlock(&o2hb_live_lock);
 
1856
 
 
1857
        ret = wait_event_interruptible(o2hb_steady_queue,
 
1858
                                atomic_read(&reg->hr_steady_iterations) == 0);
 
1859
        if (ret) {
 
1860
                atomic_set(&reg->hr_steady_iterations, 0);
 
1861
                reg->hr_aborted_start = 1;
 
1862
        }
 
1863
 
 
1864
        if (reg->hr_aborted_start) {
 
1865
                ret = -EIO;
 
1866
                goto out;
 
1867
        }
 
1868
 
 
1869
        /* Ok, we were woken.  Make sure it wasn't by drop_item() */
 
1870
        spin_lock(&o2hb_live_lock);
 
1871
        hb_task = reg->hr_task;
 
1872
        if (o2hb_global_heartbeat_active())
 
1873
                set_bit(reg->hr_region_num, o2hb_live_region_bitmap);
 
1874
        spin_unlock(&o2hb_live_lock);
 
1875
 
 
1876
        if (hb_task)
 
1877
                ret = count;
 
1878
        else
 
1879
                ret = -EIO;
 
1880
 
 
1881
        if (hb_task && o2hb_global_heartbeat_active())
 
1882
                printk(KERN_NOTICE "o2hb: Heartbeat started on region %s (%s)\n",
 
1883
                       config_item_name(&reg->hr_item), reg->hr_dev_name);
 
1884
 
 
1885
out:
 
1886
        if (filp)
 
1887
                fput(filp);
 
1888
        if (inode)
 
1889
                iput(inode);
 
1890
        if (ret < 0) {
 
1891
                if (reg->hr_bdev) {
 
1892
                        blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
 
1893
                        reg->hr_bdev = NULL;
 
1894
                }
 
1895
        }
 
1896
        return ret;
 
1897
}
 
1898
 
 
1899
static ssize_t o2hb_region_pid_read(struct o2hb_region *reg,
 
1900
                                      char *page)
 
1901
{
 
1902
        pid_t pid = 0;
 
1903
 
 
1904
        spin_lock(&o2hb_live_lock);
 
1905
        if (reg->hr_task)
 
1906
                pid = task_pid_nr(reg->hr_task);
 
1907
        spin_unlock(&o2hb_live_lock);
 
1908
 
 
1909
        if (!pid)
 
1910
                return 0;
 
1911
 
 
1912
        return sprintf(page, "%u\n", pid);
 
1913
}
 
1914
 
 
1915
struct o2hb_region_attribute {
 
1916
        struct configfs_attribute attr;
 
1917
        ssize_t (*show)(struct o2hb_region *, char *);
 
1918
        ssize_t (*store)(struct o2hb_region *, const char *, size_t);
 
1919
};
 
1920
 
 
1921
static struct o2hb_region_attribute o2hb_region_attr_block_bytes = {
 
1922
        .attr   = { .ca_owner = THIS_MODULE,
 
1923
                    .ca_name = "block_bytes",
 
1924
                    .ca_mode = S_IRUGO | S_IWUSR },
 
1925
        .show   = o2hb_region_block_bytes_read,
 
1926
        .store  = o2hb_region_block_bytes_write,
 
1927
};
 
1928
 
 
1929
static struct o2hb_region_attribute o2hb_region_attr_start_block = {
 
1930
        .attr   = { .ca_owner = THIS_MODULE,
 
1931
                    .ca_name = "start_block",
 
1932
                    .ca_mode = S_IRUGO | S_IWUSR },
 
1933
        .show   = o2hb_region_start_block_read,
 
1934
        .store  = o2hb_region_start_block_write,
 
1935
};
 
1936
 
 
1937
static struct o2hb_region_attribute o2hb_region_attr_blocks = {
 
1938
        .attr   = { .ca_owner = THIS_MODULE,
 
1939
                    .ca_name = "blocks",
 
1940
                    .ca_mode = S_IRUGO | S_IWUSR },
 
1941
        .show   = o2hb_region_blocks_read,
 
1942
        .store  = o2hb_region_blocks_write,
 
1943
};
 
1944
 
 
1945
static struct o2hb_region_attribute o2hb_region_attr_dev = {
 
1946
        .attr   = { .ca_owner = THIS_MODULE,
 
1947
                    .ca_name = "dev",
 
1948
                    .ca_mode = S_IRUGO | S_IWUSR },
 
1949
        .show   = o2hb_region_dev_read,
 
1950
        .store  = o2hb_region_dev_write,
 
1951
};
 
1952
 
 
1953
static struct o2hb_region_attribute o2hb_region_attr_pid = {
 
1954
       .attr   = { .ca_owner = THIS_MODULE,
 
1955
                   .ca_name = "pid",
 
1956
                   .ca_mode = S_IRUGO | S_IRUSR },
 
1957
       .show   = o2hb_region_pid_read,
 
1958
};
 
1959
 
 
1960
static struct configfs_attribute *o2hb_region_attrs[] = {
 
1961
        &o2hb_region_attr_block_bytes.attr,
 
1962
        &o2hb_region_attr_start_block.attr,
 
1963
        &o2hb_region_attr_blocks.attr,
 
1964
        &o2hb_region_attr_dev.attr,
 
1965
        &o2hb_region_attr_pid.attr,
 
1966
        NULL,
 
1967
};
 
1968
 
 
1969
static ssize_t o2hb_region_show(struct config_item *item,
 
1970
                                struct configfs_attribute *attr,
 
1971
                                char *page)
 
1972
{
 
1973
        struct o2hb_region *reg = to_o2hb_region(item);
 
1974
        struct o2hb_region_attribute *o2hb_region_attr =
 
1975
                container_of(attr, struct o2hb_region_attribute, attr);
 
1976
        ssize_t ret = 0;
 
1977
 
 
1978
        if (o2hb_region_attr->show)
 
1979
                ret = o2hb_region_attr->show(reg, page);
 
1980
        return ret;
 
1981
}
 
1982
 
 
1983
static ssize_t o2hb_region_store(struct config_item *item,
 
1984
                                 struct configfs_attribute *attr,
 
1985
                                 const char *page, size_t count)
 
1986
{
 
1987
        struct o2hb_region *reg = to_o2hb_region(item);
 
1988
        struct o2hb_region_attribute *o2hb_region_attr =
 
1989
                container_of(attr, struct o2hb_region_attribute, attr);
 
1990
        ssize_t ret = -EINVAL;
 
1991
 
 
1992
        if (o2hb_region_attr->store)
 
1993
                ret = o2hb_region_attr->store(reg, page, count);
 
1994
        return ret;
 
1995
}
 
1996
 
 
1997
static struct configfs_item_operations o2hb_region_item_ops = {
 
1998
        .release                = o2hb_region_release,
 
1999
        .show_attribute         = o2hb_region_show,
 
2000
        .store_attribute        = o2hb_region_store,
 
2001
};
 
2002
 
 
2003
static struct config_item_type o2hb_region_type = {
 
2004
        .ct_item_ops    = &o2hb_region_item_ops,
 
2005
        .ct_attrs       = o2hb_region_attrs,
 
2006
        .ct_owner       = THIS_MODULE,
 
2007
};
 
2008
 
 
2009
/* heartbeat set */
 
2010
 
 
2011
struct o2hb_heartbeat_group {
 
2012
        struct config_group hs_group;
 
2013
        /* some stuff? */
 
2014
};
 
2015
 
 
2016
static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group)
 
2017
{
 
2018
        return group ?
 
2019
                container_of(group, struct o2hb_heartbeat_group, hs_group)
 
2020
                : NULL;
 
2021
}
 
2022
 
 
2023
static int o2hb_debug_region_init(struct o2hb_region *reg, struct dentry *dir)
 
2024
{
 
2025
        int ret = -ENOMEM;
 
2026
 
 
2027
        reg->hr_debug_dir =
 
2028
                debugfs_create_dir(config_item_name(&reg->hr_item), dir);
 
2029
        if (!reg->hr_debug_dir) {
 
2030
                mlog_errno(ret);
 
2031
                goto bail;
 
2032
        }
 
2033
 
 
2034
        reg->hr_debug_livenodes =
 
2035
                        o2hb_debug_create(O2HB_DEBUG_LIVENODES,
 
2036
                                          reg->hr_debug_dir,
 
2037
                                          &(reg->hr_db_livenodes),
 
2038
                                          sizeof(*(reg->hr_db_livenodes)),
 
2039
                                          O2HB_DB_TYPE_REGION_LIVENODES,
 
2040
                                          sizeof(reg->hr_live_node_bitmap),
 
2041
                                          O2NM_MAX_NODES, reg);
 
2042
        if (!reg->hr_debug_livenodes) {
 
2043
                mlog_errno(ret);
 
2044
                goto bail;
 
2045
        }
 
2046
 
 
2047
        reg->hr_debug_regnum =
 
2048
                        o2hb_debug_create(O2HB_DEBUG_REGION_NUMBER,
 
2049
                                          reg->hr_debug_dir,
 
2050
                                          &(reg->hr_db_regnum),
 
2051
                                          sizeof(*(reg->hr_db_regnum)),
 
2052
                                          O2HB_DB_TYPE_REGION_NUMBER,
 
2053
                                          0, O2NM_MAX_NODES, reg);
 
2054
        if (!reg->hr_debug_regnum) {
 
2055
                mlog_errno(ret);
 
2056
                goto bail;
 
2057
        }
 
2058
 
 
2059
        reg->hr_debug_elapsed_time =
 
2060
                        o2hb_debug_create(O2HB_DEBUG_REGION_ELAPSED_TIME,
 
2061
                                          reg->hr_debug_dir,
 
2062
                                          &(reg->hr_db_elapsed_time),
 
2063
                                          sizeof(*(reg->hr_db_elapsed_time)),
 
2064
                                          O2HB_DB_TYPE_REGION_ELAPSED_TIME,
 
2065
                                          0, 0, reg);
 
2066
        if (!reg->hr_debug_elapsed_time) {
 
2067
                mlog_errno(ret);
 
2068
                goto bail;
 
2069
        }
 
2070
 
 
2071
        reg->hr_debug_pinned =
 
2072
                        o2hb_debug_create(O2HB_DEBUG_REGION_PINNED,
 
2073
                                          reg->hr_debug_dir,
 
2074
                                          &(reg->hr_db_pinned),
 
2075
                                          sizeof(*(reg->hr_db_pinned)),
 
2076
                                          O2HB_DB_TYPE_REGION_PINNED,
 
2077
                                          0, 0, reg);
 
2078
        if (!reg->hr_debug_pinned) {
 
2079
                mlog_errno(ret);
 
2080
                goto bail;
 
2081
        }
 
2082
 
 
2083
        ret = 0;
 
2084
bail:
 
2085
        return ret;
 
2086
}
 
2087
 
 
2088
static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group,
 
2089
                                                          const char *name)
 
2090
{
 
2091
        struct o2hb_region *reg = NULL;
 
2092
        int ret;
 
2093
 
 
2094
        reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL);
 
2095
        if (reg == NULL)
 
2096
                return ERR_PTR(-ENOMEM);
 
2097
 
 
2098
        if (strlen(name) > O2HB_MAX_REGION_NAME_LEN) {
 
2099
                ret = -ENAMETOOLONG;
 
2100
                goto free;
 
2101
        }
 
2102
 
 
2103
        spin_lock(&o2hb_live_lock);
 
2104
        reg->hr_region_num = 0;
 
2105
        if (o2hb_global_heartbeat_active()) {
 
2106
                reg->hr_region_num = find_first_zero_bit(o2hb_region_bitmap,
 
2107
                                                         O2NM_MAX_REGIONS);
 
2108
                if (reg->hr_region_num >= O2NM_MAX_REGIONS) {
 
2109
                        spin_unlock(&o2hb_live_lock);
 
2110
                        ret = -EFBIG;
 
2111
                        goto free;
 
2112
                }
 
2113
                set_bit(reg->hr_region_num, o2hb_region_bitmap);
 
2114
        }
 
2115
        list_add_tail(&reg->hr_all_item, &o2hb_all_regions);
 
2116
        spin_unlock(&o2hb_live_lock);
 
2117
 
 
2118
        config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
 
2119
 
 
2120
        ret = o2hb_debug_region_init(reg, o2hb_debug_dir);
 
2121
        if (ret) {
 
2122
                config_item_put(&reg->hr_item);
 
2123
                goto free;
 
2124
        }
 
2125
 
 
2126
        return &reg->hr_item;
 
2127
free:
 
2128
        kfree(reg);
 
2129
        return ERR_PTR(ret);
 
2130
}
 
2131
 
 
2132
static void o2hb_heartbeat_group_drop_item(struct config_group *group,
 
2133
                                           struct config_item *item)
 
2134
{
 
2135
        struct task_struct *hb_task;
 
2136
        struct o2hb_region *reg = to_o2hb_region(item);
 
2137
        int quorum_region = 0;
 
2138
 
 
2139
        /* stop the thread when the user removes the region dir */
 
2140
        spin_lock(&o2hb_live_lock);
 
2141
        hb_task = reg->hr_task;
 
2142
        reg->hr_task = NULL;
 
2143
        reg->hr_item_dropped = 1;
 
2144
        spin_unlock(&o2hb_live_lock);
 
2145
 
 
2146
        if (hb_task)
 
2147
                kthread_stop(hb_task);
 
2148
 
 
2149
        if (o2hb_global_heartbeat_active()) {
 
2150
                spin_lock(&o2hb_live_lock);
 
2151
                clear_bit(reg->hr_region_num, o2hb_region_bitmap);
 
2152
                clear_bit(reg->hr_region_num, o2hb_live_region_bitmap);
 
2153
                if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
 
2154
                        quorum_region = 1;
 
2155
                clear_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
 
2156
                spin_unlock(&o2hb_live_lock);
 
2157
                printk(KERN_NOTICE "o2hb: Heartbeat %s on region %s (%s)\n",
 
2158
                       ((atomic_read(&reg->hr_steady_iterations) == 0) ?
 
2159
                        "stopped" : "start aborted"), config_item_name(item),
 
2160
                       reg->hr_dev_name);
 
2161
        }
 
2162
 
 
2163
        /*
 
2164
         * If we're racing a dev_write(), we need to wake them.  They will
 
2165
         * check reg->hr_task
 
2166
         */
 
2167
        if (atomic_read(&reg->hr_steady_iterations) != 0) {
 
2168
                reg->hr_aborted_start = 1;
 
2169
                atomic_set(&reg->hr_steady_iterations, 0);
 
2170
                wake_up(&o2hb_steady_queue);
 
2171
        }
 
2172
 
 
2173
        config_item_put(item);
 
2174
 
 
2175
        if (!o2hb_global_heartbeat_active() || !quorum_region)
 
2176
                return;
 
2177
 
 
2178
        /*
 
2179
         * If global heartbeat active and there are dependent users,
 
2180
         * pin all regions if quorum region count <= CUT_OFF
 
2181
         */
 
2182
        spin_lock(&o2hb_live_lock);
 
2183
 
 
2184
        if (!o2hb_dependent_users)
 
2185
                goto unlock;
 
2186
 
 
2187
        if (o2hb_pop_count(&o2hb_quorum_region_bitmap,
 
2188
                           O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
 
2189
                o2hb_region_pin(NULL);
 
2190
 
 
2191
unlock:
 
2192
        spin_unlock(&o2hb_live_lock);
 
2193
}
 
2194
 
 
2195
struct o2hb_heartbeat_group_attribute {
 
2196
        struct configfs_attribute attr;
 
2197
        ssize_t (*show)(struct o2hb_heartbeat_group *, char *);
 
2198
        ssize_t (*store)(struct o2hb_heartbeat_group *, const char *, size_t);
 
2199
};
 
2200
 
 
2201
static ssize_t o2hb_heartbeat_group_show(struct config_item *item,
 
2202
                                         struct configfs_attribute *attr,
 
2203
                                         char *page)
 
2204
{
 
2205
        struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item));
 
2206
        struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr =
 
2207
                container_of(attr, struct o2hb_heartbeat_group_attribute, attr);
 
2208
        ssize_t ret = 0;
 
2209
 
 
2210
        if (o2hb_heartbeat_group_attr->show)
 
2211
                ret = o2hb_heartbeat_group_attr->show(reg, page);
 
2212
        return ret;
 
2213
}
 
2214
 
 
2215
static ssize_t o2hb_heartbeat_group_store(struct config_item *item,
 
2216
                                          struct configfs_attribute *attr,
 
2217
                                          const char *page, size_t count)
 
2218
{
 
2219
        struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item));
 
2220
        struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr =
 
2221
                container_of(attr, struct o2hb_heartbeat_group_attribute, attr);
 
2222
        ssize_t ret = -EINVAL;
 
2223
 
 
2224
        if (o2hb_heartbeat_group_attr->store)
 
2225
                ret = o2hb_heartbeat_group_attr->store(reg, page, count);
 
2226
        return ret;
 
2227
}
 
2228
 
 
2229
static ssize_t o2hb_heartbeat_group_threshold_show(struct o2hb_heartbeat_group *group,
 
2230
                                                     char *page)
 
2231
{
 
2232
        return sprintf(page, "%u\n", o2hb_dead_threshold);
 
2233
}
 
2234
 
 
2235
static ssize_t o2hb_heartbeat_group_threshold_store(struct o2hb_heartbeat_group *group,
 
2236
                                                    const char *page,
 
2237
                                                    size_t count)
 
2238
{
 
2239
        unsigned long tmp;
 
2240
        char *p = (char *)page;
 
2241
 
 
2242
        tmp = simple_strtoul(p, &p, 10);
 
2243
        if (!p || (*p && (*p != '\n')))
 
2244
                return -EINVAL;
 
2245
 
 
2246
        /* this will validate ranges for us. */
 
2247
        o2hb_dead_threshold_set((unsigned int) tmp);
 
2248
 
 
2249
        return count;
 
2250
}
 
2251
 
 
2252
static
 
2253
ssize_t o2hb_heartbeat_group_mode_show(struct o2hb_heartbeat_group *group,
 
2254
                                       char *page)
 
2255
{
 
2256
        return sprintf(page, "%s\n",
 
2257
                       o2hb_heartbeat_mode_desc[o2hb_heartbeat_mode]);
 
2258
}
 
2259
 
 
2260
static
 
2261
ssize_t o2hb_heartbeat_group_mode_store(struct o2hb_heartbeat_group *group,
 
2262
                                        const char *page, size_t count)
 
2263
{
 
2264
        unsigned int i;
 
2265
        int ret;
 
2266
        size_t len;
 
2267
 
 
2268
        len = (page[count - 1] == '\n') ? count - 1 : count;
 
2269
        if (!len)
 
2270
                return -EINVAL;
 
2271
 
 
2272
        for (i = 0; i < O2HB_HEARTBEAT_NUM_MODES; ++i) {
 
2273
                if (strnicmp(page, o2hb_heartbeat_mode_desc[i], len))
 
2274
                        continue;
 
2275
 
 
2276
                ret = o2hb_global_hearbeat_mode_set(i);
 
2277
                if (!ret)
 
2278
                        printk(KERN_NOTICE "o2hb: Heartbeat mode set to %s\n",
 
2279
                               o2hb_heartbeat_mode_desc[i]);
 
2280
                return count;
 
2281
        }
 
2282
 
 
2283
        return -EINVAL;
 
2284
 
 
2285
}
 
2286
 
 
2287
static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_threshold = {
 
2288
        .attr   = { .ca_owner = THIS_MODULE,
 
2289
                    .ca_name = "dead_threshold",
 
2290
                    .ca_mode = S_IRUGO | S_IWUSR },
 
2291
        .show   = o2hb_heartbeat_group_threshold_show,
 
2292
        .store  = o2hb_heartbeat_group_threshold_store,
 
2293
};
 
2294
 
 
2295
static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_mode = {
 
2296
        .attr   = { .ca_owner = THIS_MODULE,
 
2297
                .ca_name = "mode",
 
2298
                .ca_mode = S_IRUGO | S_IWUSR },
 
2299
        .show   = o2hb_heartbeat_group_mode_show,
 
2300
        .store  = o2hb_heartbeat_group_mode_store,
 
2301
};
 
2302
 
 
2303
static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = {
 
2304
        &o2hb_heartbeat_group_attr_threshold.attr,
 
2305
        &o2hb_heartbeat_group_attr_mode.attr,
 
2306
        NULL,
 
2307
};
 
2308
 
 
2309
static struct configfs_item_operations o2hb_hearbeat_group_item_ops = {
 
2310
        .show_attribute         = o2hb_heartbeat_group_show,
 
2311
        .store_attribute        = o2hb_heartbeat_group_store,
 
2312
};
 
2313
 
 
2314
static struct configfs_group_operations o2hb_heartbeat_group_group_ops = {
 
2315
        .make_item      = o2hb_heartbeat_group_make_item,
 
2316
        .drop_item      = o2hb_heartbeat_group_drop_item,
 
2317
};
 
2318
 
 
2319
static struct config_item_type o2hb_heartbeat_group_type = {
 
2320
        .ct_group_ops   = &o2hb_heartbeat_group_group_ops,
 
2321
        .ct_item_ops    = &o2hb_hearbeat_group_item_ops,
 
2322
        .ct_attrs       = o2hb_heartbeat_group_attrs,
 
2323
        .ct_owner       = THIS_MODULE,
 
2324
};
 
2325
 
 
2326
/* this is just here to avoid touching group in heartbeat.h which the
 
2327
 * entire damn world #includes */
 
2328
struct config_group *o2hb_alloc_hb_set(void)
 
2329
{
 
2330
        struct o2hb_heartbeat_group *hs = NULL;
 
2331
        struct config_group *ret = NULL;
 
2332
 
 
2333
        hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
 
2334
        if (hs == NULL)
 
2335
                goto out;
 
2336
 
 
2337
        config_group_init_type_name(&hs->hs_group, "heartbeat",
 
2338
                                    &o2hb_heartbeat_group_type);
 
2339
 
 
2340
        ret = &hs->hs_group;
 
2341
out:
 
2342
        if (ret == NULL)
 
2343
                kfree(hs);
 
2344
        return ret;
 
2345
}
 
2346
 
 
2347
void o2hb_free_hb_set(struct config_group *group)
 
2348
{
 
2349
        struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group);
 
2350
        kfree(hs);
 
2351
}
 
2352
 
 
2353
/* hb callback registration and issuing */
 
2354
 
 
2355
static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type)
 
2356
{
 
2357
        if (type == O2HB_NUM_CB)
 
2358
                return ERR_PTR(-EINVAL);
 
2359
 
 
2360
        return &o2hb_callbacks[type];
 
2361
}
 
2362
 
 
2363
void o2hb_setup_callback(struct o2hb_callback_func *hc,
 
2364
                         enum o2hb_callback_type type,
 
2365
                         o2hb_cb_func *func,
 
2366
                         void *data,
 
2367
                         int priority)
 
2368
{
 
2369
        INIT_LIST_HEAD(&hc->hc_item);
 
2370
        hc->hc_func = func;
 
2371
        hc->hc_data = data;
 
2372
        hc->hc_priority = priority;
 
2373
        hc->hc_type = type;
 
2374
        hc->hc_magic = O2HB_CB_MAGIC;
 
2375
}
 
2376
EXPORT_SYMBOL_GPL(o2hb_setup_callback);
 
2377
 
 
2378
/*
 
2379
 * In local heartbeat mode, region_uuid passed matches the dlm domain name.
 
2380
 * In global heartbeat mode, region_uuid passed is NULL.
 
2381
 *
 
2382
 * In local, we only pin the matching region. In global we pin all the active
 
2383
 * regions.
 
2384
 */
 
2385
static int o2hb_region_pin(const char *region_uuid)
 
2386
{
 
2387
        int ret = 0, found = 0;
 
2388
        struct o2hb_region *reg;
 
2389
        char *uuid;
 
2390
 
 
2391
        assert_spin_locked(&o2hb_live_lock);
 
2392
 
 
2393
        list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
 
2394
                uuid = config_item_name(&reg->hr_item);
 
2395
 
 
2396
                /* local heartbeat */
 
2397
                if (region_uuid) {
 
2398
                        if (strcmp(region_uuid, uuid))
 
2399
                                continue;
 
2400
                        found = 1;
 
2401
                }
 
2402
 
 
2403
                if (reg->hr_item_pinned || reg->hr_item_dropped)
 
2404
                        goto skip_pin;
 
2405
 
 
2406
                /* Ignore ENOENT only for local hb (userdlm domain) */
 
2407
                ret = o2nm_depend_item(&reg->hr_item);
 
2408
                if (!ret) {
 
2409
                        mlog(ML_CLUSTER, "Pin region %s\n", uuid);
 
2410
                        reg->hr_item_pinned = 1;
 
2411
                } else {
 
2412
                        if (ret == -ENOENT && found)
 
2413
                                ret = 0;
 
2414
                        else {
 
2415
                                mlog(ML_ERROR, "Pin region %s fails with %d\n",
 
2416
                                     uuid, ret);
 
2417
                                break;
 
2418
                        }
 
2419
                }
 
2420
skip_pin:
 
2421
                if (found)
 
2422
                        break;
 
2423
        }
 
2424
 
 
2425
        return ret;
 
2426
}
 
2427
 
 
2428
/*
 
2429
 * In local heartbeat mode, region_uuid passed matches the dlm domain name.
 
2430
 * In global heartbeat mode, region_uuid passed is NULL.
 
2431
 *
 
2432
 * In local, we only unpin the matching region. In global we unpin all the
 
2433
 * active regions.
 
2434
 */
 
2435
static void o2hb_region_unpin(const char *region_uuid)
 
2436
{
 
2437
        struct o2hb_region *reg;
 
2438
        char *uuid;
 
2439
        int found = 0;
 
2440
 
 
2441
        assert_spin_locked(&o2hb_live_lock);
 
2442
 
 
2443
        list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
 
2444
                uuid = config_item_name(&reg->hr_item);
 
2445
                if (region_uuid) {
 
2446
                        if (strcmp(region_uuid, uuid))
 
2447
                                continue;
 
2448
                        found = 1;
 
2449
                }
 
2450
 
 
2451
                if (reg->hr_item_pinned) {
 
2452
                        mlog(ML_CLUSTER, "Unpin region %s\n", uuid);
 
2453
                        o2nm_undepend_item(&reg->hr_item);
 
2454
                        reg->hr_item_pinned = 0;
 
2455
                }
 
2456
                if (found)
 
2457
                        break;
 
2458
        }
 
2459
}
 
2460
 
 
2461
static int o2hb_region_inc_user(const char *region_uuid)
 
2462
{
 
2463
        int ret = 0;
 
2464
 
 
2465
        spin_lock(&o2hb_live_lock);
 
2466
 
 
2467
        /* local heartbeat */
 
2468
        if (!o2hb_global_heartbeat_active()) {
 
2469
            ret = o2hb_region_pin(region_uuid);
 
2470
            goto unlock;
 
2471
        }
 
2472
 
 
2473
        /*
 
2474
         * if global heartbeat active and this is the first dependent user,
 
2475
         * pin all regions if quorum region count <= CUT_OFF
 
2476
         */
 
2477
        o2hb_dependent_users++;
 
2478
        if (o2hb_dependent_users > 1)
 
2479
                goto unlock;
 
2480
 
 
2481
        if (o2hb_pop_count(&o2hb_quorum_region_bitmap,
 
2482
                           O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
 
2483
                ret = o2hb_region_pin(NULL);
 
2484
 
 
2485
unlock:
 
2486
        spin_unlock(&o2hb_live_lock);
 
2487
        return ret;
 
2488
}
 
2489
 
 
2490
void o2hb_region_dec_user(const char *region_uuid)
 
2491
{
 
2492
        spin_lock(&o2hb_live_lock);
 
2493
 
 
2494
        /* local heartbeat */
 
2495
        if (!o2hb_global_heartbeat_active()) {
 
2496
            o2hb_region_unpin(region_uuid);
 
2497
            goto unlock;
 
2498
        }
 
2499
 
 
2500
        /*
 
2501
         * if global heartbeat active and there are no dependent users,
 
2502
         * unpin all quorum regions
 
2503
         */
 
2504
        o2hb_dependent_users--;
 
2505
        if (!o2hb_dependent_users)
 
2506
                o2hb_region_unpin(NULL);
 
2507
 
 
2508
unlock:
 
2509
        spin_unlock(&o2hb_live_lock);
 
2510
}
 
2511
 
 
2512
int o2hb_register_callback(const char *region_uuid,
 
2513
                           struct o2hb_callback_func *hc)
 
2514
{
 
2515
        struct o2hb_callback_func *tmp;
 
2516
        struct list_head *iter;
 
2517
        struct o2hb_callback *hbcall;
 
2518
        int ret;
 
2519
 
 
2520
        BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
 
2521
        BUG_ON(!list_empty(&hc->hc_item));
 
2522
 
 
2523
        hbcall = hbcall_from_type(hc->hc_type);
 
2524
        if (IS_ERR(hbcall)) {
 
2525
                ret = PTR_ERR(hbcall);
 
2526
                goto out;
 
2527
        }
 
2528
 
 
2529
        if (region_uuid) {
 
2530
                ret = o2hb_region_inc_user(region_uuid);
 
2531
                if (ret) {
 
2532
                        mlog_errno(ret);
 
2533
                        goto out;
 
2534
                }
 
2535
        }
 
2536
 
 
2537
        down_write(&o2hb_callback_sem);
 
2538
 
 
2539
        list_for_each(iter, &hbcall->list) {
 
2540
                tmp = list_entry(iter, struct o2hb_callback_func, hc_item);
 
2541
                if (hc->hc_priority < tmp->hc_priority) {
 
2542
                        list_add_tail(&hc->hc_item, iter);
 
2543
                        break;
 
2544
                }
 
2545
        }
 
2546
        if (list_empty(&hc->hc_item))
 
2547
                list_add_tail(&hc->hc_item, &hbcall->list);
 
2548
 
 
2549
        up_write(&o2hb_callback_sem);
 
2550
        ret = 0;
 
2551
out:
 
2552
        mlog(ML_CLUSTER, "returning %d on behalf of %p for funcs %p\n",
 
2553
             ret, __builtin_return_address(0), hc);
 
2554
        return ret;
 
2555
}
 
2556
EXPORT_SYMBOL_GPL(o2hb_register_callback);
 
2557
 
 
2558
void o2hb_unregister_callback(const char *region_uuid,
 
2559
                              struct o2hb_callback_func *hc)
 
2560
{
 
2561
        BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
 
2562
 
 
2563
        mlog(ML_CLUSTER, "on behalf of %p for funcs %p\n",
 
2564
             __builtin_return_address(0), hc);
 
2565
 
 
2566
        /* XXX Can this happen _with_ a region reference? */
 
2567
        if (list_empty(&hc->hc_item))
 
2568
                return;
 
2569
 
 
2570
        if (region_uuid)
 
2571
                o2hb_region_dec_user(region_uuid);
 
2572
 
 
2573
        down_write(&o2hb_callback_sem);
 
2574
 
 
2575
        list_del_init(&hc->hc_item);
 
2576
 
 
2577
        up_write(&o2hb_callback_sem);
 
2578
}
 
2579
EXPORT_SYMBOL_GPL(o2hb_unregister_callback);
 
2580
 
 
2581
int o2hb_check_node_heartbeating(u8 node_num)
 
2582
{
 
2583
        unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
 
2584
 
 
2585
        o2hb_fill_node_map(testing_map, sizeof(testing_map));
 
2586
        if (!test_bit(node_num, testing_map)) {
 
2587
                mlog(ML_HEARTBEAT,
 
2588
                     "node (%u) does not have heartbeating enabled.\n",
 
2589
                     node_num);
 
2590
                return 0;
 
2591
        }
 
2592
 
 
2593
        return 1;
 
2594
}
 
2595
EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating);
 
2596
 
 
2597
int o2hb_check_node_heartbeating_from_callback(u8 node_num)
 
2598
{
 
2599
        unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
 
2600
 
 
2601
        o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
 
2602
        if (!test_bit(node_num, testing_map)) {
 
2603
                mlog(ML_HEARTBEAT,
 
2604
                     "node (%u) does not have heartbeating enabled.\n",
 
2605
                     node_num);
 
2606
                return 0;
 
2607
        }
 
2608
 
 
2609
        return 1;
 
2610
}
 
2611
EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback);
 
2612
 
 
2613
/* Makes sure our local node is configured with a node number, and is
 
2614
 * heartbeating. */
 
2615
int o2hb_check_local_node_heartbeating(void)
 
2616
{
 
2617
        u8 node_num;
 
2618
 
 
2619
        /* if this node was set then we have networking */
 
2620
        node_num = o2nm_this_node();
 
2621
        if (node_num == O2NM_MAX_NODES) {
 
2622
                mlog(ML_HEARTBEAT, "this node has not been configured.\n");
 
2623
                return 0;
 
2624
        }
 
2625
 
 
2626
        return o2hb_check_node_heartbeating(node_num);
 
2627
}
 
2628
EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating);
 
2629
 
 
2630
/*
 
2631
 * this is just a hack until we get the plumbing which flips file systems
 
2632
 * read only and drops the hb ref instead of killing the node dead.
 
2633
 */
 
2634
void o2hb_stop_all_regions(void)
 
2635
{
 
2636
        struct o2hb_region *reg;
 
2637
 
 
2638
        mlog(ML_ERROR, "stopping heartbeat on all active regions.\n");
 
2639
 
 
2640
        spin_lock(&o2hb_live_lock);
 
2641
 
 
2642
        list_for_each_entry(reg, &o2hb_all_regions, hr_all_item)
 
2643
                reg->hr_unclean_stop = 1;
 
2644
 
 
2645
        spin_unlock(&o2hb_live_lock);
 
2646
}
 
2647
EXPORT_SYMBOL_GPL(o2hb_stop_all_regions);
 
2648
 
 
2649
int o2hb_get_all_regions(char *region_uuids, u8 max_regions)
 
2650
{
 
2651
        struct o2hb_region *reg;
 
2652
        int numregs = 0;
 
2653
        char *p;
 
2654
 
 
2655
        spin_lock(&o2hb_live_lock);
 
2656
 
 
2657
        p = region_uuids;
 
2658
        list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
 
2659
                mlog(0, "Region: %s\n", config_item_name(&reg->hr_item));
 
2660
                if (numregs < max_regions) {
 
2661
                        memcpy(p, config_item_name(&reg->hr_item),
 
2662
                               O2HB_MAX_REGION_NAME_LEN);
 
2663
                        p += O2HB_MAX_REGION_NAME_LEN;
 
2664
                }
 
2665
                numregs++;
 
2666
        }
 
2667
 
 
2668
        spin_unlock(&o2hb_live_lock);
 
2669
 
 
2670
        return numregs;
 
2671
}
 
2672
EXPORT_SYMBOL_GPL(o2hb_get_all_regions);
 
2673
 
 
2674
int o2hb_global_heartbeat_active(void)
 
2675
{
 
2676
        return (o2hb_heartbeat_mode == O2HB_HEARTBEAT_GLOBAL);
 
2677
}
 
2678
EXPORT_SYMBOL(o2hb_global_heartbeat_active);