~clint-fewbar/ubuntu/precise/gearmand/drop-unneeded-patches

« back to all changes in this revision

Viewing changes to libgearman-server/gearmand_thread.c

  • Committer: Package Import Robot
  • Author(s): Stig Sandbeck Mathisen, Michael Fladischer, Stig Sandbeck Mathisen
  • Date: 2012-01-23 11:31:08 UTC
  • mfrom: (6.1.2 sid)
  • Revision ID: package-import@ubuntu.com-20120123113108-wl1yhiba13q9jusb
Tags: 0.27-1
[Michael Fladischer]
* Patch: fix spelling
* Patch: remove dependency on googleanalytics
* Patch: fix tests
* Use non-authenticating URL for Vcs-Git.
* Add "status" action to init script.

[Stig Sandbeck Mathisen]
* New upstream release (Closes: #621486) (LP: #682680)
* Remove build dependency on drizzle
  (until it reaches testing again)
* Build with support for tokyocabinet
* Remove backported ipv6 patch
* Patch: disable hostile build tests, they take hours...
* Patch: workaround duplicate address issue for tests
* Do not build API documentation, the sources are not shipped in
  upstream tarball
* Update debian/copyright

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Gearman server and library
2
 
 * Copyright (C) 2008 Brian Aker, Eric Day
3
 
 * All rights reserved.
4
 
 *
5
 
 * Use and distribution licensed under the BSD license.  See
6
 
 * the COPYING file in the parent directory for full text.
7
 
 */
8
 
 
9
 
/**
10
 
 * @file
11
 
 * @brief Gearmand Thread Definitions
12
 
 */
13
 
 
14
 
#include "common.h"
15
 
#include "gearmand.h"
16
 
 
17
 
/*
18
 
 * Private declarations
19
 
 */
20
 
 
21
 
/**
22
 
 * @addtogroup gearmand_thread_private Private Gearmand Thread Functions
23
 
 * @ingroup gearmand_thread
24
 
 * @{
25
 
 */
26
 
 
27
 
static void *_thread(void *data);
28
 
static void _log(const char *line, gearman_verbose_t verbose, void *context);
29
 
static void _run(gearman_server_thread_st *thread, void *fn_arg);
30
 
 
31
 
static gearman_return_t _wakeup_init(gearmand_thread_st *thread);
32
 
static void _wakeup_close(gearmand_thread_st *thread);
33
 
static void _wakeup_clear(gearmand_thread_st *thread);
34
 
static void _wakeup_event(int fd, short events, void *arg);
35
 
static void _clear_events(gearmand_thread_st *thread);
36
 
 
37
 
/** @} */
38
 
 
39
 
/*
40
 
 * Public definitions
41
 
 */
42
 
 
43
 
gearman_return_t gearmand_thread_create(gearmand_st *gearmand)
44
 
{
45
 
  gearmand_thread_st *thread;
46
 
  gearman_return_t ret;
47
 
  int pthread_ret;
48
 
 
49
 
  thread= (gearmand_thread_st *)malloc(sizeof(gearmand_thread_st));
50
 
  if (thread == NULL)
51
 
  {
52
 
    gearmand_log_fatal(gearmand, "gearmand_thread_create:malloc");
53
 
    return GEARMAN_MEMORY_ALLOCATION_FAILURE;
54
 
  }
55
 
 
56
 
  if (gearman_server_thread_create(&(gearmand->server),
57
 
                                   &(thread->server_thread)) == NULL)
58
 
  {
59
 
    free(thread);
60
 
    gearmand_log_fatal(gearmand, "gearmand_thread_create:gearman_server_thread_create:NULL");
61
 
    return GEARMAN_MEMORY_ALLOCATION_FAILURE;
62
 
  }
63
 
 
64
 
  gearman_server_thread_set_log_fn(&(thread->server_thread), _log, thread,
65
 
                                   gearmand->verbose);
66
 
  gearman_server_thread_set_event_watch(&(thread->server_thread),
67
 
                                        gearmand_connection_watch, NULL);
68
 
 
69
 
  thread->is_thread_lock= false;
70
 
  thread->is_wakeup_event= false;
71
 
  thread->count= 0;
72
 
  thread->dcon_count= 0;
73
 
  thread->dcon_add_count= 0;
74
 
  thread->free_dcon_count= 0;
75
 
  thread->wakeup_fd[0]= -1;
76
 
  thread->wakeup_fd[1]= -1;
77
 
  GEARMAN_LIST_ADD(gearmand->thread, thread,)
78
 
  thread->gearmand= gearmand;
79
 
  thread->dcon_list= NULL;
80
 
  thread->dcon_add_list= NULL;
81
 
  thread->free_dcon_list= NULL;
82
 
 
83
 
  /* If we have no threads, we still create a fake thread that uses the main
84
 
     libevent instance. Otherwise create a libevent instance for each thread. */
85
 
  if (gearmand->threads == 0)
86
 
    thread->base= gearmand->base;
87
 
  else
88
 
  {
89
 
    gearmand_log_info(gearmand, "Initializing libevent for IO thread");
90
 
 
91
 
    thread->base= event_base_new();
92
 
    if (thread->base == NULL)
93
 
    {
94
 
      gearmand_thread_free(thread);
95
 
      gearmand_log_fatal(gearmand, "gearmand_thread_create:event_base_new:NULL");
96
 
      return GEARMAN_EVENT;
97
 
    }
98
 
  }
99
 
 
100
 
  ret= _wakeup_init(thread);
101
 
  if (ret != GEARMAN_SUCCESS)
102
 
  {
103
 
    gearmand_thread_free(thread);
104
 
    return ret;
105
 
  }
106
 
 
107
 
  /* If we are not running multi-threaded, just return the thread context. */
108
 
  if (gearmand->threads == 0)
109
 
    return GEARMAN_SUCCESS;
110
 
 
111
 
  thread->count= gearmand->thread_count;
112
 
 
113
 
  pthread_ret= pthread_mutex_init(&(thread->lock), NULL);
114
 
  if (pthread_ret != 0)
115
 
  {
116
 
    thread->count= 0;
117
 
    gearmand_thread_free(thread);
118
 
    gearmand_log_fatal(gearmand, "gearmand_thread_create:pthread_mutex_init:%d", pthread_ret);
119
 
    return GEARMAN_PTHREAD;
120
 
  }
121
 
 
122
 
  thread->is_thread_lock= true;
123
 
 
124
 
  gearman_server_thread_set_run(&(thread->server_thread), _run, thread);
125
 
 
126
 
  pthread_ret= pthread_create(&(thread->id), NULL, _thread, thread);
127
 
  if (pthread_ret != 0)
128
 
  {
129
 
    thread->count= 0;
130
 
    gearmand_thread_free(thread);
131
 
    gearmand_log_fatal(gearmand, "gearmand_thread_create:pthread_create:%d", pthread_ret);
132
 
 
133
 
    return GEARMAN_PTHREAD;
134
 
  }
135
 
 
136
 
  gearmand_log_info(gearmand, "Thread %u created", thread->count);
137
 
 
138
 
  return GEARMAN_SUCCESS;
139
 
}
140
 
 
141
 
void gearmand_thread_free(gearmand_thread_st *thread)
142
 
{
143
 
  gearmand_con_st *dcon;
144
 
 
145
 
  if (thread->gearmand->threads && thread->count > 0)
146
 
  {
147
 
    gearmand_log_info(thread->gearmand, "Shutting down thread %u", thread->count);
148
 
 
149
 
    gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN);
150
 
    (void) pthread_join(thread->id, NULL);
151
 
  }
152
 
 
153
 
  if (thread->is_thread_lock)
154
 
    (void) pthread_mutex_destroy(&(thread->lock));
155
 
 
156
 
  _wakeup_close(thread);
157
 
 
158
 
  while (thread->dcon_list != NULL)
159
 
    gearmand_con_free(thread->dcon_list);
160
 
 
161
 
  while (thread->dcon_add_list != NULL)
162
 
  {
163
 
    dcon= thread->dcon_add_list;
164
 
    thread->dcon_add_list= dcon->next;
165
 
    close(dcon->fd);
166
 
    free(dcon);
167
 
  }
168
 
 
169
 
  while (thread->free_dcon_list != NULL)
170
 
  {
171
 
    dcon= thread->free_dcon_list;
172
 
    thread->free_dcon_list= dcon->next;
173
 
    free(dcon);
174
 
  }
175
 
 
176
 
  gearman_server_thread_free(&(thread->server_thread));
177
 
 
178
 
  GEARMAN_LIST_DEL(thread->gearmand->thread, thread,)
179
 
 
180
 
  if (thread->gearmand->threads > 0)
181
 
  {
182
 
    if (thread->base != NULL)
183
 
      event_base_free(thread->base);
184
 
 
185
 
    gearmand_log_info(thread->gearmand, "Thread %u shutdown complete", thread->count);
186
 
  }
187
 
 
188
 
  free(thread);
189
 
}
190
 
 
191
 
