~clint-fewbar/ubuntu/precise/gearmand/drop-unneeded-patches

« back to all changes in this revision

Viewing changes to libgearman/gearmand.c

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2009-09-28 21:43:31 UTC
  • mto: (1.2.3 upstream) (6.1.1 sid)
  • mto: This revision was merged to the branch mainline in revision 5.
  • Revision ID: james.westby@ubuntu.com-20090928214331-9bku0d3v1b1ypgp4
ImportĀ upstreamĀ versionĀ 0.10

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Gearman server and library
2
 
 * Copyright (C) 2008 Brian Aker, Eric Day
3
 
 * All rights reserved.
4
 
 *
5
 
 * Use and distribution licensed under the BSD license.  See
6
 
 * the COPYING file in the parent directory for full text.
7
 
 */
8
 
 
9
 
/**
10
 
 * @file
11
 
 * @brief Gearmand Definitions
12
 
 */
13
 
 
14
 
#include "common.h"
15
 
 
16
 
/*
17
 
 * Private declarations
18
 
 */
19
 
 
20
 
/**
21
 
 * @addtogroup gearmand_private Private Gearman Daemon Functions
22
 
 * @ingroup gearmand
23
 
 * @{
24
 
 */
25
 
 
26
 
static void _log(gearman_server_st *server, gearman_verbose_t verbose,
27
 
                 const char *line, void *arg);
28
 
 
29
 
static gearman_return_t _listen_init(gearmand_st *gearmand);
30
 
static void _listen_close(gearmand_st *gearmand);
31
 
static gearman_return_t _listen_watch(gearmand_st *gearmand);
32
 
static void _listen_clear(gearmand_st *gearmand);
33
 
static void _listen_event(int fd, short events, void *arg);
34
 
 
35
 
static gearman_return_t _wakeup_init(gearmand_st *gearmand);
36
 
static void _wakeup_close(gearmand_st *gearmand);
37
 
static gearman_return_t _wakeup_watch(gearmand_st *gearmand);
38
 
static void _wakeup_clear(gearmand_st *gearmand);
39
 
static void _wakeup_event(int fd, short events, void *arg);
40
 
 
41
 
static gearman_return_t _watch_events(gearmand_st *gearmand);
42
 
static void _clear_events(gearmand_st *gearmand);
43
 
static void _close_events(gearmand_st *gearmand);
44
 
 
45
 
/** @} */
46
 
 
47
 
/*
48
 
 * Public definitions
49
 
 */
50
 
 
51
 
gearmand_st *gearmand_create(const char *host, in_port_t port)
52
 
{
53
 
  gearmand_st *gearmand;
54
 
 
55
 
  gearmand= malloc(sizeof(gearmand_st));
56
 
  if (gearmand == NULL)
57
 
    return NULL;
58
 
 
59
 
  if (gearman_server_create(&(gearmand->server)) == NULL)
60
 
  {
61
 
    free(gearmand);
62
 
    return NULL;
63
 
  }
64
 
 
65
 
  gearmand->options= 0;
66
 
  gearmand->verbose= 0;
67
 
  gearmand->ret= 0;
68
 
  gearmand->backlog= GEARMAN_DEFAULT_BACKLOG;
69
 
  gearmand->threads= 0;
70
 
  gearmand->port_count= 0;
71
 
  gearmand->thread_count= 0;
72
 
  gearmand->free_dcon_count= 0;
73
 
  gearmand->max_thread_free_dcon_count= 0;
74
 
  gearmand->wakeup_fd[0]= -1;
75
 
  gearmand->wakeup_fd[1]= -1;
76
 
  gearmand->host= host;
77
 
  gearmand->log_fn= NULL;
78
 
  gearmand->log_fn_arg= NULL;
79
 
  gearmand->base= NULL;
80
 
  gearmand->port_list= NULL;
81
 
  gearmand->thread_list= NULL;
82
 
  gearmand->thread_add_next= NULL;
83
 
  gearmand->free_dcon_list= NULL;
84
 
 
85
 
  if (port == 0)
86
 
    port= GEARMAN_DEFAULT_TCP_PORT;
87
 
 
88
 
  if (gearmand_port_add(gearmand, port, NULL) != GEARMAN_SUCCESS)
89
 
  {
90
 
    gearmand_free(gearmand);
91
 
    return NULL;
92
 
  }
93
 
 
94
 
  return gearmand;
95
 
}
96
 
 
97
 
