~mm-yuhu/gearmand/server-funcs

« back to all changes in this revision

Viewing changes to libgearman/gearmand.c

  • Committer: Eric Day
  • Date: 2008-12-25 02:00:12 UTC
  • Revision ID: eday@oddments.org-20081225020012-ko1ypndkphx9pemi
First pass at libevent interface, server interface, and gearmand.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Gearman server and library
 
2
 * Copyright (C) 2008 Brian Aker, Eric Day
 
3
 * All rights reserved.
 
4
 *
 
5
 * Use and distribution licensed under the BSD license.  See
 
6
 * the COPYING file in the parent directory for full text.
 
7
 */
 
8
 
 
9
/**
 
10
 * @file
 
11
 * @brief Basic Server definitions
 
12
 */
 
13
 
 
14
#include "common.h"
 
15
 
 
16
/*
 
17
 * Private declarations
 
18
 */
 
19
 
 
20
/**
 
21
 * @addtogroup gearmand_private Private Basic Server Functions
 
22
 * @ingroup gearmand
 
23
 * @{
 
24
 */
 
25
 
 
26
#ifdef HAVE_EVENT_H
 
27
static gearman_return_t _listen_init(in_port_t port, int backlog,
 
28
                                     struct event_base *base,
 
29
                                     struct event *event, void *arg);
 
30
 
 
31
static void _listen_accept(int fd, short events __attribute__ ((unused)),
 
32
                           void *arg);
 
33
 
 
34
static gearman_return_t _con_watch(gearman_con_st *con, short events,
 
35
                                   void *arg __attribute__ ((unused)));
 
36
 
 
37
static void _con_ready(int fd __attribute__ ((unused)), short events,
 
38
                       void *arg);
 
39
 
 
40
static gearman_return_t _con_close(gearman_con_st *con, gearman_return_t ret,
 
41
                                   void *arg __attribute__ ((unused)));
 
42
#endif
 
43
 
 
44
/** @} */
 
45
 
 
46
/*
 
47
 * Public definitions
 
48
 */
 
49
 
 
50
gearmand_st *gearmand_init(in_port_t port, int backlog)
 
51
{
 
52
#ifdef HAVE_EVENT_H
 
53
  gearmand_st *gearmand;
 
54
 
 
55
  gearmand= malloc(sizeof(gearmand_st));
 
56
  if (gearmand == NULL)
 
57
    return NULL;
 
58
 
 
59
  memset(gearmand, 0, sizeof(gearmand_st));
 
60
  
 
61
  gearmand->base= event_init();
 
62
  if (gearmand->base == NULL)
 
63
  {
 
64
    free(gearmand);
 
65
    printf("event_init:NULL\n");
 
66
    return NULL;
 
67
  }
 
68
 
 
69
  if (gearman_server_create(&(gearmand->server)) == NULL)
 
70
  {
 
71
    free(gearmand);
 
72
    printf("gearman_server_create:NULL\n");
 
73
    return NULL;
 
74
  }
 
75
 
 
76
  gearman_server_set_event_cb(&(gearmand->server), _con_watch, _con_close,
 
77
                              NULL);
 
78
 
 
79
  if (_listen_init(port, backlog, gearmand->base, &(gearmand->listen_event),
 
80
                   gearmand) != GEARMAN_SUCCESS)
 
81
  {
 
82
    gearmand_destroy(gearmand);
 
83
    return NULL;
 
84
  }
 
85
 
 
86
  return gearmand;
 
87
#else
 
88
  (void) port;
 
89
  (void) backlog;
 
90
  printf("Library not built with libevent support!\n");
 
91
  return NULL;
 
92
#endif
 
93
}
 
94
 
 
95
void gearmand_destroy(gearmand_st *gearmand)
 
96
{
 
97
#ifdef HAVE_EVENT_H
 
98
  gearman_server_free(&(gearmand->server));
 
99
  event_base_free(gearmand->base);
 
100
  free(gearmand);
 
101
#else
 
102
  (void) gearmand;
 
103
#endif
 
104
}
 
105
 
 
106
void gearmand_run(gearmand_st *gearmand)
 
107
{
 
108
#ifdef HAVE_EVENT_H
 
109
  if (event_base_loop(gearmand->base, 0) == -1)
 
110
    printf("event_base_loop:-1\n");
 
111
#else
 
112
  (void) gearmand;
 
113
#endif
 
114
}
 
115
 
 
116
/*
 
117
 * Private definitions
 
118
 */
 
119
 
 
120
#ifdef HAVE_EVENT_H
 
121
static gearman_return_t _listen_init(in_port_t port, int backlog,
 
122
                                     struct event_base *base,
 
123
                                     struct event *event, void *arg)
 
124
{
 
125
  struct sockaddr_in sa;
 
126
  int fd;
 
127
  int opt= 1;
 
128
 
 
129
  if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
 
130
  {
 
131
    printf("signal:%d\n", errno);
 
132
    return GEARMAN_ERRNO;
 
133
  }
 
134
 
 
135
  memset(&sa, 0, sizeof(sa));
 
136
  sa.sin_family = AF_INET;
 
137
  sa.sin_port= htons(port);
 
138
  sa.sin_addr.s_addr = INADDR_ANY;
 
139
 
 
140
  fd= socket(sa.sin_family, SOCK_STREAM, 0);
 
141
  if (fd == -1)
 
142
  {
 
143
    printf("socket:%d\n", errno);
 
144
    return GEARMAN_ERRNO;
 
145
  }
 
146
 
 
147
  if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1)
 
148
  {
 
149
    printf("setsockopt:%d\n", errno);
 
150
    return GEARMAN_ERRNO;
 
151
  }
 
