~wb-munzinger/+junk/sanlock

« back to all changes in this revision

Viewing changes to resource.c

  • Committer: David Weber
  • Date: 2012-01-18 13:00:36 UTC
  • Revision ID: wb@munzinger.de-20120118130036-9a7wvhhmfuip7zx5
Tags: upstream-1.9
Import

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * Copyright 2010-2011 Red Hat, Inc.
 
3
 *
 
4
 * This copyrighted material is made available to anyone wishing to use,
 
5
 * modify, copy, or redistribute it subject to the terms and conditions
 
6
 * of the GNU General Public License v2 or (at your option) any later version.
 
7
 */
 
8
 
 
9
#include <inttypes.h>
 
10
#include <unistd.h>
 
11
#include <stdio.h>
 
12
#include <stdlib.h>
 
13
#include <stdint.h>
 
14
#include <stddef.h>
 
15
#include <fcntl.h>
 
16
#include <string.h>
 
17
#include <errno.h>
 
18
#include <limits.h>
 
19
#include <pthread.h>
 
20
#include <time.h>
 
21
#include <syslog.h>
 
22
#include <signal.h>
 
23
#include <sys/types.h>
 
24
#include <sys/time.h>
 
25
 
 
26
#include "sanlock_internal.h"
 
27
#include "diskio.h"
 
28
#include "log.h"
 
29
#include "paxos_lease.h"
 
30
#include "lockspace.h"
 
31
#include "resource.h"
 
32
#include "task.h"
 
33
 
 
34
static pthread_t resource_pt;
 
35
static int resource_thread_stop;
 
36
static int resource_examine;
 
37
 
 
38
int set_resource_examine(char *space_name, char *res_name)
 
39
{
 
40
        struct resource *r;
 
41
        int count = 0;
 
42
 
 
43
        pthread_mutex_lock(&resource_mutex);
 
44
        list_for_each_entry(r, &resources, list) {
 
45
                if (strncmp(r->r.lockspace_name, space_name, NAME_ID_SIZE))
 
46
                        continue;
 
47
                if (res_name && strncmp(r->r.name, res_name, NAME_ID_SIZE))
 
48
                        continue;
 
49
                r->flags |= R_EXAMINE;
 
50
                resource_examine = 1;
 
51
                count++;
 
52
        }
 
53
        if (count)
 
54
                pthread_cond_signal(&resource_cond);
 
55
        pthread_mutex_unlock(&resource_mutex);
 
56
 
 
57
        return count;
 
58
}
 
59
 
 
60
static struct resource *find_resource_examine(void)
 
61
{
 
62
        struct resource *r;
 
63
 
 
64
        list_for_each_entry(r, &resources, list) {
 
65
                if (r->flags & R_EXAMINE)
 
66
                        return r;
 
67
        }
 
68
        return NULL;
 
69
}
 
70
 
 
71
static struct resource *find_resource(struct token *token,
 
72
                                      struct list_head *head)
 
73
{
 
74
        struct resource *r;
 
75
 
 
76
        list_for_each_entry(r, head, list) {
 
77
                if (strncmp(r->r.lockspace_name, token->r.lockspace_name, NAME_ID_SIZE))
 
78
                        continue;
 
79
                if (strncmp(r->r.name, token->r.name, NAME_ID_SIZE))
 
80
                        continue;
 
81
                return r;
 
82
        }
 
83
        return NULL;
 
84
}
 
85
 
 
86
static void save_resource_lver(struct token *token, uint64_t lver)
 
87
{
 
88
        struct resource *r;
 
89
 
 
90
        pthread_mutex_lock(&resource_mutex);
 
91
        r = find_resource(token, &resources);
 
92
        if (r)
 
93
                r->lver = lver;
 
94
        pthread_mutex_unlock(&resource_mutex);
 
95
 
 
96
        if (!r)
 
97
                log_errot(token, "save_resource_lver no r");
 
98
 
 
99
}
 
100
 
 
101
int add_resource(struct token *token, int pid, uint32_t cl_restrict)
 
