~chenryhabana/gearmand/Round-Robin

« back to all changes in this revision

Viewing changes to worker.c

  • Committer: Carlos Henry
  • Date: 2009-08-17 14:29:56 UTC
  • Revision ID: chenryhabana@yahoo.es-20090817142956-0va48xo7trh5041j
initial

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Gearman server and library
 
2
 * Copyright (C) 2008 Brian Aker, Eric Day
 
3
 * All rights reserved.
 
4
 *
 
5
 * Use and distribution licensed under the BSD license.  See
 
6
 * the COPYING file in the parent directory for full text.
 
7
 */
 
8
 
 
9
/**
 
10
 * @file
 
11
 * @brief Worker definitions
 
12
 */
 
13
 
 
14
#include "common.h"
 
15
 
 
16
/*
 
17
 * Private declarations
 
18
 */
 
19
 
 
20
/**
 
21
 * @addtogroup gearman_worker_private Private Worker Functions
 
22
 * @ingroup gearman_worker
 
23
 * @{
 
24
 */
 
25
 
 
26
/**
 
27
 * Allocate a worker structure.
 
28
 */
 
29
static gearman_worker_st *_worker_allocate(gearman_worker_st *worker);
 
30
 
 
31
/**
 
32
 * Initialize common packets for later use.
 
33
 */
 
34
static gearman_return_t _worker_packet_init(gearman_worker_st *worker);
 
35
 
 
36
/**
 
37
 * Callback function used when parsing server lists.
 
38
 */
 
39
static gearman_return_t _worker_add_server(const char *host, in_port_t port,
 
40
                                           void *data);
 
41
 
 
42
/**
 
43
 * Allocate and add a function to the register list.
 
44
 */
 
45
static gearman_return_t _worker_function_add(gearman_worker_st *worker,
 
46
                                             const char *function_name,
 
47
                                             uint32_t timeout,
 
48
                                             gearman_worker_fn *worker_fn,
 
49
                                             const void *fn_arg);
 
50
 
 
51
/**
 
52
 * Free a function.
 
53
 */
 
54
static void _worker_function_free(gearman_worker_st *worker,
 
55
                                  gearman_worker_function_st *function);
 
56
 
 
57
/** @} */
 
58
 
 
59
/*
 
60
 * Public definitions
 
61
 */
 
62
 
 
63
gearman_worker_st *gearman_worker_create(gearman_worker_st *worker)
 
64
{
 
65
  worker= _worker_allocate(worker);
 
66
  if (worker == NULL)
 
67
    return NULL;
 
68
 
 
69
  worker->gearman= gearman_create(&(worker->gearman_static));
 
70
  if (worker->gearman == NULL)
 
71
  { 
 
72
    gearman_worker_free(worker);
 
73
    return NULL;
 
74
  }
 
75
 
 
76
  if (_worker_packet_init(worker) != GEARMAN_SUCCESS)
 
77
  {
 
78
    gearman_worker_free(worker);
 
79
    return NULL;
 
80
  }
 
81
 
 
82
  return worker;
 
83
}
 
84
 
 
85
gearman_worker_st *gearman_worker_clone(gearman_worker_st *worker,
 
86
                                        gearman_worker_st *from)
 
87
{
 
88
  if (from == NULL)
 
89
    return NULL;
 
90
 
 
91
  worker= _worker_allocate(worker);
 
92
  if (worker == NULL)
 
93
    return NULL;
 
94
 
 
95
  worker->options|= (from->options &
 
96
                     (gearman_worker_options_t)~GEARMAN_WORKER_ALLOCATED);
 
97
 
 
98
  worker->gearman= gearman_clone(&(worker->gearman_static), from->gearman);
 
99
  if (worker->gearman == NULL)
 
100
  { 
 
101
    gearman_worker_free(worker);
 
102
    return NULL;
 
103
  }
 
104
 
 
105
  if (_worker_packet_init(worker) != GEARMAN_SUCCESS)
 
106
  {
 
107
    gearman_worker_free(worker);
 
108
    return NULL;
 
109
  }
 
110
 
 
111
  return worker;
 
112
}
 
113
 
 
114
void gearman_worker_free(gearman_worker_st *worker)
 
