1
/* Gearman server and library
2
* Copyright (C) 2008 Brian Aker, Eric Day
5
* Use and distribution licensed under the BSD license. See
6
* the COPYING file in the parent directory for full text.
11
* @brief Worker definitions
17
* Private declarations
21
* @addtogroup gearman_worker_private Private Worker Functions
22
* @ingroup gearman_worker
27
* Allocate a worker structure.
29
static gearman_worker_st *_worker_allocate(gearman_worker_st *worker);
32
* Initialize common packets for later use.
34
static gearman_return_t _worker_packet_init(gearman_worker_st *worker);
37
* Callback function used when parsing server lists.
39
static gearman_return_t _worker_add_server(const char *host, in_port_t port,
43
* Allocate and add a function to the register list.
45
static gearman_return_t _worker_function_add(gearman_worker_st *worker,
46
const char *function_name,
48
gearman_worker_fn *worker_fn,
54
static void _worker_function_free(gearman_worker_st *worker,
55
gearman_worker_function_st *function);
63
gearman_worker_st *gearman_worker_create(gearman_worker_st *worker)
65
worker= _worker_allocate(worker);
69
worker->gearman= gearman_create(&(worker->gearman_static));
70
if (worker->gearman == NULL)
72
gearman_worker_free(worker);
76
if (_worker_packet_init(worker) != GEARMAN_SUCCESS)
78
gearman_worker_free(worker);
85
gearman_worker_st *gearman_worker_clone(gearman_worker_st *worker,
86
gearman_worker_st *from)
91
worker= _worker_allocate(worker);
95
worker->options|= (from->options &
96
(gearman_worker_options_t)~GEARMAN_WORKER_ALLOCATED);
98
worker->gearman= gearman_clone(&(worker->gearman_static), from->gearman);
99
if (worker->gearman == NULL)
101
gearman_worker_free(worker);
105
if (_worker_packet_init(worker) != GEARMAN_SUCCESS)
107
gearman_worker_free(worker);
114
void gearman_worker_free(gearman_worker_st *worker)
116
if (worker->options & GEARMAN_WORKER_PACKET_INIT)
118
gearman_packet_free(&(worker->grab_job));
119
gearman_packet_free(&(worker->pre_sleep));
122
if (worker->job != NULL)
123
gearman_job_free(worker->job);
125
if (worker->options & GEARMAN_WORKER_WORK_JOB_IN_USE)
126
gearman_job_free(&(worker->work_job));
128
if (worker->work_result != NULL)
130
if (worker->gearman->workload_free == NULL)
131
free(worker->work_result);
134
worker->gearman->workload_free(worker->work_result,
135
(void *)(worker->gearman->workload_free_arg));
139
while (worker->function_list != NULL)
140
_worker_function_free(worker, worker->function_list);
142
if (worker->gearman != NULL)
143
gearman_free(worker->gearman);
145
if (worker->options & GEARMAN_WORKER_ALLOCATED)
149
const char *gearman_worker_error(gearman_worker_st *worker)
151
return gearman_error(worker->gearman);
154
int gearman_worker_errno(gearman_worker_st *worker)
156
return gearman_errno(worker->gearman);
159
void gearman_worker_set_options(gearman_worker_st *worker,
160
gearman_worker_options_t options,
163
if (options & GEARMAN_WORKER_ROUND_ROBIN)
164
gearman_set_options(worker->gearman, GEARMAN_ROUND_ROBIN, data);
166
if (options & GEARMAN_WORKER_NON_BLOCKING)
167
gearman_set_options(worker->gearman, GEARMAN_NON_BLOCKING, data);
169
if (options & GEARMAN_WORKER_GRAB_UNIQ)
172
worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB_UNIQ;
174
worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB;
176
(void)gearman_packet_pack_header(&(worker->grab_job));
180
worker->options |= options;
182
worker->options &= ~options;
185
void gearman_worker_set_workload_malloc(gearman_worker_st *worker,
186
gearman_malloc_fn *workload_malloc,
187
const void *workload_malloc_arg)
189
gearman_set_workload_malloc(worker->gearman, workload_malloc,
190
workload_malloc_arg);
193
void gearman_worker_set_workload_free(gearman_worker_st *worker,
194
gearman_free_fn *workload_free,
195
const void *workload_free_arg)
197
gearman_set_workload_free(worker->gearman, workload_free, workload_free_arg);
200
gearman_return_t gearman_worker_add_server(gearman_worker_st *worker,
201
const char *host, in_port_t port)
203
if (gearman_con_add(worker->gearman, NULL, host, port) == NULL)
204
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
206
return GEARMAN_SUCCESS;
209
gearman_return_t gearman_worker_add_servers(gearman_worker_st *worker,
212
return gearman_parse_servers(servers, worker, _worker_add_server);
215
gearman_return_t gearman_worker_register(gearman_worker_st *worker,
216
const char *function_name,
219
return _worker_function_add(worker, function_name, timeout, NULL, NULL);
222
gearman_return_t gearman_worker_unregister(gearman_worker_st *worker,
223
const char *function_name)
225
gearman_worker_function_st *function;
226
gearman_return_t ret;
228
for (function= worker->function_list; function != NULL;
229
function= function->next)
231
if (!strcmp(function_name, function->function_name))
235
if (function == NULL)
236
return GEARMAN_SUCCESS;
238
gearman_packet_free(&(function->packet));
240
ret= gearman_packet_add(worker->gearman, &(function->packet),
241
GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_CANT_DO,
242
(uint8_t *)function_name, strlen(function_name),
244
if (ret != GEARMAN_SUCCESS)
247
(gearman_worker_function_options_t)~GEARMAN_WORKER_FUNCTION_PACKET_IN_USE;
251
function->options|= (GEARMAN_WORKER_FUNCTION_CHANGE |
252
GEARMAN_WORKER_FUNCTION_REMOVE);
254
worker->options|= GEARMAN_WORKER_CHANGE;
256
return GEARMAN_SUCCESS;
259
gearman_return_t gearman_worker_unregister_all(gearman_worker_st *worker)
261
gearman_return_t ret;
263
if (worker->function_list == NULL)
264
return GEARMAN_SUCCESS;
266
while (worker->function_list->next != NULL)
267
_worker_function_free(worker, worker->function_list->next);
269
gearman_packet_free(&(worker->function_list->packet));
271
ret= gearman_packet_add(worker->gearman, &(worker->function_list->packet),
272
GEARMAN_MAGIC_REQUEST,
273
GEARMAN_COMMAND_RESET_ABILITIES, NULL);
274
if (ret != GEARMAN_SUCCESS)
276
worker->function_list->options&=
277
(gearman_worker_function_options_t)~GEARMAN_WORKER_FUNCTION_PACKET_IN_USE;
281
worker->function_list->options|= (GEARMAN_WORKER_FUNCTION_CHANGE |
282
GEARMAN_WORKER_FUNCTION_REMOVE);
284
worker->options|= GEARMAN_WORKER_CHANGE;
286
return GEARMAN_SUCCESS;
289
gearman_job_st *gearman_worker_grab_job(gearman_worker_st *worker,
291
gearman_return_t *ret_ptr)
293
gearman_worker_function_st *function;
298
//if (worker->options & GEARMAN_WORKER_ROUND_ROBIN)
299
// worker->gearman->con_list = worker->gearman->con_list->next;
302
switch (worker->state)
304
case GEARMAN_WORKER_STATE_START:
305
/* If there are any new functions changes, send them now. */
306
if (worker->options & GEARMAN_WORKER_CHANGE)
308
worker->function= worker->function_list;
309
while (worker->function != NULL)
311
if (!(worker->function->options & GEARMAN_WORKER_FUNCTION_CHANGE))
313
worker->function= worker->function->next;
317
for (i=0, worker->con= worker->gearman->con_list; i < worker->gearman->con_count;
318
i++, worker->con= worker->con->next)
320
if (worker->con->fd == -1)
323
case GEARMAN_WORKER_STATE_FUNCTION_SEND:
324
*ret_ptr= gearman_con_send(worker->con, &(worker->function->packet),
326
if (*ret_ptr != GEARMAN_SUCCESS)
328
if (*ret_ptr == GEARMAN_IO_WAIT)
329
worker->state= GEARMAN_WORKER_STATE_FUNCTION_SEND;
330
else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
337
if (worker->function->options & GEARMAN_WORKER_FUNCTION_REMOVE)
339
function= worker->function->prev;
340
_worker_function_free(worker, worker->function);
341
if (function == NULL)
342
worker->function= worker->function_list;
344
worker->function= function;
348
worker->function->options&=
349
(gearman_worker_function_options_t)~GEARMAN_WORKER_FUNCTION_CHANGE;
350
worker->function= worker->function->next;
354
worker->options&= (gearman_worker_options_t)~GEARMAN_WORKER_CHANGE;
357
if (worker->function_list == NULL)
359
GEARMAN_ERROR_SET(worker->gearman, "gearman_worker_grab_job",
360
"no functions have been registered")
361
*ret_ptr= GEARMAN_NO_REGISTERED_FUNCTIONS;
365
for (i=0, worker->con= worker->gearman->con_list; i < worker->gearman->con_count;
366
i++, worker->con= worker->con->next)
368
/* If the connection to the job server is not active, start it. */
369
if (worker->con->fd == -1)
371
for (worker->function= worker->function_list;
372
worker->function != NULL;
373
worker->function= worker->function->next)
375
case GEARMAN_WORKER_STATE_CONNECT:
376
*ret_ptr= gearman_con_send(worker->con, &(worker->function->packet),
378
if (*ret_ptr != GEARMAN_SUCCESS)
380
if (*ret_ptr == GEARMAN_IO_WAIT)
381
worker->state= GEARMAN_WORKER_STATE_CONNECT;
382
else if (*ret_ptr == GEARMAN_COULD_NOT_CONNECT ||
383
*ret_ptr == GEARMAN_LOST_CONNECTION)
392
if (*ret_ptr == GEARMAN_COULD_NOT_CONNECT)
396
case GEARMAN_WORKER_STATE_GRAB_JOB_SEND:
397
if (worker->con->fd == -1)
399
if(worker->state == GEARMAN_WORKER_STATE_GRAB_JOB_SEND )
400
if (worker->options & GEARMAN_WORKER_ROUND_ROBIN)
401
worker->gearman->con_list = worker->gearman->con_list->next;
403
*ret_ptr= gearman_con_send(worker->con, &(worker->grab_job), true);
404
if (*ret_ptr != GEARMAN_SUCCESS)
406
if (*ret_ptr == GEARMAN_IO_WAIT)
408
worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_SEND;
410
else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
416
if (worker->job == NULL)
418
worker->job= gearman_job_create(worker->gearman, job);
419
if (worker->job == NULL)
421
*ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
428
case GEARMAN_WORKER_STATE_GRAB_JOB_RECV:
429
(void)gearman_con_recv(worker->con, &(worker->job->assigned), ret_ptr,
431
if (*ret_ptr != GEARMAN_SUCCESS)
433
if (*ret_ptr == GEARMAN_IO_WAIT)
434
worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_RECV;
437
gearman_job_free(worker->job);
440
if (*ret_ptr == GEARMAN_LOST_CONNECTION)
447
if (worker->job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN ||
448
worker->job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_UNIQ)
450
worker->job->options|= GEARMAN_JOB_ASSIGNED_IN_USE;
451
worker->job->con= worker->con;
452
worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_SEND;
458
if (worker->job->assigned.command == GEARMAN_COMMAND_NO_JOB)
460
gearman_packet_free(&(worker->job->assigned));
464
if (worker->job->assigned.command != GEARMAN_COMMAND_NOOP)
466
GEARMAN_ERROR_SET(worker->gearman, "gearman_worker_grab_job",
467
"unexpected packet:%s",
468
gearman_command_info_list[worker->job->assigned.command].name);
469
gearman_packet_free(&(worker->job->assigned));
470
gearman_job_free(worker->job);
472
*ret_ptr= GEARMAN_UNEXPECTED_PACKET;
476
gearman_packet_free(&(worker->job->assigned));
480
case GEARMAN_WORKER_STATE_PRE_SLEEP:
481
for (i=0, worker->con= worker->gearman->con_list; i < worker->gearman->con_count;
482
i++, worker->con= worker->con->next)
484
if (worker->con->fd == -1)
487
*ret_ptr= gearman_con_send(worker->con, &(worker->pre_sleep), true);
488
if (*ret_ptr != GEARMAN_SUCCESS)
490
if (*ret_ptr == GEARMAN_IO_WAIT)
491
worker->state= GEARMAN_WORKER_STATE_PRE_SLEEP;
492
else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
499
worker->state= GEARMAN_WORKER_STATE_START;
501
/* Set a watch on all active connections that we sent a PRE_SLEEP to. */
503
for (i=0, worker->con= worker->gearman->con_list; i < worker->gearman->con_count;
504
i++, worker->con= worker->con->next)
506
if (worker->con->fd == -1)
509
*ret_ptr= gearman_con_set_events(worker->con, POLLIN);
510
if (*ret_ptr != GEARMAN_SUCCESS)
516
if (worker->gearman->options & GEARMAN_NON_BLOCKING)
518
*ret_ptr= GEARMAN_NO_JOBS;
523
sleep(GEARMAN_WORKER_WAIT_TIMEOUT / 1000);
526
*ret_ptr= gearman_con_wait(worker->gearman,
527
GEARMAN_WORKER_WAIT_TIMEOUT);
528
if (*ret_ptr != GEARMAN_SUCCESS)
535
GEARMAN_ERROR_SET(worker->gearman, "gearman_worker_grab_job",
536
"unknown state: %u", worker->state)
537
*ret_ptr= GEARMAN_UNKNOWN_STATE;
543
gearman_return_t gearman_worker_add_function(gearman_worker_st *worker,
544
const char *function_name,
546
gearman_worker_fn *worker_fn,
549
if (function_name == NULL)
551
GEARMAN_ERROR_SET(worker->gearman, "gearman_worker_add_function",
552
"function name not given")
553
return GEARMAN_INVALID_FUNCTION_NAME;
556
if (worker_fn == NULL)
558
GEARMAN_ERROR_SET(worker->gearman, "gearman_worker_add_function",
559
"function not given")
560
return GEARMAN_INVALID_WORKER_FUNCTION;
563
return _worker_function_add(worker, function_name, timeout, worker_fn,
567
gearman_return_t gearman_worker_work(gearman_worker_st *worker)
569
gearman_return_t ret;
571
switch (worker->work_state)
573
case GEARMAN_WORKER_WORK_STATE_GRAB_JOB:
574
(void)gearman_worker_grab_job(worker, &(worker->work_job), &ret);
575
if (ret != GEARMAN_SUCCESS)
578
for (worker->work_function= worker->function_list;
579
worker->work_function != NULL;
580
worker->work_function= worker->work_function->next)
582
if (!strcmp(gearman_job_function_name(&(worker->work_job)),
583
worker->work_function->function_name))
589
if (worker->work_function == NULL)
591
gearman_job_free(&(worker->work_job));
592
GEARMAN_ERROR_SET(worker->gearman, "gearman_worker_work",
593
"function not found")
594
return GEARMAN_INVALID_FUNCTION_NAME;
597
if (worker->work_function->worker_fn == NULL)
599
gearman_job_free(&(worker->work_job));
600
GEARMAN_ERROR_SET(worker->gearman, "gearman_worker_work",
601
"no callback function supplied")
602
return GEARMAN_INVALID_FUNCTION_NAME;
605
worker->options|= GEARMAN_WORKER_WORK_JOB_IN_USE;
606
worker->work_result_size= 0;
608
case GEARMAN_WORKER_WORK_STATE_FUNCTION:
609
worker->work_result= (*(worker->work_function->worker_fn))(
611
(void *)(worker->work_function->fn_arg),
612
&(worker->work_result_size), &ret);
613
if (ret == GEARMAN_WORK_FAIL)
615
ret= gearman_job_fail(&(worker->work_job));
616
if (ret != GEARMAN_SUCCESS)
618
if (ret == GEARMAN_LOST_CONNECTION)
621
worker->work_state= GEARMAN_WORKER_WORK_STATE_FAIL;
628
if (ret != GEARMAN_SUCCESS)
630
if (ret == GEARMAN_LOST_CONNECTION)
633
worker->work_state= GEARMAN_WORKER_WORK_STATE_FUNCTION;
637
case GEARMAN_WORKER_WORK_STATE_COMPLETE:
638
ret= gearman_job_complete(&(worker->work_job), worker->work_result,
639
worker->work_result_size);
640
if (ret == GEARMAN_IO_WAIT)
642
worker->work_state= GEARMAN_WORKER_WORK_STATE_COMPLETE;
646
if (worker->work_result != NULL)
648
if (worker->gearman->workload_free == NULL)
649
free(worker->work_result);
652
worker->gearman->workload_free(worker->work_result,
653
(void *)(worker->gearman->workload_free_arg));
655
worker->work_result= NULL;
658
if (ret != GEARMAN_SUCCESS)
660
if (ret == GEARMAN_LOST_CONNECTION)
668
case GEARMAN_WORKER_WORK_STATE_FAIL:
669
ret= gearman_job_fail(&(worker->work_job));
670
if (ret != GEARMAN_SUCCESS)
672
if (ret == GEARMAN_LOST_CONNECTION)
681
GEARMAN_ERROR_SET(worker->gearman, "gearman_worker_work",
682
"unknown state: %u", worker->work_state)
683
return GEARMAN_UNKNOWN_STATE;
686
gearman_job_free(&(worker->work_job));
687
worker->options&= (gearman_worker_options_t)~GEARMAN_WORKER_WORK_JOB_IN_USE;
688
worker->work_state= GEARMAN_WORKER_WORK_STATE_GRAB_JOB;
689
return GEARMAN_SUCCESS;
692
gearman_return_t gearman_worker_echo(gearman_worker_st *worker,
693
const void *workload,
694
size_t workload_size)
696
return gearman_con_echo(worker->gearman, workload, workload_size);
700
* Private definitions
703
static gearman_worker_st *_worker_allocate(gearman_worker_st *worker)
707
worker= malloc(sizeof(gearman_worker_st));
711
worker->options= GEARMAN_WORKER_ALLOCATED;
717
worker->work_state= 0;
718
worker->function_count= 0;
719
worker->work_result_size= 0;
720
worker->gearman= NULL;
723
worker->function= NULL;
724
worker->function_list= NULL;
725
worker->work_function= NULL;
726
worker->work_result= NULL;
731
static gearman_return_t _worker_packet_init(gearman_worker_st *worker)
733
gearman_return_t ret;
735
ret= gearman_packet_add(worker->gearman, &(worker->grab_job),
736
GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_GRAB_JOB,
738
if (ret != GEARMAN_SUCCESS)
741
ret= gearman_packet_add(worker->gearman, &(worker->pre_sleep),
742
GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_PRE_SLEEP,
744
if (ret != GEARMAN_SUCCESS)
746
gearman_packet_free(&(worker->grab_job));
750
worker->options|= GEARMAN_WORKER_PACKET_INIT;
752
return GEARMAN_SUCCESS;
755
static gearman_return_t _worker_add_server(const char *host, in_port_t port,
758
return gearman_worker_add_server((gearman_worker_st *)data, host, port);
761
static gearman_return_t _worker_function_add(gearman_worker_st *worker,
762
const char *function_name,
764
gearman_worker_fn *worker_fn,
767
gearman_worker_function_st *function;
768
gearman_return_t ret;
769
char timeout_buffer[11];
771
function= malloc(sizeof(gearman_worker_function_st));
772
if (function == NULL)
774
GEARMAN_ERROR_SET(worker->gearman, "_worker_function_add", "malloc")
775
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
778
function->options= (GEARMAN_WORKER_FUNCTION_PACKET_IN_USE |
779
GEARMAN_WORKER_FUNCTION_CHANGE);
781
function->function_name= strdup(function_name);
782
if (function->function_name == NULL)
785
GEARMAN_ERROR_SET(worker->gearman, "gearman_worker_add_function", "strdup")
786
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
789
function->worker_fn= worker_fn;
790
function->fn_arg= fn_arg;
794
snprintf(timeout_buffer, 11, "%u", timeout);
795
ret= gearman_packet_add(worker->gearman, &(function->packet),
796
GEARMAN_MAGIC_REQUEST,
797
GEARMAN_COMMAND_CAN_DO_TIMEOUT,
798
(uint8_t *)function_name,
799
strlen(function_name) + 1,
800
(uint8_t *)timeout_buffer,
801
strlen(timeout_buffer), NULL);
805
ret= gearman_packet_add(worker->gearman, &(function->packet),
806
GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_CAN_DO,
807
(uint8_t *)function_name, strlen(function_name),
810
if (ret != GEARMAN_SUCCESS)
812
free(function->function_name);
817
GEARMAN_LIST_ADD(worker->function, function,)
819
worker->options|= GEARMAN_WORKER_CHANGE;
821
return GEARMAN_SUCCESS;
824
static void _worker_function_free(gearman_worker_st *worker,
825
gearman_worker_function_st *function)
827
GEARMAN_LIST_DEL(worker->function, function,)
829
if (function->options & GEARMAN_WORKER_FUNCTION_PACKET_IN_USE)
830
gearman_packet_free(&(function->packet));
832
free(function->function_name);