102
{
 
103
        struct resource *r;
 
104
        int rv, disks_len, r_len;
 
105
 
 
106
        pthread_mutex_lock(&resource_mutex);
 
107
 
 
108
        r = find_resource(token, &resources);
 
109
        if (r) {
 
110
                if (!com.quiet_fail)
 
111
                        log_errot(token, "add_resource name exists");
 
112
                rv = -EEXIST;
 
113
                goto out;
 
114
        }
 
115
 
 
116
        r = find_resource(token, &dispose_resources);
 
117
        if (r) {
 
118
                if (!com.quiet_fail)
 
119
                        log_errot(token, "add_resource disposed");
 
120
                rv = -EAGAIN;
 
121
                goto out;
 
122
        }
 
123
 
 
124
        disks_len = token->r.num_disks * sizeof(struct sync_disk);
 
125
        r_len = sizeof(struct resource) + disks_len;
 
126
 
 
127
        r = malloc(r_len);
 
128
        if (!r) {
 
129
                rv = -ENOMEM;
 
130
                goto out;
 
131
        }
 
132
        memset(r, 0, r_len);
 
133
        memcpy(&r->r, &token->r, sizeof(struct sanlk_resource));
 
134
        memcpy(&r->r.disks, &token->r.disks, disks_len);
 
135
        r->token_id = token->token_id;
 
136
        r->token = token;
 
137
        r->pid = pid;
 
138
        if (cl_restrict & SANLK_RESTRICT_SIGKILL)
 
139
                r->flags |= R_NO_SIGKILL;
 
140
        list_add_tail(&r->list, &resources);
 
141
        rv = 0;
 
142
 out:
 
143
        pthread_mutex_unlock(&resource_mutex);
 
144
        return rv;
 
145
}
 
146
 
 
147
/* resource_mutex must be held */
 
148
 
 
149
static void _del_resource(struct resource *r)
 
150
{
 
151
        list_del(&r->list);
 
152
        free(r);
 
153
}
 
154
 
 
155
void del_resource(struct token *token)
 
156
{
 
157
        struct resource *r;
 
158
 
 
159
        pthread_mutex_lock(&resource_mutex);
 
160
        r = find_resource(token, &resources);
 
161
        if (r)
 
162
                _del_resource(r);
 
163
        pthread_mutex_unlock(&resource_mutex);
 
164
}
 
165
 
 
166
/* return < 0 on error, 1 on success */
 
167
 
 
168
int acquire_token(struct task *task, struct token *token,
 
169
                  uint64_t acquire_lver, int new_num_hosts)
 
170
{
 
171
        struct leader_record leader_ret;
 
172
        int rv;
 
173
        uint32_t flags = 0;
 
174
 
 
175
        if (com.quiet_fail)
 
176
                flags |= PAXOS_ACQUIRE_QUIET_FAIL;
 
177
 
 
178
        rv = open_disks(token->disks, token->r.num_disks);
 
179
        if (!majority_disks(token, rv)) {
 
180
                log_errot(token, "acquire open_disk error %s", token->disks[0].path);
 
181
                return -ENODEV;
 
182
        }
 
183
 
 
184
        rv = paxos_lease_acquire(task, token, flags, &leader_ret, acquire_lver,
 
185
                                 new_num_hosts);
 
186
 
 
187
        token->acquire_result = rv;
 
188
 
 
189
        /* we could leave this open so release does not have to reopen */
 
190
        close_disks(token->disks, token->r.num_disks);
 
191
 
 
192
        log_token(token, "acquire rv %d lver %llu at %llu", rv,
 
193
                  (unsigned long long)token->leader.lver,
 
194
                  (unsigned long long)token->leader.timestamp);
 
195
 
 
196
        if (rv < 0)
 
197
                return rv;
 
198
 
 
199
        memcpy(&token->leader, &leader_ret, sizeof(struct leader_record));
 
200
        token->r.lver = token->leader.lver;
 
201
        save_resource_lver(token, token->leader.lver);
 
202
        return rv; /* SANLK_OK */
 
203
}
 
204
 
 
205
/* return < 0 on error, 1 on success */
 
206
 
 
207
int release_token(struct task *task, struct token *token)
 
208
{
 
209
        struct leader_record leader_ret;
 
210
        int rv;
 
211
 
 
212
        rv = open_disks_fd(token->disks, token->r.num_disks);
 
213
        if (!majority_disks(token, rv)) {
 
214
                log_errot(token, "release open_disk error %s", token->disks[0].path);
 
215
                return -ENODEV;
 
216
        }
 
217
 
 
218
        rv = paxos_lease_release(task, token, &token->leader, &leader_ret);
 
219
 
 
220
        token->release_result = rv;
 
221
 
 
222
        close_disks(token->disks, token->r.num_disks);
 
223
 
 
224
        log_token(token, "release rv %d", rv);
 
225
 
 
226
        if (rv < 0)
 
227
                return rv;
 
228
 
 
229
        memcpy(&token->leader, &leader_ret, sizeof(struct leader_record));
 
230
        return rv; /* SANLK_OK */
 
231
}
 