152
 
 
153
  if (bind(fd, (struct sockaddr *)(&sa), sizeof(sa)) == -1)
 
154
  {
 
155
    printf("bind:%d\n", errno);
 
156
    return GEARMAN_ERRNO;
 
157
  }
 
158
 
 
159
  if (listen(fd, backlog) == -1)
 
160
  {
 
161
    printf("listen:%d\n", errno);
 
162
    return GEARMAN_ERRNO;
 
163
  }
 
164
 
 
165
  event_set(event, fd, EV_READ | EV_PERSIST, _listen_accept, arg);
 
166
  event_base_set(base, event);
 
167
 
 
168
  if (event_add(event, NULL) == -1)
 
169
  {
 
170
    printf("event_add\n");
 
171
    return GEARMAN_ERRNO;
 
172
  }
 
173
 
 
174
  return GEARMAN_SUCCESS;
 
175
}
 
176
 
 
177
static void _listen_accept(int fd, short events __attribute__ ((unused)),
 
178
                           void *arg)
 
179
{
 
180
  gearmand_st *gearmand= (gearmand_st *)arg;
 
181
  gearmand_con_st *con;
 
182
  socklen_t sa_len;
 
183
  gearman_return_t ret;
 
184
 
 
185
  con= malloc(sizeof(gearmand_con_st));
 
186
  if (con == NULL)
 
187
  {
 
188
    printf("malloc:%d\n", errno);
 
189
    exit(1);
 
190
  }
 
191
 
 
192
  sa_len= sizeof(con->sa);
 
193
  con->fd= accept(fd, (struct sockaddr *)(&(con->sa)), &sa_len);
 
194
  if (con->fd == -1)
 
195
  {
 
196
    free(con);
 
197
    printf("accept:%d\n", errno);
 
198
    exit(1);
 
199
  }
 
200
 
 
201
  printf("Connect: %s:%u\n", inet_ntoa(con->sa.sin_addr),
 
202
         ntohs(con->sa.sin_port));
 
203
 
 
204
  con->gearmand= gearmand;
 
205
 
 
206
  if (gearman_server_add_con(&(gearmand->server), &(con->server_con),
 
207
      con->fd) == NULL)
 
208
  {
 
209
    close(con->fd);
 
210
    free(con);
 
211
    printf("gearman_server_add_con:%s\n",
 
212
           gearman_server_error(&(gearmand->server)));
 
213
    exit(1);
 
214
  }
 
215
 
 
216
  gearman_server_con_set_data(&(con->server_con), con);
 
217
 
 
218
  ret= gearman_server_run(&(gearmand->server));
 
219
  if (ret != GEARMAN_SUCCESS && ret != GEARMAN_IO_WAIT)
 
220
  {
 
221
    printf("gearman_server_run:%s\n",
 
222
           gearman_server_error(&(gearmand->server)));
 
223
    exit(1);
 
224
  }
 
225
}
 
226
 
 
227
static gearman_return_t _con_watch(gearman_con_st *con, short events,
 
228
                                   void *arg __attribute__ ((unused)))
 
229
{
 
230
  gearmand_con_st *gcon;
 
231
  short set_events= 0;
 
232
 
 
233
  gcon= (gearmand_con_st *)gearman_con_data(con);
 
234
  gcon->con= con;
 
235
 
 
236
  if (events & POLLIN)
 
237
    set_events|= EV_READ;
 
238
  if (events & POLLOUT)
 
239
    set_events|= EV_WRITE;
 
240
 
 
241
  event_set(&(gcon->event), gcon->fd, set_events, _con_ready, gcon);
 
242
  event_base_set(gcon->gearmand->base, &(gcon->event));
 
243
 
 
244
  if (event_add(&(gcon->event), NULL) == -1)
 
245
    return GEARMAN_EVENT;
 
246
 
 
247
  return GEARMAN_SUCCESS;
 
248
}
 
249
 
 
250
static void _con_ready(int fd __attribute__ ((unused)), short events,
 
251
                       void *arg)
 
252
{
 
253
  gearmand_con_st *gcon= (gearmand_con_st *)arg;
 
254
  short revents= 0;
 
255
  gearman_return_t ret;
 
256
 
 
257
  if (events & EV_READ)
 
258
    revents|= POLLIN;
 
259
  if (events & EV_WRITE)
 
260
    revents|= POLLOUT;
 
261
 
 
262
  gearman_con_set_revents(gcon->con, revents);
 
263
 
 
264
  ret= gearman_server_run(&(gcon->gearmand->server));
 
265
  if (ret != GEARMAN_SUCCESS && ret != GEARMAN_IO_WAIT)
 
266
  {
 
267
    printf("gearman_server_run:%s\n",
 
268
           gearman_server_error(&(gcon->gearmand->server)));
 
269
    exit(1);
 
270
  }
 
271
}
 
272
 
 
273
static gearman_return_t _con_close(gearman_con_st *con, gearman_return_t ret,
 
274
                                   void *arg __attribute__ ((unused)))
 
275
{
 
276
  gearmand_con_st *gcon= (gearmand_con_st *)gearman_con_data(con);
 
277
 
 
278
  if (ret != GEARMAN_SUCCESS)
 
279
    printf("_con_close:%s\n", gearman_server_error(&(gcon->gearmand->server)));
 
280
 
 
281
  if (event_del(&(gcon->event)) == -1)
 
282
    return GEARMAN_EVENT;
 
283
 
 
284
  gearman_con_free(con);
 
285
  free(gcon);
 
286
 
 
287
  return GEARMAN_SUCCESS;
 
288
}
 
289
#endif