~clint-fewbar/ubuntu/precise/gearmand/drop-unneeded-patches

« back to all changes in this revision

Viewing changes to libgearman-server/gearmand.c

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2009-09-28 21:43:31 UTC
  • mto: (1.2.3 upstream) (6.1.1 sid)
  • mto: This revision was merged to the branch mainline in revision 5.
  • Revision ID: james.westby@ubuntu.com-20090928214331-9bku0d3v1b1ypgp4
ImportĀ upstreamĀ versionĀ 0.10

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Gearman server and library
 
2
 * Copyright (C) 2008 Brian Aker, Eric Day
 
3
 * All rights reserved.
 
4
 *
 
5
 * Use and distribution licensed under the BSD license.  See
 
6
 * the COPYING file in the parent directory for full text.
 
7
 */
 
8
 
 
9
/**
 
10
 * @file
 
11
 * @brief Gearmand Definitions
 
12
 */
 
13
 
 
14
#include "common.h"
 
15
#include "gearmand.h"
 
16
 
 
17
/*
 
18
 * Private declarations
 
19
 */
 
20
 
 
21
/**
 
22
 * @addtogroup gearmand_private Private Gearman Daemon Functions
 
23
 * @ingroup gearmand
 
24
 * @{
 
25
 */
 
26
 
 
27
static void _log(const char *line, gearman_verbose_t verbose, void *context);
 
28
 
 
29
static gearman_return_t _listen_init(gearmand_st *gearmand);
 
30
static void _listen_close(gearmand_st *gearmand);
 
31
static gearman_return_t _listen_watch(gearmand_st *gearmand);
 
32
static void _listen_clear(gearmand_st *gearmand);
 
33
static void _listen_event(int fd, short events, void *arg);
 
34
 
 
35
static gearman_return_t _wakeup_init(gearmand_st *gearmand);
 
36
static void _wakeup_close(gearmand_st *gearmand);
 
37
static gearman_return_t _wakeup_watch(gearmand_st *gearmand);
 
38
static void _wakeup_clear(gearmand_st *gearmand);
 
39
static void _wakeup_event(int fd, short events, void *arg);
 
40
 
 
41
static gearman_return_t _watch_events(gearmand_st *gearmand);
 
42
static void _clear_events(gearmand_st *gearmand);
 
43
static void _close_events(gearmand_st *gearmand);
 
44
 
 
45
/** @} */
 
46
 
 
47
/*
 
48
 * Public definitions
 
49
 */
 
50
 
 
51
gearmand_st *gearmand_create(const char *host, in_port_t port)
 
52
{
 
53
  gearmand_st *gearmand;
 
54
 
 
55
  gearmand= malloc(sizeof(gearmand_st));
 
56
  if (gearmand == NULL)
 
57
    return NULL;
 
58
 
 
59
  if (gearman_server_create(&(gearmand->server)) == NULL)
 
60
  {
 
61
    free(gearmand);
 
62
    return NULL;
 
63
  }
 
64
 
 
65
  gearmand->options= 0;
 
66
  gearmand->verbose= 0;
 
67
  gearmand->ret= 0;
 
68
  gearmand->backlog= GEARMAN_DEFAULT_BACKLOG;
 
69
  gearmand->threads= 0;
 
70
  gearmand->port_count= 0;
 
71
  gearmand->thread_count= 0;
 
72
  gearmand->free_dcon_count= 0;
 
73
  gearmand->max_thread_free_dcon_count= 0;
 
74
  gearmand->wakeup_fd[0]= -1;
 
75
  gearmand->wakeup_fd[1]= -1;
 
76
  gearmand->host= host;
 
77
  gearmand->log_fn= NULL;
 
78
  gearmand->log_context= NULL;
 
79
  gearmand->base= NULL;
 
80
  gearmand->port_list= NULL;
 
81
  gearmand->thread_list= NULL;
 
82
  gearmand->thread_add_next= NULL;
 
83
  gearmand->free_dcon_list= NULL;
 
84
 
 
85
  if (port == 0)
 
86
    port= GEARMAN_DEFAULT_TCP_PORT;
 
87
 
 
88
  if (gearmand_port_add(gearmand, port, NULL) != GEARMAN_SUCCESS)
 
89
  {
 
90
    gearmand_free(gearmand);
 
91
    return NULL;
 
92
  }
 
93
 
 
94
  return gearmand;
 
95
}
 