void gearmand_free(gearmand_st *gearmand)
98
 
{
99
 
  gearmand_con_st *dcon;
100
 
  uint32_t x;
101
 
 
102
 
  _close_events(gearmand);
103
 
 
104
 
  if (gearmand->threads > 0)
105
 
    GEARMAN_INFO(gearmand, "Shutting down all threads")
106
 
 
107
 
  while (gearmand->thread_list != NULL)
108
 
    gearmand_thread_free(gearmand->thread_list);
109
 
 
110
 
  while (gearmand->free_dcon_list != NULL)
111
 
  {
112
 
    dcon= gearmand->free_dcon_list;
113
 
    gearmand->free_dcon_list= dcon->next;
114
 
    free(dcon);
115
 
  }
116
 
 
117
 
  if (gearmand->base != NULL)
118
 
    event_base_free(gearmand->base);
119
 
 
120
 
  gearman_server_free(&(gearmand->server));
121
 
 
122
 
  for (x= 0; x < gearmand->port_count; x++)
123
 
  {
124
 
    if (gearmand->port_list[x].listen_fd != NULL)
125
 
      free(gearmand->port_list[x].listen_fd);
126
 
 
127
 
    if (gearmand->port_list[x].listen_event != NULL)
128
 
      free(gearmand->port_list[x].listen_event);
129
 
  }
130
 
 
131
 
  if (gearmand->port_list != NULL)
132
 
    free(gearmand->port_list);
133
 
 
134
 
  GEARMAN_INFO(gearmand, "Shutdown complete")
135
 
 
136
 
  free(gearmand);
137
 
}
138
 
 
139
 
void gearmand_set_backlog(gearmand_st *gearmand, int backlog)
140
 
{
141
 
  gearmand->backlog= backlog;
142
 
}
143
 
 
144
 
void gearmand_set_threads(gearmand_st *gearmand, uint32_t threads)
145
 
{
146
 
  gearmand->threads= threads;
147
 
}
148
 
 
149
 
void gearmand_set_log(gearmand_st *gearmand, gearmand_log_fn log_fn,
150
 
                      void *log_fn_arg, gearman_verbose_t verbose)
151
 
{
152
 
  gearman_server_set_log(&(gearmand->server), _log, gearmand, verbose);
153
 
  gearmand->log_fn= log_fn;
154
 
  gearmand->log_fn_arg= log_fn_arg;
155
 
  gearmand->verbose= verbose;
156
 
}
157
 
 
158
 
gearman_return_t gearmand_port_add(gearmand_st *gearmand, in_port_t port,
159
 
                                   gearman_con_add_fn *add_fn)
160
 
{
161
 
  gearmand_port_st *port_list;
162
 
 
163
 
  port_list= realloc(gearmand->port_list,
164
 
                     sizeof(gearmand_port_st) * (gearmand->port_count + 1));
165
 
  if (port_list == NULL)
166
 
  {
167
 
    GEARMAN_FATAL(gearmand, "gearmand_port_add:realloc:NULL");
168
 
    return GEARMAN_MEMORY_ALLOCATION_FAILURE;
169
 
  }
170
 
 
171
 
  port_list[gearmand->port_count].port= port;
172
 
  port_list[gearmand->port_count].listen_count= 0;
173
 
  port_list[gearmand->port_count].gearmand= gearmand;
174
 
  port_list[gearmand->port_count].add_fn= add_fn;
175
 
  port_list[gearmand->port_count].listen_fd= NULL;
176
 
  port_list[gearmand->port_count].listen_event= NULL;
177
 
 
178
 
  gearmand->port_list= port_list;
179
 
  gearmand->port_count++;
180
 
 
181
 
  return GEARMAN_SUCCESS;
182
 
}
183
 
 
184
 
