~mm-yuhu/gearmand/server-funcs

« back to all changes in this revision

Viewing changes to server/main.c

  • Committer: Brian Aker
  • Date: 2008-09-15 18:17:17 UTC
  • Revision ID: brian@gir.lan-20080915181717-mb6tl0n8kr8q76gx
Import from HG

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
  Main GearmanDC server
 
3
 */
 
4
 
 
5
#define SERVER_PORT "7003"
 
6
#define MAX_MSG 100
 
7
 
 
8
#define SUCCESS 0
 
9
#define ERROR   1
 
10
 
 
11
#include <signal.h>
 
12
#include "server_common.h"
 
13
 
 
14
static struct event_base *main_base;
 
15
 
 
16
/* Prototypes */
 
17
void server_read(int fd, short event, void *arg);
 
18
bool server_response(gearman_connection_st *gear_conn);
 
19
 
 
20
int add_listener(int fd, int client)
 
21
{
 
22
  gearman_connection_st *gear_conn;
 
23
 
 
24
  gear_conn= gearman_connection_create(NULL);
 
25
  assert(gear_conn);
 
26
 
 
27
  (void)gearman_connection_add_fd(gear_conn, fd);
 
28
 
 
29
  WATCHPOINT_NUMBER(client);
 
30
  if (client)
 
31
    gear_conn->state= GEARMAN_CONNECTION_STATE_READ;
 
32
 
 
33
  /* Initalize one event */
 
34
  WATCHPOINT_NUMBER(fd);
 
35
  WATCHPOINT_ASSERT(fd);
 
36
  event_set(&gear_conn->evfifo, fd, EV_READ | EV_PERSIST, server_read, (void *)gear_conn);
 
37
 
 
38
  event_base_set(main_base, &gear_conn->evfifo);
 
39
 
 
40
  /* Add it to the active events, without a timeout */
 
41
  if (event_add(&gear_conn->evfifo, NULL) == -1)
 
42
  {
 
43
    WATCHPOINT;
 
44
    perror("event_add");
 
45
    WATCHPOINT;
 
46
    exit(1);
 
47
  }
 
48
 
 
49
  return 0;
 
50
}
 
51
 
 
52
int startup(void)
 
53
{
 
54
  int fd;
 
55
  struct addrinfo *ai;
 
56
  struct addrinfo *next;
 
57
  struct addrinfo hints;
 
58
  int arg;
 
59
 
 
60
  memset(&hints, 0, sizeof (hints));
 
61
 
 
62
  hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
 
63
  hints.ai_socktype = SOCK_STREAM;
 
64
 
 
65
  int e= getaddrinfo(NULL, SERVER_PORT, &hints, &ai);
 
66
 
 
67
  if (e != 0)
 
68
    exit(1);
 
69
 
 
70
  if (ai == NULL)
 
71
  {
 
72
    fprintf(stderr, "no hosts found\n");
 
73
    exit(1);
 
74
  }
 
75
 
 
76
  next= ai;
 
77
  do 
 
78
  {
 
79
    fd= socket(next->ai_family, next->ai_socktype,
 
80
               next->ai_protocol);
 
81
    
 
82
    arg= 1;
 
83
    (void)setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, sizeof(int));
 
84
 
 
85
    if (bind(fd, next->ai_addr, next->ai_addrlen) != 0)
 
86
    {
 
87
      if (errno != EADDRINUSE) 
 
88
      {
 
89
        perror("bind()");
 
90
        exit(1);
 
91
      }
 
92
      close(fd);
 
93
    }
 
94
    else if (listen(fd, 100) != 0)
 
95
    {
 
96
      exit(1);
 
97
    }
 
98
    else
 
99
    {
 
100
      add_listener(fd, 0);
 
101
      WATCHPOINT_NUMBER(fd);
 
102
    }
 
103
 
 
104
  } while ((next= next->ai_next) != NULL);
 
105
 
 
106
  freeaddrinfo(ai);
 
107
 
 
108
  return 0;
 
109
}
 
110
 
 
111
void server_read(int fd, short event, void *arg)
 
