1
/* Gearman server and library
2
* Copyright (C) 2008 Brian Aker, Eric Day
5
* Use and distribution licensed under the BSD license. See
6
* the COPYING file in the parent directory for full text.
11
* @brief Basic Server definitions
17
* Private declarations
21
* @addtogroup gearmand_private Private Basic Server Functions
27
static gearman_return_t _listen_init(in_port_t port, int backlog,
28
struct event_base *base,
29
struct event *event, void *arg);
31
static void _listen_accept(int fd, short events __attribute__ ((unused)),
34
static gearman_return_t _con_watch(gearman_con_st *con, short events,
35
void *arg __attribute__ ((unused)));
37
static void _con_ready(int fd __attribute__ ((unused)), short events,
40
static gearman_return_t _con_close(gearman_con_st *con, gearman_return_t ret,
41
void *arg __attribute__ ((unused)));
50
gearmand_st *gearmand_init(in_port_t port, int backlog)
53
gearmand_st *gearmand;
55
gearmand= malloc(sizeof(gearmand_st));
59
memset(gearmand, 0, sizeof(gearmand_st));
61
gearmand->base= event_init();
62
if (gearmand->base == NULL)
65
printf("event_init:NULL\n");
69
if (gearman_server_create(&(gearmand->server)) == NULL)
72
printf("gearman_server_create:NULL\n");
76
gearman_server_set_event_cb(&(gearmand->server), _con_watch, _con_close,
79
if (_listen_init(port, backlog, gearmand->base, &(gearmand->listen_event),
80
gearmand) != GEARMAN_SUCCESS)
82
gearmand_destroy(gearmand);
90
printf("Library not built with libevent support!\n");
95
void gearmand_destroy(gearmand_st *gearmand)
98
gearman_server_free(&(gearmand->server));
99
event_base_free(gearmand->base);
106
void gearmand_run(gearmand_st *gearmand)
109
if (event_base_loop(gearmand->base, 0) == -1)
110
printf("event_base_loop:-1\n");
117
* Private definitions
121
static gearman_return_t _listen_init(in_port_t port, int backlog,
122
struct event_base *base,
123
struct event *event, void *arg)
125
struct sockaddr_in sa;
129
if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
131
printf("signal:%d\n", errno);
132
return GEARMAN_ERRNO;
135
memset(&sa, 0, sizeof(sa));
136
sa.sin_family = AF_INET;
137
sa.sin_port= htons(port);
138
sa.sin_addr.s_addr = INADDR_ANY;
140
fd= socket(sa.sin_family, SOCK_STREAM, 0);
143
printf("socket:%d\n", errno);
144
return GEARMAN_ERRNO;
147
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1)
149
printf("setsockopt:%d\n", errno);
150
return GEARMAN_ERRNO;
153
if (bind(fd, (struct sockaddr *)(&sa), sizeof(sa)) == -1)
155
printf("bind:%d\n", errno);
156
return GEARMAN_ERRNO;
159
if (listen(fd, backlog) == -1)
161
printf("listen:%d\n", errno);
162
return GEARMAN_ERRNO;
165
event_set(event, fd, EV_READ | EV_PERSIST, _listen_accept, arg);
166
event_base_set(base, event);
168
if (event_add(event, NULL) == -1)
170
printf("event_add\n");
171
return GEARMAN_ERRNO;
174
return GEARMAN_SUCCESS;
177
static void _listen_accept(int fd, short events __attribute__ ((unused)),
180
gearmand_st *gearmand= (gearmand_st *)arg;
181
gearmand_con_st *con;
183
gearman_return_t ret;
185
con= malloc(sizeof(gearmand_con_st));
188
printf("malloc:%d\n", errno);
192
sa_len= sizeof(con->sa);
193
con->fd= accept(fd, (struct sockaddr *)(&(con->sa)), &sa_len);
197
printf("accept:%d\n", errno);
201
printf("Connect: %s:%u\n", inet_ntoa(con->sa.sin_addr),
202
ntohs(con->sa.sin_port));
204
con->gearmand= gearmand;
206
if (gearman_server_add_con(&(gearmand->server), &(con->server_con),
211
printf("gearman_server_add_con:%s\n",
212
gearman_server_error(&(gearmand->server)));
216
gearman_server_con_set_data(&(con->server_con), con);
218
ret= gearman_server_run(&(gearmand->server));
219
if (ret != GEARMAN_SUCCESS && ret != GEARMAN_IO_WAIT)
221
printf("gearman_server_run:%s\n",
222
gearman_server_error(&(gearmand->server)));
227
static gearman_return_t _con_watch(gearman_con_st *con, short events,
228
void *arg __attribute__ ((unused)))
230
gearmand_con_st *gcon;
233
gcon= (gearmand_con_st *)gearman_con_data(con);
237
set_events|= EV_READ;
238
if (events & POLLOUT)
239
set_events|= EV_WRITE;
241
event_set(&(gcon->event), gcon->fd, set_events, _con_ready, gcon);
242
event_base_set(gcon->gearmand->base, &(gcon->event));
244
if (event_add(&(gcon->event), NULL) == -1)
245
return GEARMAN_EVENT;
247
return GEARMAN_SUCCESS;
250
static void _con_ready(int fd __attribute__ ((unused)), short events,
253
gearmand_con_st *gcon= (gearmand_con_st *)arg;
255
gearman_return_t ret;
257
if (events & EV_READ)
259
if (events & EV_WRITE)
262
gearman_con_set_revents(gcon->con, revents);
264
ret= gearman_server_run(&(gcon->gearmand->server));
265
if (ret != GEARMAN_SUCCESS && ret != GEARMAN_IO_WAIT)
267
printf("gearman_server_run:%s\n",
268
gearman_server_error(&(gcon->gearmand->server)));
273
static gearman_return_t _con_close(gearman_con_st *con, gearman_return_t ret,
274
void *arg __attribute__ ((unused)))
276
gearmand_con_st *gcon= (gearmand_con_st *)gearman_con_data(con);
278
if (ret != GEARMAN_SUCCESS)
279
printf("_con_close:%s\n", gearman_server_error(&(gcon->gearmand->server)));
281
if (event_del(&(gcon->event)) == -1)
282
return GEARMAN_EVENT;
284
gearman_con_free(con);
287
return GEARMAN_SUCCESS;