gearman_return_t gearmand_run(gearmand_st *gearmand)
185
 
{
186
 
  uint32_t x;
187
 
 
188
 
  /* Initialize server components. */
189
 
  if (gearmand->base == NULL)
190
 
  {
191
 
    GEARMAN_INFO(gearmand, "Starting up")
192
 
 
193
 
    if (gearmand->threads > 0)
194
 
    {
195
 
#ifndef HAVE_EVENT_BASE_NEW
196
 
      GEARMAN_FATAL(gearmand, "Multi-threaded gearmand requires libevent 1.4 "                                "or later, libevent 1.3 does not provided a "
197
 
                              "thread-safe interface.");
198
 
      return GEARMAN_EVENT;
199
 
#else
200
 
      /* Set the number of free connection structures each thread should keep
201
 
         around before the main thread is forced to take them. We compute this
202
 
         here so we don't need to on every new connection. */
203
 
      gearmand->max_thread_free_dcon_count= ((GEARMAN_MAX_FREE_SERVER_CON /
204
 
                                              gearmand->threads) / 2);
205
 
#endif
206
 
    }
207
 
 
208
 
    GEARMAN_DEBUG(gearmand, "Initializing libevent for main thread")
209
 
 
210
 
    gearmand->base= event_base_new();
211
 
    if (gearmand->base == NULL)
212
 
    {
213
 
      GEARMAN_FATAL(gearmand, "gearmand_run:event_base_new:NULL")
214
 
      return GEARMAN_EVENT;
215
 
    }
216
 
 
217
 
    GEARMAN_DEBUG(gearmand, "Method for libevent: %s",
218
 
                  event_base_get_method(gearmand->base));
219
 
 
220
 
    gearmand->ret= _listen_init(gearmand);
221
 
    if (gearmand->ret != GEARMAN_SUCCESS)
222
 
      return gearmand->ret;
223
 
 
224
 
    gearmand->ret= _wakeup_init(gearmand);
225
 
    if (gearmand->ret != GEARMAN_SUCCESS)
226
 
      return gearmand->ret;
227
 
 
228
 
    GEARMAN_DEBUG(gearmand, "Creating %u threads", gearmand->threads)
229
 
 
230
 
    /* If we have 0 threads we still need to create a fake one for context. */
231
 
    x= 0;
232
 
    do
233
 
    {
234
 
      gearmand->ret= gearmand_thread_create(gearmand);
235
 
      if (gearmand->ret != GEARMAN_SUCCESS)
236
 
        return gearmand->ret;
237
 
      x++;
238
 
    }
239
 
    while (x < gearmand->threads);
240
 
 
241
 
    gearmand->ret= gearman_server_queue_replay(&(gearmand->server));
242
 
    if (gearmand->ret != GEARMAN_SUCCESS)
243
 
      return gearmand->ret;
244
 
  }
245
 
 
246
 
  gearmand->ret= _watch_events(gearmand);
247
 
  if (gearmand->ret != GEARMAN_SUCCESS)
248
 
    return gearmand->ret;
249
 
 
250
 
  GEARMAN_INFO(gearmand, "Entering main event loop")
251
 
 
252
 
  if (event_base_loop(gearmand->base, 0) == -1)
253
 
  {
254
 
    GEARMAN_FATAL(gearmand, "gearmand_run:event_base_loop:-1")
255
 
    return GEARMAN_EVENT;
256
 
  }
257
 
 
258
 
  GEARMAN_INFO(gearmand, "Exited main event loop")
259
 
 
260
 
  return gearmand->ret;
261
 
}
262
 
 
263
 
void gearmand_wakeup(gearmand_st *gearmand, gearmand_wakeup_t wakeup)
264
 