void gearmand_thread_wakeup(gearmand_thread_st *thread,
192
 
                            gearmand_wakeup_t wakeup)
193
 
{
194
 
  uint8_t buffer= wakeup;
195
 
 
196
 
  /* If this fails, there is not much we can really do. This should never fail
197
 
     though if the thread is still active. */
198
 
  if (write(thread->wakeup_fd[1], &buffer, 1) != 1)
199
 
    gearmand_log_error(thread->gearmand, "gearmand_thread_wakeup:write:%d", errno);
200
 
}
201
 
 
202
 
void gearmand_thread_run(gearmand_thread_st *thread)
203
 
{
204
 
  gearman_server_con_st *server_con;
205
 
  gearman_return_t ret;
206
 
  gearmand_con_st *dcon;
207
 
 
208
 
  while (1)
209
 
  {
210
 
    server_con= gearman_server_thread_run(&(thread->server_thread), &ret);
211
 
    if (ret == GEARMAN_SUCCESS || ret == GEARMAN_IO_WAIT ||
212
 
        ret == GEARMAN_SHUTDOWN_GRACEFUL)
213
 
    {
214
 
      return;
215
 
    }
216
 
 
217
 
    if (server_con == NULL)
218
 
    {
219
 
      /* We either got a GEARMAN_SHUTDOWN or some other fatal internal error.
220
 
         Either way, we want to shut the server down. */
221
 
      gearmand_wakeup(thread->gearmand, GEARMAND_WAKEUP_SHUTDOWN);
222
 
      return;
223
 
    }
224
 
 
225
 
    dcon= (gearmand_con_st *)gearman_server_con_data(server_con);
226
 
 
227
 
    gearmand_log_info(thread->gearmand, "[%4u] %15s:%5s Disconnected", thread->count, dcon->host, dcon->port);
228
 
 
229
 
    gearmand_con_free(dcon);
230
 
  }
231
 
}
232
 
 
233
 