115
{
 
116
  if (worker->options & GEARMAN_WORKER_PACKET_INIT)
 
117
  {
 
118
    gearman_packet_free(&(worker->grab_job));
 
119
    gearman_packet_free(&(worker->pre_sleep));
 
120
  }
 
121
 
 
122
  if (worker->job != NULL)
 
123
    gearman_job_free(worker->job);
 
124
 
 
125
  if (worker->options & GEARMAN_WORKER_WORK_JOB_IN_USE)
 
126
    gearman_job_free(&(worker->work_job));
 
127
 
 
128
  if (worker->work_result != NULL)
 
129
  {
 
130
    if (worker->gearman->workload_free == NULL)
 
131
      free(worker->work_result);
 
132
    else
 
133
    {
 
134
      worker->gearman->workload_free(worker->work_result,
 
135
                                  (void *)(worker->gearman->workload_free_arg));
 
136
    }
 
137
  }
 
138
 
 
139
  while (worker->function_list != NULL)
 
140
    _worker_function_free(worker, worker->function_list);
 
141
 
 
142
  if (worker->gearman != NULL)
 
143
    gearman_free(worker->gearman);
 
144
 
 
145
  if (worker->options & GEARMAN_WORKER_ALLOCATED)
 
146
    free(worker);
 
147
}
 
148
 
 
149
const char *gearman_worker_error(gearman_worker_st *worker)
 
150
{
 
151
  return gearman_error(worker->gearman);
 
152
}
 
153
 
 
154
int gearman_worker_errno(gearman_worker_st *worker)
 
155
{
 
156
  return gearman_errno(worker->gearman);
 
157
}
 
158
 
 
159
void gearman_worker_set_options(gearman_worker_st *worker,
 
160
                                gearman_worker_options_t options,
 
161
                                uint32_t data)
 
162
{
 
163
  if (options & GEARMAN_WORKER_ROUND_ROBIN)
 
164
    gearman_set_options(worker->gearman, GEARMAN_ROUND_ROBIN, data);
 
165
 
 
166
  if (options & GEARMAN_WORKER_NON_BLOCKING)
 
167
    gearman_set_options(worker->gearman, GEARMAN_NON_BLOCKING, data);
 
168
 
 
169
  if (options & GEARMAN_WORKER_GRAB_UNIQ)
 
170
  {
 
171
    if (data)
 
172
      worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB_UNIQ;
 
173
    else
 
174
      worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB;
 
175
 
 
176
    (void)gearman_packet_pack_header(&(worker->grab_job));
 
177
  }
 
178
 
 
179
  if (data)
 
180
    worker->options |= options;
 
181
  else
 
182
    worker->options &= ~options;
 
183
}
 
184
 
 
185
void gearman_worker_set_workload_malloc(gearman_worker_st *worker,
 
186
                                        gearman_malloc_fn *workload_malloc,
 
187
                                        const void *workload_malloc_arg)
 
188
{
 
189
  gearman_set_workload_malloc(worker->gearman, workload_malloc,
 
190
                              workload_malloc_arg);
 
191
}
 
192
 
 
193
void gearman_worker_set_workload_free(gearman_worker_st *worker,
 
194
                                      gearman_free_fn *workload_free,
 
195
                                      const void *workload_free_arg)
 
196
{
 
197
  gearman_set_workload_free(worker->gearman, workload_free, workload_free_arg);
 
198
}
 
199
 
 
200
gearman_return_t gearman_worker_add_server(gearman_worker_st *worker,
 
201
                                           const char *host, in_port_t port)
 
202
{
 
203
  if (gearman_con_add(worker->gearman, NULL, host, port) == NULL)
 
204
    return GEARMAN_MEMORY_ALLOCATION_FAILURE;
 
205
 
 
206
  return GEARMAN_SUCCESS;
 
207
}
 
208
 
 
209
gearman_return_t gearman_worker_add_servers(gearman_worker_st *worker,
 
210
                                            const char *servers)
 
211
{
 
212
  return gearman_parse_servers(servers, worker, _worker_add_server);
 
213
}
 
214
 
 
215
gearman_return_t gearman_worker_register(gearman_worker_st *worker,
 
216
                                         const char *function_name,
 
217
                                         uint32_t timeout)
 
218
{
 
219
  return _worker_function_add(worker, function_name, timeout, NULL, NULL);
 
220
}
 
