1
/* Gearman server and library
2
* Copyright (C) 2008 Brian Aker, Eric Day
5
* Use and distribution licensed under the BSD license. See
6
* the COPYING file in the parent directory for full text.
11
* @brief Gearmand Thread Definitions
18
* Private declarations
22
* @addtogroup gearmand_thread_private Private Gearmand Thread Functions
23
* @ingroup gearmand_thread
27
static void *_thread(void *data);
28
static void _log(const char *line, gearman_verbose_t verbose, void *context);
29
static void _run(gearman_server_thread_st *thread, void *fn_arg);
31
static gearman_return_t _wakeup_init(gearmand_thread_st *thread);
32
static void _wakeup_close(gearmand_thread_st *thread);
33
static void _wakeup_clear(gearmand_thread_st *thread);
34
static void _wakeup_event(int fd, short events, void *arg);
35
static void _clear_events(gearmand_thread_st *thread);
43
gearman_return_t gearmand_thread_create(gearmand_st *gearmand)
45
gearmand_thread_st *thread;
49
thread= (gearmand_thread_st *)malloc(sizeof(gearmand_thread_st));
52
gearmand_log_fatal(gearmand, "gearmand_thread_create:malloc");
53
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
56
if (gearman_server_thread_create(&(gearmand->server),
57
&(thread->server_thread)) == NULL)
60
gearmand_log_fatal(gearmand, "gearmand_thread_create:gearman_server_thread_create:NULL");
61
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
64
gearman_server_thread_set_log_fn(&(thread->server_thread), _log, thread,
66
gearman_server_thread_set_event_watch(&(thread->server_thread),
67
gearmand_connection_watch, NULL);
69
thread->is_thread_lock= false;
70
thread->is_wakeup_event= false;
72
thread->dcon_count= 0;
73
thread->dcon_add_count= 0;
74
thread->free_dcon_count= 0;
75
thread->wakeup_fd[0]= -1;
76
thread->wakeup_fd[1]= -1;
77
GEARMAN_LIST_ADD(gearmand->thread, thread,)
78
thread->gearmand= gearmand;
79
thread->dcon_list= NULL;
80
thread->dcon_add_list= NULL;
81
thread->free_dcon_list= NULL;
83
/* If we have no threads, we still create a fake thread that uses the main
84
libevent instance. Otherwise create a libevent instance for each thread. */
85
if (gearmand->threads == 0)
86
thread->base= gearmand->base;
89
gearmand_log_info(gearmand, "Initializing libevent for IO thread");
91
thread->base= event_base_new();
92
if (thread->base == NULL)
94
gearmand_thread_free(thread);
95
gearmand_log_fatal(gearmand, "gearmand_thread_create:event_base_new:NULL");
100
ret= _wakeup_init(thread);
101
if (ret != GEARMAN_SUCCESS)
103
gearmand_thread_free(thread);
107
/* If we are not running multi-threaded, just return the thread context. */
108
if (gearmand->threads == 0)
109
return GEARMAN_SUCCESS;
111
thread->count= gearmand->thread_count;
113
pthread_ret= pthread_mutex_init(&(thread->lock), NULL);
114
if (pthread_ret != 0)
117
gearmand_thread_free(thread);
118
gearmand_log_fatal(gearmand, "gearmand_thread_create:pthread_mutex_init:%d", pthread_ret);
119
return GEARMAN_PTHREAD;
122
thread->is_thread_lock= true;
124
gearman_server_thread_set_run(&(thread->server_thread), _run, thread);
126
pthread_ret= pthread_create(&(thread->id), NULL, _thread, thread);
127
if (pthread_ret != 0)
130
gearmand_thread_free(thread);
131
gearmand_log_fatal(gearmand, "gearmand_thread_create:pthread_create:%d", pthread_ret);
133
return GEARMAN_PTHREAD;
136
gearmand_log_info(gearmand, "Thread %u created", thread->count);
138
return GEARMAN_SUCCESS;
141
void gearmand_thread_free(gearmand_thread_st *thread)
143
gearmand_con_st *dcon;
145
if (thread->gearmand->threads && thread->count > 0)
147
gearmand_log_info(thread->gearmand, "Shutting down thread %u", thread->count);
149
gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN);
150
(void) pthread_join(thread->id, NULL);
153
if (thread->is_thread_lock)
154
(void) pthread_mutex_destroy(&(thread->lock));
156
_wakeup_close(thread);
158
while (thread->dcon_list != NULL)
159
gearmand_con_free(thread->dcon_list);
161
while (thread->dcon_add_list != NULL)
163
dcon= thread->dcon_add_list;
164
thread->dcon_add_list= dcon->next;
169
while (thread->free_dcon_list != NULL)
171
dcon= thread->free_dcon_list;
172
thread->free_dcon_list= dcon->next;
176
gearman_server_thread_free(&(thread->server_thread));
178
GEARMAN_LIST_DEL(thread->gearmand->thread, thread,)
180
if (thread->gearmand->threads > 0)
182
if (thread->base != NULL)
183
event_base_free(thread->base);
185
gearmand_log_info(thread->gearmand, "Thread %u shutdown complete", thread->count);
191
void gearmand_thread_wakeup(gearmand_thread_st *thread,
192
gearmand_wakeup_t wakeup)
194
uint8_t buffer= wakeup;
196
/* If this fails, there is not much we can really do. This should never fail
197
though if the thread is still active. */
198
if (write(thread->wakeup_fd[1], &buffer, 1) != 1)
199
gearmand_log_error(thread->gearmand, "gearmand_thread_wakeup:write:%d", errno);
202
void gearmand_thread_run(gearmand_thread_st *thread)
204
gearman_server_con_st *server_con;
205
gearman_return_t ret;
206
gearmand_con_st *dcon;
210
server_con= gearman_server_thread_run(&(thread->server_thread), &ret);
211
if (ret == GEARMAN_SUCCESS || ret == GEARMAN_IO_WAIT ||
212
ret == GEARMAN_SHUTDOWN_GRACEFUL)
217
if (server_con == NULL)
219
/* We either got a GEARMAN_SHUTDOWN or some other fatal internal error.
220
Either way, we want to shut the server down. */
221
gearmand_wakeup(thread->gearmand, GEARMAND_WAKEUP_SHUTDOWN);
225
dcon= (gearmand_con_st *)gearman_server_con_data(server_con);
227
gearmand_log_info(thread->gearmand, "[%4u] %15s:%5s Disconnected", thread->count, dcon->host, dcon->port);
229
gearmand_con_free(dcon);
234
* Private definitions
237
static void *_thread(void *data)
239
gearmand_thread_st *thread= (gearmand_thread_st *)data;
241
gearmand_log_info(thread->gearmand, "[%4u] Entering thread event loop", thread->count);
243
if (event_base_loop(thread->base, 0) == -1)
245
gearmand_log_fatal(thread->gearmand, "_io_thread:event_base_loop:-1");
246
thread->gearmand->ret= GEARMAN_EVENT;
249
gearmand_log_info(thread->gearmand, "[%4u] Exiting thread event loop", thread->count);
254
static void _log(const char *line, gearman_verbose_t verbose, void *context)
256
gearmand_thread_st *dthread= (gearmand_thread_st *)context;
257
char buffer[GEARMAN_MAX_ERROR_SIZE];
259
snprintf(buffer, GEARMAN_MAX_ERROR_SIZE, "[%4u] %s", dthread->count, line);
260
(*dthread->gearmand->log_fn)(buffer, verbose,
261
(void *)dthread->gearmand->log_context);
264
static void _run(gearman_server_thread_st *thread __attribute__ ((unused)),
267
gearmand_thread_st *dthread= (gearmand_thread_st *)fn_arg;
268
gearmand_thread_wakeup(dthread, GEARMAND_WAKEUP_RUN);
271
static gearman_return_t _wakeup_init(gearmand_thread_st *thread)
275
gearmand_log_info(thread->gearmand, "Creating IO thread wakeup pipe");
277
ret= pipe(thread->wakeup_fd);
280
gearmand_log_fatal(thread->gearmand, "_wakeup_init:pipe:%d", errno);
281
return GEARMAN_ERRNO;
284
ret= fcntl(thread->wakeup_fd[0], F_GETFL, 0);
287
gearmand_log_fatal(thread->gearmand, "_wakeup_init:fcntl:F_GETFL:%d", errno);
288
return GEARMAN_ERRNO;
291
ret= fcntl(thread->wakeup_fd[0], F_SETFL, ret | O_NONBLOCK);
294
gearmand_log_fatal(thread->gearmand, "_wakeup_init:fcntl:F_SETFL:%d", errno);
295
return GEARMAN_ERRNO;
298
event_set(&(thread->wakeup_event), thread->wakeup_fd[0], EV_READ | EV_PERSIST,
299
_wakeup_event, thread);
300
event_base_set(thread->base, &(thread->wakeup_event));
302
if (event_add(&(thread->wakeup_event), NULL) == -1)
304
gearmand_log_fatal(thread->gearmand, "_wakeup_init:event_add:-1");
305
return GEARMAN_EVENT;
308
thread->is_wakeup_event= true;
310
return GEARMAN_SUCCESS;
313
static void _wakeup_close(gearmand_thread_st *thread)
315
_wakeup_clear(thread);
317
if (thread->wakeup_fd[0] >= 0)
319
gearmand_log_info(thread->gearmand, "Closing IO thread wakeup pipe");
320
close(thread->wakeup_fd[0]);
321
thread->wakeup_fd[0]= -1;
322
close(thread->wakeup_fd[1]);
323
thread->wakeup_fd[1]= -1;
327
static void _wakeup_clear(gearmand_thread_st *thread)
329
if (thread->is_wakeup_event)
331
gearmand_log_info(thread->gearmand, "[%4u] Clearing event for IO thread wakeup pipe", thread->count);
332
int del_ret= event_del(&(thread->wakeup_event));
333
assert(del_ret == 0);
334
thread->is_wakeup_event= false;
338
static void _wakeup_event(int fd, short events __attribute__ ((unused)), void *arg)
340
gearmand_thread_st *thread= (gearmand_thread_st *)arg;
341
uint8_t buffer[GEARMAN_PIPE_BUFFER_SIZE];
347
ret= read(fd, buffer, GEARMAN_PIPE_BUFFER_SIZE);
350
_clear_events(thread);
351
gearmand_log_fatal(thread->gearmand, "_wakeup_event:read:EOF");
352
thread->gearmand->ret= GEARMAN_PIPE_EOF;
363
_clear_events(thread);
364
gearmand_log_fatal(thread->gearmand, "_wakeup_event:read:%d", errno);
365
thread->gearmand->ret= GEARMAN_ERRNO;
369
for (x= 0; x < ret; x++)
371
switch ((gearmand_wakeup_t)buffer[x])
373
case GEARMAND_WAKEUP_PAUSE:
374
gearmand_log_info(thread->gearmand, "[%4u] Received PAUSE wakeup event", thread->count);
377
case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
378
gearmand_log_info(thread->gearmand,
379
"[%4u] Received SHUTDOWN_GRACEFUL wakeup event",
381
if (gearman_server_shutdown_graceful(&(thread->gearmand->server)) == GEARMAN_SHUTDOWN)
383
gearmand_wakeup(thread->gearmand, GEARMAND_WAKEUP_SHUTDOWN);
387
case GEARMAND_WAKEUP_SHUTDOWN:
388
gearmand_log_info(thread->gearmand, "[%4u] Received SHUTDOWN wakeup event", thread->count);
389
_clear_events(thread);
392
case GEARMAND_WAKEUP_CON:
393
gearmand_log_info(thread->gearmand, "[%4u] Received CON wakeup event", thread->count);
394
gearmand_con_check_queue(thread);
397
case GEARMAND_WAKEUP_RUN:
398
gearmand_log_debug(thread->gearmand, "[%4u] Received RUN wakeup event", thread->count);
399
gearmand_thread_run(thread);
403
gearmand_log_fatal(thread->gearmand, "[%4u] Received unknown wakeup event (%u)", thread->count, buffer[x]);
404
_clear_events(thread);
405
thread->gearmand->ret= GEARMAN_UNKNOWN_STATE;
412
static void _clear_events(gearmand_thread_st *thread)
414
_wakeup_clear(thread);
416
while (thread->dcon_list != NULL)
417
gearmand_con_free(thread->dcon_list);