{
265
 
  uint8_t buffer= wakeup;
266
 
 
267
 
  /* If this fails, there is not much we can really do. This should never fail
268
 
     though if the main gearmand thread is still active. */
269
 
  if (write(gearmand->wakeup_fd[1], &buffer, 1) != 1)
270
 
    GEARMAN_ERROR(gearmand, "gearmand_wakeup:write:%d", errno)
271
 
}
272
 
 
273
 
/*
274
 
 * Private definitions
275
 
 */
276
 
 
277
 
static void _log(gearman_server_st *server __attribute__ ((unused)),
278
 
                 gearman_verbose_t verbose, const char *line, void *fn_arg)
279
 
{
280
 
  gearmand_st *gearmand= (gearmand_st *)fn_arg;
281
 
  (*gearmand->log_fn)(gearmand, verbose, line, gearmand->log_fn_arg);
282
 
}
283
 
 
284
 
static gearman_return_t _listen_init(gearmand_st *gearmand)
285
 
{
286
 
  struct gearmand_port_st *port;
287
 
  struct addrinfo *addrinfo;
288
 
  struct addrinfo *addrinfo_next;
289
 
  struct addrinfo ai;
290
 
  int ret;
291
 
  int opt;
292
 
  char host[NI_MAXHOST];
293
 
  char port_str[NI_MAXSERV];
294
 
  int fd;
295
 
  int *fd_list;
296
 
  uint32_t x;
297
 
  uint32_t y;
298
 
 
299
 
  for (x= 0; x < gearmand->port_count; x++)
300
 
  {
301
 
    port= &gearmand->port_list[x];
302
 
 
303
 
    snprintf(port_str, NI_MAXSERV, "%u", port->port);
304
 
 
305
 
    memset(&ai, 0, sizeof(struct addrinfo));
306
 
    ai.ai_flags  = AI_PASSIVE;
307
 
    ai.ai_family = AF_UNSPEC;
308
 
    ai.ai_socktype = SOCK_STREAM;
309
 
    ai.ai_protocol= IPPROTO_TCP;
310
 
 
311
 
    ret= getaddrinfo(gearmand->host, port_str, &ai, &addrinfo);
312
 
    if (ret != 0)
313
 
    {
314
 
      GEARMAN_FATAL(gearmand, "_listen_init:getaddrinfo:%s", gai_strerror(ret))
315
 
      return GEARMAN_ERRNO;
316
 
    }
317
 
 
318
 
    for (addrinfo_next= addrinfo; addrinfo_next != NULL;
319
 
         addrinfo_next= addrinfo_next->ai_next)
320
 
    {
321
 
      ret= getnameinfo(addrinfo_next->ai_addr, addrinfo_next->ai_addrlen, host,
322
 
                       NI_MAXHOST, port_str, NI_MAXSERV,
323
 
                       NI_NUMERICHOST | NI_NUMERICSERV);
324
 
      if (ret != 0)
325
 
      {
326
 
        GEARMAN_ERROR(gearmand, "_listen_init:getnameinfo:%s",
327
 
                      gai_strerror(ret))
328
 
        strcpy(host, "-");
329
 
        strcpy(port_str, "-");
330
 
      }
331
 
 
332
 
      GEARMAN_DEBUG(gearmand, "Trying to listen on %s:%s", host, port_str)
333
 
 
334
 
      /* Call to socket() can fail for some getaddrinfo results, try another. */
335
 
      fd= socket(addrinfo_next->ai_family, addrinfo_next->ai_socktype,
336
 
                 addrinfo_next->ai_protocol);
337
 
      if (fd == -1)
338
 
      {
339
 
        GEARMAN_ERROR(gearmand, "Failed to listen on %s:%s", host, port_str)
340
 
        continue;
341
 
      }
342
 
 
343
 
      opt= 1;
344
 
      ret= setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
345
 
      if (ret == -1)
346
 
      {
347
 
        close(fd);
348
 
        GEARMAN_FATAL(gearmand, "_listen_init:setsockopt:%d", errno)
349
 
        return GEARMAN_ERRNO;
350
 
      }
351
 
 
352
 
      ret= bind(fd, addrinfo_next->ai_addr, addrinfo_next->ai_addrlen);
353
 
      if (ret == -1)
354
 
      {
355
 
        close(fd);
356
 
        if (errno == EADDRINUSE)
357
 
        {
358
 
          if (port->listen_fd == NULL)
359
 
          {
360
 
            GEARMAN_ERROR(gearmand, "Address already in use %s:%s", host,
361
 
                          port_str)
362
 
          }
363
 
 
364
 
          continue;
365
 
        }
366
 
 
367
 
        GEARMAN_FATAL(gearmand, "_listen_init:bind:%d", errno)
368
 
        return GEARMAN_ERRNO;
369
 
      }
370
 
 
371
 
      if (listen(fd, gearmand->backlog) == -1)
372
 
      {
373
 
        close(fd);
374
 
        GEARMAN_FATAL(gearmand, "_listen_init:listen:%d", errno)
375
 
        return GEARMAN_ERRNO;
376
 
      }
377
 
 
378
 
      fd_list= realloc(port->listen_fd, sizeof(int) * (port->listen_count + 1));
379
 
      if (fd_list == NULL)
380
 
      {
381
 
        close(fd);
382
 
        GEARMAN_FATAL(gearmand, "_listen_init:realloc:%d", errno)
383
 
        return GEARMAN_ERRNO;
384
 
      }
385
 
 
386
 
      port->listen_fd= fd_list;
387
 
      port->listen_fd[port->listen_count]= fd;
388
 
      port->listen_count++;
389
 
 
390
 
      GEARMAN_INFO(gearmand, "Listening on %s:%s (%d)", host, port_str, fd)
391
 
    }
392
 
 
393
 
    freeaddrinfo(addrinfo);
394
 
 
395
 
    /* Report last socket() error if we couldn't find an address to bind. */
396
 
    if (port->listen_fd == NULL)
397
 
    {
398
 
      GEARMAN_FATAL(gearmand,
399
 
                    "_listen_init:Could not bind/listen to any addresses")
400
 
      return GEARMAN_ERRNO;
401
 
    }
402
 
 
403
 
    port->listen_event= malloc(sizeof(struct event) * port->listen_count);
404
 
    if (port->listen_event == NULL)
405
 
    {
406
 
      GEARMAN_FATAL(gearmand, "_listen_init:malloc:%d", errno)
407
 
      return GEARMAN_ERRNO;
408
 
    }
409
 
 
410
 
    for (y= 0; y < port->listen_count; y++)
411
 
    {
412
 
      event_set(&(port->listen_event[y]), port->listen_fd[y],
413
 
                EV_READ | EV_PERSIST, _listen_event, port);
414
 
      event_base_set(gearmand->base, &(port->listen_event[y]));
415
 
    }
416
 
  }
417
 
 
418
 
  return GEARMAN_SUCCESS;
419
 
}
420
 
 
421
 