96
 
 
97
void gearmand_free(gearmand_st *gearmand)
 
98
{
 
99
  gearmand_con_st *dcon;
 
100
  uint32_t x;
 
101
 
 
102
  _close_events(gearmand);
 
103
 
 
104
  if (gearmand->threads > 0)
 
105
    GEARMAN_INFO(gearmand, "Shutting down all threads")
 
106
 
 
107
  while (gearmand->thread_list != NULL)
 
108
    gearmand_thread_free(gearmand->thread_list);
 
109
 
 
110
  while (gearmand->free_dcon_list != NULL)
 
111
  {
 
112
    dcon= gearmand->free_dcon_list;
 
113
    gearmand->free_dcon_list= dcon->next;
 
114
    free(dcon);
 
115
  }
 
116
 
 
117
  if (gearmand->base != NULL)
 
118
    event_base_free(gearmand->base);
 
119
 
 
120
  gearman_server_free(&(gearmand->server));
 
121
 
 
122
  for (x= 0; x < gearmand->port_count; x++)
 
123
  {
 
124
    if (gearmand->port_list[x].listen_fd != NULL)
 
125
      free(gearmand->port_list[x].listen_fd);
 
126
 
 
127
    if (gearmand->port_list[x].listen_event != NULL)
 
128
      free(gearmand->port_list[x].listen_event);
 
129
  }
 
130
 
 
131
  if (gearmand->port_list != NULL)
 
132
    free(gearmand->port_list);
 
133
 
 
134
  GEARMAN_INFO(gearmand, "Shutdown complete")
 
135
 
 
136
  free(gearmand);
 
137
}
 
138
 
 
139
void gearmand_set_backlog(gearmand_st *gearmand, int backlog)
 
140
{
 
141
  gearmand->backlog= backlog;
 
142
}
 
143
 
 
144
void gearmand_set_job_retries(gearmand_st *gearmand, uint8_t job_retries)
 
145
{
 
146
  gearman_server_set_job_retries(&(gearmand->server), job_retries);
 
147
}
 
148
 
 
149
void gearmand_set_threads(gearmand_st *gearmand, uint32_t threads)
 
150
{
 
151
  gearmand->threads= threads;
 
152
}
 
153
 
 
154
void gearmand_set_log_fn(gearmand_st *gearmand, gearman_log_fn *function,
 
155
                         const void *context, gearman_verbose_t verbose)
 
156
{
 
157
  gearman_server_set_log_fn(&(gearmand->server), _log, gearmand, verbose);
 
158
  gearmand->log_fn= function;
 
159
  gearmand->log_context= context;
 
160
  gearmand->verbose= verbose;
 
161
}
 
162
 
 
163
gearman_return_t gearmand_port_add(gearmand_st *gearmand, in_port_t port,
 
164
                                   gearman_con_add_fn *function)
 
165
{
 
166
  gearmand_port_st *port_list;
 
167
 
 
168
  port_list= realloc(gearmand->port_list,
 
169
                     sizeof(gearmand_port_st) * (gearmand->port_count + 1));
 
170
  if (port_list == NULL)
 
171
  {
 
172
    GEARMAN_FATAL(gearmand, "gearmand_port_add:realloc:NULL");
 
173
    return GEARMAN_MEMORY_ALLOCATION_FAILURE;
 
174
  }
 
175
 
 
176
  port_list[gearmand->port_count].port= port;
 
177
  port_list[gearmand->port_count].listen_count= 0;
 
178
  port_list[gearmand->port_count].gearmand= gearmand;
 
179
  port_list[gearmand->port_count].add_fn= function;
 
180
  port_list[gearmand->port_count].listen_fd= NULL;
 
181
  port_list[gearmand->port_count].listen_event= NULL;
 
182
 
 
183
  gearmand->port_list= port_list;
 
184
  gearmand->port_count++;
 
185
 
 
186
  return GEARMAN_SUCCESS;
 
187
}
 