221
 
 
222
gearman_return_t gearman_worker_unregister(gearman_worker_st *worker,
 
223
                                           const char *function_name)
 
224
{
 
225
  gearman_worker_function_st *function;
 
226
  gearman_return_t ret;
 
227
 
 
228
  for (function= worker->function_list; function != NULL;
 
229
       function= function->next)
 
230
  {
 
231
    if (!strcmp(function_name, function->function_name))
 
232
      break;
 
233
  }
 
234
 
 
235
  if (function == NULL)
 
236
    return GEARMAN_SUCCESS;
 
237
 
 
238
  gearman_packet_free(&(function->packet));
 
239
 
 
240
  ret= gearman_packet_add(worker->gearman, &(function->packet),
 
241
                          GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_CANT_DO,
 
242
                          (uint8_t *)function_name, strlen(function_name),
 
243
                          NULL);
 
244
  if (ret != GEARMAN_SUCCESS)
 
245
  {
 
246
    function->options&=
 
247
      (gearman_worker_function_options_t)~GEARMAN_WORKER_FUNCTION_PACKET_IN_USE;
 
248
    return ret;
 
249
  }
 
250
 
 
251
  function->options|= (GEARMAN_WORKER_FUNCTION_CHANGE |
 
252
                       GEARMAN_WORKER_FUNCTION_REMOVE);
 
253
 
 
254
  worker->options|= GEARMAN_WORKER_CHANGE;
 
255
 
 
256
  return GEARMAN_SUCCESS;
 
257
}
 
258
 
 
259
gearman_return_t gearman_worker_unregister_all(gearman_worker_st *worker)
 
260
{
 
261
  gearman_return_t ret;
 
262
 
 
263
  if (worker->function_list == NULL)
 
264
    return GEARMAN_SUCCESS;
 
265
 
 
266
  while (worker->function_list->next != NULL)
 
267
    _worker_function_free(worker, worker->function_list->next);
 
268
 
 
269
  gearman_packet_free(&(worker->function_list->packet));
 
270
 
 
271
  ret= gearman_packet_add(worker->gearman, &(worker->function_list->packet),
 
272
                          GEARMAN_MAGIC_REQUEST,
 
273
                          GEARMAN_COMMAND_RESET_ABILITIES, NULL);
 
274
  if (ret != GEARMAN_SUCCESS)
 
275
  {
 
276
    worker->function_list->options&=
 
277
      (gearman_worker_function_options_t)~GEARMAN_WORKER_FUNCTION_PACKET_IN_USE;
 
278
    return ret;
 
279
  }
 
280
 
 
281
  worker->function_list->options|= (GEARMAN_WORKER_FUNCTION_CHANGE |
 
282
                                    GEARMAN_WORKER_FUNCTION_REMOVE);
 
283
 
 
284
  worker->options|= GEARMAN_WORKER_CHANGE;
 
285
 
 
286
  return GEARMAN_SUCCESS;
 
287
}
 
288
 
 
289
gearman_job_st *gearman_worker_grab_job(gearman_worker_st *worker,
 
290
                                        gearman_job_st *job,
 
291
                                        gearman_return_t *ret_ptr)
 