/*
234
 
 * Private definitions
235
 
 */
236
 
 
237
 
static void *_thread(void *data)
238
 
{
239
 
  gearmand_thread_st *thread= (gearmand_thread_st *)data;
240
 
 
241
 
  gearmand_log_info(thread->gearmand, "[%4u] Entering thread event loop", thread->count);
242
 
 
243
 
  if (event_base_loop(thread->base, 0) == -1)
244
 
  {
245
 
    gearmand_log_fatal(thread->gearmand, "_io_thread:event_base_loop:-1");
246
 
    thread->gearmand->ret= GEARMAN_EVENT;
247
 
  }
248
 
 
249
 
  gearmand_log_info(thread->gearmand, "[%4u] Exiting thread event loop", thread->count);
250
 
 
251
 
  return NULL;
252
 
}
253
 
 
254
 
static void _log(const char *line, gearman_verbose_t verbose, void *context)
255
 
{
256
 
  gearmand_thread_st *dthread= (gearmand_thread_st *)context;
257
 
  char buffer[GEARMAN_MAX_ERROR_SIZE];
258
 
 
259
 
  snprintf(buffer, GEARMAN_MAX_ERROR_SIZE, "[%4u] %s", dthread->count, line);
260
 
  (*dthread->gearmand->log_fn)(buffer, verbose,
261
 
                               (void *)dthread->gearmand->log_context);
262
 
}
263
 
 
264
 
