1
/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
* Gearmand client and server library.
5
* Copyright (C) 2011 Data Differential, http://datadifferential.com/
6
* Copyright (C) 2008 Brian Aker, Eric Day
9
* Redistribution and use in source and binary forms, with or without
10
* modification, are permitted provided that the following conditions are
13
* * Redistributions of source code must retain the above copyright
14
* notice, this list of conditions and the following disclaimer.
16
* * Redistributions in binary form must reproduce the above
17
* copyright notice, this list of conditions and the following disclaimer
18
* in the documentation and/or other materials provided with the
21
* * The names of its contributors may not be used to endorse or
22
* promote products derived from this software without specific prior
25
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39
#include <libgearman/common.h>
41
#include <arpa/inet.h>
49
#include <netinet/in.h>
50
#include <sys/socket.h>
54
Allocate a client structure.
56
static gearman_client_st *_client_allocate(gearman_client_st *client, bool is_clone)
60
client->options.allocated= false;
64
client= new (std::nothrow) gearman_client_st;
68
client->options.allocated= true;
71
client->options.non_blocking= false;
72
client->options.unbuffered_result= false;
73
client->options.no_new= false;
74
client->options.free_tasks= false;
76
client->state= GEARMAN_CLIENT_STATE_IDLE;
78
client->running_tasks= 0;
79
client->task_count= 0;
80
client->context= NULL;
83
client->task_list= NULL;
84
client->task_context_free_fn= NULL;
85
gearman_client_clear_fn(client);
89
gearman_universal_initialize(client->universal);
96
* Callback function used when parsing server lists.
98
static gearman_return_t _client_add_server(const char *host, in_port_t port,
101
return gearman_client_add_server(static_cast<gearman_client_st *>(context), host, port);
108
static void *_client_do(gearman_client_st *client, gearman_command_t command,
109
const char *function_name,
111
const void *workload_str, size_t workload_size,
112
size_t *result_size, gearman_return_t *ret_ptr)
114
gearman_return_t unused;
121
if (result_size == NULL)
123
result_size= &unused_size;
129
*ret_ptr= GEARMAN_INVALID_ARGUMENT;
133
gearman_string_t function= { gearman_string_param_cstr(function_name) };
134
gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
135
gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
138
gearman_task_st do_task;
139
gearman_task_st *do_task_ptr= add_task(*client, &do_task, NULL, command,
144
gearman_actions_do_default());
145
if (do_task_ptr == NULL)
147
*ret_ptr= gearman_universal_error_code(client->universal);
150
do_task_ptr->type= GEARMAN_TASK_KIND_DO;
152
gearman_return_t ret;
154
ret= gearman_client_run_tasks(client);
155
} while (gearman_continue(ret));
157
// gearman_client_run_tasks failed
158
assert(client->task_list); // Programmer error, we should always have the task that we used for do
160
char *returnable= NULL;
161
if (gearman_failed(ret))
163
if (ret == GEARMAN_COULD_NOT_CONNECT)
167
gearman_error(client->universal, ret, "occured during gearman_client_run_tasks()");
173
else if (gearman_success(ret) and do_task_ptr->result_rc == GEARMAN_SUCCESS)
175
*ret_ptr= do_task_ptr->result_rc;
176
if (do_task_ptr->result_ptr)
178
if (gearman_has_allocator(client->universal))
180
gearman_string_t result= gearman_result_string(do_task_ptr->result_ptr);
181
returnable= static_cast<char *>(gearman_malloc(client->universal, gearman_size(result) +1));
184
gearman_error(client->universal, GEARMAN_MEMORY_ALLOCATION_FAILURE, "custom workload_fn failed to allocate memory");
187
else // NULL terminate
189
memcpy(returnable, gearman_c_str(result), gearman_size(result));
190
returnable[gearman_size(result)]= 0;
191
*result_size= gearman_size(result);
196
gearman_string_t result= gearman_result_take_string(do_task_ptr->result_ptr);
197
*result_size= gearman_size(result);
198
returnable= const_cast<char *>(gearman_c_str(result));
206
else // gearman_client_run_tasks() was successful, but the task was not
208
gearman_error(client->universal, do_task_ptr->result_rc, "occured during gearman_client_run_tasks()");
210
*ret_ptr= do_task_ptr->result_rc;
214
gearman_task_free(&do_task);
215
client->new_tasks= 0;
216
client->running_tasks= 0;
222
Real background do function.
224
static gearman_return_t _client_do_background(gearman_client_st *client,
225
gearman_command_t command,
226
gearman_string_t &function,
227
gearman_unique_t &unique,
228
gearman_string_t &workload,
229
gearman_job_handle_t job_handle)
233
return GEARMAN_INVALID_ARGUMENT;
236
if (gearman_size(function) == 0)
238
return gearman_error(client->universal, GEARMAN_INVALID_ARGUMENT, "function argument was empty");
241
client->_do_handle[0]= 0; // Reset the job_handle we store in client
243
gearman_task_st do_task, *do_task_ptr;
244
do_task_ptr= add_task(*client, &do_task,
251
gearman_actions_do_default());
254
return gearman_universal_error_code(client->universal);
256
do_task_ptr->type= GEARMAN_TASK_KIND_DO;
258
gearman_return_t ret;
260
ret= gearman_client_run_tasks(client);
262
// If either of the following is ever true, we will end up in an
264
assert(ret != GEARMAN_IN_PROGRESS and ret != GEARMAN_JOB_EXISTS);
266
} while (gearman_continue(ret));
270
strncpy(job_handle, do_task.job_handle, GEARMAN_JOB_HANDLE_SIZE);
272
strncpy(client->_do_handle, do_task.job_handle, GEARMAN_JOB_HANDLE_SIZE);
273
client->new_tasks= 0;
274
client->running_tasks= 0;
275
gearman_task_free(&do_task);
285
gearman_client_st *gearman_client_create(gearman_client_st *client)
287
return _client_allocate(client, false);
290
gearman_client_st *gearman_client_clone(gearman_client_st *client,
291
const gearman_client_st *from)
295
return _client_allocate(client, false);
298
client= _client_allocate(client, true);
305
client->options.non_blocking= from->options.non_blocking;
306
client->options.unbuffered_result= from->options.unbuffered_result;
307
client->options.no_new= from->options.no_new;
308
client->options.free_tasks= from->options.free_tasks;
309
client->actions= from->actions;
310
client->_do_handle[0]= 0;
312
gearman_universal_clone(client->universal, from->universal);
317
void gearman_client_free(gearman_client_st *client)
324
gearman_client_task_free_all(client);
326
gearman_universal_free(client->universal);
328
if (client->options.allocated)
334
const char *gearman_client_error(const gearman_client_st *client)
341
return gearman_universal_error(client->universal);
344
int gearman_client_errno(const gearman_client_st *client)
351
return gearman_universal_errno(client->universal);
354
gearman_client_options_t gearman_client_options(const gearman_client_st *client)
357
memset(&options, 0, sizeof(int32_t));
359
if (client->options.allocated)
360
options|= int(GEARMAN_CLIENT_ALLOCATED);
362
if (client->options.non_blocking)
363
options|= int(GEARMAN_CLIENT_NON_BLOCKING);
365
if (client->options.unbuffered_result)
366
options|= int(GEARMAN_CLIENT_UNBUFFERED_RESULT);
368
if (client->options.no_new)
369
options|= int(GEARMAN_CLIENT_NO_NEW);
371
if (client->options.free_tasks)
372
options|= int(GEARMAN_CLIENT_FREE_TASKS);
374
return gearman_client_options_t(options);
377
bool gearman_client_has_option(gearman_client_st *client,
378
gearman_client_options_t option)
385
case GEARMAN_CLIENT_ALLOCATED:
386
return client->options.allocated;
388
case GEARMAN_CLIENT_NON_BLOCKING:
389
return client->options.non_blocking;
391
case GEARMAN_CLIENT_UNBUFFERED_RESULT:
392
return client->options.unbuffered_result;
394
case GEARMAN_CLIENT_NO_NEW:
395
return client->options.no_new;
397
case GEARMAN_CLIENT_FREE_TASKS:
398
return client->options.free_tasks;
401
case GEARMAN_CLIENT_TASK_IN_USE:
402
case GEARMAN_CLIENT_MAX:
407
void gearman_client_set_options(gearman_client_st *client,
408
gearman_client_options_t options)
413
gearman_client_options_t usable_options[]= {
414
GEARMAN_CLIENT_NON_BLOCKING,
415
GEARMAN_CLIENT_UNBUFFERED_RESULT,
416
GEARMAN_CLIENT_FREE_TASKS,
420
gearman_client_options_t *ptr;
423
for (ptr= usable_options; *ptr != GEARMAN_CLIENT_MAX ; ptr++)
427
gearman_client_add_options(client, *ptr);
431
gearman_client_remove_options(client, *ptr);
436
void gearman_client_add_options(gearman_client_st *client,
437
gearman_client_options_t options)
442
if (options & GEARMAN_CLIENT_NON_BLOCKING)
444
gearman_universal_add_options(client->universal, GEARMAN_NON_BLOCKING);
445
client->options.non_blocking= true;
448
if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
450
client->options.unbuffered_result= true;
453
if (options & GEARMAN_CLIENT_FREE_TASKS)
455
client->options.free_tasks= true;
459
void gearman_client_remove_options(gearman_client_st *client,
460
gearman_client_options_t options)
465
if (options & GEARMAN_CLIENT_NON_BLOCKING)
467
gearman_universal_remove_options(client->universal, GEARMAN_NON_BLOCKING);
468
client->options.non_blocking= false;
471
if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
473
client->options.unbuffered_result= false;
476
if (options & GEARMAN_CLIENT_FREE_TASKS)
478
client->options.free_tasks= false;
482
int gearman_client_timeout(gearman_client_st *client)
484
return gearman_universal_timeout(client->universal);
487
void gearman_client_set_timeout(gearman_client_st *client, int timeout)
492
gearman_universal_set_timeout(client->universal, timeout);
495
void *gearman_client_context(const gearman_client_st *client)
500
return const_cast<void *>(client->context);
503
void gearman_client_set_context(gearman_client_st *client, void *context)
508
client->context= context;
511
void gearman_client_set_log_fn(gearman_client_st *client,
512
gearman_log_fn *function, void *context,
513
gearman_verbose_t verbose)
518
gearman_set_log_fn(client->universal, function, context, verbose);
521
void gearman_client_set_workload_malloc_fn(gearman_client_st *client,
522
gearman_malloc_fn *function,
528
gearman_set_workload_malloc_fn(client->universal, function, context);
531
void gearman_client_set_workload_free_fn(gearman_client_st *client, gearman_free_fn *function, void *context)
536
gearman_set_workload_free_fn(client->universal, function, context);
539
gearman_return_t gearman_client_add_server(gearman_client_st *client,
540
const char *host, in_port_t port)
544
return GEARMAN_INVALID_ARGUMENT;
547
if (gearman_connection_create_args(client->universal, host, port) == false)
549
assert(client->universal.error.rc != GEARMAN_SUCCESS);
550
return gearman_universal_error_code(client->universal);
553
return GEARMAN_SUCCESS;
556
gearman_return_t gearman_client_add_servers(gearman_client_st *client,
559
return gearman_parse_servers(servers, _client_add_server, client);
562
void gearman_client_remove_servers(gearman_client_st *client)
569
gearman_free_all_cons(client->universal);
572
gearman_return_t gearman_client_wait(gearman_client_st *client)
576
return GEARMAN_INVALID_ARGUMENT;
579
return gearman_wait(client->universal);
582
void *gearman_client_do(gearman_client_st *client,
583
const char *function,
585
const void *workload,
586
size_t workload_size, size_t *result_size,
587
gearman_return_t *ret_ptr)
589
return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB,
592
workload, workload_size,
593
result_size, ret_ptr);
596
void *gearman_client_do_high(gearman_client_st *client,
597
const char *function,
599
const void *workload, size_t workload_size,
600
size_t *result_size, gearman_return_t *ret_ptr)
602
return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
605
workload, workload_size,
606
result_size, ret_ptr);
609
void *gearman_client_do_low(gearman_client_st *client,
610
const char *function,
612
const void *workload, size_t workload_size,
613
size_t *result_size, gearman_return_t *ret_ptr)
615
return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
618
workload, workload_size,
619
result_size, ret_ptr);
622
size_t gearman_client_count_tasks(gearman_client_st *client)
630
gearman_task_st *search= client->task_list;
632
while ((search= search->next))
641
static bool _active_tasks(gearman_client_st *client)
644
gearman_task_st *search= client->task_list;
651
if (gearman_task_is_active(search))
655
} while ((search= search->next));
661
const char *gearman_client_do_job_handle(gearman_client_st *self)
669
return self->_do_handle;
672
void gearman_client_do_status(gearman_client_st *, uint32_t *numerator, uint32_t *denominator)
681
gearman_return_t gearman_client_do_background(gearman_client_st *client,
682
const char *function_name,
684
const void *workload_str,
685
size_t workload_size,
686
gearman_job_handle_t job_handle)
688
gearman_string_t function= { gearman_string_param_cstr(function_name) };
689
gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
690
gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
692
return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_BG,
699
gearman_return_t gearman_client_do_high_background(gearman_client_st *client,
700
const char *function_name,
702
const void *workload_str,
703
size_t workload_size,
704
gearman_job_handle_t job_handle)
706
gearman_string_t function= { gearman_string_param_cstr(function_name) };
707
gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
708
gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
710
return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
717
gearman_return_t gearman_client_do_low_background(gearman_client_st *client,
718
const char *function_name,
720
const void *workload_str,
721
size_t workload_size,
722
gearman_job_handle_t job_handle)
724
gearman_string_t function= { gearman_string_param_cstr(function_name) };
725
gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
726
gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
728
return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
735
gearman_return_t gearman_client_job_status(gearman_client_st *client,
736
const gearman_job_handle_t job_handle,
737
bool *is_known, bool *is_running,
739
uint32_t *denominator)
741
gearman_return_t ret;
745
return GEARMAN_INVALID_ARGUMENT;
748
gearman_task_st do_task;
749
gearman_task_st *do_task_ptr= gearman_client_add_task_status(client, &do_task, client,
751
if (gearman_failed(ret))
756
do_task_ptr->type= GEARMAN_TASK_KIND_DO;
758
gearman_task_clear_fn(do_task_ptr);
761
ret= gearman_client_run_tasks(client);
763
// If either of the following is ever true, we will end up in an
765
assert(ret != GEARMAN_IN_PROGRESS and ret != GEARMAN_JOB_EXISTS);
767
} while (gearman_continue(ret));
769
// @note we don't know if our task was run or not, we just know something
772
if (gearman_success(ret))
775
*is_known= do_task.options.is_known;
778
*is_running= do_task.options.is_running;
781
*numerator= do_task.numerator;
784
*denominator= do_task.denominator;
786
if (not is_known and not is_running)
788
if (do_task.options.is_running)
790
ret= GEARMAN_IN_PROGRESS;
792
else if (do_task.options.is_known)
794
ret= GEARMAN_JOB_EXISTS;
814
gearman_task_free(do_task_ptr);
819
gearman_return_t gearman_client_echo(gearman_client_st *client,
820
const void *workload,
821
size_t workload_size)
825
return GEARMAN_INVALID_ARGUMENT;
828
return gearman_echo(client->universal, workload, workload_size);
831
void gearman_client_task_free_all(gearman_client_st *client)
838
while (client->task_list)
840
gearman_task_free(client->task_list);
844
void gearman_client_set_task_context_free_fn(gearman_client_st *client,
845
gearman_task_context_free_fn *function)
852
client->task_context_free_fn= function;
856
gearman_return_t gearman_client_set_memory_allocators(gearman_client_st *client,
857
gearman_malloc_fn *malloc_fn,
858
gearman_free_fn *free_fn,
859
gearman_realloc_fn *realloc_fn,
860
gearman_calloc_fn *calloc_fn,
865
return GEARMAN_INVALID_ARGUMENT;
868
return gearman_set_memory_allocator(client->universal.allocator, malloc_fn, free_fn, realloc_fn, calloc_fn, context);
873
gearman_task_st *gearman_client_add_task(gearman_client_st *client,
874
gearman_task_st *task,
876
const char *function,
878
const void *workload, size_t workload_size,
879
gearman_return_t *ret_ptr)
881
gearman_return_t unused;
889
*ret_ptr= GEARMAN_INVALID_ARGUMENT;
893
return add_task(*client, task,
894
context, GEARMAN_COMMAND_SUBMIT_JOB,
897
workload, workload_size,
903
gearman_task_st *gearman_client_add_task_high(gearman_client_st *client,
904
gearman_task_st *task,
906
const char *function,
908
const void *workload, size_t workload_size,
909
gearman_return_t *ret_ptr)
911
gearman_return_t unused;
919
*ret_ptr= GEARMAN_INVALID_ARGUMENT;
923
return add_task(*client, task, context,
924
GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
927
workload, workload_size,
933
gearman_task_st *gearman_client_add_task_low(gearman_client_st *client,
934
gearman_task_st *task,
936
const char *function,
938
const void *workload, size_t workload_size,
939
gearman_return_t *ret_ptr)
941
gearman_return_t unused;
949
*ret_ptr= GEARMAN_INVALID_ARGUMENT;
953
return add_task(*client, task, context, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
956
workload, workload_size,
962
gearman_task_st *gearman_client_add_task_background(gearman_client_st *client,
963
gearman_task_st *task,
965
const char *function,
967
const void *workload, size_t workload_size,
968
gearman_return_t *ret_ptr)
970
gearman_return_t unused;
978
*ret_ptr= GEARMAN_INVALID_ARGUMENT;
982
return add_task(*client, task, context, GEARMAN_COMMAND_SUBMIT_JOB_BG,
985
workload, workload_size,
992
gearman_client_add_task_high_background(gearman_client_st *client,
993
gearman_task_st *task,
995
const char *function,
997
const void *workload, size_t workload_size,
998
gearman_return_t *ret_ptr)
1000
gearman_return_t unused;
1001
if (ret_ptr == NULL)
1008
*ret_ptr= GEARMAN_INVALID_ARGUMENT;
1012
return add_task(*client, task, context,
1013
GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
1016
workload, workload_size,
1023
gearman_client_add_task_low_background(gearman_client_st *client,
1024
gearman_task_st *task,
1026
const char *function,
1028
const void *workload, size_t workload_size,
1029
gearman_return_t *ret_ptr)
1031
gearman_return_t unused;
1032
if (ret_ptr == NULL)
1039
*ret_ptr= GEARMAN_INVALID_ARGUMENT;
1043
return add_task(*client, task, context,
1044
GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
1047
workload, workload_size,
1054
gearman_task_st *gearman_client_add_task_status(gearman_client_st *client,
1055
gearman_task_st *task,
1057
const gearman_job_handle_t job_handle,
1058
gearman_return_t *ret_ptr)
1060
const void *args[1];
1061
size_t args_size[1];
1063
gearman_return_t unused;
1064
if (ret_ptr == NULL)
1069
if (not (task= gearman_task_internal_create(client, task)))
1071
*ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
1075
task->context= context;
1076
snprintf(task->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%s", job_handle);
1078
args[0]= job_handle;
1079
args_size[0]= strlen(job_handle);
1080
gearman_return_t rc= gearman_packet_create_args(client->universal, task->send,
1081
GEARMAN_MAGIC_REQUEST,
1082
GEARMAN_COMMAND_GET_STATUS,
1083
args, args_size, 1);
1084
if (gearman_success(rc))
1086
client->new_tasks++;
1087
client->running_tasks++;
1088
task->options.send_in_use= true;
1095
void gearman_client_set_workload_fn(gearman_client_st *client,
1096
gearman_workload_fn *function)
1103
client->actions.workload_fn= function;
1106
void gearman_client_set_created_fn(gearman_client_st *client,
1107
gearman_created_fn *function)
1114
client->actions.created_fn= function;
1117
void gearman_client_set_data_fn(gearman_client_st *client,
1118
gearman_data_fn *function)
1125
client->actions.data_fn= function;
1128
void gearman_client_set_warning_fn(gearman_client_st *client,
1129
gearman_warning_fn *function)
1136
client->actions.warning_fn= function;
1139
void gearman_client_set_status_fn(gearman_client_st *client,
1140
gearman_universal_status_fn *function)
1147
client->actions.status_fn= function;
1150
void gearman_client_set_complete_fn(gearman_client_st *client,
1151
gearman_complete_fn *function)
1158
client->actions.complete_fn= function;
1161
void gearman_client_set_exception_fn(gearman_client_st *client,
1162
gearman_exception_fn *function)
1169
client->actions.exception_fn= function;
1172
void gearman_client_set_fail_fn(gearman_client_st *client,
1173
gearman_fail_fn *function)
1180
client->actions.fail_fn= function;
1183
void gearman_client_clear_fn(gearman_client_st *client)
1190
client->actions= gearman_actions_default();
1193
static inline void _push_non_blocking(gearman_client_st *client)
1195
client->universal.options.stored_non_blocking= client->universal.options.non_blocking;
1196
client->universal.options.non_blocking= true;
1199
static inline void _pop_non_blocking(gearman_client_st *client)
1201
client->universal.options.non_blocking= client->options.non_blocking;
1202
assert(client->universal.options.stored_non_blocking == client->options.non_blocking);
1205
static inline void _push_blocking(gearman_client_st *client)
1207
client->universal.options.stored_non_blocking= client->universal.options.non_blocking;
1208
client->universal.options.non_blocking= false;
1211
static inline void _pop_blocking(gearman_client_st *client)
1213
client->universal.options.non_blocking= client->options.non_blocking;
1214
assert(client->universal.options.stored_non_blocking == client->options.non_blocking);
1217
static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
1219
gearman_return_t ret= GEARMAN_MAX_RETURN;
1221
switch(client->state)
1223
case GEARMAN_CLIENT_STATE_IDLE:
1226
/* Start any new tasks. */
1227
if (client->new_tasks > 0 && ! (client->options.no_new))
1229
for (client->task= client->task_list; client->task;
1230
client->task= client->task->next)
1232
if (client->task->state != GEARMAN_TASK_STATE_NEW)
1237
case GEARMAN_CLIENT_STATE_NEW:
1238
gearman_return_t local_ret= _client_run_task(client, client->task);
1239
if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
1241
client->state= GEARMAN_CLIENT_STATE_NEW;
1247
if (client->new_tasks == 0)
1249
gearman_return_t local_ret= gearman_flush_all(client->universal);
1250
if (gearman_failed(local_ret))
1257
/* See if there are any connections ready for I/O. */
1258
while ((client->con= gearman_ready(client->universal)))
1260
if (client->con->revents & (POLLOUT | POLLERR | POLLHUP | POLLNVAL))
1262
/* Socket is ready for writing, continue submitting jobs. */
1263
for (client->task= client->task_list; client->task;
1264
client->task= client->task->next)
1266
if (client->task->con != client->con or
1267
(client->task->state != GEARMAN_TASK_STATE_SUBMIT and
1268
client->task->state != GEARMAN_TASK_STATE_WORKLOAD))
1273
case GEARMAN_CLIENT_STATE_SUBMIT:
1274
gearman_return_t local_ret= _client_run_task(client, client->task);
1275
if (local_ret == GEARMAN_COULD_NOT_CONNECT)
1277
client->state= GEARMAN_CLIENT_STATE_IDLE;
1280
else if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
1282
client->state= GEARMAN_CLIENT_STATE_SUBMIT;
1288
if (not (client->con->revents & POLLIN))
1291
/* Socket is ready for reading. */
1294
/* Read packet on connection and find which task it belongs to. */
1295
if (client->options.unbuffered_result)
1297
/* If client is handling the data read, make sure it's complete. */
1298
if (client->con->recv_state == GEARMAN_CON_RECV_STATE_READ_DATA)
1300
for (client->task= client->task_list; client->task;
1301
client->task= client->task->next)
1303
if (client->task->con == client->con &&
1304
(client->task->state == GEARMAN_TASK_STATE_DATA or
1305
client->task->state == GEARMAN_TASK_STATE_COMPLETE))
1312
Someone has set GEARMAN_CLIENT_UNBUFFERED_RESULT but hasn't setup the client to fetch data correctly.
1315
return gearman_universal_set_error(client->universal, GEARMAN_INVALID_ARGUMENT, AT,
1316
"client created with GEARMAN_CLIENT_UNBUFFERED_RESULT, but was not setup to use it. %s", __func__);
1320
/* Read the next packet, without buffering the data part. */
1322
(void)client->con->receiving(client->con->_packet, ret, false);
1327
/* Read the next packet, buffering the data part. */
1329
(void)client->con->receiving(client->con->_packet, ret, true);
1332
if (client->task == NULL)
1334
assert(ret != GEARMAN_MAX_RETURN);
1336
/* Check the return of the gearman_connection_recv() calls above. */
1337
if (gearman_failed(ret))
1339
if (ret == GEARMAN_IO_WAIT)
1342
client->state= GEARMAN_CLIENT_STATE_IDLE;
1346
client->con->options.packet_in_use= true;
1348
/* We have a packet, see which task it belongs to. */
1349
for (client->task= client->task_list; client->task;
1350
client->task= client->task->next)
1352
if (client->task->con != client->con)
1355
if (client->con->_packet.command == GEARMAN_COMMAND_JOB_CREATED)
1357
if (client->task->created_id != client->con->created_id)
1360
/* New job created, drop through below and notify task. */
1361
client->con->created_id++;
1363
else if (client->con->_packet.command == GEARMAN_COMMAND_ERROR)
1365
gearman_universal_set_error(client->universal, GEARMAN_SERVER_ERROR, AT,
1367
static_cast<char *>(client->con->_packet.arg[0]),
1368
int(client->con->_packet.arg_size[1]),
1369
static_cast<char *>(client->con->_packet.arg[1]));
1371
return GEARMAN_SERVER_ERROR;
1373
else if (strncmp(client->task->job_handle,
1374
static_cast<char *>(client->con->_packet.arg[0]),
1375
client->con->_packet.arg_size[0]) ||
1376
(client->con->_packet.command != GEARMAN_COMMAND_WORK_FAIL &&
1377
strlen(client->task->job_handle) != client->con->_packet.arg_size[0] - 1) ||
1378
(client->con->_packet.command == GEARMAN_COMMAND_WORK_FAIL &&
1379
strlen(client->task->job_handle) != client->con->_packet.arg_size[0]))
1384
/* Else, we have a matching result packet of some kind. */
1389
if (not client->task)
1391
/* The client has stopped waiting for the response, ignore it. */
1392
client->con->free_private_packet();
1396
client->task->recv= &(client->con->_packet);
1399
case GEARMAN_CLIENT_STATE_PACKET:
1400
/* Let task process job created or result packet. */
1401
gearman_return_t local_ret= _client_run_task(client, client->task);
1403
if (local_ret == GEARMAN_IO_WAIT)
1406
if (gearman_failed(local_ret))
1408
client->state= GEARMAN_CLIENT_STATE_PACKET;
1412
/* Clean up the packet. */
1413
client->con->free_private_packet();
1415
/* If all tasks are done, return. */
1416
if (client->running_tasks == 0)
1421
/* If all tasks are done, return. */
1422
if (client->running_tasks == 0)
1427
if (client->new_tasks > 0 && ! (client->options.no_new))
1430
if (client->options.non_blocking)
1432
/* Let the caller wait for activity. */
1433
client->state= GEARMAN_CLIENT_STATE_IDLE;
1434
gearman_gerror(client->universal, GEARMAN_IO_WAIT);
1436
return GEARMAN_IO_WAIT;
1439
/* Wait for activity on one of the connections. */
1440
gearman_return_t local_ret= gearman_wait(client->universal);
1441
if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
1443
client->state= GEARMAN_CLIENT_STATE_IDLE;
1452
client->state= GEARMAN_CLIENT_STATE_IDLE;
1454
return GEARMAN_SUCCESS;
1457
gearman_return_t gearman_client_run_tasks(gearman_client_st *client)
1461
return GEARMAN_INVALID_ARGUMENT;
1464
if (not client->task_list) // We are immediatly successful if all tasks are completed
1466
return GEARMAN_SUCCESS;
1470
_push_non_blocking(client);
1472
gearman_return_t rc= _client_run_tasks(client);
1474
_pop_non_blocking(client);
1476
if (rc == GEARMAN_COULD_NOT_CONNECT)
1478
gearman_reset(client->universal);
1484
gearman_return_t gearman_client_run_block_tasks(gearman_client_st *client)
1488
return GEARMAN_INVALID_ARGUMENT;
1491
if (not client->task_list) // We are immediatly successful if all tasks are completed
1493
return GEARMAN_SUCCESS;
1497
_push_blocking(client);
1499
gearman_return_t rc= _client_run_tasks(client);
1501
_pop_blocking(client);
1503
if (gearman_failed(rc))
1505
if (rc == GEARMAN_COULD_NOT_CONNECT)
1507
gearman_reset(client->universal);
1510
assert(gearman_universal_error_code(client->universal) == rc);
1517
* Static Definitions
1520
bool gearman_client_compare(const gearman_client_st *first, const gearman_client_st *second)
1522
if (first == NULL or second == NULL)
1527
if (strcmp(first->universal.con_list->host, second->universal.con_list->host))
1532
if (first->universal.con_list->port != second->universal.con_list->port)
1540
bool gearman_client_set_server_option(gearman_client_st *self, const char *option_arg, size_t option_arg_size)
1547
gearman_string_t option= { option_arg, option_arg_size };
1549
return gearman_request_option(self->universal, option);
1552
void gearman_client_set_namespace(gearman_client_st *self, const char *namespace_key, size_t namespace_key_size)
1559
gearman_universal_set_namespace(self->universal, namespace_key, namespace_key_size);