232
 
 
233
int request_token(struct task *task, struct token *token, uint32_t force_mode,
 
234
                  uint64_t *owner_id)
 
235
{
 
236
        struct leader_record leader;
 
237
        struct request_record req;
 
238
        int rv;
 
239
 
 
240
        memset(&req, 0, sizeof(req));
 
241
 
 
242
        rv = open_disks(token->disks, token->r.num_disks);
 
243
        if (!majority_disks(token, rv)) {
 
244
                log_debug("request open_disk error %s", token->disks[0].path);
 
245
                return -ENODEV;
 
246
        }
 
247
 
 
248
        if (!token->acquire_lver && !force_mode)
 
249
                goto req_read;
 
250
 
 
251
        rv = paxos_lease_leader_read(task, token, &leader, "request");
 
252
        if (rv < 0)
 
253
                goto out;
 
254
 
 
255
        if (leader.timestamp == LEASE_FREE) {
 
256
                *owner_id = 0;
 
257
                rv = SANLK_OK;
 
258
                goto out;
 
259
        }
 
260
 
 
261
        *owner_id = leader.owner_id;
 
262
 
 
263
        if (leader.lver >= token->acquire_lver) {
 
264
                rv = SANLK_REQUEST_OLD;
 
265
                goto out;
 
266
        }
 
267
 
 
268
 req_read:
 
269
        rv = paxos_lease_request_read(task, token, &req);
 
270
        if (rv < 0)
 
271
                goto out;
 
272
 
 
273
        if (req.magic != REQ_DISK_MAGIC) {
 
274
                rv = SANLK_REQUEST_MAGIC;
 
275
                goto out;
 
276
        }
 
277
 
 
278
        if ((req.version & 0xFFFF0000) != REQ_DISK_VERSION_MAJOR) {
 
279
                rv = SANLK_REQUEST_VERSION;
 
280
                goto out;
 
281
        }
 
282
 
 
283
        if (!token->acquire_lver && !force_mode)
 
284
                goto req_write;
 
285
 
 
286
        /* > instead of >= so multiple hosts can request the same
 
287
           version at once and all succeed */
 
288
 
 
289
        if (req.lver > token->acquire_lver) {
 
290
                rv = SANLK_REQUEST_LVER;
 
291
                goto out;
 
292
        }
 
293
 
 
294
 req_write:
 
295
        req.version = REQ_DISK_VERSION_MAJOR | REQ_DISK_VERSION_MINOR;
 
296
        req.lver = token->acquire_lver;
 
297
        req.force_mode = force_mode;
 
298
 
 
299
        rv = paxos_lease_request_write(task, token, &req);
 
300
 out:
 
301
        close_disks(token->disks, token->r.num_disks);
 
302
 
 
303
        log_debug("request rv %d owner %llu lver %llu mode %u",
 
304
                  rv, (unsigned long long)*owner_id,
 
305
                  (unsigned long long)req.lver, req.force_mode);
 
306
 
 
307
        return rv;
 
308
}
 
309
 
 
310
static int examine_token(struct task *task, struct token *token,
 
311
                         struct request_record *req_out)
 
312
{
 
313
        struct request_record req;
 
314
        int rv;
 
315
 
 
316
        memset(&req, 0, sizeof(req));
 
317
 
 
318
        rv = open_disks(token->disks, token->r.num_disks);
 
319
        if (!majority_disks(token, rv)) {
 
320
                log_debug("request open_disk error %s", token->disks[0].path);
 
321
                return -ENODEV;
 
322
        }
 
323
 
 
324
        rv = paxos_lease_request_read(task, token, &req);
 
325
        if (rv < 0)
 
326
                goto out;
 
327
 
 
328
        if (req.magic != REQ_DISK_MAGIC) {
 
329
                rv = SANLK_REQUEST_MAGIC;
 
330
                goto out;
 
331
        }
 
332
 
 
333
        if ((req.version & 0xFFFF0000) != REQ_DISK_VERSION_MAJOR) {
 
334
                rv = SANLK_REQUEST_VERSION;
 
335
                goto out;
 
336
        }
 
337
 
 
338
        memcpy(req_out, &req, sizeof(struct request_record));
 
339
 out:
 
340
        close_disks(token->disks, token->r.num_disks);
 
341
 
 
342
        log_debug("examine rv %d lver %llu mode %u",
 
343
                  rv, (unsigned long long)req.lver, req.force_mode);
 
344
 
 
345
        return rv;
 
346
}
 