292
{
 
293
  gearman_worker_function_st *function;
 
294
  uint32_t active;
 
295
  uint32_t i;
 
296
 
 
297
//  int waitjob = 0;
 
298
//if (worker->options & GEARMAN_WORKER_ROUND_ROBIN)
 
299
//      worker->gearman->con_list = worker->gearman->con_list->next;
 
300
  while (1)
 
301
  {
 
302
    switch (worker->state)
 
303
    {
 
304
    case GEARMAN_WORKER_STATE_START:
 
305
      /* If there are any new functions changes, send them now. */
 
306
      if (worker->options & GEARMAN_WORKER_CHANGE)
 
307
      {
 
308
        worker->function= worker->function_list;
 
309
        while (worker->function != NULL)
 
310
        {
 
311
          if (!(worker->function->options & GEARMAN_WORKER_FUNCTION_CHANGE))
 
312
          {
 
313
            worker->function= worker->function->next;
 
314
            continue;
 
315
          }
 
316
 
 
317
          for (i=0, worker->con= worker->gearman->con_list; i < worker->gearman->con_count;
 
318
               i++, worker->con= worker->con->next)
 
319
          {
 
320
            if (worker->con->fd == -1)
 
321
              continue;
 
322
 
 
323
    case GEARMAN_WORKER_STATE_FUNCTION_SEND:
 
324
            *ret_ptr= gearman_con_send(worker->con, &(worker->function->packet),
 
325
                                       true);
 
326
            if (*ret_ptr != GEARMAN_SUCCESS)
 
327
            {
 
328
              if (*ret_ptr == GEARMAN_IO_WAIT)
 
329
                worker->state= GEARMAN_WORKER_STATE_FUNCTION_SEND;
 
330
              else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
 
331
                continue;
 
332
 
 
333
              return NULL;
 
334
            }
 
335
          }
 
336
 
 
337
          if (worker->function->options & GEARMAN_WORKER_FUNCTION_REMOVE)
 
338
          {
 
339
            function= worker->function->prev;
 
340
            _worker_function_free(worker, worker->function);
 
341
            if (function == NULL)
 
342
              worker->function= worker->function_list;
 
343
            else
 
344
              worker->function= function;
 
345
          }
 
346
          else
 
347
          {
 
348
            worker->function->options&=
 
349
             (gearman_worker_function_options_t)~GEARMAN_WORKER_FUNCTION_CHANGE;
 
350
            worker->function= worker->function->next;
 
351
          }
 
352
        }
 
353
 
 
354
        worker->options&= (gearman_worker_options_t)~GEARMAN_WORKER_CHANGE;
 
355
      }
 
356
 
 
357
      if (worker->function_list == NULL)
 
358
      {
 
359
        GEARMAN_ERROR_SET(worker->gearman, "gearman_worker_grab_job",
 
360
                          "no functions have been registered")
 
361
        *ret_ptr= GEARMAN_NO_REGISTERED_FUNCTIONS;
 
362
        return NULL;
 
363
      }
 
364
 
 
365
      for (i=0, worker->con= worker->gearman->con_list; i < worker->gearman->con_count;
 
366
           i++, worker->con= worker->con->next)
 
367
      {
 
368
        /* If the connection to the job server is not active, start it. */
 
369
        if (worker->con->fd == -1)
 
370
        {
 
371
          for (worker->function= worker->function_list;
 
372
               worker->function != NULL;
 
373
               worker->function= worker->function->next)
 
374
          {
 
375
    case GEARMAN_WORKER_STATE_CONNECT:
 
376
            *ret_ptr= gearman_con_send(worker->con, &(worker->function->packet),
 
377
                                       true);
 
378
            if (*ret_ptr != GEARMAN_SUCCESS)
 
379
            {
 
380
              if (*ret_ptr == GEARMAN_IO_WAIT)
 
381
                worker->state= GEARMAN_WORKER_STATE_CONNECT;
 
382
              else if (*ret_ptr == GEARMAN_COULD_NOT_CONNECT ||
 
383
                       *ret_ptr == GEARMAN_LOST_CONNECTION)
 
384
              {
 
385
                break;
 
386
              }
 
387
 
 
388
              return NULL;
 
389
            }
 
390
          }
 
391
 
 
392
          if (*ret_ptr == GEARMAN_COULD_NOT_CONNECT)
 
393
            continue;
 
394
        }
 
395
 
 
396
    case GEARMAN_WORKER_STATE_GRAB_JOB_SEND:
 
397
        if (worker->con->fd == -1)
 
398
          continue;
 
399
        if(worker->state == GEARMAN_WORKER_STATE_GRAB_JOB_SEND )
 
400
          if (worker->options & GEARMAN_WORKER_ROUND_ROBIN)
 
401
            worker->gearman->con_list = worker->gearman->con_list->next;
 
402
          
 
403
        *ret_ptr= gearman_con_send(worker->con, &(worker->grab_job), true);
 
404
        if (*ret_ptr != GEARMAN_SUCCESS)
 
405
        {
 
406
          if (*ret_ptr == GEARMAN_IO_WAIT)
 
407
          {
 
408
            worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_SEND;
 
409
          }
 
410
         else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
 
411
            continue;
 
412
                
 
413
          return NULL;
 
414
        }
 
415
 
 
416
        if (worker->job == NULL)
 
417
        {
 
418
          worker->job= gearman_job_create(worker->gearman, job);
 
419
          if (worker->job == NULL)
 
420
          {
 
421
            *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
 
422
            return NULL;
 
423
          }
 
424
        }
 
425
 
 
426
        while (1)
 
427
        {
 
428
    case GEARMAN_WORKER_STATE_GRAB_JOB_RECV:
 
429
          (void)gearman_con_recv(worker->con, &(worker->job->assigned), ret_ptr,
 
430
                                 true);
 
431
          if (*ret_ptr != GEARMAN_SUCCESS)
 
432
          {
 
433
            if (*ret_ptr == GEARMAN_IO_WAIT)
 
434
              worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_RECV;
 
435
            else
 
436
            {
 
437
              gearman_job_free(worker->job);
 
438
              worker->job= NULL;
 
439
 
 
440
              if (*ret_ptr == GEARMAN_LOST_CONNECTION)
 
441
                break;
 
442
            }
 
443
 
 
444
            return NULL;
 
445
          }
 
446
 
 
447
          if (worker->job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN ||
 
448
              worker->job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_UNIQ)
 
449
          {
 
450
            worker->job->options|= GEARMAN_JOB_ASSIGNED_IN_USE;
 
451
            worker->job->con= worker->con;
 
452
            worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_SEND;
 
453
            job= worker->job;
 
454
            worker->job= NULL;
 
455
            return job;
 
456
          }
 
457
 
 
458
          if (worker->job->assigned.command == GEARMAN_COMMAND_NO_JOB)
 
459
          {
 
460
            gearman_packet_free(&(worker->job->assigned));
 
461
            break;
 
462
          }
 
463
 
 
464
          if (worker->job->assigned.command != GEARMAN_COMMAND_NOOP)
 
465
          {
 
466
            GEARMAN_ERROR_SET(worker->gearman, "gearman_worker_grab_job",
 
467
                              "unexpected packet:%s",
 
468
                 gearman_command_info_list[worker->job->assigned.command].name);
 
469
            gearman_packet_free(&(worker->job->assigned));
 
470
            gearman_job_free(worker->job);
 
471
            worker->job= NULL;
 
472
            *ret_ptr= GEARMAN_UNEXPECTED_PACKET;
 
473
            return NULL;
 
474
          }
 
475
 
 
476
          gearman_packet_free(&(worker->job->assigned));
 
477
        }
 
478
      }
 
479
 
 
480
    case GEARMAN_WORKER_STATE_PRE_SLEEP:
 
481
      for (i=0, worker->con= worker->gearman->con_list; i < worker->gearman->con_count;
 
482
           i++, worker->con= worker->con->next)
 
483
      {
 
484
        if (worker->con->fd == -1)
 
485
          continue;
 
486
 
 
487
        *ret_ptr= gearman_con_send(worker->con, &(worker->pre_sleep), true);
 
488
        if (*ret_ptr != GEARMAN_SUCCESS)
 
489
        {
 
490
          if (*ret_ptr == GEARMAN_IO_WAIT)
 
491
            worker->state= GEARMAN_WORKER_STATE_PRE_SLEEP;
 
492
          else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
 
493
            continue;
 
494
 
 
495
          return NULL;
 
496
        }
 
497
      }
 
498
 
 
499
      worker->state= GEARMAN_WORKER_STATE_START;
 
500
 
 
501
      /* Set a watch on all active connections that we sent a PRE_SLEEP to. */
 
502
      active= 0;
 
503
      for (i=0, worker->con= worker->gearman->con_list; i < worker->gearman->con_count;
 
504
           i++, worker->con= worker->con->next)
 
505
      {
 
506
        if (worker->con->fd == -1)
 
507
          continue;
 
508
 
 
509
        *ret_ptr= gearman_con_set_events(worker->con, POLLIN);
 
510
        if (*ret_ptr != GEARMAN_SUCCESS)
 
511
          return NULL;
 
512
 
 
513
        active++;
 
514
      }
 
515
 
 
516
      if (worker->gearman->options & GEARMAN_NON_BLOCKING)
 
517
      {
 
518
        *ret_ptr= GEARMAN_NO_JOBS;
 
519
        return NULL;
 
520
      }
 
521
 
 
522
      if (active == 0)
 
523
        sleep(GEARMAN_WORKER_WAIT_TIMEOUT / 1000);
 
524
      else
 
525
      {
 
526
        *ret_ptr= gearman_con_wait(worker->gearman,
 
527
                                   GEARMAN_WORKER_WAIT_TIMEOUT);
 
528
        if (*ret_ptr != GEARMAN_SUCCESS)
 
529
          return NULL;
 
530
      }
 
531
 
 
532
      break;
 
533
 
 
534
    default:
 
535
      GEARMAN_ERROR_SET(worker->gearman, "gearman_worker_grab_job",
 
536
                        "unknown state: %u", worker->state)
 
537
      *ret_ptr= GEARMAN_UNKNOWN_STATE;
 
538
      return NULL;
 
539
    }
 
540
  }
 
541
}
 