static void _listen_close(gearmand_st *gearmand)
422
 
{
423
 
  uint32_t x;
424
 
  uint32_t y;
425
 
 
426
 
  _listen_clear(gearmand);
427
 
 
428
 
  for (x= 0; x < gearmand->port_count; x++)
429
 
  {
430
 
    for (y= 0; y < gearmand->port_list[x].listen_count; y++)
431
 
    {
432
 
      if (gearmand->port_list[x].listen_fd[y] >= 0)
433
 
      {
434
 
        GEARMAN_INFO(gearmand, "Closing listening socket (%d)",
435
 
                     gearmand->port_list[x].listen_fd[y])
436
 
        close(gearmand->port_list[x].listen_fd[y]);
437
 
        gearmand->port_list[x].listen_fd[y]= -1;
438
 
      }
439
 
    }
440
 
  }
441
 
}
442
 
 
443
 
static gearman_return_t _listen_watch(gearmand_st *gearmand)
444
 
{
445
 
  uint32_t x;
446
 
  uint32_t y;
447
 
 
448
 
  if (gearmand->options & GEARMAND_LISTEN_EVENT)
449
 
    return GEARMAN_SUCCESS;
450
 
 
451
 
  for (x= 0; x < gearmand->port_count; x++)
452
 
  {
453
 
    for (y= 0; y < gearmand->port_list[x].listen_count; y++)
454
 
    {
455
 
      GEARMAN_INFO(gearmand, "Adding event for listening socket (%d)",
456
 
                   gearmand->port_list[x].listen_fd[y])
457
 
 
458
 
      if (event_add(&(gearmand->port_list[x].listen_event[y]), NULL) == -1)
459
 
      {
460
 
        GEARMAN_FATAL(gearmand, "_listen_watch:event_add:-1")
461
 
        return GEARMAN_EVENT;
462
 
      }
463
 
    }
464
 
  }
465
 
 
466
 
  gearmand->options|= GEARMAND_LISTEN_EVENT;
467
 
  return GEARMAN_SUCCESS;
468
 
}
469
 
 
470
 