188
 
 
189
gearman_return_t gearmand_run(gearmand_st *gearmand)
 
190
{
 
191
  uint32_t x;
 
192
 
 
193
  /* Initialize server components. */
 
194
  if (gearmand->base == NULL)
 
195
  {
 
196
    GEARMAN_INFO(gearmand, "Starting up")
 
197
 
 
198
    if (gearmand->threads > 0)
 
199
    {
 
200
#ifndef HAVE_EVENT_BASE_NEW
 
201
      GEARMAN_FATAL(gearmand, "Multi-threaded gearmand requires libevent 1.4 "                                "or later, libevent 1.3 does not provided a "
 
202
                              "thread-safe interface.");
 
203
      return GEARMAN_EVENT;
 
204
#else
 
205
      /* Set the number of free connection structures each thread should keep
 
206
         around before the main thread is forced to take them. We compute this
 
207
         here so we don't need to on every new connection. */
 
208
      gearmand->max_thread_free_dcon_count= ((GEARMAN_MAX_FREE_SERVER_CON /
 
209
                                              gearmand->threads) / 2);
 
210
#endif
 
211
    }
 
212
 
 
213
    GEARMAN_DEBUG(gearmand, "Initializing libevent for main thread")
 
214
 
 
215
    gearmand->base= event_base_new();
 
216
    if (gearmand->base == NULL)
 
217
    {
 
218
      GEARMAN_FATAL(gearmand, "gearmand_run:event_base_new:NULL")
 
219
      return GEARMAN_EVENT;
 
220
    }
 
221
 
 
222
    GEARMAN_DEBUG(gearmand, "Method for libevent: %s",
 
223
                  event_base_get_method(gearmand->base));
 
224
 
 
225
    gearmand->ret= _listen_init(gearmand);
 
226
    if (gearmand->ret != GEARMAN_SUCCESS)
 
227
      return gearmand->ret;
 
228
 
 
229
    gearmand->ret= _wakeup_init(gearmand);
 
230
    if (gearmand->ret != GEARMAN_SUCCESS)
 
231
      return gearmand->ret;
 
232
 
 
233
    GEARMAN_DEBUG(gearmand, "Creating %u threads", gearmand->threads)
 
234
 
 
235
    /* If we have 0 threads we still need to create a fake one for context. */
 
236
    x= 0;
 
237
    do
 
238
    {
 
239
      gearmand->ret= gearmand_thread_create(gearmand);
 
240
      if (gearmand->ret != GEARMAN_SUCCESS)
 
241
        return gearmand->ret;
 
242
      x++;
 
243
    }
 
244
    while (x < gearmand->threads);
 
245
 
 
246
    gearmand->ret= gearman_server_queue_replay(&(gearmand->server));
 
247
    if (gearmand->ret != GEARMAN_SUCCESS)
 
248
      return gearmand->ret;
 
249
  }
 
250
 
 
251
  gearmand->ret= _watch_events(gearmand);
 
252
  if (gearmand->ret != GEARMAN_SUCCESS)
 
253
    return gearmand->ret;
 
254
 
 
255
  GEARMAN_INFO(gearmand, "Entering main event loop")
 
256
 
 
257
  if (event_base_loop(gearmand->base, 0) == -1)
 
258
  {
 
259
    GEARMAN_FATAL(gearmand, "gearmand_run:event_base_loop:-1")
 
260
    return GEARMAN_EVENT;
 
261
  }
 
262
 
 
263
  GEARMAN_INFO(gearmand, "Exited main event loop")
 
264
 
 
265
  return gearmand->ret;
 
266
}
 
267
 
 
268
void gearmand_wakeup(gearmand_st *gearmand, gearmand_wakeup_t wakeup)
 