112
{
 
113
  gearman_connection_st *gear_conn;
 
114
  gear_conn= (gearman_connection_st *)arg;
 
115
  bool run= true;
 
116
 
 
117
 
 
118
  fprintf(stderr, "server_read called with fd: %d, event: %d, arg: %p\n", fd, event, arg);
 
119
 
 
120
  while (run)
 
121
  {
 
122
    WATCHPOINT_NUMBER(gear_conn->state);
 
123
    switch (gear_conn->state)
 
124
    {
 
125
    case GEARMAN_CONNECTION_STATE_LISTENING:
 
126
      {
 
127
        struct sockaddr_storage addr;
 
128
        socklen_t addrlen;
 
129
        int client_fd;
 
130
 
 
131
        addrlen = sizeof(addr);
 
132
        if ((client_fd= accept(fd, (struct sockaddr *)&addr, &addrlen)) == -1) 
 
133
        {
 
134
          perror("accept");
 
135
          return;
 
136
        }
 
137
        assert(client_fd);
 
138
 
 
139
        add_listener(client_fd, 1);
 
140
        run= false;
 
141
      }
 
142
      break;
 
143
    case GEARMAN_CONNECTION_STATE_READ:
 
144
      WATCHPOINT;
 
145
      run= server_response(gear_conn);
 
146
      WATCHPOINT;
 
147
      break;
 
148
    case GEARMAN_CONNECTION_STATE_WRITE:
 
149
      assert(0);
 
150
      break;
 
151
    case GEARMAN_CONNECTION_STATE_CLOSE:
 
152
      {
 
153
        WATCHPOINT_STRING("CLOSING DOWN BABY");
 
154
        gearman_connection_free(gear_conn);
 
155
        return;
 
156
      }
 
157
    default:
 
158
      assert(0);
 
159
    }
 
160
    
 
161
    if (run == false && gearman_connection_buffered(gear_conn))
 
162
      run= true;
 
163
  }
 
164
 
 
165
  /* Reschedule this event */
 
166
  WATCHPOINT_STRING("rescheduling");
 
167
  event_add(&gear_conn->evfifo, NULL);
 
168
}
 
169
 
 
170
bool server_response(gearman_connection_st *gear_conn)
 