static void _listen_clear(gearmand_st *gearmand)
471
 
{
472
 
  uint32_t x;
473
 
  uint32_t y;
474
 
 
475
 
  if (!(gearmand->options & GEARMAND_LISTEN_EVENT))
476
 
    return;
477
 
 
478
 
  for (x= 0; x < gearmand->port_count; x++)
479
 
  {
480
 
    for (y= 0; y < gearmand->port_list[x].listen_count; y++)
481
 
    {
482
 
      GEARMAN_INFO(gearmand, "Clearing event for listening socket (%d)",
483
 
                   gearmand->port_list[x].listen_fd[y])
484
 
      assert(event_del(&(gearmand->port_list[x].listen_event[y])) == 0);
485
 
    }
486
 
  }
487
 
 
488
 
  gearmand->options&= (gearmand_options_t)~GEARMAND_LISTEN_EVENT;
489
 
}
490
 
 
491
 
static void _listen_event(int fd, short events __attribute__ ((unused)),
492
 
                          void *arg)
493
 
{
494
 
  gearmand_port_st *port= (gearmand_port_st *)arg;
495
 
  struct sockaddr sa;
496
 
  socklen_t sa_len;
497
 
  char host[NI_MAXHOST];
498
 
  char port_str[NI_MAXSERV];
499
 
  int ret;
500
 
 
501
 
  sa_len= sizeof(sa);
502
 
  fd= accept(fd, &sa, &sa_len);
503
 
  if (fd == -1)
504
 
  {
505
 
    if (errno == EINTR)
506
 
      return;
507
 
    else if (errno == EMFILE)
508
 
    {
509
 
      GEARMAN_ERROR(port->gearmand, "_listen_event:accept:too many open files")
510
 
      return;
511
 
    }
512
 
 
513
 
    _clear_events(port->gearmand);
514
 
    GEARMAN_FATAL(port->gearmand, "_listen_event:accept:%d", errno)
515
 
    port->gearmand->ret= GEARMAN_ERRNO;
516
 
    return;
517
 
  }
518
 
 
519
 
  /* Since this is numeric, it should never fail. Even if it did we don't want
520
 
     to really error from it. */
521
 
  ret= getnameinfo(&sa, sa_len, host, NI_MAXHOST, port_str, NI_MAXSERV,
522
 
                   NI_NUMERICHOST | NI_NUMERICSERV);
523
 
  if (ret != 0)
524
 
  {
525
 
    GEARMAN_ERROR(port->gearmand, "_listen_event:getnameinfo:%s",
526
 
                  gai_strerror(ret))
527
 
    strcpy(host, "-");
528
 
    strcpy(port_str, "-");
529
 
  }
530
 
 
531
 
  GEARMAN_INFO(port->gearmand, "Accepted connection from %s:%s", host, port_str)
532
 
 
533
 
  port->gearmand->ret= gearmand_con_create(port->gearmand, fd, host, port_str,
534
 
                                           port->add_fn);
535
 
  if (port->gearmand->ret != GEARMAN_SUCCESS)
536
 
    _clear_events(port->gearmand);
537
 
}
538
 
 
539
 
