~clint-fewbar/ubuntu/precise/gearmand/drop-unneeded-patches

« back to all changes in this revision

Viewing changes to libgearman-server/gearmand_thread.cc

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2009-08-11 10:06:22 UTC
  • mto: (1.2.3 upstream) (6.1.1 sid)
  • mto: This revision was merged to the branch mainline in revision 3.
  • Revision ID: james.westby@ubuntu.com-20090811100622-6ig4iknanc73olum
ImportĀ upstreamĀ versionĀ 0.9

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Gearman server and library
2
 
 * Copyright (C) 2008 Brian Aker, Eric Day
3
 
 * All rights reserved.
4
 
 *
5
 
 * Use and distribution licensed under the BSD license.  See
6
 
 * the COPYING file in the parent directory for full text.
7
 
 */
8
 
 
9
 
/**
10
 
 * @file
11
 
 * @brief Gearmand Thread Definitions
12
 
 */
13
 
 
14
 
#include <libgearman-server/common.h>
15
 
#include <libgearman-server/gearmand.h>
16
 
 
17
 
#include <assert.h>
18
 
#include <errno.h>
19
 
 
20
 
#include <libgearman-server/list.h>
21
 
 
22
 
/*
23
 
 * Private declarations
24
 
 */
25
 
 
26
 
/**
27
 
 * @addtogroup gearmand_thread_private Private Gearmand Thread Functions
28
 
 * @ingroup gearmand_thread
29
 
 * @{
30
 
 */
31
 
 
32
 
static void *_thread(void *data);
33
 
static void _log(const char *line, gearmand_verbose_t verbose, gearmand_thread_st *dthread);
34
 
static void _run(gearman_server_thread_st *thread, void *fn_arg);
35
 
 
36
 
static gearmand_error_t _wakeup_init(gearmand_thread_st *thread);
37
 
static void _wakeup_close(gearmand_thread_st *thread);
38
 
static void _wakeup_clear(gearmand_thread_st *thread);
39
 
static void _wakeup_event(int fd, short events, void *arg);
40
 
static void _clear_events(gearmand_thread_st *thread);
41
 
 
42
 
/** @} */
43
 
 
44
 
/*
45
 
 * Public definitions
46
 
 */
47
 
 
48
 
gearmand_error_t gearmand_thread_create(gearmand_st *gearmand)
49
 
{
50
 
  gearmand_thread_st *thread;
51
 
  gearmand_error_t ret;
52
 
 
53
 
  thread= static_cast<gearmand_thread_st *>(malloc(sizeof(gearmand_thread_st)));
54
 
  if (not thread)
55
 
  {
56
 
    return gearmand_merror("malloc", gearmand_thread_st, 1);
57
 
  }
58
 
 
59
 
  if (! gearman_server_thread_init(gearmand_server(gearmand), &(thread->server_thread),
60
 
                                   _log, thread, gearmand_connection_watch))
61
 
  {
62
 
    free(thread);
63
 
    gearmand_fatal("gearman_server_thread_init(NULL)");
64
 
    return GEARMAN_MEMORY_ALLOCATION_FAILURE;
65
 
  }
66
 
 
67
 
  thread->is_thread_lock= false;
68
 
  thread->is_wakeup_event= false;
69
 
  thread->count= 0;
70
 
  thread->dcon_count= 0;
71
 
  thread->dcon_add_count= 0;
72
 
  thread->free_dcon_count= 0;
73
 
  thread->wakeup_fd[0]= -1;
74
 
  thread->wakeup_fd[1]= -1;
75
 
 
76
 
  gearmand_thread_list_add(thread);
77
 
 
78
 
  thread->dcon_list= NULL;
79
 
  thread->dcon_add_list= NULL;
80
 
  thread->free_dcon_list= NULL;
81
 
 
82
 
  /* If we have no threads, we still create a fake thread that uses the main
83
 
     libevent instance. Otherwise create a libevent instance for each thread. */
84
 
  if (gearmand->threads == 0)
85
 
  {
86
 
    thread->base= gearmand->base;
87
 
  }
88
 
  else
89
 
  {
90
 
    gearmand_info("Initializing libevent for IO thread");
91
 
 
92
 
    thread->base= static_cast<struct event_base *>(event_base_new());
93
 
    if (thread->base == NULL)
94
 
    {
95
 
      gearmand_thread_free(thread);
96
 
      gearmand_fatal("event_base_new(NULL)");
97
 
      return GEARMAN_EVENT;
98
 
    }
99
 
  }
100
 
 
101
 
  ret= _wakeup_init(thread);
102
 
  if (ret != GEARMAN_SUCCESS)
103
 
  {
104
 
    gearmand_thread_free(thread);
105
 
    return ret;
106
 
  }
107
 
 
108
 
  /* If we are not running multi-threaded, just return the thread context. */
109
 
  if (gearmand->threads == 0)
110
 
    return GEARMAN_SUCCESS;
111
 
 
112
 
  thread->count= gearmand->thread_count;
113
 
 
114
 
  int pthread_ret;
115
 
  pthread_ret= pthread_mutex_init(&(thread->lock), NULL);
116
 
  if (pthread_ret != 0)
117
 
  {
118
 
    thread->count= 0;
119
 
    gearmand_thread_free(thread);
120
 
 
121
 
    errno= pthread_ret;
122
 
    gearmand_fatal_perror("pthread_mutex_init");
123
 
    return GEARMAN_ERRNO;
124
 
  }
125
 
 
126
 
  thread->is_thread_lock= true;
127
 
 
128
 
  gearman_server_thread_set_run(&(thread->server_thread), _run, thread);
129
 
 
130
 
  pthread_ret= pthread_create(&(thread->id), NULL, _thread, thread);
131
 
  if (pthread_ret != 0)
132
 
  {
133
 
    thread->count= 0;
134
 
    gearmand_thread_free(thread);
135
 
 
136
 
    errno= pthread_ret;
137
 
    gearmand_perror("pthread_create");
138
 
 
139
 
    return GEARMAN_ERRNO;
140
 
  }
141
 
 
142
 
  gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Thread %u created", thread->count);
143
 
 
144
 
  return GEARMAN_SUCCESS;
145
 
}
146
 
 
147
 