269
{
 
270
  uint8_t buffer= wakeup;
 
271
 
 
272
  /* If this fails, there is not much we can really do. This should never fail
 
273
     though if the main gearmand thread is still active. */
 
274
  if (write(gearmand->wakeup_fd[1], &buffer, 1) != 1)
 
275
    GEARMAN_ERROR(gearmand, "gearmand_wakeup:write:%d", errno)
 
276
}
 
277
 
 
278
/*
 
279
 * Private definitions
 
280
 */
 
281
 
 
282
static void _log(const char *line, gearman_verbose_t verbose, void *context)
 
283
{
 
284
  gearmand_st *gearmand= (gearmand_st *)context;
 
285
  (*gearmand->log_fn)(line, verbose, (void *)gearmand->log_context);
 
286
}
 
287
 
 
288
static gearman_return_t _listen_init(gearmand_st *gearmand)
 
289
{
 
290
  struct gearmand_port_st *port;
 
291
  struct addrinfo *addrinfo;
 
292
  struct addrinfo *addrinfo_next;
 
293
  struct addrinfo ai;
 
294
  int ret;
 
295
  int opt;
 
296
  char host[NI_MAXHOST];
 
297
  char port_str[NI_MAXSERV];
 
298
  int fd;
 
299
  int *fd_list;
 
300
  uint32_t x;
 
301
  uint32_t y;
 
302
 
 
303
  for (x= 0; x < gearmand->port_count; x++)
 
304
  {
 
305
    port= &gearmand->port_list[x];
 
306
 
 
307
    snprintf(port_str, NI_MAXSERV, "%u", port->port);
 
308
 
 
309
    memset(&ai, 0, sizeof(struct addrinfo));
 
310
    ai.ai_flags  = AI_PASSIVE;
 
311
    ai.ai_family = AF_UNSPEC;
 
312
    ai.ai_socktype = SOCK_STREAM;
 
313
    ai.ai_protocol= IPPROTO_TCP;
 
314
 
 
315
    ret= getaddrinfo(gearmand->host, port_str, &ai, &addrinfo);
 
316
    if (ret != 0)
 
317
    {
 
318
      GEARMAN_FATAL(gearmand, "_listen_init:getaddrinfo:%s", gai_strerror(ret))
 
319
      return GEARMAN_ERRNO;
 
320
    }
 
321
 
 
322
    for (addrinfo_next= addrinfo; addrinfo_next != NULL;
 
323
         addrinfo_next= addrinfo_next->ai_next)
 
324
    {
 
325
      ret= getnameinfo(addrinfo_next->ai_addr, addrinfo_next->ai_addrlen, host,
 
326
                       NI_MAXHOST, port_str, NI_MAXSERV,
 
327
                       NI_NUMERICHOST | NI_NUMERICSERV);
 
328
      if (ret != 0)
 
329
      {
 
330
        GEARMAN_ERROR(gearmand, "_listen_init:getnameinfo:%s",
 
331
                      gai_strerror(ret))
 
332
        strcpy(host, "-");
 
333
        strcpy(port_str, "-");
 
334
      }
 
335
 
 
336
      GEARMAN_DEBUG(gearmand, "Trying to listen on %s:%s", host, port_str)
 
337
 
 
338
      /* Call to socket() can fail for some getaddrinfo results, try another. */
 
339
      fd= socket(addrinfo_next->ai_family, addrinfo_next->ai_socktype,
 
340
                 addrinfo_next->ai_protocol);
 
341
      if (fd == -1)
 
342
      {
 
343
        GEARMAN_ERROR(gearmand, "Failed to listen on %s:%s", host, port_str)
 
344
        continue;
 
345
      }
 
346
 
 
347
      opt= 1;
 
348
      ret= setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
 
349
      if (ret == -1)
 
350
      {
 
351
        close(fd);
 
352
        GEARMAN_FATAL(gearmand, "_listen_init:setsockopt:%d", errno)
 
353
        return GEARMAN_ERRNO;
 
354
      }
 
355
 
 
356
      ret= bind(fd, addrinfo_next->ai_addr, addrinfo_next->ai_addrlen);
 
357
      if (ret == -1)
 
358
      {
 
359
        close(fd);
 
360
        if (errno == EADDRINUSE)
 
361
        {
 
362
          if (port->listen_fd == NULL)
 
363
          {
 
364
            GEARMAN_ERROR(gearmand, "Address already in use %s:%s", host,
 
365
                          port_str)
 
366
          }
 
367
 
 
368
          continue;
 
369
        }
 
370
 
 
371
        GEARMAN_FATAL(gearmand, "_listen_init:bind:%d", errno)
 
372
        return GEARMAN_ERRNO;
 
373
      }
 
374
 
 
375
      if (listen(fd, gearmand->backlog) == -1)
 
376
      {
 
377
        close(fd);
 
378
        GEARMAN_FATAL(gearmand, "_listen_init:listen:%d", errno)
 
379
        return GEARMAN_ERRNO;
 
380
      }
 
381
 
 
382
      fd_list= realloc(port->listen_fd, sizeof(int) * (port->listen_count + 1));
 
383
      if (fd_list == NULL)
 
384
      {
 
385
        close(fd);
 
386
        GEARMAN_FATAL(gearmand, "_listen_init:realloc:%d", errno)
 
387
        return GEARMAN_ERRNO;
 
388
      }
 
389
 
 
390
      port->listen_fd= fd_list;
 
391
      port->listen_fd[port->listen_count]= fd;
 
392
      port->listen_count++;
 
393
 
 
394
      GEARMAN_INFO(gearmand, "Listening on %s:%s (%d)", host, port_str, fd)
 
395
    }
 
396
 
 
397
    freeaddrinfo(addrinfo);
 
398
 
 
399
    /* Report last socket() error if we couldn't find an address to bind. */
 
400
    if (port->listen_fd == NULL)
 
401
    {
 
402
      GEARMAN_FATAL(gearmand,
 
403
                    "_listen_init:Could not bind/listen to any addresses")
 
404
      return GEARMAN_ERRNO;
 
405
    }
 
406
 
 
407
    port->listen_event= malloc(sizeof(struct event) * port->listen_count);
 
408
    if (port->listen_event == NULL)
 
409
    {
 
410
      GEARMAN_FATAL(gearmand, "_listen_init:malloc:%d", errno)
 
411
      return GEARMAN_ERRNO;
 
412
    }
 
413
 
 
414
    for (y= 0; y < port->listen_count; y++)
 
415
    {
 
416
      event_set(&(port->listen_event[y]), port->listen_fd[y],
 
417
                EV_READ | EV_PERSIST, _listen_event, port);
 
418
      event_base_set(gearmand->base, &(port->listen_event[y]));
 
419
    }
 
420
  }
 
421
 
 
422
  return GEARMAN_SUCCESS;
 
423
}
 