static gearman_return_t _wakeup_init(gearmand_st *gearmand)
540
 
{
541
 
  int ret;
542
 
 
543
 
  GEARMAN_INFO(gearmand, "Creating wakeup pipe")
544
 
 
545
 
  ret= pipe(gearmand->wakeup_fd);
546
 
  if (ret == -1)
547
 
  {
548
 
    GEARMAN_FATAL(gearmand, "_wakeup_init:pipe:%d", errno)
549
 
    return GEARMAN_ERRNO;
550
 
  }
551
 
 
552
 
  ret= fcntl(gearmand->wakeup_fd[0], F_GETFL, 0);
553
 
  if (ret == -1)
554
 
  {
555
 
    GEARMAN_FATAL(gearmand, "_wakeup_init:fcntl:F_GETFL:%d", errno)
556
 
    return GEARMAN_ERRNO;
557
 
  }
558
 
 
559
 
  ret= fcntl(gearmand->wakeup_fd[0], F_SETFL, ret | O_NONBLOCK);
560
 
  if (ret == -1)
561
 
  {
562
 
    GEARMAN_FATAL(gearmand, "_wakeup_init:fcntl:F_SETFL:%d", errno)
563
 
    return GEARMAN_ERRNO;
564
 
  }
565
 
 
566
 
  event_set(&(gearmand->wakeup_event), gearmand->wakeup_fd[0],
567
 
            EV_READ | EV_PERSIST, _wakeup_event, gearmand);
568
 
  event_base_set(gearmand->base, &(gearmand->wakeup_event));
569
 
 
570
 
  return GEARMAN_SUCCESS;
571
 
}
572
 
 
573
 
static void _wakeup_close(gearmand_st *gearmand)
574
 
{
575
 
  _wakeup_clear(gearmand);
576
 
 
577
 
  if (gearmand->wakeup_fd[0] >= 0)
578
 
  {
579
 
    GEARMAN_INFO(gearmand, "Closing wakeup pipe")
580
 
    close(gearmand->wakeup_fd[0]);
581
 
    gearmand->wakeup_fd[0]= -1;
582
 
    close(gearmand->wakeup_fd[1]);
583
 
    gearmand->wakeup_fd[1]= -1;
584
 
  }
585
 
}
586
 
 
587
 
static gearman_return_t _wakeup_watch(gearmand_st *gearmand)
588
 
{
589
 
  if (gearmand->options & GEARMAND_WAKEUP_EVENT)
590
 
    return GEARMAN_SUCCESS;
591
 
 
592
 
  GEARMAN_INFO(gearmand, "Adding event for wakeup pipe")
593
 
 
594
 
  if (event_add(&(gearmand->wakeup_event), NULL) == -1)
595
 
  {
596
 
    GEARMAN_FATAL(gearmand, "_wakeup_watch:event_add:-1")
597
 
    return GEARMAN_EVENT;
598
 
  }
599
 
 
600
 
  gearmand->options|= GEARMAND_WAKEUP_EVENT;
601
 
  return GEARMAN_SUCCESS;
602
 
}
603
 
 
604
 
static void _wakeup_clear(gearmand_st *gearmand)
605
 
{
606
 
  if (gearmand->options & GEARMAND_WAKEUP_EVENT)
607
 
  {
608
 
    GEARMAN_INFO(gearmand, "Clearing event for wakeup pipe")
609
 
    assert(event_del(&(gearmand->wakeup_event)) == 0);
610
 
    gearmand->options&= (gearmand_options_t)~GEARMAND_WAKEUP_EVENT;
611
 
  }
612
 
}
613
 
 
614
 
static void _wakeup_event(int fd, short events __attribute__ ((unused)),
615
 
                          void *arg)
616
 