347
 
 
348
static void do_req_kill_pid(struct token *tt, int pid)
 
349
{
 
350
        struct resource *r;
 
351
        uint32_t flags;
 
352
        int found = 0;
 
353
 
 
354
        pthread_mutex_lock(&resource_mutex);
 
355
        r = find_resource(tt, &resources);
 
356
        if (r && r->pid == pid) {
 
357
                found = 1;
 
358
                flags = r->flags;
 
359
        }
 
360
        pthread_mutex_unlock(&resource_mutex);
 
361
 
 
362
        if (!found) {
 
363
                log_error("req pid %d %.48s:%.48s not found",
 
364
                           pid, tt->r.lockspace_name, tt->r.name);
 
365
                return;
 
366
        }
 
367
 
 
368
        log_debug("do_req_kill_pid %d flags %x %.48s:%.48s",
 
369
                  pid, flags, tt->r.lockspace_name, tt->r.name);
 
370
 
 
371
        /* TODO: share code with kill_pids() to gradually
 
372
         * escalate from killscript, SIGTERM, SIGKILL */
 
373
 
 
374
        kill(pid, SIGTERM);
 
375
 
 
376
        if (flags & R_NO_SIGKILL)
 
377
                return;
 
378
 
 
379
        sleep(1);
 
380
        kill(pid, SIGKILL);
 
381
}
 
382
 
 
383
/*
 
384
 * TODO? add force_mode SANLK_REQ_KILL_PID_OR_RESET
 
385
 * which would attempt to kill the pid like KILL_PID,
 
386
 * but if the pid doesn't exit will block watchdog
 
387
 * updates to reset the host.
 
388
 * Here set r->block_wd_time = now + pid_exit_time,
 
389
 * In renewal check for any r in resources with
 
390
 * block_wd_time <= now, and if found will not
 
391
 * update the watchdog.  If the pid continues to
 
392
 * not exit, the wd will fire and reset the machine.
 
393
 * If the pid exits before pid_exit_time, no wd
 
394
 * updates will be skipped.
 
395
 */
 
396
 
 
397
/*
 
398
 * - releases tokens of pid's that die
 
399
 * - examines request blocks of resources
 
400
 */
 
401
 
 
402
static void *resource_thread(void *arg GNUC_UNUSED)
 
