5
#define SERVER_PORT "7003"
12
#include "server_common.h"
14
static struct event_base *main_base;
17
void server_read(int fd, short event, void *arg);
18
bool server_response(gearman_connection_st *gear_conn);
20
int add_listener(int fd, int client)
22
gearman_connection_st *gear_conn;
24
gear_conn= gearman_connection_create(NULL);
27
(void)gearman_connection_add_fd(gear_conn, fd);
29
WATCHPOINT_NUMBER(client);
31
gear_conn->state= GEARMAN_CONNECTION_STATE_READ;
33
/* Initalize one event */
34
WATCHPOINT_NUMBER(fd);
35
WATCHPOINT_ASSERT(fd);
36
event_set(&gear_conn->evfifo, fd, EV_READ | EV_PERSIST, server_read, (void *)gear_conn);
38
event_base_set(main_base, &gear_conn->evfifo);
40
/* Add it to the active events, without a timeout */
41
if (event_add(&gear_conn->evfifo, NULL) == -1)
56
struct addrinfo *next;
57
struct addrinfo hints;
60
memset(&hints, 0, sizeof (hints));
62
hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
63
hints.ai_socktype = SOCK_STREAM;
65
int e= getaddrinfo(NULL, SERVER_PORT, &hints, &ai);
72
fprintf(stderr, "no hosts found\n");
79
fd= socket(next->ai_family, next->ai_socktype,
83
(void)setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, sizeof(int));
85
if (bind(fd, next->ai_addr, next->ai_addrlen) != 0)
87
if (errno != EADDRINUSE)
94
else if (listen(fd, 100) != 0)
101
WATCHPOINT_NUMBER(fd);
104
} while ((next= next->ai_next) != NULL);
111
void server_read(int fd, short event, void *arg)
113
gearman_connection_st *gear_conn;
114
gear_conn= (gearman_connection_st *)arg;
118
fprintf(stderr, "server_read called with fd: %d, event: %d, arg: %p\n", fd, event, arg);
122
WATCHPOINT_NUMBER(gear_conn->state);
123
switch (gear_conn->state)
125
case GEARMAN_CONNECTION_STATE_LISTENING:
127
struct sockaddr_storage addr;
131
addrlen = sizeof(addr);
132
if ((client_fd= accept(fd, (struct sockaddr *)&addr, &addrlen)) == -1)
139
add_listener(client_fd, 1);
143
case GEARMAN_CONNECTION_STATE_READ:
145
run= server_response(gear_conn);
148
case GEARMAN_CONNECTION_STATE_WRITE:
151
case GEARMAN_CONNECTION_STATE_CLOSE:
153
WATCHPOINT_STRING("CLOSING DOWN BABY");
154
gearman_connection_free(gear_conn);
161
if (run == false && gearman_connection_buffered(gear_conn))
165
/* Reschedule this event */
166
WATCHPOINT_STRING("rescheduling");
167
event_add(&gear_conn->evfifo, NULL);
170
bool server_response(gearman_connection_st *gear_conn)
175
rc= gearman_response(&gear_conn->server, NULL, &gear_conn->result);
176
WATCHPOINT_ERROR(rc);
178
if (rc == GEARMAN_FAILURE || rc == GEARMAN_PROTOCOL_ERROR || rc == GEARMAN_READ_FAILURE)
180
gear_conn->state= GEARMAN_CONNECTION_STATE_CLOSE;
183
else if (rc != GEARMAN_SUCCESS)
185
WATCHPOINT_ERROR(rc);
189
WATCHPOINT_ERROR(rc);
190
WATCHPOINT_NUMBER(gearman_result_length(&gear_conn->result));
192
WATCHPOINT_ACTION(gear_conn->result.action);
194
switch(gear_conn->result.action)
196
case GEARMAN_ECHO_REQ:
201
giov.arg= gearman_result_value(&gear_conn->result);
202
giov.arg_length= gearman_result_length(&gear_conn->result);
204
rc= gearman_dispatch(&gear_conn->server, GEARMAN_ECHO_RES, &giov, 1);
205
WATCHPOINT_ASSERT(rc == GEARMAN_SUCCESS);
211
case GEARMAN_CAN_DO_TIMEOUT:
213
/* We should be storing this + client into a hash table for lookup */
217
case GEARMAN_SET_CLIENT_ID:
219
/* Update hash structure storage with CLIENT ID */
223
case GEARMAN_RESET_ABILITIES:
225
/* We should remove the worker from the queue at this point */
229
case GEARMAN_GRAB_JOB:
233
/* Find a job that is available or return no_job */
234
rc= gearman_dispatch(&gear_conn->server, GEARMAN_NO_JOB, NULL, 1);
235
WATCHPOINT_ASSERT(rc == GEARMAN_SUCCESS);
237
/* Once send the job, we will wait for the worker to tell us when it is done */
242
case GEARMAN_SUBMIT_JOB:
243
case GEARMAN_SUBMIT_JOB_HIGH:
244
case GEARMAN_SUBMIT_JOB_BJ:
249
giov.arg= "foomanchoo";
250
giov.arg_length= strlen("foomanchoo");
251
rc= gearman_dispatch(&gear_conn->server, GEARMAN_JOB_CREATED, &giov, 1);
252
WATCHPOINT_ASSERT(rc == GEARMAN_SUCCESS);
257
case GEARMAN_PRE_SLEEP:
260
case GEARMAN_WORK_FAIL:
261
case GEARMAN_GET_STATUS:
265
/* We should look up work here, and dispatch as appropriate */
270
giov[0].arg= "foomanchoo";
271
giov[0].arg_length= strlen("foomanchoo");
273
giov[1].arg= gearman_result_value(&gear_conn->result);
274
giov[1].arg_length= gearman_result_length(&gear_conn->result);
276
rc= gearman_dispatch(&gear_conn->server, GEARMAN_WORK_COMPLETE, giov, 1);
277
WATCHPOINT_ASSERT(rc == GEARMAN_SUCCESS);
281
rc= gearman_dispatch(&gear_conn->server, GEARMAN_WORK_FAIL, NULL, 1);
282
WATCHPOINT_ASSERT(rc == GEARMAN_SUCCESS);
288
case GEARMAN_ECHO_RES:
289
case GEARMAN_JOB_CREATED:
290
case GEARMAN_CANT_DO:
291
case GEARMAN_WORK_COMPLETE:
293
case GEARMAN_WORK_STATUS:
294
case GEARMAN_JOB_ASSIGN:
295
case GEARMAN_STATUS_RES:
296
case GEARMAN_SUBMIT_JOB_SCHEDULUED:
297
case GEARMAN_SUBMIT_JOB_FUTURE:
298
WATCHPOINT_ACTION(gear_conn->result.action);
299
WATCHPOINT_ASSERT(0);
306
int main (int argc, char **argv)
311
* ignore SIGPIPE signals; we can use errno==EPIPE if we
312
* need that information
314
sa.sa_handler = SIG_IGN;
316
if (sigemptyset(&sa.sa_mask) == -1 ||
317
sigaction(SIGPIPE, &sa, 0) == -1)
319
perror("failed to ignore SIGPIPE; sigaction");
324
/* Initalize the event library */
325
main_base= event_init();
331
event_base_loop(main_base, 0);