542
 
 
543
gearman_return_t gearman_worker_add_function(gearman_worker_st *worker,
 
544
                                             const char *function_name,
 
545
                                             uint32_t timeout,
 
546
                                             gearman_worker_fn *worker_fn,
 
547
                                             const void *fn_arg)
 
548
{
 
549
  if (function_name == NULL)
 
550
  {
 
551
    GEARMAN_ERROR_SET(worker->gearman, "gearman_worker_add_function",
 
552
                      "function name not given")
 
553
    return GEARMAN_INVALID_FUNCTION_NAME;
 
554
  }
 
555
 
 
556
  if (worker_fn == NULL)
 
557
  {
 
558
    GEARMAN_ERROR_SET(worker->gearman, "gearman_worker_add_function",
 
559
                      "function not given")
 
560
    return GEARMAN_INVALID_WORKER_FUNCTION;
 
561
  }
 
562
 
 
563
  return _worker_function_add(worker, function_name, timeout, worker_fn,
 
564
                              fn_arg);
 
565
}
 
566
 
 
567
gearman_return_t gearman_worker_work(gearman_worker_st *worker)
 
568
{
 
569
  gearman_return_t ret;
 
570
 
 
571
  switch (worker->work_state)
 
572
  {
 
573
  case GEARMAN_WORKER_WORK_STATE_GRAB_JOB:
 
574
    (void)gearman_worker_grab_job(worker, &(worker->work_job), &ret);
 
575
    if (ret != GEARMAN_SUCCESS)
 
576
      return ret;
 
577
 
 
578
    for (worker->work_function= worker->function_list;
 
579
         worker->work_function != NULL;
 
580
         worker->work_function= worker->work_function->next)
 
581
    {
 
582
      if (!strcmp(gearman_job_function_name(&(worker->work_job)),
 
583
                  worker->work_function->function_name))
 
584
      {
 
585
        break;
 
586
      }
 
587
    }
 
588
 
 
589
    if (worker->work_function == NULL)
 
590
    {
 
591
      gearman_job_free(&(worker->work_job));
 
592
      GEARMAN_ERROR_SET(worker->gearman, "gearman_worker_work",
 
593
                        "function not found")
 
594
      return GEARMAN_INVALID_FUNCTION_NAME;
 
595
    }
 
596
 
 
597
    if (worker->work_function->worker_fn == NULL)
 
598
    {
 
599
      gearman_job_free(&(worker->work_job));
 
600
      GEARMAN_ERROR_SET(worker->gearman, "gearman_worker_work",
 
601
                        "no callback function supplied")
 
602
      return GEARMAN_INVALID_FUNCTION_NAME;
 
603
    }
 
604
 
 
605
    worker->options|= GEARMAN_WORKER_WORK_JOB_IN_USE;
 
606
    worker->work_result_size= 0;
 
607
 
 
608
  case GEARMAN_WORKER_WORK_STATE_FUNCTION:
 
609
    worker->work_result= (*(worker->work_function->worker_fn))(
 
610
                         &(worker->work_job),
 
611
                         (void *)(worker->work_function->fn_arg),
 
612
                         &(worker->work_result_size), &ret);
 
613
    if (ret == GEARMAN_WORK_FAIL)
 
614
    {
 
615
      ret= gearman_job_fail(&(worker->work_job));
 
616
      if (ret != GEARMAN_SUCCESS)
 
617
      {
 
618
        if (ret == GEARMAN_LOST_CONNECTION)
 
619
          break;
 
620
 
 
621
        worker->work_state= GEARMAN_WORKER_WORK_STATE_FAIL;
 
622
        return ret;
 
623
      }
 
624
 
 
625
      break;
 
626
    }
 
627
 
 
628
    if (ret != GEARMAN_SUCCESS)
 
629
    {
 
630
      if (ret == GEARMAN_LOST_CONNECTION)
 
631
        break;
 
632
 
 
633
      worker->work_state= GEARMAN_WORKER_WORK_STATE_FUNCTION;
 
634
      return ret;
 
635
    }
 
636
 
 
637
  case GEARMAN_WORKER_WORK_STATE_COMPLETE:
 
638
    ret= gearman_job_complete(&(worker->work_job), worker->work_result,
 
639
                              worker->work_result_size);
 
640
    if (ret == GEARMAN_IO_WAIT)
 
641
    {
 
642
      worker->work_state= GEARMAN_WORKER_WORK_STATE_COMPLETE;
 
643
      return ret;
 
644
    }
 
645
 
 
646
    if (worker->work_result != NULL)
 
647
    {
 
648
      if (worker->gearman->workload_free == NULL)
 
649
        free(worker->work_result);
 
650
      else
 
651
      {
 
652
        worker->gearman->workload_free(worker->work_result,
 
653
                                  (void *)(worker->gearman->workload_free_arg));
 
654
      }
 
655
      worker->work_result= NULL;
 
656
    }
 
657
 
 
658
    if (ret != GEARMAN_SUCCESS)
 
659
    {
 
660
      if (ret == GEARMAN_LOST_CONNECTION)
 
661
        break;
 
662
 
 
663
      return ret;
 
664
    }
 
665
 
 
666
    break;
 
667
 
 
668
  case GEARMAN_WORKER_WORK_STATE_FAIL:
 
669
    ret= gearman_job_fail(&(worker->work_job));
 
670
    if (ret != GEARMAN_SUCCESS)
 
671
    {
 
672
      if (ret == GEARMAN_LOST_CONNECTION)
 
673
        break;
 
674
 
 
675
      return ret;
 
676
    }
 
677
 
 
678
   break;
 
679
 
 
680
  default:
 
681
    GEARMAN_ERROR_SET(worker->gearman, "gearman_worker_work",
 
682
                      "unknown state: %u", worker->work_state)
 
683
    return GEARMAN_UNKNOWN_STATE;
 
684
  }
 
685
 
 
686
  gearman_job_free(&(worker->work_job));
 
687
  worker->options&= (gearman_worker_options_t)~GEARMAN_WORKER_WORK_JOB_IN_USE;
 
688
  worker->work_state= GEARMAN_WORKER_WORK_STATE_GRAB_JOB;
 
689
  return GEARMAN_SUCCESS;
 
690
}
 
