1
/* Gearman server and library
2
* Copyright (C) 2008 Brian Aker, Eric Day
5
* Use and distribution licensed under the BSD license. See
6
* the COPYING file in the parent directory for full text.
11
* @brief Gearmand Thread Definitions
14
#include <libgearman-server/common.h>
15
#include <libgearman-server/gearmand.h>
20
#include <libgearman-server/list.h>
23
* Private declarations
27
* @addtogroup gearmand_thread_private Private Gearmand Thread Functions
28
* @ingroup gearmand_thread
32
static void *_thread(void *data);
33
static void _log(const char *line, gearmand_verbose_t verbose, gearmand_thread_st *dthread);
34
static void _run(gearman_server_thread_st *thread, void *fn_arg);
36
static gearmand_error_t _wakeup_init(gearmand_thread_st *thread);
37
static void _wakeup_close(gearmand_thread_st *thread);
38
static void _wakeup_clear(gearmand_thread_st *thread);
39
static void _wakeup_event(int fd, short events, void *arg);
40
static void _clear_events(gearmand_thread_st *thread);
48
gearmand_error_t gearmand_thread_create(gearmand_st *gearmand)
50
gearmand_thread_st *thread;
53
thread= static_cast<gearmand_thread_st *>(malloc(sizeof(gearmand_thread_st)));
56
return gearmand_merror("malloc", gearmand_thread_st, 1);
59
if (! gearman_server_thread_init(gearmand_server(gearmand), &(thread->server_thread),
60
_log, thread, gearmand_connection_watch))
63
gearmand_fatal("gearman_server_thread_init(NULL)");
64
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
67
thread->is_thread_lock= false;
68
thread->is_wakeup_event= false;
70
thread->dcon_count= 0;
71
thread->dcon_add_count= 0;
72
thread->free_dcon_count= 0;
73
thread->wakeup_fd[0]= -1;
74
thread->wakeup_fd[1]= -1;
76
gearmand_thread_list_add(thread);
78
thread->dcon_list= NULL;
79
thread->dcon_add_list= NULL;
80
thread->free_dcon_list= NULL;
82
/* If we have no threads, we still create a fake thread that uses the main
83
libevent instance. Otherwise create a libevent instance for each thread. */
84
if (gearmand->threads == 0)
86
thread->base= gearmand->base;
90
gearmand_info("Initializing libevent for IO thread");
92
thread->base= static_cast<struct event_base *>(event_base_new());
93
if (thread->base == NULL)
95
gearmand_thread_free(thread);
96
gearmand_fatal("event_base_new(NULL)");
101
ret= _wakeup_init(thread);
102
if (ret != GEARMAN_SUCCESS)
104
gearmand_thread_free(thread);
108
/* If we are not running multi-threaded, just return the thread context. */
109
if (gearmand->threads == 0)
110
return GEARMAN_SUCCESS;
112
thread->count= gearmand->thread_count;
115
pthread_ret= pthread_mutex_init(&(thread->lock), NULL);
116
if (pthread_ret != 0)
119
gearmand_thread_free(thread);
122
gearmand_fatal_perror("pthread_mutex_init");
123
return GEARMAN_ERRNO;
126
thread->is_thread_lock= true;
128
gearman_server_thread_set_run(&(thread->server_thread), _run, thread);
130
pthread_ret= pthread_create(&(thread->id), NULL, _thread, thread);
131
if (pthread_ret != 0)
134
gearmand_thread_free(thread);
137
gearmand_perror("pthread_create");
139
return GEARMAN_ERRNO;
142
gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Thread %u created", thread->count);
144
return GEARMAN_SUCCESS;
147
void gearmand_thread_free(gearmand_thread_st *thread)
149
gearmand_con_st *dcon;
151
if (Gearmand()->threads && thread->count > 0)
153
gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Shutting down thread %u", thread->count);
155
gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN);
156
(void) pthread_join(thread->id, NULL);
159
if (thread->is_thread_lock)
160
(void) pthread_mutex_destroy(&(thread->lock));
162
_wakeup_close(thread);
164
while (thread->dcon_list != NULL)
166
gearmand_con_free(thread->dcon_list);
169
while (thread->dcon_add_list != NULL)
171
dcon= thread->dcon_add_list;
172
thread->dcon_add_list= dcon->next;
173
gearmand_sockfd_close(dcon->fd);
177
while (thread->free_dcon_list != NULL)
179
dcon= thread->free_dcon_list;
180
thread->free_dcon_list= dcon->next;
184
gearman_server_thread_free(&(thread->server_thread));
186
gearmand_thread_list_free(thread);
188
if (Gearmand()->threads > 0)
190
if (thread->base != NULL)
191
event_base_free(thread->base);
193
gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Thread %u shutdown complete", thread->count);
199
void gearmand_thread_wakeup(gearmand_thread_st *thread,
200
gearmand_wakeup_t wakeup)
202
uint8_t buffer= wakeup;
204
/* If this fails, there is not much we can really do. This should never fail
205
though if the thread is still active. */
206
if (write(thread->wakeup_fd[1], &buffer, 1) != 1)
208
gearmand_perror("write");
212
void gearmand_thread_run(gearmand_thread_st *thread)
216
gearmand_error_t ret;
217
gearmand_con_st *dcon= gearman_server_thread_run(&(thread->server_thread), &ret);
219
if (ret == GEARMAN_SUCCESS || ret == GEARMAN_IO_WAIT ||
220
ret == GEARMAN_SHUTDOWN_GRACEFUL)
227
/* We either got a GEARMAN_SHUTDOWN or some other fatal internal error.
228
Either way, we want to shut the server down. */
229
gearmand_wakeup(Gearmand(), GEARMAND_WAKEUP_SHUTDOWN);
233
gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Disconnected %s:%s", dcon->host, dcon->port);
235
gearmand_con_free(dcon);
239
#ifndef __INTEL_COMPILER
240
#pragma GCC diagnostic ignored "-Wold-style-cast"
244
* Private definitions
247
static void *_thread(void *data)
249
gearmand_thread_st *thread= (gearmand_thread_st *)data;
252
snprintf(buffer, sizeof(buffer), "[%6u ]", thread->count);
254
gearmand_initialize_thread_logging(buffer);
256
gearmand_info("Entering thread event loop");
258
if (event_base_loop(thread->base, 0) == -1)
260
gearmand_fatal("event_base_loop(-1)");
261
Gearmand()->ret= GEARMAN_EVENT;
264
gearmand_info("Exiting thread event loop");
269
static void _log(const char *line, gearmand_verbose_t verbose, gearmand_thread_st *dthread)
272
(*Gearmand()->log_fn)(line, verbose, (void *)Gearmand()->log_context);
275
static void _run(gearman_server_thread_st *thread __attribute__ ((unused)),
278
gearmand_thread_st *dthread= (gearmand_thread_st *)fn_arg;
279
gearmand_thread_wakeup(dthread, GEARMAND_WAKEUP_RUN);
282
static gearmand_error_t _wakeup_init(gearmand_thread_st *thread)
286
gearmand_info("Creating IO thread wakeup pipe");
288
ret= pipe(thread->wakeup_fd);
291
gearmand_perror("pipe");
292
return GEARMAN_ERRNO;
295
ret= fcntl(thread->wakeup_fd[0], F_GETFL, 0);
298
gearmand_perror("fcntl(F_GETFL)");
299
return GEARMAN_ERRNO;
302
ret= fcntl(thread->wakeup_fd[0], F_SETFL, ret | O_NONBLOCK);
305
gearmand_perror("fcntl(F_SETFL)");
306
return GEARMAN_ERRNO;
309
event_set(&(thread->wakeup_event), thread->wakeup_fd[0], EV_READ | EV_PERSIST,
310
_wakeup_event, thread);
311
event_base_set(thread->base, &(thread->wakeup_event));
313
if (event_add(&(thread->wakeup_event), NULL) < 0)
315
gearmand_perror("event_add");
316
return GEARMAN_EVENT;
319
thread->is_wakeup_event= true;
321
return GEARMAN_SUCCESS;
324
static void _wakeup_close(gearmand_thread_st *thread)
326
_wakeup_clear(thread);
328
if (thread->wakeup_fd[0] >= 0)
330
gearmand_info("Closing IO thread wakeup pipe");
331
gearmand_pipe_close(thread->wakeup_fd[0]);
332
thread->wakeup_fd[0]= -1;
333
gearmand_pipe_close(thread->wakeup_fd[1]);
334
thread->wakeup_fd[1]= -1;
338
static void _wakeup_clear(gearmand_thread_st *thread)
340
if (thread->is_wakeup_event)
342
gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Clearing event for IO thread wakeup pipe %u", thread->count);
343
if (event_del(&(thread->wakeup_event)) < 0)
345
gearmand_perror("event_del");
346
assert(! "event_del");
348
thread->is_wakeup_event= false;
352
static void _wakeup_event(int fd, short events __attribute__ ((unused)), void *arg)
354
gearmand_thread_st *thread= (gearmand_thread_st *)arg;
355
uint8_t buffer[GEARMAN_PIPE_BUFFER_SIZE];
360
ret= read(fd, buffer, GEARMAN_PIPE_BUFFER_SIZE);
363
_clear_events(thread);
364
gearmand_fatal("read(EOF)");
365
Gearmand()->ret= GEARMAN_PIPE_EOF;
376
_clear_events(thread);
377
gearmand_perror("_wakeup_event:read");
378
Gearmand()->ret= GEARMAN_ERRNO;
382
for (ssize_t x= 0; x < ret; x++)
384
switch ((gearmand_wakeup_t)buffer[x])
386
case GEARMAND_WAKEUP_PAUSE:
387
gearmand_debug("Received PAUSE wakeup event");
390
case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
391
gearmand_info("Received SHUTDOWN_GRACEFUL wakeup event");
392
if (gearman_server_shutdown_graceful(&(Gearmand()->server)) == GEARMAN_SHUTDOWN)
394
gearmand_wakeup(Gearmand(), GEARMAND_WAKEUP_SHUTDOWN);
398
case GEARMAND_WAKEUP_SHUTDOWN:
399
gearmand_info("Received SHUTDOWN wakeup event");
400
_clear_events(thread);
403
case GEARMAND_WAKEUP_CON:
404
gearmand_debug("Received CON wakeup event");
405
gearmand_con_check_queue(thread);
408
case GEARMAND_WAKEUP_RUN:
409
gearmand_debug("Received RUN wakeup event");
410
gearmand_thread_run(thread);
414
gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "Received unknown wakeup event (%u)", buffer[x]);
415
_clear_events(thread);
416
Gearmand()->ret= GEARMAN_UNKNOWN_STATE;
423
static void _clear_events(gearmand_thread_st *thread)
425
_wakeup_clear(thread);
427
while (thread->dcon_list != NULL)
429
gearmand_con_free(thread->dcon_list);