{
617
 
  gearmand_st *gearmand= (gearmand_st *)arg;
618
 
  uint8_t buffer[GEARMAN_PIPE_BUFFER_SIZE];
619
 
  ssize_t ret;
620
 
  ssize_t x;
621
 
  gearmand_thread_st *thread;
622
 
 
623
 
  while (1)
624
 
  {
625
 
    ret= read(fd, buffer, GEARMAN_PIPE_BUFFER_SIZE);
626
 
    if (ret == 0)
627
 
    {
628
 
      _clear_events(gearmand);
629
 
      GEARMAN_FATAL(gearmand, "_wakeup_event:read:EOF")
630
 
      gearmand->ret= GEARMAN_PIPE_EOF;
631
 
      return;
632
 
    }
633
 
    else if (ret == -1)
634
 
    {
635
 
      if (errno == EINTR)
636
 
        continue;
637
 
 
638
 
      if (errno == EAGAIN)
639
 
        break;
640
 
 
641
 
      _clear_events(gearmand);
642
 
      GEARMAN_FATAL(gearmand, "_wakeup_event:read:%d", errno)
643
 
      gearmand->ret= GEARMAN_ERRNO;
644
 
      return;
645
 
    }
646
 
 
647
 
    for (x= 0; x < ret; x++)
648
 
    {
649
 
      switch ((gearmand_wakeup_t)buffer[x])
650
 
      {
651
 
      case GEARMAND_WAKEUP_PAUSE:
652
 
        GEARMAN_INFO(gearmand, "Received PAUSE wakeup event")
653
 
        _clear_events(gearmand);
654
 
        gearmand->ret= GEARMAN_PAUSE;
655
 
        break;
656
 
 
657
 
      case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
658
 
        GEARMAN_INFO(gearmand, "Received SHUTDOWN_GRACEFUL wakeup event")
659
 
        _listen_close(gearmand);
660
 
 
661
 
        for (thread= gearmand->thread_list; thread != NULL;
662
 
             thread= thread->next)
663
 
        {
664
 
          gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL);
665
 
        }
666
 
 
667
 
        gearmand->ret= GEARMAN_SHUTDOWN_GRACEFUL;
668
 
        break;
669
 
 
670
 
      case GEARMAND_WAKEUP_SHUTDOWN:
671
 
        GEARMAN_INFO(gearmand, "Received SHUTDOWN wakeup event")
672
 
        _clear_events(gearmand);
673
 
        gearmand->ret= GEARMAN_SHUTDOWN;
674
 
        break;
675
 
 
676
 
      case GEARMAND_WAKEUP_CON:
677
 
      case GEARMAND_WAKEUP_RUN:
678
 
      default:
679
 
        GEARMAN_FATAL(gearmand, "Received unknown wakeup event (%u)",
680
 
                      buffer[x])
681
 
        _clear_events(gearmand);
682
 
        gearmand->ret= GEARMAN_UNKNOWN_STATE;
683
 
        break;
684
 
      }
685
 
    }
686
 
  }
687
 
}
688
 
 
689
 
static gearman_return_t _watch_events(gearmand_st *gearmand)
690
 
{
691
 
  gearman_return_t ret;
692
 
 
693
 
  ret= _listen_watch(gearmand);
694
 
  if (ret != GEARMAN_SUCCESS)
695
 
    return ret;
696
 
 
697
 
  ret= _wakeup_watch(gearmand);
698
 
  if (ret != GEARMAN_SUCCESS)
699
 
    return ret;
700
 
 
701
 
  return GEARMAN_SUCCESS;
702
 
}
703
 
 
704
 
static void _clear_events(gearmand_st *gearmand)
705
 
{
706
 
  _listen_clear(gearmand);
707
 
  _wakeup_clear(gearmand);
708
 
 
709
 
  /* If we are not threaded, tell the fake thread to shutdown now to clear
710
 
     connections. Otherwise we will never exit the libevent loop. */
711
 
  if (gearmand->threads == 0 && gearmand->thread_list != NULL)
712
 
    gearmand_thread_wakeup(gearmand->thread_list, GEARMAND_WAKEUP_SHUTDOWN);
713
 
}
714
 
 
715
 
static void _close_events(gearmand_st *gearmand)
716
 
{
717
 
  _listen_close(gearmand);
718
 
  _wakeup_close(gearmand);
719
 
}