424
 
 
425
static void _listen_close(gearmand_st *gearmand)
 
426
{
 
427
  uint32_t x;
 
428
  uint32_t y;
 
429
 
 
430
  _listen_clear(gearmand);
 
431
 
 
432
  for (x= 0; x < gearmand->port_count; x++)
 
433
  {
 
434
    for (y= 0; y < gearmand->port_list[x].listen_count; y++)
 
435
    {
 
436
      if (gearmand->port_list[x].listen_fd[y] >= 0)
 
437
      {
 
438
        GEARMAN_INFO(gearmand, "Closing listening socket (%d)",
 
439
                     gearmand->port_list[x].listen_fd[y])
 
440
        close(gearmand->port_list[x].listen_fd[y]);
 
441
        gearmand->port_list[x].listen_fd[y]= -1;
 
442
      }
 
443
    }
 
444
  }
 
445
}
 
446
 
 
447
static gearman_return_t _listen_watch(gearmand_st *gearmand)
 
448
{
 
449
  uint32_t x;
 
450
  uint32_t y;
 
451
 
 
452
  if (gearmand->options & GEARMAND_LISTEN_EVENT)
 
453
    return GEARMAN_SUCCESS;
 
454
 
 
455
  for (x= 0; x < gearmand->port_count; x++)
 
456
  {
 
457
    for (y= 0; y < gearmand->port_list[x].listen_count; y++)
 
458
    {
 
459
      GEARMAN_INFO(gearmand, "Adding event for listening socket (%d)",
 
460
                   gearmand->port_list[x].listen_fd[y])
 
461
 
 
462
      if (event_add(&(gearmand->port_list[x].listen_event[y]), NULL) == -1)
 
463
      {
 
464
        GEARMAN_FATAL(gearmand, "_listen_watch:event_add:-1")
 
465
        return GEARMAN_EVENT;
 
466
      }
 
467
    }
 
468
  }
 
469
 
 
470
  gearmand->options|= GEARMAND_LISTEN_EVENT;
 
471
  return GEARMAN_SUCCESS;
 
472
}
 