static void _run(gearman_server_thread_st *thread __attribute__ ((unused)),
265
 
                 void *fn_arg)
266
 
{
267
 
  gearmand_thread_st *dthread= (gearmand_thread_st *)fn_arg;
268
 
  gearmand_thread_wakeup(dthread, GEARMAND_WAKEUP_RUN);
269
 
}
270
 
 
271
 
static gearman_return_t _wakeup_init(gearmand_thread_st *thread)
272
 
{
273
 
  int ret;
274
 
 
275
 
  gearmand_log_info(thread->gearmand, "Creating IO thread wakeup pipe");
276
 
 
277
 
  ret= pipe(thread->wakeup_fd);
278
 
  if (ret == -1)
279
 
  {
280
 
    gearmand_log_fatal(thread->gearmand, "_wakeup_init:pipe:%d", errno);
281
 
    return GEARMAN_ERRNO;
282
 
  }
283
 
 
284
 
  ret= fcntl(thread->wakeup_fd[0], F_GETFL, 0);
285
 
  if (ret == -1)
286
 
  {
287
 
    gearmand_log_fatal(thread->gearmand, "_wakeup_init:fcntl:F_GETFL:%d", errno);
288
 
    return GEARMAN_ERRNO;
289
 
  }
290
 
 
291
 
  ret= fcntl(thread->wakeup_fd[0], F_SETFL, ret | O_NONBLOCK);
292
 
  if (ret == -1)
293
 
  {
294
 
    gearmand_log_fatal(thread->gearmand, "_wakeup_init:fcntl:F_SETFL:%d", errno);
295
 
    return GEARMAN_ERRNO;
296
 
  }
297
 
 
298
 
  event_set(&(thread->wakeup_event), thread->wakeup_fd[0], EV_READ | EV_PERSIST,
299
 
            _wakeup_event, thread);
300
 
  event_base_set(thread->base, &(thread->wakeup_event));
301
 
 
302
 
  if (event_add(&(thread->wakeup_event), NULL) == -1)
303
 
  {
304
 
    gearmand_log_fatal(thread->gearmand, "_wakeup_init:event_add:-1");
305
 
    return GEARMAN_EVENT;
306
 
  }
307
 
 
308
 
  thread->is_wakeup_event= true;
309
 
 
310
 
  return GEARMAN_SUCCESS;
311
 
}
312
 
 
313
 
static void _wakeup_close(gearmand_thread_st *thread)
314
 
{
315
 
  _wakeup_clear(thread);
316
 
 
317
 
  if (thread->wakeup_fd[0] >= 0)
318
 
  {
319
 
    gearmand_log_info(thread->gearmand, "Closing IO thread wakeup pipe");
320
 
    close(thread->wakeup_fd[0]);
321
 
    thread->wakeup_fd[0]= -1;
322
 
    close(thread->wakeup_fd[1]);
323
 
    thread->wakeup_fd[1]= -1;
324
 
  }
325
 
}
326
 
 
327
 
static void _wakeup_clear(gearmand_thread_st *thread)
328
 