403
{
 
404
        struct task task;
 
405
        struct resource *r;
 
406
        struct token *token, *tt = NULL;
 
407
        struct request_record req;
 
408
        uint64_t lver;
 
409
        int rv, j, pid, tt_len;
 
410
 
 
411
        memset(&task, 0, sizeof(struct task));
 
412
        setup_task_timeouts(&task, main_task.io_timeout_seconds);
 
413
        setup_task_aio(&task, main_task.use_aio, RESOURCE_AIO_CB_SIZE);
 
414
        sprintf(task.name, "%s", "resource");
 
415
 
 
416
        /* a fake/tmp token struct we copy necessary res info into,
 
417
           because other functions take a token struct arg */
 
418
 
 
419
        tt_len = sizeof(struct token) + (SANLK_MAX_DISKS * sizeof(struct sync_disk));
 
420
        tt = malloc(tt_len);
 
421
        if (!tt) {
 
422
                log_error("resource_thread tt malloc error");
 
423
                goto out;
 
424
        }
 
425
        memset(tt, 0, tt_len);
 
426
        tt->disks = (struct sync_disk *)&tt->r.disks[0];
 
427
 
 
428
        while (1) {
 
429
                pthread_mutex_lock(&resource_mutex);
 
430
                while (list_empty(&dispose_resources) && !resource_examine) {
 
431
                        if (resource_thread_stop) {
 
432
                                pthread_mutex_unlock(&resource_mutex);
 
433
                                goto out;
 
434
                        }
 
435
                        pthread_cond_wait(&resource_cond, &resource_mutex);
 
436
                }
 
437
 
 
438
                if (!list_empty(&dispose_resources)) {
 
439
                        r = list_first_entry(&dispose_resources, struct resource, list);
 
440
                        pthread_mutex_unlock(&resource_mutex);
 
441
 
 
442
                        token = r->token;
 
443
                        release_token(&task, token);
 
444
 
 
445
                        /* we don't want to remove r from dispose_list until after the
 
446
                           lease is released because we don't want a new token for
 
447
                           the same resource to be added and attempt to acquire
 
448
                           the lease until after it's been released */
 
449
 
 
450
                        pthread_mutex_lock(&resource_mutex);
 
451
                        _del_resource(r);
 
452
                        pthread_mutex_unlock(&resource_mutex);
 
453
                        free(token);
 
454
 
 
455
                } else if (resource_examine) {
 
456
                        r = find_resource_examine();
 
457
                        if (!r) {
 
458
                                resource_examine = 0;
 
459
                                pthread_mutex_unlock(&resource_mutex);
 
460
                                continue;
 
461
                        }
 
462
                        r->flags &= ~R_EXAMINE;
 
463
 
 
464
                        /* we can't safely access r->token here, and
 
465
                           r may be freed after we release mutex, so copy
 
466
                           everything we need before unlocking mutex */
 
467
 
 
468
                        pid = r->pid;
 
469
                        lver = r->lver;
 
470
                        memcpy(&tt->r, &r->r, sizeof(struct sanlk_resource));
 
471
                        memcpy(&tt->r.disks, &r->r.disks, r->r.num_disks * sizeof(struct sync_disk));
 
472
                        pthread_mutex_unlock(&resource_mutex);
 
473
 
 
474
                        for (j = 0; j < tt->r.num_disks; j++) {
 
475
                                tt->disks[j].sector_size = 0;
 
476
                                tt->disks[j].fd = -1;
 
477
                        }
 
478
 
 
479
                        rv = examine_token(&task, tt, &req);
 
480
 
 
481
                        if (rv != SANLK_OK)
 
482
                                continue;
 
483
 
 
484
                        if (!req.force_mode || !req.lver)
 
485
                                continue;
 
486
 
 
487
                        if (req.lver <= lver) {
 
488
                                log_debug("examine req lver %llu our lver %llu",
 
489
                                          (unsigned long long)req.lver,
 
490
                                          (unsigned long long)lver);
 
491
                                continue;
 
492
                        }
 
493
 
 
494
                        if (req.force_mode == SANLK_REQ_KILL_PID) {
 
495
                                do_req_kill_pid(tt, pid);
 
496
                        } else {
 
497
                                log_error("req force_mode %u unknown", req.force_mode);
 
498
                        }
 
499
                }
 
500
        }
 
501
 out:
 
502
        if (tt)
 
503
                free(tt);
 
504
        close_task_aio(&task);
 
505
        return NULL;
 
506
}
 
507
 
 
508
void release_token_async(struct token *token)
 
509
{
 
510
        struct resource *r;
 
511
 
 
512
        pthread_mutex_lock(&resource_mutex);
 
513
        r = find_resource(token, &resources);
 
514
        if (r) {
 
515
                /* assert r->token == token ? */
 
516
 
 
517
                if (token->space_dead || (token->acquire_result != SANLK_OK)) {
 
518
                        _del_resource(r);
 
519
                        free(token);
 
520
                } else {
 
521
                        list_move(&r->list, &dispose_resources);
 
522
                        pthread_cond_signal(&resource_cond);
 
523
                }
 
524
        }
 
525
        pthread_mutex_unlock(&resource_mutex);
 
526
}
 
527
 
 
528
int setup_token_manager(void)
 
529
{
 
530
        int rv;
 
531
 
 
532
        rv = pthread_create(&resource_pt, NULL, resource_thread, NULL);
 
533
        if (rv)
 
534
                return -1;
 
535
        return 0;
 
536
}
 
537
 
 
538
void close_token_manager(void)
 
539
{
 
540
        pthread_mutex_lock(&resource_mutex);
 
541
        resource_thread_stop = 1;
 
542
        pthread_cond_signal(&resource_cond);
 
543
        pthread_mutex_unlock(&resource_mutex);
 
544
        pthread_join(resource_pt, NULL);
 
545
}
 
546