691
 
 
692
gearman_return_t gearman_worker_echo(gearman_worker_st *worker,
 
693
                                     const void *workload,
 
694
                                     size_t workload_size)
 
695
{
 
696
  return gearman_con_echo(worker->gearman, workload, workload_size);
 
697
}
 
698
 
 
699
/*
 
700
 * Private definitions
 
701
 */
 
702
 
 
703
static gearman_worker_st *_worker_allocate(gearman_worker_st *worker)
 
704
{
 
705
  if (worker == NULL)
 
706
  {
 
707
    worker= malloc(sizeof(gearman_worker_st));
 
708
    if (worker == NULL)
 
709
      return NULL;
 
710
 
 
711
    worker->options= GEARMAN_WORKER_ALLOCATED;
 
712
  }
 
713
  else
 
714
    worker->options= 0;
 
715
 
 
716
  worker->state= 0;
 
717
  worker->work_state= 0;
 
718
  worker->function_count= 0;
 
719
  worker->work_result_size= 0;
 
720
  worker->gearman= NULL;
 
721
  worker->con= NULL;
 
722
  worker->job= NULL;
 
723
  worker->function= NULL;
 
724
  worker->function_list= NULL;
 
725
  worker->work_function= NULL;
 
726
  worker->work_result= NULL;
 
727
 
 
728
  return worker;
 
729
}
 