void gearmand_thread_free(gearmand_thread_st *thread)
148
 
{
149
 
  gearmand_con_st *dcon;
150
 
 
151
 
  if (Gearmand()->threads && thread->count > 0)
152
 
  {
153
 
    gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Shutting down thread %u", thread->count);
154
 
 
155
 
    gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN);
156
 
    (void) pthread_join(thread->id, NULL);
157
 
  }
158
 
 
159
 
  if (thread->is_thread_lock)
160
 
    (void) pthread_mutex_destroy(&(thread->lock));
161
 
 
162
 
  _wakeup_close(thread);
163
 
 
164
 
  while (thread->dcon_list != NULL)
165
 
  {
166
 
    gearmand_con_free(thread->dcon_list);
167
 
  }
168
 
 
169
 
  while (thread->dcon_add_list != NULL)
170
 
  {
171
 
    dcon= thread->dcon_add_list;
172
 
    thread->dcon_add_list= dcon->next;
173
 
    gearmand_sockfd_close(dcon->fd);
174
 
    free(dcon);
175
 
  }
176
 
 
177
 
  while (thread->free_dcon_list != NULL)
178
 
  {
179
 
    dcon= thread->free_dcon_list;
180
 
    thread->free_dcon_list= dcon->next;
181
 
    free(dcon);
182
 
  }
183
 
 
184
 
  gearman_server_thread_free(&(thread->server_thread));
185
 
 
186
 
  gearmand_thread_list_free(thread);
187
 
 
188
 
  if (Gearmand()->threads > 0)
189
 
  {
190
 
    if (thread->base != NULL)
191
 
      event_base_free(thread->base);
192
 
 
193
 
    gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Thread %u shutdown complete", thread->count);
194
 
  }
195
 
 
196
 
  free(thread);
197
 
}
198
 
 
199
 
void gearmand_thread_wakeup(gearmand_thread_st *thread,
200
 
                            gearmand_wakeup_t wakeup)
201
 
{
202
 
  uint8_t buffer= wakeup;
203
 
 
204
 
  /* If this fails, there is not much we can really do. This should never fail
205
 
     though if the thread is still active. */
206
 
  if (write(thread->wakeup_fd[1], &buffer, 1) != 1)
207
 
  {
208
 
    gearmand_perror("write");
209
 
  }
210
 
}
211
 
 
212
 
void gearmand_thread_run(gearmand_thread_st *thread)
213
 