473
 
 
474
static void _listen_clear(gearmand_st *gearmand)
 
475
{
 
476
  uint32_t x;
 
477
  uint32_t y;
 
478
 
 
479
  if (!(gearmand->options & GEARMAND_LISTEN_EVENT))
 
480
    return;
 
481
 
 
482
  for (x= 0; x < gearmand->port_count; x++)
 
483
  {
 
484
    for (y= 0; y < gearmand->port_list[x].listen_count; y++)
 
485
    {
 
486
      GEARMAN_INFO(gearmand, "Clearing event for listening socket (%d)",
 
487
                   gearmand->port_list[x].listen_fd[y])
 
488
      assert(event_del(&(gearmand->port_list[x].listen_event[y])) == 0);
 
489
    }
 
490
  }
 
491
 
 
492
  gearmand->options&= (gearmand_options_t)~GEARMAND_LISTEN_EVENT;
 
493
}
 
494
 
 
495
static void _listen_event(int fd, short events __attribute__ ((unused)),
 
496
                          void *arg)
 
497
{
 
498
  gearmand_port_st *port= (gearmand_port_st *)arg;
 
499
  struct sockaddr sa;
 
500
  socklen_t sa_len;
 
501
  char host[NI_MAXHOST];
 
502
  char port_str[NI_MAXSERV];
 
503
  int ret;
 
504
 
 
505
  sa_len= sizeof(sa);
 
506
  fd= accept(fd, &sa, &sa_len);
 
507
  if (fd == -1)
 
508
  {
 
509
    if (errno == EINTR)
 
510
      return;
 
511
    else if (errno == EMFILE)
 
512
    {
 
513
      GEARMAN_ERROR(port->gearmand, "_listen_event:accept:too many open files")
 
514
      return;
 
515
    }
 
516
 
 
517
    _clear_events(port->gearmand);
 
518
    GEARMAN_FATAL(port->gearmand, "_listen_event:accept:%d", errno)
 
519
    port->gearmand->ret= GEARMAN_ERRNO;
 
520
    return;
 
521
  }
 
522
 
 
523
  /* Since this is numeric, it should never fail. Even if it did we don't want
 
524
     to really error from it. */
 
525
  ret= getnameinfo(&sa, sa_len, host, NI_MAXHOST, port_str, NI_MAXSERV,
 
526
                   NI_NUMERICHOST | NI_NUMERICSERV);
 
527
  if (ret != 0)
 
528
  {
 
529
    GEARMAN_ERROR(port->gearmand, "_listen_event:getnameinfo:%s",
 
530
                  gai_strerror(ret))
 
531
    strcpy(host, "-");
 
532
    strcpy(port_str, "-");
 
533
  }
 
534
 
 
535
  GEARMAN_INFO(port->gearmand, "Accepted connection from %s:%s", host, port_str)
 
536
 
 
537
  port->gearmand->ret= gearmand_con_create(port->gearmand, fd, host, port_str,
 
538
                                           port->add_fn);
 
539
  if (port->gearmand->ret != GEARMAN_SUCCESS)
 
540
    _clear_events(port->gearmand);
 
541
}
 