171
{
 
172
  gearman_return rc;
 
173
  bool run;
 
174
 
 
175
  rc= gearman_response(&gear_conn->server, NULL, &gear_conn->result); 
 
176
  WATCHPOINT_ERROR(rc);
 
177
 
 
178
  if (rc == GEARMAN_FAILURE || rc == GEARMAN_PROTOCOL_ERROR || rc == GEARMAN_READ_FAILURE)
 
179
  {
 
180
    gear_conn->state= GEARMAN_CONNECTION_STATE_CLOSE;
 
181
    return true;
 
182
  } 
 
183
  else if (rc != GEARMAN_SUCCESS)
 
184
  {
 
185
    WATCHPOINT_ERROR(rc);
 
186
    assert(0);
 
187
  }
 
188
 
 
189
  WATCHPOINT_ERROR(rc);
 
190
  WATCHPOINT_NUMBER(gearman_result_length(&gear_conn->result));
 
191
 
 
192
  WATCHPOINT_ACTION(gear_conn->result.action);
 
193
  /* Value! */
 
194
  switch(gear_conn->result.action)
 
195
  {
 
196
  case GEARMAN_ECHO_REQ:
 
197
    {
 
198
      giov_st giov;
 
199
      gearman_return rc;
 
200
 
 
201
      giov.arg= gearman_result_value(&gear_conn->result);
 
202
      giov.arg_length= gearman_result_length(&gear_conn->result);
 
203
      WATCHPOINT;
 
204
      rc= gearman_dispatch(&gear_conn->server, GEARMAN_ECHO_RES, &giov, 1); 
 
205
      WATCHPOINT_ASSERT(rc == GEARMAN_SUCCESS);
 
206
      WATCHPOINT;
 
207
      run= false;
 
208
      break;
 
209
    }
 
210
  case GEARMAN_CAN_DO:
 
211
  case GEARMAN_CAN_DO_TIMEOUT:
 
212
    {
 
213
      /* We should be storing this + client into a hash table for lookup */
 
214
      run= false;
 
215
      break;
 
216
    }
 
217
  case GEARMAN_SET_CLIENT_ID:
 
218
    {
 
219
      /* Update hash structure storage with CLIENT ID */
 
220
      run= false;
 
221
      break;
 
222
    }
 
223
  case GEARMAN_RESET_ABILITIES:
 
224
    {
 
225
      /* We should remove the worker from the queue at this point */ 
 
226
      run= false;
 
227
      break;
 
228
    }
 
229
  case GEARMAN_GRAB_JOB:
 
230
    {
 
231
      gearman_return rc;
 
232
 
 
233
      /* Find a job that is available or return no_job */
 
234
      rc= gearman_dispatch(&gear_conn->server, GEARMAN_NO_JOB, NULL, 1); 
 
235
      WATCHPOINT_ASSERT(rc == GEARMAN_SUCCESS);
 
236
 
 
237
      /* Once send the job, we will wait for the worker to tell us when it is done */
 
238
      run= false;
 
239
 
 
240
      break;
 
241
    }
 
242
  case GEARMAN_SUBMIT_JOB:
 
243
  case GEARMAN_SUBMIT_JOB_HIGH:
 
244
  case GEARMAN_SUBMIT_JOB_BJ:
 
245
    {
 
246
      giov_st giov;
 
247
      gearman_return rc;
 
248
 
 
249
      giov.arg= "foomanchoo";
 
250
      giov.arg_length= strlen("foomanchoo");
 
251
      rc= gearman_dispatch(&gear_conn->server, GEARMAN_JOB_CREATED, &giov, 1); 
 
252
      WATCHPOINT_ASSERT(rc == GEARMAN_SUCCESS);
 
253
 
 
254
      run= false;
 
255
      break;
 
256
    }
 
257
  case GEARMAN_PRE_SLEEP:
 
258
  case GEARMAN_NOOP:
 
259
  case GEARMAN_NO_JOB:
 
260
  case GEARMAN_WORK_FAIL:
 
261
  case GEARMAN_GET_STATUS:
 
262
    {
 
263
      gearman_return rc;
 
264
 
 
265
      /* We should look up work here, and dispatch as appropriate */
 
266
      if (1)
 
267
      {
 
268
        giov_st giov[2];
 
269
 
 
270
        giov[0].arg= "foomanchoo";
 
271
        giov[0].arg_length= strlen("foomanchoo");
 
272
 
 
273
        giov[1].arg= gearman_result_value(&gear_conn->result);
 
274
        giov[1].arg_length= gearman_result_length(&gear_conn->result);
 
275
 
 
276
        rc= gearman_dispatch(&gear_conn->server, GEARMAN_WORK_COMPLETE, giov, 1); 
 
277
        WATCHPOINT_ASSERT(rc == GEARMAN_SUCCESS);
 
278
      }
 
279
      else
 
280
      {
 
281
        rc= gearman_dispatch(&gear_conn->server, GEARMAN_WORK_FAIL, NULL, 1); 
 
282
        WATCHPOINT_ASSERT(rc == GEARMAN_SUCCESS);
 
283
      }
 
284
 
 
285
      run= false;
 
286
      break;
 
287
    }
 
288
  case GEARMAN_ECHO_RES:
 
289
  case GEARMAN_JOB_CREATED:
 
290
  case GEARMAN_CANT_DO:
 
291
  case GEARMAN_WORK_COMPLETE:
 
292
  case GEARMAN_ERROR:
 
293
  case GEARMAN_WORK_STATUS:
 
294
  case GEARMAN_JOB_ASSIGN:
 
295
  case GEARMAN_STATUS_RES:
 
296
  case GEARMAN_SUBMIT_JOB_SCHEDULUED:
 
297
  case GEARMAN_SUBMIT_JOB_FUTURE:
 
298
    WATCHPOINT_ACTION(gear_conn->result.action);
 
299
    WATCHPOINT_ASSERT(0);
 
300
    run= false;
 
301
  }
 
302
 
 
303
  return run;
 
304
}
 
305
 
 
306
int main (int argc, char **argv)
 
307
{
 
308
  struct sigaction sa;
 
309
 
 
310
  /*
 
311
   * ignore SIGPIPE signals; we can use errno==EPIPE if we
 
312
   * need that information
 
313
 */
 
314
  sa.sa_handler = SIG_IGN;
 
315
  sa.sa_flags = 0;
 
316
  if (sigemptyset(&sa.sa_mask) == -1 ||
 
317
      sigaction(SIGPIPE, &sa, 0) == -1) 
 
318
  {
 
319
    perror("failed to ignore SIGPIPE; sigaction");
 
320
    exit(EXIT_FAILURE);
 
321
  }
 
322
 
 
323
 
 
324
  /* Initalize the event library */
 
325
  main_base= event_init();
 
326
 
 
327
  /* create socket */
 
328
  startup();
 
329
 
 
330
  //event_dispatch();
 
331
  event_base_loop(main_base, 0);
 
332
 
 
333
  return (0);
 
334
}