{
214
 
  while (1)
215
 
  {
216
 
    gearmand_error_t ret;
217
 
    gearmand_con_st *dcon= gearman_server_thread_run(&(thread->server_thread), &ret);
218
 
 
219
 
    if (ret == GEARMAN_SUCCESS || ret == GEARMAN_IO_WAIT ||
220
 
        ret == GEARMAN_SHUTDOWN_GRACEFUL)
221
 
    {
222
 
      return;
223
 
    }
224
 
 
225
 
    if (not dcon)
226
 
    {
227
 
      /* We either got a GEARMAN_SHUTDOWN or some other fatal internal error.
228
 
         Either way, we want to shut the server down. */
229
 
      gearmand_wakeup(Gearmand(), GEARMAND_WAKEUP_SHUTDOWN);
230
 
      return;
231
 
    }
232
 
 
233
 
    gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Disconnected %s:%s", dcon->host, dcon->port);
234
 
 
235
 
    gearmand_con_free(dcon);
236
 
  }
237
 
}
238
 
 
239
 
#ifndef __INTEL_COMPILER
240
 
#pragma GCC diagnostic ignored "-Wold-style-cast"
241
 
#endif
242
 
 
243
 
/*
244
 
 * Private definitions
245
 
 */
246
 
 
247
 
static void *_thread(void *data)
248
 
{
249
 
  gearmand_thread_st *thread= (gearmand_thread_st *)data;
250
 
  char buffer[BUFSIZ];
251
 
 
252
 
  snprintf(buffer, sizeof(buffer), "[%6u ]", thread->count);
253
 
 
254
 
  gearmand_initialize_thread_logging(buffer);
255
 
 
256
 
  gearmand_info("Entering thread event loop");
257
 
 
258
 
  if (event_base_loop(thread->base, 0) == -1)
259
 
  {
260
 
    gearmand_fatal("event_base_loop(-1)");
261
 
    Gearmand()->ret= GEARMAN_EVENT;
262
 
  }
263
 
 
264
 
  gearmand_info("Exiting thread event loop");
265
 
 
266
 
  return NULL;
267
 
}
268
 
 
269
 
static void _log(const char *line, gearmand_verbose_t verbose, gearmand_thread_st *dthread)
270
 
{
271
 
  (void)dthread;
272
 
  (*Gearmand()->log_fn)(line, verbose, (void *)Gearmand()->log_context);
273
 
}
274
 
 
275
 
static void _run(gearman_server_thread_st *thread __attribute__ ((unused)),
276
 
                 void *fn_arg)
277
 
{
278
 
  gearmand_thread_st *dthread= (gearmand_thread_st *)fn_arg;
279
 
  gearmand_thread_wakeup(dthread, GEARMAND_WAKEUP_RUN);
280
 
}
281
 
 
282
 
static gearmand_error_t _wakeup_init(gearmand_thread_st *thread)
283
 
{
284
 
  int ret;
285
 
 
286
 
  gearmand_info("Creating IO thread wakeup pipe");
287
 
 
288
 
  ret= pipe(thread->wakeup_fd);
289
 
  if (ret == -1)
290
 
  {
291
 
    gearmand_perror("pipe");
292
 
    return GEARMAN_ERRNO;
293
 
  }
294
 
 
295
 
  ret= fcntl(thread->wakeup_fd[0], F_GETFL, 0);
296
 
  if (ret == -1)
297
 
  {
298
 
    gearmand_perror("fcntl(F_GETFL)");
299
 
    return GEARMAN_ERRNO;
300
 
  }
301
 
 
302
 
  ret= fcntl(thread->wakeup_fd[0], F_SETFL, ret | O_NONBLOCK);
303
 
  if (ret == -1)
304
 
  {
305
 
    gearmand_perror("fcntl(F_SETFL)");
306
 
    return GEARMAN_ERRNO;
307
 
  }
308
 
 
309
 
  event_set(&(thread->wakeup_event), thread->wakeup_fd[0], EV_READ | EV_PERSIST,
310
 
            _wakeup_event, thread);
311
 
  event_base_set(thread->base, &(thread->wakeup_event));
312
 
 
313
 
  if (event_add(&(thread->wakeup_event), NULL) < 0)
314
 
  {
315
 
    gearmand_perror("event_add");
316
 
    return GEARMAN_EVENT;
317
 
  }
318
 
 
319
 
  thread->is_wakeup_event= true;
320
 
 
321
 
  return GEARMAN_SUCCESS;
322
 
}
323
 
 
324
 
