1
/* Gearman server and library
2
* Copyright (C) 2008 Brian Aker, Eric Day
5
* Use and distribution licensed under the BSD license. See
6
* the COPYING file in the parent directory for full text.
11
* @brief Gearmand Definitions
17
* Private declarations
21
* @addtogroup gearmand_private Private Gearman Daemon Functions
26
static void _log(gearman_server_st *server, gearman_verbose_t verbose,
27
const char *line, void *arg);
29
static gearman_return_t _listen_init(gearmand_st *gearmand);
30
static void _listen_close(gearmand_st *gearmand);
31
static gearman_return_t _listen_watch(gearmand_st *gearmand);
32
static void _listen_clear(gearmand_st *gearmand);
33
static void _listen_event(int fd, short events, void *arg);
35
static gearman_return_t _wakeup_init(gearmand_st *gearmand);
36
static void _wakeup_close(gearmand_st *gearmand);
37
static gearman_return_t _wakeup_watch(gearmand_st *gearmand);
38
static void _wakeup_clear(gearmand_st *gearmand);
39
static void _wakeup_event(int fd, short events, void *arg);
41
static gearman_return_t _watch_events(gearmand_st *gearmand);
42
static void _clear_events(gearmand_st *gearmand);
43
static void _close_events(gearmand_st *gearmand);
51
gearmand_st *gearmand_create(const char *host, in_port_t port)
53
gearmand_st *gearmand;
55
gearmand= malloc(sizeof(gearmand_st));
59
if (gearman_server_create(&(gearmand->server)) == NULL)
68
gearmand->backlog= GEARMAN_DEFAULT_BACKLOG;
70
gearmand->port_count= 0;
71
gearmand->thread_count= 0;
72
gearmand->free_dcon_count= 0;
73
gearmand->max_thread_free_dcon_count= 0;
74
gearmand->wakeup_fd[0]= -1;
75
gearmand->wakeup_fd[1]= -1;
77
gearmand->log_fn= NULL;
78
gearmand->log_fn_arg= NULL;
80
gearmand->port_list= NULL;
81
gearmand->thread_list= NULL;
82
gearmand->thread_add_next= NULL;
83
gearmand->free_dcon_list= NULL;
86
port= GEARMAN_DEFAULT_TCP_PORT;
88
if (gearmand_port_add(gearmand, port, NULL) != GEARMAN_SUCCESS)
90
gearmand_free(gearmand);
97
void gearmand_free(gearmand_st *gearmand)
99
gearmand_con_st *dcon;
102
_close_events(gearmand);
104
if (gearmand->threads > 0)
105
GEARMAN_INFO(gearmand, "Shutting down all threads")
107
while (gearmand->thread_list != NULL)
108
gearmand_thread_free(gearmand->thread_list);
110
while (gearmand->free_dcon_list != NULL)
112
dcon= gearmand->free_dcon_list;
113
gearmand->free_dcon_list= dcon->next;
117
if (gearmand->base != NULL)
118
event_base_free(gearmand->base);
120
gearman_server_free(&(gearmand->server));
122
for (x= 0; x < gearmand->port_count; x++)
124
if (gearmand->port_list[x].listen_fd != NULL)
125
free(gearmand->port_list[x].listen_fd);
127
if (gearmand->port_list[x].listen_event != NULL)
128
free(gearmand->port_list[x].listen_event);
131
if (gearmand->port_list != NULL)
132
free(gearmand->port_list);
134
GEARMAN_INFO(gearmand, "Shutdown complete")
139
void gearmand_set_backlog(gearmand_st *gearmand, int backlog)
141
gearmand->backlog= backlog;
144
void gearmand_set_threads(gearmand_st *gearmand, uint32_t threads)
146
gearmand->threads= threads;
149
void gearmand_set_log(gearmand_st *gearmand, gearmand_log_fn log_fn,
150
void *log_fn_arg, gearman_verbose_t verbose)
152
gearman_server_set_log(&(gearmand->server), _log, gearmand, verbose);
153
gearmand->log_fn= log_fn;
154
gearmand->log_fn_arg= log_fn_arg;
155
gearmand->verbose= verbose;
158
gearman_return_t gearmand_port_add(gearmand_st *gearmand, in_port_t port,
159
gearman_con_add_fn *add_fn)
161
gearmand_port_st *port_list;
163
port_list= realloc(gearmand->port_list,
164
sizeof(gearmand_port_st) * (gearmand->port_count + 1));
165
if (port_list == NULL)
167
GEARMAN_FATAL(gearmand, "gearmand_port_add:realloc:NULL");
168
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
171
port_list[gearmand->port_count].port= port;
172
port_list[gearmand->port_count].listen_count= 0;
173
port_list[gearmand->port_count].gearmand= gearmand;
174
port_list[gearmand->port_count].add_fn= add_fn;
175
port_list[gearmand->port_count].listen_fd= NULL;
176
port_list[gearmand->port_count].listen_event= NULL;
178
gearmand->port_list= port_list;
179
gearmand->port_count++;
181
return GEARMAN_SUCCESS;
184
gearman_return_t gearmand_run(gearmand_st *gearmand)
188
/* Initialize server components. */
189
if (gearmand->base == NULL)
191
GEARMAN_INFO(gearmand, "Starting up")
193
if (gearmand->threads > 0)
195
#ifndef HAVE_EVENT_BASE_NEW
196
GEARMAN_FATAL(gearmand, "Multi-threaded gearmand requires libevent 1.4 " "or later, libevent 1.3 does not provided a "
197
"thread-safe interface.");
198
return GEARMAN_EVENT;
200
/* Set the number of free connection structures each thread should keep
201
around before the main thread is forced to take them. We compute this
202
here so we don't need to on every new connection. */
203
gearmand->max_thread_free_dcon_count= ((GEARMAN_MAX_FREE_SERVER_CON /
204
gearmand->threads) / 2);
208
GEARMAN_DEBUG(gearmand, "Initializing libevent for main thread")
210
gearmand->base= event_base_new();
211
if (gearmand->base == NULL)
213
GEARMAN_FATAL(gearmand, "gearmand_run:event_base_new:NULL")
214
return GEARMAN_EVENT;
217
GEARMAN_DEBUG(gearmand, "Method for libevent: %s",
218
event_base_get_method(gearmand->base));
220
gearmand->ret= _listen_init(gearmand);
221
if (gearmand->ret != GEARMAN_SUCCESS)
222
return gearmand->ret;
224
gearmand->ret= _wakeup_init(gearmand);
225
if (gearmand->ret != GEARMAN_SUCCESS)
226
return gearmand->ret;
228
GEARMAN_DEBUG(gearmand, "Creating %u threads", gearmand->threads)
230
/* If we have 0 threads we still need to create a fake one for context. */
234
gearmand->ret= gearmand_thread_create(gearmand);
235
if (gearmand->ret != GEARMAN_SUCCESS)
236
return gearmand->ret;
239
while (x < gearmand->threads);
241
gearmand->ret= gearman_server_queue_replay(&(gearmand->server));
242
if (gearmand->ret != GEARMAN_SUCCESS)
243
return gearmand->ret;
246
gearmand->ret= _watch_events(gearmand);
247
if (gearmand->ret != GEARMAN_SUCCESS)
248
return gearmand->ret;
250
GEARMAN_INFO(gearmand, "Entering main event loop")
252
if (event_base_loop(gearmand->base, 0) == -1)
254
GEARMAN_FATAL(gearmand, "gearmand_run:event_base_loop:-1")
255
return GEARMAN_EVENT;
258
GEARMAN_INFO(gearmand, "Exited main event loop")
260
return gearmand->ret;
263
void gearmand_wakeup(gearmand_st *gearmand, gearmand_wakeup_t wakeup)
265
uint8_t buffer= wakeup;
267
/* If this fails, there is not much we can really do. This should never fail
268
though if the main gearmand thread is still active. */
269
if (write(gearmand->wakeup_fd[1], &buffer, 1) != 1)
270
GEARMAN_ERROR(gearmand, "gearmand_wakeup:write:%d", errno)
274
* Private definitions
277
static void _log(gearman_server_st *server __attribute__ ((unused)),
278
gearman_verbose_t verbose, const char *line, void *fn_arg)
280
gearmand_st *gearmand= (gearmand_st *)fn_arg;
281
(*gearmand->log_fn)(gearmand, verbose, line, gearmand->log_fn_arg);
284
static gearman_return_t _listen_init(gearmand_st *gearmand)
286
struct gearmand_port_st *port;
287
struct addrinfo *addrinfo;
288
struct addrinfo *addrinfo_next;
292
char host[NI_MAXHOST];
293
char port_str[NI_MAXSERV];
299
for (x= 0; x < gearmand->port_count; x++)
301
port= &gearmand->port_list[x];
303
snprintf(port_str, NI_MAXSERV, "%u", port->port);
305
memset(&ai, 0, sizeof(struct addrinfo));
306
ai.ai_flags = AI_PASSIVE;
307
ai.ai_family = AF_UNSPEC;
308
ai.ai_socktype = SOCK_STREAM;
309
ai.ai_protocol= IPPROTO_TCP;
311
ret= getaddrinfo(gearmand->host, port_str, &ai, &addrinfo);
314
GEARMAN_FATAL(gearmand, "_listen_init:getaddrinfo:%s", gai_strerror(ret))
315
return GEARMAN_ERRNO;
318
for (addrinfo_next= addrinfo; addrinfo_next != NULL;
319
addrinfo_next= addrinfo_next->ai_next)
321
ret= getnameinfo(addrinfo_next->ai_addr, addrinfo_next->ai_addrlen, host,
322
NI_MAXHOST, port_str, NI_MAXSERV,
323
NI_NUMERICHOST | NI_NUMERICSERV);
326
GEARMAN_ERROR(gearmand, "_listen_init:getnameinfo:%s",
329
strcpy(port_str, "-");
332
GEARMAN_DEBUG(gearmand, "Trying to listen on %s:%s", host, port_str)
334
/* Call to socket() can fail for some getaddrinfo results, try another. */
335
fd= socket(addrinfo_next->ai_family, addrinfo_next->ai_socktype,
336
addrinfo_next->ai_protocol);
339
GEARMAN_ERROR(gearmand, "Failed to listen on %s:%s", host, port_str)
344
ret= setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
348
GEARMAN_FATAL(gearmand, "_listen_init:setsockopt:%d", errno)
349
return GEARMAN_ERRNO;
352
ret= bind(fd, addrinfo_next->ai_addr, addrinfo_next->ai_addrlen);
356
if (errno == EADDRINUSE)
358
if (port->listen_fd == NULL)
360
GEARMAN_ERROR(gearmand, "Address already in use %s:%s", host,
367
GEARMAN_FATAL(gearmand, "_listen_init:bind:%d", errno)
368
return GEARMAN_ERRNO;
371
if (listen(fd, gearmand->backlog) == -1)
374
GEARMAN_FATAL(gearmand, "_listen_init:listen:%d", errno)
375
return GEARMAN_ERRNO;
378
fd_list= realloc(port->listen_fd, sizeof(int) * (port->listen_count + 1));
382
GEARMAN_FATAL(gearmand, "_listen_init:realloc:%d", errno)
383
return GEARMAN_ERRNO;
386
port->listen_fd= fd_list;
387
port->listen_fd[port->listen_count]= fd;
388
port->listen_count++;
390
GEARMAN_INFO(gearmand, "Listening on %s:%s (%d)", host, port_str, fd)
393
freeaddrinfo(addrinfo);
395
/* Report last socket() error if we couldn't find an address to bind. */
396
if (port->listen_fd == NULL)
398
GEARMAN_FATAL(gearmand,
399
"_listen_init:Could not bind/listen to any addresses")
400
return GEARMAN_ERRNO;
403
port->listen_event= malloc(sizeof(struct event) * port->listen_count);
404
if (port->listen_event == NULL)
406
GEARMAN_FATAL(gearmand, "_listen_init:malloc:%d", errno)
407
return GEARMAN_ERRNO;
410
for (y= 0; y < port->listen_count; y++)
412
event_set(&(port->listen_event[y]), port->listen_fd[y],
413
EV_READ | EV_PERSIST, _listen_event, port);
414
event_base_set(gearmand->base, &(port->listen_event[y]));
418
return GEARMAN_SUCCESS;
421
static void _listen_close(gearmand_st *gearmand)
426
_listen_clear(gearmand);
428
for (x= 0; x < gearmand->port_count; x++)
430
for (y= 0; y < gearmand->port_list[x].listen_count; y++)
432
if (gearmand->port_list[x].listen_fd[y] >= 0)
434
GEARMAN_INFO(gearmand, "Closing listening socket (%d)",
435
gearmand->port_list[x].listen_fd[y])
436
close(gearmand->port_list[x].listen_fd[y]);
437
gearmand->port_list[x].listen_fd[y]= -1;
443
static gearman_return_t _listen_watch(gearmand_st *gearmand)
448
if (gearmand->options & GEARMAND_LISTEN_EVENT)
449
return GEARMAN_SUCCESS;
451
for (x= 0; x < gearmand->port_count; x++)
453
for (y= 0; y < gearmand->port_list[x].listen_count; y++)
455
GEARMAN_INFO(gearmand, "Adding event for listening socket (%d)",
456
gearmand->port_list[x].listen_fd[y])
458
if (event_add(&(gearmand->port_list[x].listen_event[y]), NULL) == -1)
460
GEARMAN_FATAL(gearmand, "_listen_watch:event_add:-1")
461
return GEARMAN_EVENT;
466
gearmand->options|= GEARMAND_LISTEN_EVENT;
467
return GEARMAN_SUCCESS;
470
static void _listen_clear(gearmand_st *gearmand)
475
if (!(gearmand->options & GEARMAND_LISTEN_EVENT))
478
for (x= 0; x < gearmand->port_count; x++)
480
for (y= 0; y < gearmand->port_list[x].listen_count; y++)
482
GEARMAN_INFO(gearmand, "Clearing event for listening socket (%d)",
483
gearmand->port_list[x].listen_fd[y])
484
assert(event_del(&(gearmand->port_list[x].listen_event[y])) == 0);
488
gearmand->options&= (gearmand_options_t)~GEARMAND_LISTEN_EVENT;
491
static void _listen_event(int fd, short events __attribute__ ((unused)),
494
gearmand_port_st *port= (gearmand_port_st *)arg;
497
char host[NI_MAXHOST];
498
char port_str[NI_MAXSERV];
502
fd= accept(fd, &sa, &sa_len);
507
else if (errno == EMFILE)
509
GEARMAN_ERROR(port->gearmand, "_listen_event:accept:too many open files")
513
_clear_events(port->gearmand);
514
GEARMAN_FATAL(port->gearmand, "_listen_event:accept:%d", errno)
515
port->gearmand->ret= GEARMAN_ERRNO;
519
/* Since this is numeric, it should never fail. Even if it did we don't want
520
to really error from it. */
521
ret= getnameinfo(&sa, sa_len, host, NI_MAXHOST, port_str, NI_MAXSERV,
522
NI_NUMERICHOST | NI_NUMERICSERV);
525
GEARMAN_ERROR(port->gearmand, "_listen_event:getnameinfo:%s",
528
strcpy(port_str, "-");
531
GEARMAN_INFO(port->gearmand, "Accepted connection from %s:%s", host, port_str)
533
port->gearmand->ret= gearmand_con_create(port->gearmand, fd, host, port_str,
535
if (port->gearmand->ret != GEARMAN_SUCCESS)
536
_clear_events(port->gearmand);
539
static gearman_return_t _wakeup_init(gearmand_st *gearmand)
543
GEARMAN_INFO(gearmand, "Creating wakeup pipe")
545
ret= pipe(gearmand->wakeup_fd);
548
GEARMAN_FATAL(gearmand, "_wakeup_init:pipe:%d", errno)
549
return GEARMAN_ERRNO;
552
ret= fcntl(gearmand->wakeup_fd[0], F_GETFL, 0);
555
GEARMAN_FATAL(gearmand, "_wakeup_init:fcntl:F_GETFL:%d", errno)
556
return GEARMAN_ERRNO;
559
ret= fcntl(gearmand->wakeup_fd[0], F_SETFL, ret | O_NONBLOCK);
562
GEARMAN_FATAL(gearmand, "_wakeup_init:fcntl:F_SETFL:%d", errno)
563
return GEARMAN_ERRNO;
566
event_set(&(gearmand->wakeup_event), gearmand->wakeup_fd[0],
567
EV_READ | EV_PERSIST, _wakeup_event, gearmand);
568
event_base_set(gearmand->base, &(gearmand->wakeup_event));
570
return GEARMAN_SUCCESS;
573
static void _wakeup_close(gearmand_st *gearmand)
575
_wakeup_clear(gearmand);
577
if (gearmand->wakeup_fd[0] >= 0)
579
GEARMAN_INFO(gearmand, "Closing wakeup pipe")
580
close(gearmand->wakeup_fd[0]);
581
gearmand->wakeup_fd[0]= -1;
582
close(gearmand->wakeup_fd[1]);
583
gearmand->wakeup_fd[1]= -1;
587
static gearman_return_t _wakeup_watch(gearmand_st *gearmand)
589
if (gearmand->options & GEARMAND_WAKEUP_EVENT)
590
return GEARMAN_SUCCESS;
592
GEARMAN_INFO(gearmand, "Adding event for wakeup pipe")
594
if (event_add(&(gearmand->wakeup_event), NULL) == -1)
596
GEARMAN_FATAL(gearmand, "_wakeup_watch:event_add:-1")
597
return GEARMAN_EVENT;
600
gearmand->options|= GEARMAND_WAKEUP_EVENT;
601
return GEARMAN_SUCCESS;
604
static void _wakeup_clear(gearmand_st *gearmand)
606
if (gearmand->options & GEARMAND_WAKEUP_EVENT)
608
GEARMAN_INFO(gearmand, "Clearing event for wakeup pipe")
609
assert(event_del(&(gearmand->wakeup_event)) == 0);
610
gearmand->options&= (gearmand_options_t)~GEARMAND_WAKEUP_EVENT;
614
static void _wakeup_event(int fd, short events __attribute__ ((unused)),
617
gearmand_st *gearmand= (gearmand_st *)arg;
618
uint8_t buffer[GEARMAN_PIPE_BUFFER_SIZE];
621
gearmand_thread_st *thread;
625
ret= read(fd, buffer, GEARMAN_PIPE_BUFFER_SIZE);
628
_clear_events(gearmand);
629
GEARMAN_FATAL(gearmand, "_wakeup_event:read:EOF")
630
gearmand->ret= GEARMAN_PIPE_EOF;
641
_clear_events(gearmand);
642
GEARMAN_FATAL(gearmand, "_wakeup_event:read:%d", errno)
643
gearmand->ret= GEARMAN_ERRNO;
647
for (x= 0; x < ret; x++)
649
switch ((gearmand_wakeup_t)buffer[x])
651
case GEARMAND_WAKEUP_PAUSE:
652
GEARMAN_INFO(gearmand, "Received PAUSE wakeup event")
653
_clear_events(gearmand);
654
gearmand->ret= GEARMAN_PAUSE;
657
case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
658
GEARMAN_INFO(gearmand, "Received SHUTDOWN_GRACEFUL wakeup event")
659
_listen_close(gearmand);
661
for (thread= gearmand->thread_list; thread != NULL;
662
thread= thread->next)
664
gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL);
667
gearmand->ret= GEARMAN_SHUTDOWN_GRACEFUL;
670
case GEARMAND_WAKEUP_SHUTDOWN:
671
GEARMAN_INFO(gearmand, "Received SHUTDOWN wakeup event")
672
_clear_events(gearmand);
673
gearmand->ret= GEARMAN_SHUTDOWN;
676
case GEARMAND_WAKEUP_CON:
677
case GEARMAND_WAKEUP_RUN:
679
GEARMAN_FATAL(gearmand, "Received unknown wakeup event (%u)",
681
_clear_events(gearmand);
682
gearmand->ret= GEARMAN_UNKNOWN_STATE;
689
static gearman_return_t _watch_events(gearmand_st *gearmand)
691
gearman_return_t ret;
693
ret= _listen_watch(gearmand);
694
if (ret != GEARMAN_SUCCESS)
697
ret= _wakeup_watch(gearmand);
698
if (ret != GEARMAN_SUCCESS)
701
return GEARMAN_SUCCESS;
704
static void _clear_events(gearmand_st *gearmand)
706
_listen_clear(gearmand);
707
_wakeup_clear(gearmand);
709
/* If we are not threaded, tell the fake thread to shutdown now to clear
710
connections. Otherwise we will never exit the libevent loop. */
711
if (gearmand->threads == 0 && gearmand->thread_list != NULL)
712
gearmand_thread_wakeup(gearmand->thread_list, GEARMAND_WAKEUP_SHUTDOWN);
715
static void _close_events(gearmand_st *gearmand)
717
_listen_close(gearmand);
718
_wakeup_close(gearmand);