730
 
 
731
static gearman_return_t _worker_packet_init(gearman_worker_st *worker)
 
732
{
 
733
  gearman_return_t ret;
 
734
 
 
735
  ret= gearman_packet_add(worker->gearman, &(worker->grab_job),
 
736
                          GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_GRAB_JOB,
 
737
                          NULL);
 
738
  if (ret != GEARMAN_SUCCESS)
 
739
    return ret;
 
740
 
 
741
  ret= gearman_packet_add(worker->gearman, &(worker->pre_sleep),
 
742
                          GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_PRE_SLEEP,
 
743
                          NULL);
 
744
  if (ret != GEARMAN_SUCCESS)
 
745
  {
 
746
    gearman_packet_free(&(worker->grab_job));
 
747
    return ret;
 
748
  }
 
749
 
 
750
  worker->options|= GEARMAN_WORKER_PACKET_INIT;
 
751
 
 
752
  return GEARMAN_SUCCESS;
 
753
}
 
754
 
 
755
static gearman_return_t _worker_add_server(const char *host, in_port_t port,
 
756
                                           void *data)
 
757
{
 
758
  return gearman_worker_add_server((gearman_worker_st *)data, host, port);
 
759
}
 
760
 
 
761
static gearman_return_t _worker_function_add(gearman_worker_st *worker,
 
762
                                             const char *function_name,
 
763
                                             uint32_t timeout,
 
764
                                             gearman_worker_fn *worker_fn,
 
765
                                             const void *fn_arg)
 
