1
/* Gearman server and library
2
* Copyright (C) 2008 Brian Aker, Eric Day
5
* Use and distribution licensed under the BSD license. See
6
* the COPYING file in the parent directory for full text.
11
* @brief Gearmand Definitions
18
* Private declarations
22
* @addtogroup gearmand_private Private Gearman Daemon Functions
27
static void _log(const char *line, gearman_verbose_t verbose, void *context);
29
static gearman_return_t _listen_init(gearmand_st *gearmand);
30
static void _listen_close(gearmand_st *gearmand);
31
static gearman_return_t _listen_watch(gearmand_st *gearmand);
32
static void _listen_clear(gearmand_st *gearmand);
33
static void _listen_event(int fd, short events, void *arg);
35
static gearman_return_t _wakeup_init(gearmand_st *gearmand);
36
static void _wakeup_close(gearmand_st *gearmand);
37
static gearman_return_t _wakeup_watch(gearmand_st *gearmand);
38
static void _wakeup_clear(gearmand_st *gearmand);
39
static void _wakeup_event(int fd, short events, void *arg);
41
static gearman_return_t _watch_events(gearmand_st *gearmand);
42
static void _clear_events(gearmand_st *gearmand);
43
static void _close_events(gearmand_st *gearmand);
51
gearmand_st *gearmand_create(const char *host, in_port_t port)
53
gearmand_st *gearmand;
55
gearmand= malloc(sizeof(gearmand_st));
59
if (gearman_server_create(&(gearmand->server)) == NULL)
68
gearmand->backlog= GEARMAN_DEFAULT_BACKLOG;
70
gearmand->port_count= 0;
71
gearmand->thread_count= 0;
72
gearmand->free_dcon_count= 0;
73
gearmand->max_thread_free_dcon_count= 0;
74
gearmand->wakeup_fd[0]= -1;
75
gearmand->wakeup_fd[1]= -1;
77
gearmand->log_fn= NULL;
78
gearmand->log_context= NULL;
80
gearmand->port_list= NULL;
81
gearmand->thread_list= NULL;
82
gearmand->thread_add_next= NULL;
83
gearmand->free_dcon_list= NULL;
86
port= GEARMAN_DEFAULT_TCP_PORT;
88
if (gearmand_port_add(gearmand, port, NULL) != GEARMAN_SUCCESS)
90
gearmand_free(gearmand);
97
void gearmand_free(gearmand_st *gearmand)
99
gearmand_con_st *dcon;
102
_close_events(gearmand);
104
if (gearmand->threads > 0)
105
GEARMAN_INFO(gearmand, "Shutting down all threads")
107
while (gearmand->thread_list != NULL)
108
gearmand_thread_free(gearmand->thread_list);
110
while (gearmand->free_dcon_list != NULL)
112
dcon= gearmand->free_dcon_list;
113
gearmand->free_dcon_list= dcon->next;
117
if (gearmand->base != NULL)
118
event_base_free(gearmand->base);
120
gearman_server_free(&(gearmand->server));
122
for (x= 0; x < gearmand->port_count; x++)
124
if (gearmand->port_list[x].listen_fd != NULL)
125
free(gearmand->port_list[x].listen_fd);
127
if (gearmand->port_list[x].listen_event != NULL)
128
free(gearmand->port_list[x].listen_event);
131
if (gearmand->port_list != NULL)
132
free(gearmand->port_list);
134
GEARMAN_INFO(gearmand, "Shutdown complete")
139
void gearmand_set_backlog(gearmand_st *gearmand, int backlog)
141
gearmand->backlog= backlog;
144
void gearmand_set_job_retries(gearmand_st *gearmand, uint8_t job_retries)
146
gearman_server_set_job_retries(&(gearmand->server), job_retries);
149
void gearmand_set_threads(gearmand_st *gearmand, uint32_t threads)
151
gearmand->threads= threads;
154
void gearmand_set_log_fn(gearmand_st *gearmand, gearman_log_fn *function,
155
const void *context, gearman_verbose_t verbose)
157
gearman_server_set_log_fn(&(gearmand->server), _log, gearmand, verbose);
158
gearmand->log_fn= function;
159
gearmand->log_context= context;
160
gearmand->verbose= verbose;
163
gearman_return_t gearmand_port_add(gearmand_st *gearmand, in_port_t port,
164
gearman_con_add_fn *function)
166
gearmand_port_st *port_list;
168
port_list= realloc(gearmand->port_list,
169
sizeof(gearmand_port_st) * (gearmand->port_count + 1));
170
if (port_list == NULL)
172
GEARMAN_FATAL(gearmand, "gearmand_port_add:realloc:NULL");
173
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
176
port_list[gearmand->port_count].port= port;
177
port_list[gearmand->port_count].listen_count= 0;
178
port_list[gearmand->port_count].gearmand= gearmand;
179
port_list[gearmand->port_count].add_fn= function;
180
port_list[gearmand->port_count].listen_fd= NULL;
181
port_list[gearmand->port_count].listen_event= NULL;
183
gearmand->port_list= port_list;
184
gearmand->port_count++;
186
return GEARMAN_SUCCESS;
189
gearman_return_t gearmand_run(gearmand_st *gearmand)
193
/* Initialize server components. */
194
if (gearmand->base == NULL)
196
GEARMAN_INFO(gearmand, "Starting up")
198
if (gearmand->threads > 0)
200
#ifndef HAVE_EVENT_BASE_NEW
201
GEARMAN_FATAL(gearmand, "Multi-threaded gearmand requires libevent 1.4 " "or later, libevent 1.3 does not provided a "
202
"thread-safe interface.");
203
return GEARMAN_EVENT;
205
/* Set the number of free connection structures each thread should keep
206
around before the main thread is forced to take them. We compute this
207
here so we don't need to on every new connection. */
208
gearmand->max_thread_free_dcon_count= ((GEARMAN_MAX_FREE_SERVER_CON /
209
gearmand->threads) / 2);
213
GEARMAN_DEBUG(gearmand, "Initializing libevent for main thread")
215
gearmand->base= event_base_new();
216
if (gearmand->base == NULL)
218
GEARMAN_FATAL(gearmand, "gearmand_run:event_base_new:NULL")
219
return GEARMAN_EVENT;
222
GEARMAN_DEBUG(gearmand, "Method for libevent: %s",
223
event_base_get_method(gearmand->base));
225
gearmand->ret= _listen_init(gearmand);
226
if (gearmand->ret != GEARMAN_SUCCESS)
227
return gearmand->ret;
229
gearmand->ret= _wakeup_init(gearmand);
230
if (gearmand->ret != GEARMAN_SUCCESS)
231
return gearmand->ret;
233
GEARMAN_DEBUG(gearmand, "Creating %u threads", gearmand->threads)
235
/* If we have 0 threads we still need to create a fake one for context. */
239
gearmand->ret= gearmand_thread_create(gearmand);
240
if (gearmand->ret != GEARMAN_SUCCESS)
241
return gearmand->ret;
244
while (x < gearmand->threads);
246
gearmand->ret= gearman_server_queue_replay(&(gearmand->server));
247
if (gearmand->ret != GEARMAN_SUCCESS)
248
return gearmand->ret;
251
gearmand->ret= _watch_events(gearmand);
252
if (gearmand->ret != GEARMAN_SUCCESS)
253
return gearmand->ret;
255
GEARMAN_INFO(gearmand, "Entering main event loop")
257
if (event_base_loop(gearmand->base, 0) == -1)
259
GEARMAN_FATAL(gearmand, "gearmand_run:event_base_loop:-1")
260
return GEARMAN_EVENT;
263
GEARMAN_INFO(gearmand, "Exited main event loop")
265
return gearmand->ret;
268
void gearmand_wakeup(gearmand_st *gearmand, gearmand_wakeup_t wakeup)
270
uint8_t buffer= wakeup;
272
/* If this fails, there is not much we can really do. This should never fail
273
though if the main gearmand thread is still active. */
274
if (write(gearmand->wakeup_fd[1], &buffer, 1) != 1)
275
GEARMAN_ERROR(gearmand, "gearmand_wakeup:write:%d", errno)
279
* Private definitions
282
static void _log(const char *line, gearman_verbose_t verbose, void *context)
284
gearmand_st *gearmand= (gearmand_st *)context;
285
(*gearmand->log_fn)(line, verbose, (void *)gearmand->log_context);
288
static gearman_return_t _listen_init(gearmand_st *gearmand)
290
struct gearmand_port_st *port;
291
struct addrinfo *addrinfo;
292
struct addrinfo *addrinfo_next;
296
char host[NI_MAXHOST];
297
char port_str[NI_MAXSERV];
303
for (x= 0; x < gearmand->port_count; x++)
305
port= &gearmand->port_list[x];
307
snprintf(port_str, NI_MAXSERV, "%u", port->port);
309
memset(&ai, 0, sizeof(struct addrinfo));
310
ai.ai_flags = AI_PASSIVE;
311
ai.ai_family = AF_UNSPEC;
312
ai.ai_socktype = SOCK_STREAM;
313
ai.ai_protocol= IPPROTO_TCP;
315
ret= getaddrinfo(gearmand->host, port_str, &ai, &addrinfo);
318
GEARMAN_FATAL(gearmand, "_listen_init:getaddrinfo:%s", gai_strerror(ret))
319
return GEARMAN_ERRNO;
322
for (addrinfo_next= addrinfo; addrinfo_next != NULL;
323
addrinfo_next= addrinfo_next->ai_next)
325
ret= getnameinfo(addrinfo_next->ai_addr, addrinfo_next->ai_addrlen, host,
326
NI_MAXHOST, port_str, NI_MAXSERV,
327
NI_NUMERICHOST | NI_NUMERICSERV);
330
GEARMAN_ERROR(gearmand, "_listen_init:getnameinfo:%s",
333
strcpy(port_str, "-");
336
GEARMAN_DEBUG(gearmand, "Trying to listen on %s:%s", host, port_str)
338
/* Call to socket() can fail for some getaddrinfo results, try another. */
339
fd= socket(addrinfo_next->ai_family, addrinfo_next->ai_socktype,
340
addrinfo_next->ai_protocol);
343
GEARMAN_ERROR(gearmand, "Failed to listen on %s:%s", host, port_str)
348
ret= setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
352
GEARMAN_FATAL(gearmand, "_listen_init:setsockopt:%d", errno)
353
return GEARMAN_ERRNO;
356
ret= bind(fd, addrinfo_next->ai_addr, addrinfo_next->ai_addrlen);
360
if (errno == EADDRINUSE)
362
if (port->listen_fd == NULL)
364
GEARMAN_ERROR(gearmand, "Address already in use %s:%s", host,
371
GEARMAN_FATAL(gearmand, "_listen_init:bind:%d", errno)
372
return GEARMAN_ERRNO;
375
if (listen(fd, gearmand->backlog) == -1)
378
GEARMAN_FATAL(gearmand, "_listen_init:listen:%d", errno)
379
return GEARMAN_ERRNO;
382
fd_list= realloc(port->listen_fd, sizeof(int) * (port->listen_count + 1));
386
GEARMAN_FATAL(gearmand, "_listen_init:realloc:%d", errno)
387
return GEARMAN_ERRNO;
390
port->listen_fd= fd_list;
391
port->listen_fd[port->listen_count]= fd;
392
port->listen_count++;
394
GEARMAN_INFO(gearmand, "Listening on %s:%s (%d)", host, port_str, fd)
397
freeaddrinfo(addrinfo);
399
/* Report last socket() error if we couldn't find an address to bind. */
400
if (port->listen_fd == NULL)
402
GEARMAN_FATAL(gearmand,
403
"_listen_init:Could not bind/listen to any addresses")
404
return GEARMAN_ERRNO;
407
port->listen_event= malloc(sizeof(struct event) * port->listen_count);
408
if (port->listen_event == NULL)
410
GEARMAN_FATAL(gearmand, "_listen_init:malloc:%d", errno)
411
return GEARMAN_ERRNO;
414
for (y= 0; y < port->listen_count; y++)
416
event_set(&(port->listen_event[y]), port->listen_fd[y],
417
EV_READ | EV_PERSIST, _listen_event, port);
418
event_base_set(gearmand->base, &(port->listen_event[y]));
422
return GEARMAN_SUCCESS;
425
static void _listen_close(gearmand_st *gearmand)
430
_listen_clear(gearmand);
432
for (x= 0; x < gearmand->port_count; x++)
434
for (y= 0; y < gearmand->port_list[x].listen_count; y++)
436
if (gearmand->port_list[x].listen_fd[y] >= 0)
438
GEARMAN_INFO(gearmand, "Closing listening socket (%d)",
439
gearmand->port_list[x].listen_fd[y])
440
close(gearmand->port_list[x].listen_fd[y]);
441
gearmand->port_list[x].listen_fd[y]= -1;
447
static gearman_return_t _listen_watch(gearmand_st *gearmand)
452
if (gearmand->options & GEARMAND_LISTEN_EVENT)
453
return GEARMAN_SUCCESS;
455
for (x= 0; x < gearmand->port_count; x++)
457
for (y= 0; y < gearmand->port_list[x].listen_count; y++)
459
GEARMAN_INFO(gearmand, "Adding event for listening socket (%d)",
460
gearmand->port_list[x].listen_fd[y])
462
if (event_add(&(gearmand->port_list[x].listen_event[y]), NULL) == -1)
464
GEARMAN_FATAL(gearmand, "_listen_watch:event_add:-1")
465
return GEARMAN_EVENT;
470
gearmand->options|= GEARMAND_LISTEN_EVENT;
471
return GEARMAN_SUCCESS;
474
static void _listen_clear(gearmand_st *gearmand)
479
if (!(gearmand->options & GEARMAND_LISTEN_EVENT))
482
for (x= 0; x < gearmand->port_count; x++)
484
for (y= 0; y < gearmand->port_list[x].listen_count; y++)
486
GEARMAN_INFO(gearmand, "Clearing event for listening socket (%d)",
487
gearmand->port_list[x].listen_fd[y])
488
assert(event_del(&(gearmand->port_list[x].listen_event[y])) == 0);
492
gearmand->options&= (gearmand_options_t)~GEARMAND_LISTEN_EVENT;
495
static void _listen_event(int fd, short events __attribute__ ((unused)),
498
gearmand_port_st *port= (gearmand_port_st *)arg;
501
char host[NI_MAXHOST];
502
char port_str[NI_MAXSERV];
506
fd= accept(fd, &sa, &sa_len);
511
else if (errno == EMFILE)
513
GEARMAN_ERROR(port->gearmand, "_listen_event:accept:too many open files")
517
_clear_events(port->gearmand);
518
GEARMAN_FATAL(port->gearmand, "_listen_event:accept:%d", errno)
519
port->gearmand->ret= GEARMAN_ERRNO;
523
/* Since this is numeric, it should never fail. Even if it did we don't want
524
to really error from it. */
525
ret= getnameinfo(&sa, sa_len, host, NI_MAXHOST, port_str, NI_MAXSERV,
526
NI_NUMERICHOST | NI_NUMERICSERV);
529
GEARMAN_ERROR(port->gearmand, "_listen_event:getnameinfo:%s",
532
strcpy(port_str, "-");
535
GEARMAN_INFO(port->gearmand, "Accepted connection from %s:%s", host, port_str)
537
port->gearmand->ret= gearmand_con_create(port->gearmand, fd, host, port_str,
539
if (port->gearmand->ret != GEARMAN_SUCCESS)
540
_clear_events(port->gearmand);
543
static gearman_return_t _wakeup_init(gearmand_st *gearmand)
547
GEARMAN_INFO(gearmand, "Creating wakeup pipe")
549
ret= pipe(gearmand->wakeup_fd);
552
GEARMAN_FATAL(gearmand, "_wakeup_init:pipe:%d", errno)
553
return GEARMAN_ERRNO;
556
ret= fcntl(gearmand->wakeup_fd[0], F_GETFL, 0);
559
GEARMAN_FATAL(gearmand, "_wakeup_init:fcntl:F_GETFL:%d", errno)
560
return GEARMAN_ERRNO;
563
ret= fcntl(gearmand->wakeup_fd[0], F_SETFL, ret | O_NONBLOCK);
566
GEARMAN_FATAL(gearmand, "_wakeup_init:fcntl:F_SETFL:%d", errno)
567
return GEARMAN_ERRNO;
570
event_set(&(gearmand->wakeup_event), gearmand->wakeup_fd[0],
571
EV_READ | EV_PERSIST, _wakeup_event, gearmand);
572
event_base_set(gearmand->base, &(gearmand->wakeup_event));
574
return GEARMAN_SUCCESS;
577
static void _wakeup_close(gearmand_st *gearmand)
579
_wakeup_clear(gearmand);
581
if (gearmand->wakeup_fd[0] >= 0)
583
GEARMAN_INFO(gearmand, "Closing wakeup pipe")
584
close(gearmand->wakeup_fd[0]);
585
gearmand->wakeup_fd[0]= -1;
586
close(gearmand->wakeup_fd[1]);
587
gearmand->wakeup_fd[1]= -1;
591
static gearman_return_t _wakeup_watch(gearmand_st *gearmand)
593
if (gearmand->options & GEARMAND_WAKEUP_EVENT)
594
return GEARMAN_SUCCESS;
596
GEARMAN_INFO(gearmand, "Adding event for wakeup pipe")
598
if (event_add(&(gearmand->wakeup_event), NULL) == -1)
600
GEARMAN_FATAL(gearmand, "_wakeup_watch:event_add:-1")
601
return GEARMAN_EVENT;
604
gearmand->options|= GEARMAND_WAKEUP_EVENT;
605
return GEARMAN_SUCCESS;
608
static void _wakeup_clear(gearmand_st *gearmand)
610
if (gearmand->options & GEARMAND_WAKEUP_EVENT)
612
GEARMAN_INFO(gearmand, "Clearing event for wakeup pipe")
613
assert(event_del(&(gearmand->wakeup_event)) == 0);
614
gearmand->options&= (gearmand_options_t)~GEARMAND_WAKEUP_EVENT;
618
static void _wakeup_event(int fd, short events __attribute__ ((unused)),
621
gearmand_st *gearmand= (gearmand_st *)arg;
622
uint8_t buffer[GEARMAN_PIPE_BUFFER_SIZE];
625
gearmand_thread_st *thread;
629
ret= read(fd, buffer, GEARMAN_PIPE_BUFFER_SIZE);
632
_clear_events(gearmand);
633
GEARMAN_FATAL(gearmand, "_wakeup_event:read:EOF")
634
gearmand->ret= GEARMAN_PIPE_EOF;
645
_clear_events(gearmand);
646
GEARMAN_FATAL(gearmand, "_wakeup_event:read:%d", errno)
647
gearmand->ret= GEARMAN_ERRNO;
651
for (x= 0; x < ret; x++)
653
switch ((gearmand_wakeup_t)buffer[x])
655
case GEARMAND_WAKEUP_PAUSE:
656
GEARMAN_INFO(gearmand, "Received PAUSE wakeup event")
657
_clear_events(gearmand);
658
gearmand->ret= GEARMAN_PAUSE;
661
case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
662
GEARMAN_INFO(gearmand, "Received SHUTDOWN_GRACEFUL wakeup event")
663
_listen_close(gearmand);
665
for (thread= gearmand->thread_list; thread != NULL;
666
thread= thread->next)
668
gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL);
671
gearmand->ret= GEARMAN_SHUTDOWN_GRACEFUL;
674
case GEARMAND_WAKEUP_SHUTDOWN:
675
GEARMAN_INFO(gearmand, "Received SHUTDOWN wakeup event")
676
_clear_events(gearmand);
677
gearmand->ret= GEARMAN_SHUTDOWN;
680
case GEARMAND_WAKEUP_CON:
681
case GEARMAND_WAKEUP_RUN:
683
GEARMAN_FATAL(gearmand, "Received unknown wakeup event (%u)",
685
_clear_events(gearmand);
686
gearmand->ret= GEARMAN_UNKNOWN_STATE;
693
static gearman_return_t _watch_events(gearmand_st *gearmand)
695
gearman_return_t ret;
697
ret= _listen_watch(gearmand);
698
if (ret != GEARMAN_SUCCESS)
701
ret= _wakeup_watch(gearmand);
702
if (ret != GEARMAN_SUCCESS)
705
return GEARMAN_SUCCESS;
708
static void _clear_events(gearmand_st *gearmand)
710
_listen_clear(gearmand);
711
_wakeup_clear(gearmand);
713
/* If we are not threaded, tell the fake thread to shutdown now to clear
714
connections. Otherwise we will never exit the libevent loop. */
715
if (gearmand->threads == 0 && gearmand->thread_list != NULL)
716
gearmand_thread_wakeup(gearmand->thread_list, GEARMAND_WAKEUP_SHUTDOWN);
719
static void _close_events(gearmand_st *gearmand)
721
_listen_close(gearmand);
722
_wakeup_close(gearmand);