2
* Copyright 2010-2011 Red Hat, Inc.
4
* This copyrighted material is made available to anyone wishing to use,
5
* modify, copy, or redistribute it subject to the terms and conditions
6
* of the GNU General Public License v2 or (at your option) any later version.
23
#include <sys/types.h>
26
#include "sanlock_internal.h"
29
#include "paxos_lease.h"
30
#include "lockspace.h"
34
static pthread_t resource_pt;
35
static int resource_thread_stop;
36
static int resource_examine;
38
int set_resource_examine(char *space_name, char *res_name)
43
pthread_mutex_lock(&resource_mutex);
44
list_for_each_entry(r, &resources, list) {
45
if (strncmp(r->r.lockspace_name, space_name, NAME_ID_SIZE))
47
if (res_name && strncmp(r->r.name, res_name, NAME_ID_SIZE))
49
r->flags |= R_EXAMINE;
54
pthread_cond_signal(&resource_cond);
55
pthread_mutex_unlock(&resource_mutex);
60
static struct resource *find_resource_examine(void)
64
list_for_each_entry(r, &resources, list) {
65
if (r->flags & R_EXAMINE)
71
static struct resource *find_resource(struct token *token,
72
struct list_head *head)
76
list_for_each_entry(r, head, list) {
77
if (strncmp(r->r.lockspace_name, token->r.lockspace_name, NAME_ID_SIZE))
79
if (strncmp(r->r.name, token->r.name, NAME_ID_SIZE))
86
static void save_resource_lver(struct token *token, uint64_t lver)
90
pthread_mutex_lock(&resource_mutex);
91
r = find_resource(token, &resources);
94
pthread_mutex_unlock(&resource_mutex);
97
log_errot(token, "save_resource_lver no r");
101
int add_resource(struct token *token, int pid, uint32_t cl_restrict)
104
int rv, disks_len, r_len;
106
pthread_mutex_lock(&resource_mutex);
108
r = find_resource(token, &resources);
111
log_errot(token, "add_resource name exists");
116
r = find_resource(token, &dispose_resources);
119
log_errot(token, "add_resource disposed");
124
disks_len = token->r.num_disks * sizeof(struct sync_disk);
125
r_len = sizeof(struct resource) + disks_len;
133
memcpy(&r->r, &token->r, sizeof(struct sanlk_resource));
134
memcpy(&r->r.disks, &token->r.disks, disks_len);
135
r->token_id = token->token_id;
138
if (cl_restrict & SANLK_RESTRICT_SIGKILL)
139
r->flags |= R_NO_SIGKILL;
140
list_add_tail(&r->list, &resources);
143
pthread_mutex_unlock(&resource_mutex);
147
/* resource_mutex must be held */
149
static void _del_resource(struct resource *r)
155
void del_resource(struct token *token)
159
pthread_mutex_lock(&resource_mutex);
160
r = find_resource(token, &resources);
163
pthread_mutex_unlock(&resource_mutex);
166
/* return < 0 on error, 1 on success */
168
int acquire_token(struct task *task, struct token *token,
169
uint64_t acquire_lver, int new_num_hosts)
171
struct leader_record leader_ret;
176
flags |= PAXOS_ACQUIRE_QUIET_FAIL;
178
rv = open_disks(token->disks, token->r.num_disks);
179
if (!majority_disks(token, rv)) {
180
log_errot(token, "acquire open_disk error %s", token->disks[0].path);
184
rv = paxos_lease_acquire(task, token, flags, &leader_ret, acquire_lver,
187
token->acquire_result = rv;
189
/* we could leave this open so release does not have to reopen */
190
close_disks(token->disks, token->r.num_disks);
192
log_token(token, "acquire rv %d lver %llu at %llu", rv,
193
(unsigned long long)token->leader.lver,
194
(unsigned long long)token->leader.timestamp);
199
memcpy(&token->leader, &leader_ret, sizeof(struct leader_record));
200
token->r.lver = token->leader.lver;
201
save_resource_lver(token, token->leader.lver);
202
return rv; /* SANLK_OK */
205
/* return < 0 on error, 1 on success */
207
int release_token(struct task *task, struct token *token)
209
struct leader_record leader_ret;
212
rv = open_disks_fd(token->disks, token->r.num_disks);
213
if (!majority_disks(token, rv)) {
214
log_errot(token, "release open_disk error %s", token->disks[0].path);
218
rv = paxos_lease_release(task, token, &token->leader, &leader_ret);
220
token->release_result = rv;
222
close_disks(token->disks, token->r.num_disks);
224
log_token(token, "release rv %d", rv);
229
memcpy(&token->leader, &leader_ret, sizeof(struct leader_record));
230
return rv; /* SANLK_OK */
233
int request_token(struct task *task, struct token *token, uint32_t force_mode,
236
struct leader_record leader;
237
struct request_record req;
240
memset(&req, 0, sizeof(req));
242
rv = open_disks(token->disks, token->r.num_disks);
243
if (!majority_disks(token, rv)) {
244
log_debug("request open_disk error %s", token->disks[0].path);
248
if (!token->acquire_lver && !force_mode)
251
rv = paxos_lease_leader_read(task, token, &leader, "request");
255
if (leader.timestamp == LEASE_FREE) {
261
*owner_id = leader.owner_id;
263
if (leader.lver >= token->acquire_lver) {
264
rv = SANLK_REQUEST_OLD;
269
rv = paxos_lease_request_read(task, token, &req);
273
if (req.magic != REQ_DISK_MAGIC) {
274
rv = SANLK_REQUEST_MAGIC;
278
if ((req.version & 0xFFFF0000) != REQ_DISK_VERSION_MAJOR) {
279
rv = SANLK_REQUEST_VERSION;
283
if (!token->acquire_lver && !force_mode)
286
/* > instead of >= so multiple hosts can request the same
287
version at once and all succeed */
289
if (req.lver > token->acquire_lver) {
290
rv = SANLK_REQUEST_LVER;
295
req.version = REQ_DISK_VERSION_MAJOR | REQ_DISK_VERSION_MINOR;
296
req.lver = token->acquire_lver;
297
req.force_mode = force_mode;
299
rv = paxos_lease_request_write(task, token, &req);
301
close_disks(token->disks, token->r.num_disks);
303
log_debug("request rv %d owner %llu lver %llu mode %u",
304
rv, (unsigned long long)*owner_id,
305
(unsigned long long)req.lver, req.force_mode);
310
static int examine_token(struct task *task, struct token *token,
311
struct request_record *req_out)
313
struct request_record req;
316
memset(&req, 0, sizeof(req));
318
rv = open_disks(token->disks, token->r.num_disks);
319
if (!majority_disks(token, rv)) {
320
log_debug("request open_disk error %s", token->disks[0].path);
324
rv = paxos_lease_request_read(task, token, &req);
328
if (req.magic != REQ_DISK_MAGIC) {
329
rv = SANLK_REQUEST_MAGIC;
333
if ((req.version & 0xFFFF0000) != REQ_DISK_VERSION_MAJOR) {
334
rv = SANLK_REQUEST_VERSION;
338
memcpy(req_out, &req, sizeof(struct request_record));
340
close_disks(token->disks, token->r.num_disks);
342
log_debug("examine rv %d lver %llu mode %u",
343
rv, (unsigned long long)req.lver, req.force_mode);
348
static void do_req_kill_pid(struct token *tt, int pid)
354
pthread_mutex_lock(&resource_mutex);
355
r = find_resource(tt, &resources);
356
if (r && r->pid == pid) {
360
pthread_mutex_unlock(&resource_mutex);
363
log_error("req pid %d %.48s:%.48s not found",
364
pid, tt->r.lockspace_name, tt->r.name);
368
log_debug("do_req_kill_pid %d flags %x %.48s:%.48s",
369
pid, flags, tt->r.lockspace_name, tt->r.name);
371
/* TODO: share code with kill_pids() to gradually
372
* escalate from killscript, SIGTERM, SIGKILL */
376
if (flags & R_NO_SIGKILL)
384
* TODO? add force_mode SANLK_REQ_KILL_PID_OR_RESET
385
* which would attempt to kill the pid like KILL_PID,
386
* but if the pid doesn't exit will block watchdog
387
* updates to reset the host.
388
* Here set r->block_wd_time = now + pid_exit_time,
389
* In renewal check for any r in resources with
390
* block_wd_time <= now, and if found will not
391
* update the watchdog. If the pid continues to
392
* not exit, the wd will fire and reset the machine.
393
* If the pid exits before pid_exit_time, no wd
394
* updates will be skipped.
398
* - releases tokens of pid's that die
399
* - examines request blocks of resources
402
static void *resource_thread(void *arg GNUC_UNUSED)
406
struct token *token, *tt = NULL;
407
struct request_record req;
409
int rv, j, pid, tt_len;
411
memset(&task, 0, sizeof(struct task));
412
setup_task_timeouts(&task, main_task.io_timeout_seconds);
413
setup_task_aio(&task, main_task.use_aio, RESOURCE_AIO_CB_SIZE);
414
sprintf(task.name, "%s", "resource");
416
/* a fake/tmp token struct we copy necessary res info into,
417
because other functions take a token struct arg */
419
tt_len = sizeof(struct token) + (SANLK_MAX_DISKS * sizeof(struct sync_disk));
422
log_error("resource_thread tt malloc error");
425
memset(tt, 0, tt_len);
426
tt->disks = (struct sync_disk *)&tt->r.disks[0];
429
pthread_mutex_lock(&resource_mutex);
430
while (list_empty(&dispose_resources) && !resource_examine) {
431
if (resource_thread_stop) {
432
pthread_mutex_unlock(&resource_mutex);
435
pthread_cond_wait(&resource_cond, &resource_mutex);
438
if (!list_empty(&dispose_resources)) {
439
r = list_first_entry(&dispose_resources, struct resource, list);
440
pthread_mutex_unlock(&resource_mutex);
443
release_token(&task, token);
445
/* we don't want to remove r from dispose_list until after the
446
lease is released because we don't want a new token for
447
the same resource to be added and attempt to acquire
448
the lease until after it's been released */
450
pthread_mutex_lock(&resource_mutex);
452
pthread_mutex_unlock(&resource_mutex);
455
} else if (resource_examine) {
456
r = find_resource_examine();
458
resource_examine = 0;
459
pthread_mutex_unlock(&resource_mutex);
462
r->flags &= ~R_EXAMINE;
464
/* we can't safely access r->token here, and
465
r may be freed after we release mutex, so copy
466
everything we need before unlocking mutex */
470
memcpy(&tt->r, &r->r, sizeof(struct sanlk_resource));
471
memcpy(&tt->r.disks, &r->r.disks, r->r.num_disks * sizeof(struct sync_disk));
472
pthread_mutex_unlock(&resource_mutex);
474
for (j = 0; j < tt->r.num_disks; j++) {
475
tt->disks[j].sector_size = 0;
476
tt->disks[j].fd = -1;
479
rv = examine_token(&task, tt, &req);
484
if (!req.force_mode || !req.lver)
487
if (req.lver <= lver) {
488
log_debug("examine req lver %llu our lver %llu",
489
(unsigned long long)req.lver,
490
(unsigned long long)lver);
494
if (req.force_mode == SANLK_REQ_KILL_PID) {
495
do_req_kill_pid(tt, pid);
497
log_error("req force_mode %u unknown", req.force_mode);
504
close_task_aio(&task);
508
void release_token_async(struct token *token)
512
pthread_mutex_lock(&resource_mutex);
513
r = find_resource(token, &resources);
515
/* assert r->token == token ? */
517
if (token->space_dead || (token->acquire_result != SANLK_OK)) {
521
list_move(&r->list, &dispose_resources);
522
pthread_cond_signal(&resource_cond);
525
pthread_mutex_unlock(&resource_mutex);
528
int setup_token_manager(void)
532
rv = pthread_create(&resource_pt, NULL, resource_thread, NULL);
538
void close_token_manager(void)
540
pthread_mutex_lock(&resource_mutex);
541
resource_thread_stop = 1;
542
pthread_cond_signal(&resource_cond);
543
pthread_mutex_unlock(&resource_mutex);
544
pthread_join(resource_pt, NULL);