542
 
 
543
static gearman_return_t _wakeup_init(gearmand_st *gearmand)
 
544
{
 
545
  int ret;
 
546
 
 
547
  GEARMAN_INFO(gearmand, "Creating wakeup pipe")
 
548
 
 
549
  ret= pipe(gearmand->wakeup_fd);
 
550
  if (ret == -1)
 
551
  {
 
552
    GEARMAN_FATAL(gearmand, "_wakeup_init:pipe:%d", errno)
 
553
    return GEARMAN_ERRNO;
 
554
  }
 
555
 
 
556
  ret= fcntl(gearmand->wakeup_fd[0], F_GETFL, 0);
 
557
  if (ret == -1)
 
558
  {
 
559
    GEARMAN_FATAL(gearmand, "_wakeup_init:fcntl:F_GETFL:%d", errno)
 
560
    return GEARMAN_ERRNO;
 
561
  }
 
562
 
 
563
  ret= fcntl(gearmand->wakeup_fd[0], F_SETFL, ret | O_NONBLOCK);
 
564
  if (ret == -1)
 
565
  {
 
566
    GEARMAN_FATAL(gearmand, "_wakeup_init:fcntl:F_SETFL:%d", errno)
 
567
    return GEARMAN_ERRNO;
 
568
  }
 
569
 
 
570
  event_set(&(gearmand->wakeup_event), gearmand->wakeup_fd[0],
 
571
            EV_READ | EV_PERSIST, _wakeup_event, gearmand);
 
572
  event_base_set(gearmand->base, &(gearmand->wakeup_event));
 
573
 
 
574
  return GEARMAN_SUCCESS;
 
575
}
 
576
 
 
577
static void _wakeup_close(gearmand_st *gearmand)
 
578
{
 
579
  _wakeup_clear(gearmand);
 
580
 
 
581
  if (gearmand->wakeup_fd[0] >= 0)
 
582
  {
 
583
    GEARMAN_INFO(gearmand, "Closing wakeup pipe")
 
584
    close(gearmand->wakeup_fd[0]);
 
585
    gearmand->wakeup_fd[0]= -1;
 
586
    close(gearmand->wakeup_fd[1]);
 
587
    gearmand->wakeup_fd[1]= -1;
 
588
  }
 
589
}
 
590
 
 
591
static gearman_return_t _wakeup_watch(gearmand_st *gearmand)
 
592
{
 
593
  if (gearmand->options & GEARMAND_WAKEUP_EVENT)
 
594
    return GEARMAN_SUCCESS;
 
595
 
 
596
  GEARMAN_INFO(gearmand, "Adding event for wakeup pipe")
 
597
 
 
598
  if (event_add(&(gearmand->wakeup_event), NULL) == -1)
 
599
  {
 
600
    GEARMAN_FATAL(gearmand, "_wakeup_watch:event_add:-1")
 
601
    return GEARMAN_EVENT;
 
602
  }
 
603
 
 
604
  gearmand->options|= GEARMAND_WAKEUP_EVENT;
 
605
  return GEARMAN_SUCCESS;
 
606
}
 
607
 
 
608
static void _wakeup_clear(gearmand_st *gearmand)
 
609
{
 
610
  if (gearmand->options & GEARMAND_WAKEUP_EVENT)
 
611
  {
 
612
    GEARMAN_INFO(gearmand, "Clearing event for wakeup pipe")
 
613
    assert(event_del(&(gearmand->wakeup_event)) == 0);
 
614
    gearmand->options&= (gearmand_options_t)~GEARMAND_WAKEUP_EVENT;
 
615
  }
 
616
}
 
617
 
 
618
static void _wakeup_event(int fd, short events __attribute__ ((unused)),
 
619
                          void *arg)
 