static void _wakeup_close(gearmand_thread_st *thread)
325
 
{
326
 
  _wakeup_clear(thread);
327
 
 
328
 
  if (thread->wakeup_fd[0] >= 0)
329
 
  {
330
 
    gearmand_info("Closing IO thread wakeup pipe");
331
 
    gearmand_pipe_close(thread->wakeup_fd[0]);
332
 
    thread->wakeup_fd[0]= -1;
333
 
    gearmand_pipe_close(thread->wakeup_fd[1]);
334
 
    thread->wakeup_fd[1]= -1;
335
 
  }
336
 
}
337
 
 
338
 
static void _wakeup_clear(gearmand_thread_st *thread)
339
 
{
340
 
  if (thread->is_wakeup_event)
341
 
  {
342
 
    gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Clearing event for IO thread wakeup pipe %u", thread->count);
343
 
    if (event_del(&(thread->wakeup_event)) < 0)
344
 
    {
345
 
      gearmand_perror("event_del");
346
 
      assert(! "event_del");
347
 
    }
348
 
    thread->is_wakeup_event= false;
349
 
  }
350
 
}
351
 
 
352
 
static void _wakeup_event(int fd, short events __attribute__ ((unused)), void *arg)
353
 
{
354
 
  gearmand_thread_st *thread= (gearmand_thread_st *)arg;
355
 
  uint8_t buffer[GEARMAN_PIPE_BUFFER_SIZE];
356
 
  ssize_t ret;
357
 
 
358
 
  while (1)
359
 
  {
360
 
    ret= read(fd, buffer, GEARMAN_PIPE_BUFFER_SIZE);
361
 
    if (ret == 0)
362
 
    {
363
 
      _clear_events(thread);
364
 
      gearmand_fatal("read(EOF)");
365
 
      Gearmand()->ret= GEARMAN_PIPE_EOF;
366
 
      return;
367
 
    }
368
 
    else if (ret == -1)
369
 
    {
370
 
      if (errno == EINTR)
371
 
        continue;
372
 
 
373
 
      if (errno == EAGAIN)
374
 
        break;
375
 
 
376
 
      _clear_events(thread);
377
 
      gearmand_perror("_wakeup_event:read");
378
 
      Gearmand()->ret= GEARMAN_ERRNO;
379
 
      return;
380
 
    }
381
 
 
382
 
    for (ssize_t x= 0; x < ret; x++)
383
 
    {
384
 
      switch ((gearmand_wakeup_t)buffer[x])
385
 
      {
386
 
      case GEARMAND_WAKEUP_PAUSE:
387
 
        gearmand_debug("Received PAUSE wakeup event");
388
 
        break;
389
 
 
390
 
      case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
391
 
        gearmand_info("Received SHUTDOWN_GRACEFUL wakeup event");
392
 
        if (gearman_server_shutdown_graceful(&(Gearmand()->server)) == GEARMAN_SHUTDOWN)
393
 
        {
394
 
          gearmand_wakeup(Gearmand(), GEARMAND_WAKEUP_SHUTDOWN);
395
 
        }
396
 
        break;
397
 
 
398
 
      case GEARMAND_WAKEUP_SHUTDOWN:
399
 
        gearmand_info("Received SHUTDOWN wakeup event");
400
 
        _clear_events(thread);
401
 
        break;
402
 
 
403
 
      case GEARMAND_WAKEUP_CON:
404
 
        gearmand_debug("Received CON wakeup event");
405
 
        gearmand_con_check_queue(thread);
406
 
        break;
407
 
 
408
 
      case GEARMAND_WAKEUP_RUN:
409
 
        gearmand_debug("Received RUN wakeup event");
410
 
        gearmand_thread_run(thread);
411
 
        break;
412
 
 
413
 
      default:
414
 
        gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "Received unknown wakeup event (%u)", buffer[x]);
415
 
        _clear_events(thread);
416
 
        Gearmand()->ret= GEARMAN_UNKNOWN_STATE;
417
 
        break;
418
 
      }
419
 
    }
420
 
  }
421
 
}
422
 
 
423
 
static void _clear_events(gearmand_thread_st *thread)
424
 
{
425
 
  _wakeup_clear(thread);
426
 
 
427
 
  while (thread->dcon_list != NULL)
428
 
  {
429
 
    gearmand_con_free(thread->dcon_list);
430
 
  }
431
 
}