2
* Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
4
* This library is free software; you can redistribute it and/or
5
* modify it under the terms of the GNU Lesser General Public
6
* License as published by the Free Software Foundation; either
7
* version 2.1 of the License, or (at your option) any later version.
9
* This library is distributed in the hope that it will be useful,
10
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12
* Lesser General Public License for more details.
14
* You should have received a copy of the GNU Lesser General Public
15
* License along with this library; if not, write to the Free Software
16
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
#include <crm_internal.h>
32
#include <crm/common/xml.h>
33
#include <crm/common/mainloop.h>
34
#include <crm/common/ipcs.h>
36
struct mainloop_child_s {
44
/* Called when a process dies */
45
void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode);
58
crm_trigger_prepare(GSource * source, gint * timeout)
60
crm_trigger_t *trig = (crm_trigger_t *) source;
62
/* cluster-glue's FD and IPC related sources make use of
63
* g_source_add_poll() but do not set a timeout in their prepare
66
* This means mainloop's poll() will block until an event for one
67
* of these sources occurs - any /other/ type of source, such as
68
* this one or g_idle_*, that doesn't use g_source_add_poll() is
69
* S-O-L and wont be processed until there is something fd-based
72
* Luckily the timeout we can set here affects all sources and
73
* puts an upper limit on how long poll() can take.
75
* So unconditionally set a small-ish timeout, not too small that
76
* we're in constant motion, which will act as an upper bound on
77
* how long the signal handling might be delayed for.
79
*timeout = 500; /* Timeout in ms */
85
crm_trigger_check(GSource * source)
87
crm_trigger_t *trig = (crm_trigger_t *) source;
93
crm_trigger_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
96
crm_trigger_t *trig = (crm_trigger_t *) source;
99
/* Wait until the existing job is complete before starting the next one */
102
trig->trigger = FALSE;
105
rc = callback(trig->user_data);
107
crm_trace("Trigger handler %p not yet complete", trig);
108
trig->running = TRUE;
116
crm_trigger_finalize(GSource * source)
118
crm_trace("Trigger %p destroyed", source);
124
gpointer callback_data;
125
GSourceCallbackFuncs *callback_funcs;
127
const GSourceFuncs *source_funcs;
130
GMainContext *context;
147
g_source_refcount(GSource * source)
149
/* Duplicating the contents of private header files is a necessary evil */
151
struct _GSourceCopy *evil = (struct _GSourceCopy*)source;
152
return evil->ref_count;
157
static int g_source_refcount(GSource * source)
163
static GSourceFuncs crm_trigger_funcs = {
166
crm_trigger_dispatch,
167
crm_trigger_finalize,
170
static crm_trigger_t *
171
mainloop_setup_trigger(GSource * source, int priority, int (*dispatch) (gpointer user_data),
174
crm_trigger_t *trigger = NULL;
176
trigger = (crm_trigger_t *) source;
179
trigger->trigger = FALSE;
180
trigger->user_data = userdata;
183
g_source_set_callback(source, dispatch, trigger, NULL);
186
g_source_set_priority(source, priority);
187
g_source_set_can_recurse(source, FALSE);
189
crm_trace("Setup %p with ref-count=%u", source, g_source_refcount(source));
190
trigger->id = g_source_attach(source, NULL);
191
crm_trace("Attached %p with ref-count=%u", source, g_source_refcount(source));
197
mainloop_trigger_complete(crm_trigger_t * trig)
199
crm_trace("Trigger handler %p complete", trig);
200
trig->running = FALSE;
203
/* If dispatch returns:
204
* -1: Job running but not complete
205
* 0: Remove the trigger from mainloop
206
* 1: Leave the trigger in mainloop
209
mainloop_add_trigger(int priority, int (*dispatch) (gpointer user_data), gpointer userdata)
211
GSource *source = NULL;
213
CRM_ASSERT(sizeof(crm_trigger_t) > sizeof(GSource));
214
source = g_source_new(&crm_trigger_funcs, sizeof(crm_trigger_t));
215
CRM_ASSERT(source != NULL);
217
return mainloop_setup_trigger(source, priority, dispatch, userdata);
221
mainloop_set_trigger(crm_trigger_t * source)
224
source->trigger = TRUE;
229
mainloop_destroy_trigger(crm_trigger_t * source)
237
gs = (GSource *)source;
239
if(g_source_refcount(gs) > 2) {
240
crm_info("Trigger %p is still referenced %u times", gs, g_source_refcount(gs));
243
g_source_destroy(gs); /* Remove from mainloop, ref_count-- */
244
g_source_unref(gs); /* The caller no longer carries a reference to source
246
* At this point the source should be free'd,
247
* unless we're currently processing said
248
* source, in which case mainloop holds an
249
* additional reference and it will be free'd
250
* once our processing completes
255
typedef struct signal_s {
256
crm_trigger_t trigger; /* must be first */
257
void (*handler) (int sig);
262
static crm_signal_t *crm_signals[NSIG];
265
crm_signal_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
267
crm_signal_t *sig = (crm_signal_t *) source;
269
if(sig->signal != SIGCHLD) {
270
crm_info("Invoking handler for signal %d: %s", sig->signal, strsignal(sig->signal));
273
sig->trigger.trigger = FALSE;
275
sig->handler(sig->signal);
281
mainloop_signal_handler(int sig)
283
if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) {
284
mainloop_set_trigger((crm_trigger_t *) crm_signals[sig]);
288
static GSourceFuncs crm_signal_funcs = {
292
crm_trigger_finalize,
296
crm_signal(int sig, void (*dispatch) (int sig))
300
struct sigaction old;
302
if (sigemptyset(&mask) < 0) {
303
crm_perror(LOG_ERR, "Call to sigemptyset failed");
307
memset(&sa, 0, sizeof(struct sigaction));
308
sa.sa_handler = dispatch;
309
sa.sa_flags = SA_RESTART;
312
if (sigaction(sig, &sa, &old) < 0) {
313
crm_perror(LOG_ERR, "Could not install signal handler for signal %d", sig);
321
mainloop_add_signal(int sig, void (*dispatch) (int sig))
323
GSource *source = NULL;
324
int priority = G_PRIORITY_HIGH - 1;
326
if (sig == SIGTERM) {
327
/* TERM is higher priority than other signals,
328
* signals are higher priority than other ipc.
329
* Yes, minus: smaller is "higher"
334
if (sig >= NSIG || sig < 0) {
335
crm_err("Signal %d is out of range", sig);
338
} else if (crm_signals[sig] != NULL && crm_signals[sig]->handler == dispatch) {
339
crm_trace("Signal handler for %d is already installed", sig);
342
} else if (crm_signals[sig] != NULL) {
343
crm_err("Different signal handler for %d is already installed", sig);
347
CRM_ASSERT(sizeof(crm_signal_t) > sizeof(GSource));
348
source = g_source_new(&crm_signal_funcs, sizeof(crm_signal_t));
350
crm_signals[sig] = (crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL);
351
CRM_ASSERT(crm_signals[sig] != NULL);
353
crm_signals[sig]->handler = dispatch;
354
crm_signals[sig]->signal = sig;
356
if (crm_signal(sig, mainloop_signal_handler) == FALSE) {
357
crm_signal_t *tmp = crm_signals[sig];
359
crm_signals[sig] = NULL;
361
mainloop_destroy_trigger((crm_trigger_t *) tmp);
365
/* If we want signals to interrupt mainloop's poll(), instead of waiting for
366
* the timeout, then we should call siginterrupt() below
368
* For now, just enforce a low timeout
370
if (siginterrupt(sig, 1) < 0) {
371
crm_perror(LOG_INFO, "Could not enable system call interruptions for signal %d", sig);
379
mainloop_destroy_signal(int sig)
381
crm_signal_t *tmp = NULL;
383
if (sig >= NSIG || sig < 0) {
384
crm_err("Signal %d is out of range", sig);
387
} else if (crm_signal(sig, NULL) == FALSE) {
388
crm_perror(LOG_ERR, "Could not uninstall signal handler for signal %d", sig);
391
} else if (crm_signals[sig] == NULL) {
395
crm_trace("Destroying signal %d", sig);
396
tmp = crm_signals[sig];
397
crm_signals[sig] = NULL;
398
mainloop_destroy_trigger((crm_trigger_t *) tmp);
402
static qb_array_t *gio_map = NULL;
405
mainloop_cleanup(void)
408
qb_array_free(gio_map);
415
struct gio_to_qb_poll {
420
qb_ipcs_dispatch_fn_t fn;
421
enum qb_loop_priority p;
425
gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data)
427
struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
428
gint fd = g_io_channel_unix_get_fd(gio);
430
crm_trace("%p.%d %d", data, fd, condition);
432
if (condition & G_IO_NVAL) {
433
crm_trace("Marking failed adaptor %p unused", adaptor);
434
adaptor->is_used = QB_FALSE;
437
return (adaptor->fn(fd, condition, adaptor->data) == 0);
441
gio_poll_destroy(gpointer data)
443
struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
445
adaptor->is_used = QB_FALSE;
450
gio_poll_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
451
void *data, qb_ipcs_dispatch_fn_t fn)
453
struct gio_to_qb_poll *adaptor;
457
res = qb_array_index(gio_map, fd, (void **)&adaptor);
459
crm_err("Array lookup failed for fd=%d: %d", fd, res);
463
crm_trace("Adding fd=%d to mainloop as adapater %p", fd, adaptor);
464
if (adaptor->is_used) {
465
crm_err("Adapter for descriptor %d is still in-use", fd);
469
/* channel is created with ref_count = 1 */
470
channel = g_io_channel_unix_new(fd);
472
crm_err("No memory left to add fd=%d", fd);
476
/* Because unlike the poll() API, glib doesn't tell us about HUPs by default */
477
evts |= (G_IO_HUP | G_IO_NVAL | G_IO_ERR);
480
adaptor->events = evts;
481
adaptor->data = data;
483
adaptor->is_used = QB_TRUE;
485
g_io_add_watch_full(channel, G_PRIORITY_DEFAULT, evts, gio_read_socket, adaptor,
488
/* Now that mainloop now holds a reference to channel,
489
* thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
491
* This means that channel will be free'd by:
492
* g_main_context_dispatch()
493
* -> g_source_destroy_internal()
494
* -> g_source_callback_unref()
495
* shortly after gio_poll_destroy() completes
497
g_io_channel_unref(channel);
499
crm_trace("Added to mainloop with gsource id=%d", adaptor->source);
500
if (adaptor->source > 0) {
508
gio_poll_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
509
void *data, qb_ipcs_dispatch_fn_t fn)
515
gio_poll_dispatch_del(int32_t fd)
517
struct gio_to_qb_poll *adaptor;
519
crm_trace("Looking for fd=%d", fd);
520
if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) {
521
crm_trace("Marking adaptor %p unused", adaptor);
522
if (adaptor->source) {
523
g_source_remove(adaptor->source);
526
adaptor->is_used = QB_FALSE;
531
struct qb_ipcs_poll_handlers gio_poll_funcs = {
533
.dispatch_add = gio_poll_dispatch_add,
534
.dispatch_mod = gio_poll_dispatch_mod,
535
.dispatch_del = gio_poll_dispatch_del,
538
static enum qb_ipc_type
539
pick_ipc_type(enum qb_ipc_type requested)
541
const char *env = getenv("PCMK_ipc_type");
543
if (env && strcmp("shared-mem", env) == 0) {
545
} else if (env && strcmp("socket", env) == 0) {
546
return QB_IPC_SOCKET;
547
} else if (env && strcmp("posix", env) == 0) {
548
return QB_IPC_POSIX_MQ;
549
} else if (env && strcmp("sysv", env) == 0) {
550
return QB_IPC_SYSV_MQ;
551
} else if (requested == QB_IPC_NATIVE) {
552
/* We prefer shared memory because the server never blocks on
553
* send. If part of a message fits into the socket, libqb
554
* needs to block until the remainder can be sent also.
555
* Otherwise the client will wait forever for the remaining
564
mainloop_add_ipc_server(const char *name, enum qb_ipc_type type,
565
struct qb_ipcs_service_handlers * callbacks)
568
qb_ipcs_service_t *server = NULL;
570
if (gio_map == NULL) {
571
gio_map = qb_array_create_2(64, sizeof(struct gio_to_qb_poll), 1);
575
server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks);
576
qb_ipcs_poll_handlers_set(server, &gio_poll_funcs);
578
rc = qb_ipcs_run(server);
580
crm_err("Could not start %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc);
588
mainloop_del_ipc_server(qb_ipcs_service_t * server)
591
qb_ipcs_destroy(server);
595
struct mainloop_io_s {
604
int (*dispatch_fn_ipc) (const char *buffer, ssize_t length, gpointer userdata);
605
int (*dispatch_fn_io) (gpointer userdata);
606
void (*destroy_fn) (gpointer userdata);
611
mainloop_gio_callback(GIOChannel * gio, GIOCondition condition, gpointer data)
613
gboolean keep = TRUE;
614
mainloop_io_t *client = data;
616
CRM_ASSERT(client->fd == g_io_channel_unix_get_fd(gio));
618
if (condition & G_IO_IN) {
624
rc = crm_ipc_read(client->ipc);
626
crm_trace("Message acquisition from %s[%p] failed: %s (%ld)",
627
client->name, client, pcmk_strerror(rc), rc);
629
} else if (client->dispatch_fn_ipc) {
630
const char *buffer = crm_ipc_buffer(client->ipc);
632
crm_trace("New message from %s[%p] = %d", client->name, client, rc, condition);
633
if (client->dispatch_fn_ipc(buffer, rc, client->userdata) < 0) {
634
crm_trace("Connection to %s no longer required", client->name);
639
} while (keep && rc > 0 && --max > 0);
642
crm_trace("New message from %s[%p] %u", client->name, client, condition);
643
if (client->dispatch_fn_io) {
644
if (client->dispatch_fn_io(client->userdata) < 0) {
645
crm_trace("Connection to %s no longer required", client->name);
652
if (client->ipc && crm_ipc_connected(client->ipc) == FALSE) {
653
crm_err("Connection to %s[%p] closed (I/O condition=%d)", client->name, client, condition);
656
} else if (condition & (G_IO_HUP | G_IO_NVAL | G_IO_ERR)) {
657
crm_trace("The connection %s[%p] has been closed (I/O condition=%d)",
658
client->name, client, condition);
661
} else if ((condition & G_IO_IN) == 0) {
663
#define GLIB_SYSDEF_POLLIN =1
664
#define GLIB_SYSDEF_POLLPRI =2
665
#define GLIB_SYSDEF_POLLOUT =4
666
#define GLIB_SYSDEF_POLLERR =8
667
#define GLIB_SYSDEF_POLLHUP =16
668
#define GLIB_SYSDEF_POLLNVAL =32
672
G_IO_IN GLIB_SYSDEF_POLLIN,
673
G_IO_OUT GLIB_SYSDEF_POLLOUT,
674
G_IO_PRI GLIB_SYSDEF_POLLPRI,
675
G_IO_ERR GLIB_SYSDEF_POLLERR,
676
G_IO_HUP GLIB_SYSDEF_POLLHUP,
677
G_IO_NVAL GLIB_SYSDEF_POLLNVAL
680
A bitwise combination representing a condition to watch for on an event source.
682
G_IO_IN There is data to read.
683
G_IO_OUT Data can be written (without blocking).
684
G_IO_PRI There is urgent data to read.
685
G_IO_ERR Error condition.
686
G_IO_HUP Hung up (the connection has been broken, usually for pipes and sockets).
687
G_IO_NVAL Invalid request. The file descriptor is not open.
689
crm_err("Strange condition: %d", condition);
692
/* keep == FALSE results in mainloop_gio_destroy() being called
693
* just before the source is removed from mainloop
699
mainloop_gio_destroy(gpointer c)
701
mainloop_io_t *client = c;
702
char *c_name = strdup(client->name);
704
/* client->source is valid but about to be destroyed (ref_count == 0) in gmain.c
705
* client->channel will still have ref_count > 0... should be == 1
707
crm_trace("Destroying client %s[%p]", c_name, c);
710
crm_ipc_close(client->ipc);
713
if (client->destroy_fn) {
714
void (*destroy_fn) (gpointer userdata) = client->destroy_fn;
716
client->destroy_fn = NULL;
717
destroy_fn(client->userdata);
721
crm_ipc_t *ipc = client->ipc;
724
crm_ipc_destroy(ipc);
727
crm_trace("Destroyed client %s[%p]", c_name, c);
729
free(client->name); client->name = NULL;
736
mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata,
737
struct ipc_client_callbacks *callbacks)
739
mainloop_io_t *client = NULL;
740
crm_ipc_t *conn = crm_ipc_new(name, max_size);
742
if (conn && crm_ipc_connect(conn)) {
743
int32_t fd = crm_ipc_get_fd(conn);
745
client = mainloop_add_fd(name, priority, fd, userdata, NULL);
747
client->destroy_fn = callbacks->destroy;
748
client->dispatch_fn_ipc = callbacks->dispatch;
751
if (conn && client == NULL) {
752
crm_trace("Connection to %s failed", name);
754
crm_ipc_destroy(conn);
761
mainloop_del_ipc_client(mainloop_io_t * client)
763
mainloop_del_fd(client);
767
mainloop_get_ipc_client(mainloop_io_t * client)
776
mainloop_add_fd(const char *name, int priority, int fd, void *userdata,
777
struct mainloop_fd_callbacks * callbacks)
779
mainloop_io_t *client = NULL;
782
client = calloc(1, sizeof(mainloop_io_t));
783
client->name = strdup(name);
784
client->userdata = userdata;
787
client->destroy_fn = callbacks->destroy;
788
client->dispatch_fn_io = callbacks->dispatch;
792
client->channel = g_io_channel_unix_new(fd);
794
g_io_add_watch_full(client->channel, priority,
795
(G_IO_IN | G_IO_HUP | G_IO_NVAL | G_IO_ERR), mainloop_gio_callback,
796
client, mainloop_gio_destroy);
798
/* Now that mainloop now holds a reference to channel,
799
* thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
801
* This means that channel will be free'd by:
802
* g_main_context_dispatch() or g_source_remove()
803
* -> g_source_destroy_internal()
804
* -> g_source_callback_unref()
805
* shortly after mainloop_gio_destroy() completes
807
g_io_channel_unref(client->channel);
808
crm_trace("Added connection %d for %s[%p].%d", client->source, client->name, client, fd);
815
mainloop_del_fd(mainloop_io_t * client)
817
if (client != NULL) {
818
crm_trace("Removing client %s[%p]", client->name, client);
819
if (client->source) {
820
/* Results in mainloop_gio_destroy() being called just
821
* before the source is removed from mainloop
823
g_source_remove(client->source);
829
mainloop_child_pid(mainloop_child_t * child)
835
mainloop_child_name(mainloop_child_t * child)
841
mainloop_child_timeout(mainloop_child_t * child)
843
return child->timeout;
847
mainloop_child_userdata(mainloop_child_t * child)
849
return child->privatedata;
853
mainloop_clear_child_userdata(mainloop_child_t * child)
855
child->privatedata = NULL;
859
child_timeout_callback(gpointer p)
861
mainloop_child_t *child = p;
864
if (child->timeout) {
865
crm_crit("%s process (PID %d) will not die!", child->desc, (int)child->pid);
869
child->timeout = TRUE;
870
crm_warn("%s process (PID %d) timed out", child->desc, (int)child->pid);
872
if (kill(child->pid, SIGKILL) < 0) {
873
if (errno == ESRCH) {
874
/* Nothing left to do */
877
crm_perror(LOG_ERR, "kill(%d, KILL) failed", child->pid);
880
child->timerid = g_timeout_add(5000, child_timeout_callback, child);
884
static GListPtr child_list = NULL;
887
child_death_dispatch(int signal)
889
GListPtr iter = child_list;
898
GListPtr saved = NULL;
899
mainloop_child_t *child = iter->data;
901
rc = waitpid(child->pid, &status, WNOHANG);
906
} else if(rc != child->pid) {
910
crm_perror(LOG_ERR, "Call to waitpid(%d) failed", child->pid);
913
crm_trace("Managed process %d exited: %p", child->pid, child);
915
if (WIFEXITED(status)) {
916
exitcode = WEXITSTATUS(status);
917
crm_trace("Managed process %d (%s) exited with rc=%d", child->pid, child->desc, exitcode);
919
} else if (WIFSIGNALED(status)) {
920
signo = WTERMSIG(status);
921
crm_trace("Managed process %d (%s) exited with signal=%d", child->pid, child->desc, signo);
924
if (WCOREDUMP(status)) {
926
crm_err("Managed process %d (%s) dumped core", child->pid, child->desc);
931
if (child->callback) {
932
child->callback(child, child->pid, core, signo, exitcode);
935
crm_trace("Removing process entry %p for %d", child, child->pid);
940
child_list = g_list_remove_link(child_list, saved);
943
if (child->timerid != 0) {
944
crm_trace("Removing timer %d", child->timerid);
945
g_source_remove(child->timerid);
953
/* Create/Log a new tracked process
954
* To track a process group, use -pid
957
mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata,
958
void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
960
static bool need_init = TRUE;
961
mainloop_child_t *child = g_new(mainloop_child_t, 1);
965
child->timeout = FALSE;
966
child->privatedata = privatedata;
967
child->callback = callback;
970
child->desc = strdup(desc);
974
child->timerid = g_timeout_add(timeout, child_timeout_callback, child);
977
child_list = g_list_append(child_list, child);
982
/* Do NOT use g_child_watch_add() and friends, they rely on pthreads */
983
mainloop_add_signal(SIGCHLD, child_death_dispatch);
985
/* In case they terminated before the signal handler was installed */
986
child_death_dispatch(SIGCHLD);