620
{
 
621
  gearmand_st *gearmand= (gearmand_st *)arg;
 
622
  uint8_t buffer[GEARMAN_PIPE_BUFFER_SIZE];
 
623
  ssize_t ret;
 
624
  ssize_t x;
 
625
  gearmand_thread_st *thread;
 
626
 
 
627
  while (1)
 
628
  {
 
629
    ret= read(fd, buffer, GEARMAN_PIPE_BUFFER_SIZE);
 
630
    if (ret == 0)
 
631
    {
 
632
      _clear_events(gearmand);
 
633
      GEARMAN_FATAL(gearmand, "_wakeup_event:read:EOF")
 
634
      gearmand->ret= GEARMAN_PIPE_EOF;
 
635
      return;
 
636
    }
 
637
    else if (ret == -1)
 
638
    {
 
639
      if (errno == EINTR)
 
640
        continue;
 
641
 
 
642
      if (errno == EAGAIN)
 
643
        break;
 
644
 
 
645
      _clear_events(gearmand);
 
646
      GEARMAN_FATAL(gearmand, "_wakeup_event:read:%d", errno)
 
647
      gearmand->ret= GEARMAN_ERRNO;
 
648
      return;
 
649
    }
 
650
 
 
651
    for (x= 0; x < ret; x++)
 
652
    {
 
653
      switch ((gearmand_wakeup_t)buffer[x])
 
654
      {
 
655
      case GEARMAND_WAKEUP_PAUSE:
 
656
        GEARMAN_INFO(gearmand, "Received PAUSE wakeup event")
 
657
        _clear_events(gearmand);
 
658
        gearmand->ret= GEARMAN_PAUSE;
 
659
        break;
 
660
 
 
661
      case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
 
662
        GEARMAN_INFO(gearmand, "Received SHUTDOWN_GRACEFUL wakeup event")
 
663
        _listen_close(gearmand);
 
664
 
 
665
        for (thread= gearmand->thread_list; thread != NULL;
 
666
             thread= thread->next)
 
667
        {
 
668
          gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL);
 
669
        }
 
670
 
 
671
        gearmand->ret= GEARMAN_SHUTDOWN_GRACEFUL;
 
672
        break;
 
673
 
 
674
      case GEARMAND_WAKEUP_SHUTDOWN:
 
675
        GEARMAN_INFO(gearmand, "Received SHUTDOWN wakeup event")
 
676
        _clear_events(gearmand);
 
677
        gearmand->ret= GEARMAN_SHUTDOWN;
 
678
        break;
 
679
 
 
680
      case GEARMAND_WAKEUP_CON:
 
681
      case GEARMAND_WAKEUP_RUN:
 
682
      default:
 
683
        GEARMAN_FATAL(gearmand, "Received unknown wakeup event (%u)",
 
684
                      buffer[x])
 
685
        _clear_events(gearmand);
 
686
        gearmand->ret= GEARMAN_UNKNOWN_STATE;
 
687
        break;
 
688
      }
 
689
    }
 
690
  }
 
691
}
 
692
 
 
693
static gearman_return_t _watch_events(gearmand_st *gearmand)
 
694
{
 
695
  gearman_return_t ret;
 
696
 
 
697
  ret= _listen_watch(gearmand);
 
698
  if (ret != GEARMAN_SUCCESS)
 
699
    return ret;
 
700
 
 
701
  ret= _wakeup_watch(gearmand);
 
702
  if (ret != GEARMAN_SUCCESS)
 
703
    return ret;
 
704
 
 
705
  return GEARMAN_SUCCESS;
 
706
}
 
707
 
 
708
static void _clear_events(gearmand_st *gearmand)
 
709
{
 
710
  _listen_clear(gearmand);
 
711
  _wakeup_clear(gearmand);
 
712
 
 
713
  /* If we are not threaded, tell the fake thread to shutdown now to clear
 
714
     connections. Otherwise we will never exit the libevent loop. */
 
715
  if (gearmand->threads == 0 && gearmand->thread_list != NULL)
 
716
    gearmand_thread_wakeup(gearmand->thread_list, GEARMAND_WAKEUP_SHUTDOWN);
 
717
}
 
718
 
 
719
static void _close_events(gearmand_st *gearmand)
 
720
{
 
721
  _listen_close(gearmand);
 
722
  _wakeup_close(gearmand);
 
723
}