766
{
 
767
  gearman_worker_function_st *function;
 
768
  gearman_return_t ret;
 
769
  char timeout_buffer[11];
 
770
 
 
771
  function= malloc(sizeof(gearman_worker_function_st));
 
772
  if (function == NULL)
 
773
  {
 
774
    GEARMAN_ERROR_SET(worker->gearman, "_worker_function_add", "malloc")
 
775
    return GEARMAN_MEMORY_ALLOCATION_FAILURE;
 
776
  }
 
777
 
 
778
  function->options= (GEARMAN_WORKER_FUNCTION_PACKET_IN_USE |
 
779
                      GEARMAN_WORKER_FUNCTION_CHANGE);
 
780
 
 
781
  function->function_name= strdup(function_name);
 
782
  if (function->function_name == NULL)
 
783
  {
 
784
    free(function);
 
785
    GEARMAN_ERROR_SET(worker->gearman, "gearman_worker_add_function", "strdup")
 
786
    return GEARMAN_MEMORY_ALLOCATION_FAILURE;
 
787
  }
 
788
 
 
789
  function->worker_fn= worker_fn;
 
790
  function->fn_arg= fn_arg;
 
791
 
 
792
  if (timeout > 0)
 
793
  {
 
794
    snprintf(timeout_buffer, 11, "%u", timeout);
 
795
    ret= gearman_packet_add(worker->gearman, &(function->packet),
 
796
                            GEARMAN_MAGIC_REQUEST,
 
797
                            GEARMAN_COMMAND_CAN_DO_TIMEOUT,
 
798
                            (uint8_t *)function_name,
 
799
                            strlen(function_name) + 1,
 
800
                            (uint8_t *)timeout_buffer,
 
801
                            strlen(timeout_buffer), NULL);
 
802
  }
 
803
  else
 
804
  {
 
805
    ret= gearman_packet_add(worker->gearman, &(function->packet),
 
806
                            GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_CAN_DO,
 
807
                            (uint8_t *)function_name, strlen(function_name),
 
808
                            NULL);
 
809
  }
 
810
  if (ret != GEARMAN_SUCCESS)
 
811
  {
 
812
    free(function->function_name);
 
813
    free(function);
 
814
    return ret;
 
815
  }
 
816
 
 
817
  GEARMAN_LIST_ADD(worker->function, function,)
 
818
 
 
819
  worker->options|= GEARMAN_WORKER_CHANGE;
 
820
 
 
821
  return GEARMAN_SUCCESS;
 
822
}
 
823
 
 
824
static void _worker_function_free(gearman_worker_st *worker,
 
825
                                  gearman_worker_function_st *function)
 
826
{
 
827
  GEARMAN_LIST_DEL(worker->function, function,)
 
828
 
 
829
  if (function->options & GEARMAN_WORKER_FUNCTION_PACKET_IN_USE)
 
830
    gearman_packet_free(&(function->packet));
 
831
 
 
832
  free(function->function_name);
 
833
  free(function);
 
834
}