{
329
 
  if (thread->is_wakeup_event)
330
 
  {
331
 
    gearmand_log_info(thread->gearmand, "[%4u] Clearing event for IO thread wakeup pipe", thread->count);
332
 
    int del_ret= event_del(&(thread->wakeup_event));
333
 
    assert(del_ret == 0);
334
 
    thread->is_wakeup_event= false;
335
 
  }
336
 
}
337
 
 
338
 
static void _wakeup_event(int fd, short events __attribute__ ((unused)), void *arg)
339
 
{
340
 
  gearmand_thread_st *thread= (gearmand_thread_st *)arg;
341
 
  uint8_t buffer[GEARMAN_PIPE_BUFFER_SIZE];
342
 
  ssize_t ret;
343
 
  ssize_t x;
344
 
 
345
 
  while (1)
346
 
  {
347
 
    ret= read(fd, buffer, GEARMAN_PIPE_BUFFER_SIZE);
348
 
    if (ret == 0)
349
 
    {
350
 
      _clear_events(thread);
351
 
      gearmand_log_fatal(thread->gearmand, "_wakeup_event:read:EOF");
352
 
      thread->gearmand->ret= GEARMAN_PIPE_EOF;
353
 
      return;
354
 
    }
355
 
    else if (ret == -1)
356
 
    {
357
 
      if (errno == EINTR)
358
 
        continue;
359
 
 
360
 
      if (errno == EAGAIN)
361
 
        break;
362
 
 
363
 
      _clear_events(thread);
364
 
      gearmand_log_fatal(thread->gearmand, "_wakeup_event:read:%d", errno);
365
 
      thread->gearmand->ret= GEARMAN_ERRNO;
366
 
      return;
367
 
    }
368
 
 
369
 
    for (x= 0; x < ret; x++)
370
 
    {
371
 
      switch ((gearmand_wakeup_t)buffer[x])
372
 
      {
373
 
      case GEARMAND_WAKEUP_PAUSE:
374
 
        gearmand_log_info(thread->gearmand, "[%4u] Received PAUSE wakeup event", thread->count);
375
 
        break;
376
 
 
377
 
      case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
378
 
        gearmand_log_info(thread->gearmand,
379
 
                          "[%4u] Received SHUTDOWN_GRACEFUL wakeup event",
380
 
                          thread->count);
381
 
        if (gearman_server_shutdown_graceful(&(thread->gearmand->server)) == GEARMAN_SHUTDOWN)
382
 
        {
383
 
          gearmand_wakeup(thread->gearmand, GEARMAND_WAKEUP_SHUTDOWN);
384
 
        }
385
 
        break;
386
 
 
387
 
      case GEARMAND_WAKEUP_SHUTDOWN:
388
 
        gearmand_log_info(thread->gearmand, "[%4u] Received SHUTDOWN wakeup event", thread->count);
389
 
        _clear_events(thread);
390
 
        break;
391
 
 
392
 
      case GEARMAND_WAKEUP_CON:
393
 
        gearmand_log_info(thread->gearmand, "[%4u] Received CON wakeup event", thread->count);
394
 
        gearmand_con_check_queue(thread);
395
 
        break;
396
 
 
397
 
      case GEARMAND_WAKEUP_RUN:
398
 
        gearmand_log_debug(thread->gearmand, "[%4u] Received RUN wakeup event", thread->count);
399
 
        gearmand_thread_run(thread);
400
 
        break;
401
 
 
402
 
      default:
403
 
        gearmand_log_fatal(thread->gearmand, "[%4u] Received unknown wakeup event (%u)", thread->count, buffer[x]);
404
 
        _clear_events(thread);
405
 
        thread->gearmand->ret= GEARMAN_UNKNOWN_STATE;
406
 
        break;
407
 
      }
408
 
    }
409
 
  }
410
 
}
411
 
 
412
 
static void _clear_events(gearmand_thread_st *thread)
413
 
{
414
 
  _wakeup_clear(thread);
415
 
 
416
 
  while (thread->dcon_list != NULL)
417
 
    gearmand_con_free(thread->dcon_list);
418
 
}