1
/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
* Gearmand client and server library.
5
* Copyright (C) 2011 Data Differential, http://datadifferential.com/
6
* Copyright (C) 2008 Brian Aker, Eric Day
9
* Redistribution and use in source and binary forms, with or without
10
* modification, are permitted provided that the following conditions are
13
* * Redistributions of source code must retain the above copyright
14
* notice, this list of conditions and the following disclaimer.
16
* * Redistributions in binary form must reproduce the above
17
* copyright notice, this list of conditions and the following disclaimer
18
* in the documentation and/or other materials provided with the
21
* * The names of its contributors may not be used to endorse or
22
* promote products derived from this software without specific prior
25
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40
#include <libgearman/common.h>
42
#include <arpa/inet.h>
50
#include <netinet/in.h>
51
#include <sys/socket.h>
55
Allocate a client structure.
57
static gearman_client_st *_client_allocate(gearman_client_st *client, bool is_clone)
61
client->options.allocated= false;
65
client= new (std::nothrow) gearman_client_st;
69
client->options.allocated= true;
72
client->options.non_blocking= false;
73
client->options.unbuffered_result= false;
74
client->options.no_new= false;
75
client->options.free_tasks= false;
77
client->state= GEARMAN_CLIENT_STATE_IDLE;
79
client->running_tasks= 0;
80
client->task_count= 0;
81
client->context= NULL;
84
client->task_list= NULL;
85
client->task_context_free_fn= NULL;
86
gearman_client_clear_fn(client);
90
gearman_universal_initialize(client->universal);
97
* Callback function used when parsing server lists.
99
static gearman_return_t _client_add_server(const char *host, in_port_t port,
102
return gearman_client_add_server(static_cast<gearman_client_st *>(context), host, port);
109
static void *_client_do(gearman_client_st *client, gearman_command_t command,
110
const char *function_name,
112
const void *workload_str, size_t workload_size,
113
size_t *result_size, gearman_return_t *ret_ptr)
115
gearman_return_t unused;
121
universal_reset_error(client->universal);
124
if (result_size == NULL)
126
result_size= &unused_size;
132
*ret_ptr= GEARMAN_INVALID_ARGUMENT;
136
gearman_string_t function= { gearman_string_param_cstr(function_name) };
137
gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
138
gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
141
gearman_task_st do_task;
142
gearman_task_st *do_task_ptr= add_task(*client, &do_task, NULL, command,
147
gearman_actions_do_default());
148
if (do_task_ptr == NULL)
150
*ret_ptr= gearman_universal_error_code(client->universal);
153
do_task_ptr->type= GEARMAN_TASK_KIND_DO;
155
gearman_return_t ret;
157
ret= gearman_client_run_tasks(client);
158
} while (gearman_continue(ret));
160
// gearman_client_run_tasks failed
161
assert(client->task_list); // Programmer error, we should always have the task that we used for do
163
char *returnable= NULL;
164
if (gearman_failed(ret))
166
if (ret == GEARMAN_COULD_NOT_CONNECT)
170
gearman_error(client->universal, ret, "occured during gearman_client_run_tasks()");
176
else if (gearman_success(ret) and do_task_ptr->result_rc == GEARMAN_SUCCESS)
178
*ret_ptr= do_task_ptr->result_rc;
179
if (do_task_ptr->result_ptr)
181
if (gearman_has_allocator(client->universal))
183
gearman_string_t result= gearman_result_string(do_task_ptr->result_ptr);
184
returnable= static_cast<char *>(gearman_malloc(client->universal, gearman_size(result) +1));
187
gearman_error(client->universal, GEARMAN_MEMORY_ALLOCATION_FAILURE, "custom workload_fn failed to allocate memory");
190
else // NULL terminate
192
memcpy(returnable, gearman_c_str(result), gearman_size(result));
193
returnable[gearman_size(result)]= 0;
194
*result_size= gearman_size(result);
199
gearman_string_t result= gearman_result_take_string(do_task_ptr->result_ptr);
200
*result_size= gearman_size(result);
201
returnable= const_cast<char *>(gearman_c_str(result));
209
else // gearman_client_run_tasks() was successful, but the task was not
211
gearman_error(client->universal, do_task_ptr->result_rc, "occured during gearman_client_run_tasks()");
213
*ret_ptr= do_task_ptr->result_rc;
217
gearman_task_free(&do_task);
218
client->new_tasks= 0;
219
client->running_tasks= 0;
225
Real background do function.
227
static gearman_return_t _client_do_background(gearman_client_st *client,
228
gearman_command_t command,
229
gearman_string_t &function,
230
gearman_unique_t &unique,
231
gearman_string_t &workload,
232
gearman_job_handle_t job_handle)
236
return GEARMAN_INVALID_ARGUMENT;
239
universal_reset_error(client->universal);
241
if (gearman_size(function) == 0)
243
return gearman_error(client->universal, GEARMAN_INVALID_ARGUMENT, "function arguement was empty");
246
client->_do_handle[0]= 0; // Reset the job_handle we store in client
248
gearman_task_st do_task, *do_task_ptr;
249
do_task_ptr= add_task(*client, &do_task,
256
gearman_actions_do_default());
259
return gearman_universal_error_code(client->universal);
261
do_task_ptr->type= GEARMAN_TASK_KIND_DO;
263
gearman_return_t ret;
265
ret= gearman_client_run_tasks(client);
267
// If either of the following is ever true, we will end up in an
269
assert(ret != GEARMAN_IN_PROGRESS and ret != GEARMAN_JOB_EXISTS);
271
} while (gearman_continue(ret));
275
strncpy(job_handle, do_task.job_handle, GEARMAN_JOB_HANDLE_SIZE);
277
strncpy(client->_do_handle, do_task.job_handle, GEARMAN_JOB_HANDLE_SIZE);
278
client->new_tasks= 0;
279
client->running_tasks= 0;
280
gearman_task_free(&do_task);
290
gearman_client_st *gearman_client_create(gearman_client_st *client)
292
return _client_allocate(client, false);
295
gearman_client_st *gearman_client_clone(gearman_client_st *client,
296
const gearman_client_st *from)
300
return _client_allocate(client, false);
303
client= _client_allocate(client, true);
310
client->options.non_blocking= from->options.non_blocking;
311
client->options.unbuffered_result= from->options.unbuffered_result;
312
client->options.no_new= from->options.no_new;
313
client->options.free_tasks= from->options.free_tasks;
314
client->actions= from->actions;
315
client->_do_handle[0]= 0;
317
gearman_universal_clone(client->universal, from->universal);
322
void gearman_client_free(gearman_client_st *client)
329
gearman_client_task_free_all(client);
331
gearman_universal_free(client->universal);
333
if (client->options.allocated)
339
const char *gearman_client_error(const gearman_client_st *client)
346
return gearman_universal_error(client->universal);
349
int gearman_client_errno(const gearman_client_st *client)
356
return gearman_universal_errno(client->universal);
359
gearman_client_options_t gearman_client_options(const gearman_client_st *client)
362
memset(&options, 0, sizeof(int32_t));
364
if (client->options.allocated)
365
options|= int(GEARMAN_CLIENT_ALLOCATED);
367
if (client->options.non_blocking)
368
options|= int(GEARMAN_CLIENT_NON_BLOCKING);
370
if (client->options.unbuffered_result)
371
options|= int(GEARMAN_CLIENT_UNBUFFERED_RESULT);
373
if (client->options.no_new)
374
options|= int(GEARMAN_CLIENT_NO_NEW);
376
if (client->options.free_tasks)
377
options|= int(GEARMAN_CLIENT_FREE_TASKS);
379
return gearman_client_options_t(options);
382
bool gearman_client_has_option(gearman_client_st *client,
383
gearman_client_options_t option)
390
case GEARMAN_CLIENT_ALLOCATED:
391
return client->options.allocated;
393
case GEARMAN_CLIENT_NON_BLOCKING:
394
return client->options.non_blocking;
396
case GEARMAN_CLIENT_UNBUFFERED_RESULT:
397
return client->options.unbuffered_result;
399
case GEARMAN_CLIENT_NO_NEW:
400
return client->options.no_new;
402
case GEARMAN_CLIENT_FREE_TASKS:
403
return client->options.free_tasks;
406
case GEARMAN_CLIENT_TASK_IN_USE:
407
case GEARMAN_CLIENT_MAX:
412
void gearman_client_set_options(gearman_client_st *client,
413
gearman_client_options_t options)
418
gearman_client_options_t usable_options[]= {
419
GEARMAN_CLIENT_NON_BLOCKING,
420
GEARMAN_CLIENT_UNBUFFERED_RESULT,
421
GEARMAN_CLIENT_FREE_TASKS,
425
gearman_client_options_t *ptr;
428
for (ptr= usable_options; *ptr != GEARMAN_CLIENT_MAX ; ptr++)
432
gearman_client_add_options(client, *ptr);
436
gearman_client_remove_options(client, *ptr);
441
void gearman_client_add_options(gearman_client_st *client,
442
gearman_client_options_t options)
447
if (options & GEARMAN_CLIENT_NON_BLOCKING)
449
gearman_universal_add_options(client->universal, GEARMAN_NON_BLOCKING);
450
client->options.non_blocking= true;
453
if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
455
client->options.unbuffered_result= true;
458
if (options & GEARMAN_CLIENT_FREE_TASKS)
460
client->options.free_tasks= true;
464
void gearman_client_remove_options(gearman_client_st *client,
465
gearman_client_options_t options)
470
if (options & GEARMAN_CLIENT_NON_BLOCKING)
472
gearman_universal_remove_options(client->universal, GEARMAN_NON_BLOCKING);
473
client->options.non_blocking= false;
476
if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
478
client->options.unbuffered_result= false;
481
if (options & GEARMAN_CLIENT_FREE_TASKS)
483
client->options.free_tasks= false;
487
int gearman_client_timeout(gearman_client_st *client)
489
return gearman_universal_timeout(client->universal);
492
void gearman_client_set_timeout(gearman_client_st *client, int timeout)
497
gearman_universal_set_timeout(client->universal, timeout);
500
void *gearman_client_context(const gearman_client_st *client)
505
return const_cast<void *>(client->context);
508
void gearman_client_set_context(gearman_client_st *client, void *context)
513
client->context= context;
516
void gearman_client_set_log_fn(gearman_client_st *client,
517
gearman_log_fn *function, void *context,
518
gearman_verbose_t verbose)
523
gearman_set_log_fn(client->universal, function, context, verbose);
526
void gearman_client_set_workload_malloc_fn(gearman_client_st *client,
527
gearman_malloc_fn *function,
533
gearman_set_workload_malloc_fn(client->universal, function, context);
536
void gearman_client_set_workload_free_fn(gearman_client_st *client, gearman_free_fn *function, void *context)
541
gearman_set_workload_free_fn(client->universal, function, context);
544
gearman_return_t gearman_client_add_server(gearman_client_st *client,
545
const char *host, in_port_t port)
549
return GEARMAN_INVALID_ARGUMENT;
552
if (gearman_connection_create_args(client->universal, host, port) == false)
554
assert(client->universal.error.rc != GEARMAN_SUCCESS);
555
return gearman_universal_error_code(client->universal);
558
return GEARMAN_SUCCESS;
561
gearman_return_t gearman_client_add_servers(gearman_client_st *client,
564
return gearman_parse_servers(servers, _client_add_server, client);
567
void gearman_client_remove_servers(gearman_client_st *client)
574
gearman_free_all_cons(client->universal);
577
gearman_return_t gearman_client_wait(gearman_client_st *client)
581
return GEARMAN_INVALID_ARGUMENT;
584
return gearman_wait(client->universal);
587
void *gearman_client_do(gearman_client_st *client,
588
const char *function,
590
const void *workload,
591
size_t workload_size, size_t *result_size,
592
gearman_return_t *ret_ptr)
594
return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB,
597
workload, workload_size,
598
result_size, ret_ptr);
601
void *gearman_client_do_high(gearman_client_st *client,
602
const char *function,
604
const void *workload, size_t workload_size,
605
size_t *result_size, gearman_return_t *ret_ptr)
607
return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
610
workload, workload_size,
611
result_size, ret_ptr);
614
void *gearman_client_do_low(gearman_client_st *client,
615
const char *function,
617
const void *workload, size_t workload_size,
618
size_t *result_size, gearman_return_t *ret_ptr)
620
return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
623
workload, workload_size,
624
result_size, ret_ptr);
627
size_t gearman_client_count_tasks(gearman_client_st *client)
635
gearman_task_st *search= client->task_list;
637
while ((search= search->next))
646
static bool _active_tasks(gearman_client_st *client)
649
gearman_task_st *search= client->task_list;
656
if (gearman_task_is_active(search))
660
} while ((search= search->next));
666
const char *gearman_client_do_job_handle(gearman_client_st *self)
674
return self->_do_handle;
677
void gearman_client_do_status(gearman_client_st *, uint32_t *numerator, uint32_t *denominator)
686
gearman_return_t gearman_client_do_background(gearman_client_st *client,
687
const char *function_name,
689
const void *workload_str,
690
size_t workload_size,
691
gearman_job_handle_t job_handle)
693
gearman_string_t function= { gearman_string_param_cstr(function_name) };
694
gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
695
gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
697
return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_BG,
704
gearman_return_t gearman_client_do_high_background(gearman_client_st *client,
705
const char *function_name,
707
const void *workload_str,
708
size_t workload_size,
709
gearman_job_handle_t job_handle)
711
gearman_string_t function= { gearman_string_param_cstr(function_name) };
712
gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
713
gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
715
return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
722
gearman_return_t gearman_client_do_low_background(gearman_client_st *client,
723
const char *function_name,
725
const void *workload_str,
726
size_t workload_size,
727
gearman_job_handle_t job_handle)
729
gearman_string_t function= { gearman_string_param_cstr(function_name) };
730
gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
731
gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
733
return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
740
gearman_return_t gearman_client_job_status(gearman_client_st *client,
741
const gearman_job_handle_t job_handle,
742
bool *is_known, bool *is_running,
744
uint32_t *denominator)
746
gearman_return_t ret;
750
return GEARMAN_INVALID_ARGUMENT;
753
universal_reset_error(client->universal);
755
gearman_task_st do_task;
756
gearman_task_st *do_task_ptr= gearman_client_add_task_status(client, &do_task, client,
758
if (gearman_failed(ret))
763
do_task_ptr->type= GEARMAN_TASK_KIND_DO;
765
gearman_task_clear_fn(do_task_ptr);
768
ret= gearman_client_run_tasks(client);
770
// If either of the following is ever true, we will end up in an
772
assert(ret != GEARMAN_IN_PROGRESS and ret != GEARMAN_JOB_EXISTS);
774
} while (gearman_continue(ret));
776
// @note we don't know if our task was run or not, we just know something
779
if (gearman_success(ret))
782
*is_known= do_task.options.is_known;
785
*is_running= do_task.options.is_running;
788
*numerator= do_task.numerator;
791
*denominator= do_task.denominator;
793
if (not is_known and not is_running)
795
if (do_task.options.is_running)
797
ret= GEARMAN_IN_PROGRESS;
799
else if (do_task.options.is_known)
801
ret= GEARMAN_JOB_EXISTS;
821
gearman_task_free(do_task_ptr);
826
gearman_return_t gearman_client_echo(gearman_client_st *client,
827
const void *workload,
828
size_t workload_size)
832
return GEARMAN_INVALID_ARGUMENT;
835
return gearman_echo(client->universal, workload, workload_size);
838
void gearman_client_task_free_all(gearman_client_st *client)
845
while (client->task_list)
847
gearman_task_free(client->task_list);
851
void gearman_client_set_task_context_free_fn(gearman_client_st *client,
852
gearman_task_context_free_fn *function)
859
client->task_context_free_fn= function;
863
gearman_return_t gearman_client_set_memory_allocators(gearman_client_st *client,
864
gearman_malloc_fn *malloc_fn,
865
gearman_free_fn *free_fn,
866
gearman_realloc_fn *realloc_fn,
867
gearman_calloc_fn *calloc_fn,
872
return GEARMAN_INVALID_ARGUMENT;
875
return gearman_set_memory_allocator(client->universal.allocator, malloc_fn, free_fn, realloc_fn, calloc_fn, context);
880
gearman_task_st *gearman_client_add_task(gearman_client_st *client,
881
gearman_task_st *task,
883
const char *function,
885
const void *workload, size_t workload_size,
886
gearman_return_t *ret_ptr)
888
gearman_return_t unused;
896
*ret_ptr= GEARMAN_INVALID_ARGUMENT;
900
return add_task(*client, task,
901
context, GEARMAN_COMMAND_SUBMIT_JOB,
904
workload, workload_size,
910
gearman_task_st *gearman_client_add_task_high(gearman_client_st *client,
911
gearman_task_st *task,
913
const char *function,
915
const void *workload, size_t workload_size,
916
gearman_return_t *ret_ptr)
918
gearman_return_t unused;
926
*ret_ptr= GEARMAN_INVALID_ARGUMENT;
930
return add_task(*client, task, context,
931
GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
934
workload, workload_size,
940
gearman_task_st *gearman_client_add_task_low(gearman_client_st *client,
941
gearman_task_st *task,
943
const char *function,
945
const void *workload, size_t workload_size,
946
gearman_return_t *ret_ptr)
948
gearman_return_t unused;
956
*ret_ptr= GEARMAN_INVALID_ARGUMENT;
960
return add_task(*client, task, context, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
963
workload, workload_size,
969
gearman_task_st *gearman_client_add_task_background(gearman_client_st *client,
970
gearman_task_st *task,
972
const char *function,
974
const void *workload, size_t workload_size,
975
gearman_return_t *ret_ptr)
977
gearman_return_t unused;
985
*ret_ptr= GEARMAN_INVALID_ARGUMENT;
989
return add_task(*client, task, context, GEARMAN_COMMAND_SUBMIT_JOB_BG,
992
workload, workload_size,
999
gearman_client_add_task_high_background(gearman_client_st *client,
1000
gearman_task_st *task,
1002
const char *function,
1004
const void *workload, size_t workload_size,
1005
gearman_return_t *ret_ptr)
1007
gearman_return_t unused;
1008
if (ret_ptr == NULL)
1015
*ret_ptr= GEARMAN_INVALID_ARGUMENT;
1019
return add_task(*client, task, context,
1020
GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
1023
workload, workload_size,
1030
gearman_client_add_task_low_background(gearman_client_st *client,
1031
gearman_task_st *task,
1033
const char *function,
1035
const void *workload, size_t workload_size,
1036
gearman_return_t *ret_ptr)
1038
gearman_return_t unused;
1039
if (ret_ptr == NULL)
1046
*ret_ptr= GEARMAN_INVALID_ARGUMENT;
1050
return add_task(*client, task, context,
1051
GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
1054
workload, workload_size,
1061
gearman_task_st *gearman_client_add_task_status(gearman_client_st *client,
1062
gearman_task_st *task,
1064
const gearman_job_handle_t job_handle,
1065
gearman_return_t *ret_ptr)
1067
const void *args[1];
1068
size_t args_size[1];
1070
gearman_return_t unused;
1071
if (ret_ptr == NULL)
1078
*ret_ptr= GEARMAN_INVALID_ARGUMENT;
1082
if ((task= gearman_task_internal_create(client, task)) == NULL)
1084
*ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
1088
task->context= context;
1089
snprintf(task->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%s", job_handle);
1091
args[0]= job_handle;
1092
args_size[0]= strlen(job_handle);
1093
gearman_return_t rc= gearman_packet_create_args(client->universal, task->send,
1094
GEARMAN_MAGIC_REQUEST,
1095
GEARMAN_COMMAND_GET_STATUS,
1096
args, args_size, 1);
1097
if (gearman_success(rc))
1099
client->new_tasks++;
1100
client->running_tasks++;
1101
task->options.send_in_use= true;
1108
void gearman_client_set_workload_fn(gearman_client_st *client,
1109
gearman_workload_fn *function)
1116
client->actions.workload_fn= function;
1119
void gearman_client_set_created_fn(gearman_client_st *client,
1120
gearman_created_fn *function)
1127
client->actions.created_fn= function;
1130
void gearman_client_set_data_fn(gearman_client_st *client,
1131
gearman_data_fn *function)
1138
client->actions.data_fn= function;
1141
void gearman_client_set_warning_fn(gearman_client_st *client,
1142
gearman_warning_fn *function)
1149
client->actions.warning_fn= function;
1152
void gearman_client_set_status_fn(gearman_client_st *client,
1153
gearman_universal_status_fn *function)
1160
client->actions.status_fn= function;
1163
void gearman_client_set_complete_fn(gearman_client_st *client,
1164
gearman_complete_fn *function)
1171
client->actions.complete_fn= function;
1174
void gearman_client_set_exception_fn(gearman_client_st *client,
1175
gearman_exception_fn *function)
1182
client->actions.exception_fn= function;
1185
void gearman_client_set_fail_fn(gearman_client_st *client,
1186
gearman_fail_fn *function)
1193
client->actions.fail_fn= function;
1196
void gearman_client_clear_fn(gearman_client_st *client)
1203
client->actions= gearman_actions_default();
1206
static inline void _push_non_blocking(gearman_client_st *client)
1208
client->universal.options.stored_non_blocking= client->universal.options.non_blocking;
1209
client->universal.options.non_blocking= true;
1212
static inline void _pop_non_blocking(gearman_client_st *client)
1214
client->universal.options.non_blocking= client->options.non_blocking;
1215
assert(client->universal.options.stored_non_blocking == client->options.non_blocking);
1218
static inline void _push_blocking(gearman_client_st *client)
1220
client->universal.options.stored_non_blocking= client->universal.options.non_blocking;
1221
client->universal.options.non_blocking= false;
1224
static inline void _pop_blocking(gearman_client_st *client)
1226
client->universal.options.non_blocking= client->options.non_blocking;
1227
assert(client->universal.options.stored_non_blocking == client->options.non_blocking);
1230
static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
1232
gearman_return_t ret= GEARMAN_MAX_RETURN;
1234
switch(client->state)
1236
case GEARMAN_CLIENT_STATE_IDLE:
1239
/* Start any new tasks. */
1240
if (client->new_tasks > 0 && ! (client->options.no_new))
1242
for (client->task= client->task_list; client->task;
1243
client->task= client->task->next)
1245
if (client->task->state != GEARMAN_TASK_STATE_NEW)
1250
case GEARMAN_CLIENT_STATE_NEW:
1251
assert_msg(client == client->task->client, "Programmer error, client and task member client are not the same");
1252
gearman_return_t local_ret= _client_run_task(client->task);
1253
if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
1255
client->state= GEARMAN_CLIENT_STATE_NEW;
1261
if (client->new_tasks == 0)
1263
gearman_return_t local_ret= gearman_flush_all(client->universal);
1264
if (gearman_failed(local_ret))
1271
/* See if there are any connections ready for I/O. */
1272
while ((client->con= gearman_ready(client->universal)))
1274
if (client->con->revents & (POLLOUT | POLLERR | POLLHUP | POLLNVAL))
1276
/* Socket is ready for writing, continue submitting jobs. */
1277
for (client->task= client->task_list; client->task;
1278
client->task= client->task->next)
1280
if (client->task->con != client->con or
1281
(client->task->state != GEARMAN_TASK_STATE_SUBMIT and
1282
client->task->state != GEARMAN_TASK_STATE_WORKLOAD))
1287
case GEARMAN_CLIENT_STATE_SUBMIT:
1288
assert_msg(client == client->task->client, "Programmer error, client and task member client are not the same");
1289
gearman_return_t local_ret= _client_run_task(client->task);
1290
if (local_ret == GEARMAN_COULD_NOT_CONNECT)
1292
client->state= GEARMAN_CLIENT_STATE_IDLE;
1295
else if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
1297
client->state= GEARMAN_CLIENT_STATE_SUBMIT;
1303
if (not (client->con->revents & POLLIN))
1306
/* Socket is ready for reading. */
1309
/* Read packet on connection and find which task it belongs to. */
1310
if (client->options.unbuffered_result)
1312
/* If client is handling the data read, make sure it's complete. */
1313
if (client->con->recv_state == GEARMAN_CON_RECV_STATE_READ_DATA)
1315
for (client->task= client->task_list; client->task;
1316
client->task= client->task->next)
1318
if (client->task->con == client->con &&
1319
(client->task->state == GEARMAN_TASK_STATE_DATA or
1320
client->task->state == GEARMAN_TASK_STATE_COMPLETE))
1327
Someone has set GEARMAN_CLIENT_UNBUFFERED_RESULT but hasn't setup the client to fetch data correctly.
1330
return gearman_universal_set_error(client->universal, GEARMAN_INVALID_ARGUMENT, GEARMAN_AT,
1331
"client created with GEARMAN_CLIENT_UNBUFFERED_RESULT, but was not setup to use it. %s", __func__);
1335
/* Read the next packet, without buffering the data part. */
1337
(void)client->con->receiving(client->con->_packet, ret, false);
1342
/* Read the next packet, buffering the data part. */
1344
(void)client->con->receiving(client->con->_packet, ret, true);
1347
if (client->task == NULL)
1349
assert(ret != GEARMAN_MAX_RETURN);
1351
/* Check the return of the gearman_connection_recv() calls above. */
1352
if (gearman_failed(ret))
1354
if (ret == GEARMAN_IO_WAIT)
1357
client->state= GEARMAN_CLIENT_STATE_IDLE;
1361
client->con->options.packet_in_use= true;
1363
/* We have a packet, see which task it belongs to. */
1364
for (client->task= client->task_list; client->task;
1365
client->task= client->task->next)
1367
if (client->task->con != client->con)
1370
if (client->con->_packet.command == GEARMAN_COMMAND_JOB_CREATED)
1372
if (client->task->created_id != client->con->created_id)
1375
/* New job created, drop through below and notify task. */
1376
client->con->created_id++;
1378
else if (client->con->_packet.command == GEARMAN_COMMAND_ERROR)
1380
gearman_universal_set_error(client->universal, GEARMAN_SERVER_ERROR, GEARMAN_AT,
1382
static_cast<char *>(client->con->_packet.arg[0]),
1383
int(client->con->_packet.arg_size[1]),
1384
static_cast<char *>(client->con->_packet.arg[1]));
1386
return GEARMAN_SERVER_ERROR;
1388
else if (strncmp(client->task->job_handle,
1389
static_cast<char *>(client->con->_packet.arg[0]),
1390
client->con->_packet.arg_size[0]) ||
1391
(client->con->_packet.command != GEARMAN_COMMAND_WORK_FAIL &&
1392
strlen(client->task->job_handle) != client->con->_packet.arg_size[0] - 1) ||
1393
(client->con->_packet.command == GEARMAN_COMMAND_WORK_FAIL &&
1394
strlen(client->task->job_handle) != client->con->_packet.arg_size[0]))
1399
/* Else, we have a matching result packet of some kind. */
1404
if (not client->task)
1406
/* The client has stopped waiting for the response, ignore it. */
1407
client->con->free_private_packet();
1411
client->task->recv= &(client->con->_packet);
1414
case GEARMAN_CLIENT_STATE_PACKET:
1415
/* Let task process job created or result packet. */
1416
assert_msg(client == client->task->client, "Programmer error, client and task member client are not the same");
1417
gearman_return_t local_ret= _client_run_task(client->task);
1419
if (local_ret == GEARMAN_IO_WAIT)
1422
if (gearman_failed(local_ret))
1424
client->state= GEARMAN_CLIENT_STATE_PACKET;
1428
/* Clean up the packet. */
1429
client->con->free_private_packet();
1431
/* If all tasks are done, return. */
1432
if (client->running_tasks == 0)
1437
/* If all tasks are done, return. */
1438
if (client->running_tasks == 0)
1443
if (client->new_tasks > 0 && ! (client->options.no_new))
1446
if (client->options.non_blocking)
1448
/* Let the caller wait for activity. */
1449
client->state= GEARMAN_CLIENT_STATE_IDLE;
1450
gearman_gerror(client->universal, GEARMAN_IO_WAIT);
1452
return GEARMAN_IO_WAIT;
1455
/* Wait for activity on one of the connections. */
1456
gearman_return_t local_ret= gearman_wait(client->universal);
1457
if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
1459
client->state= GEARMAN_CLIENT_STATE_IDLE;
1468
client->state= GEARMAN_CLIENT_STATE_IDLE;
1470
return GEARMAN_SUCCESS;
1473
gearman_return_t gearman_client_run_tasks(gearman_client_st *client)
1477
return GEARMAN_INVALID_ARGUMENT;
1480
if (client->task_list == NULL) // We are immediatly successful if all tasks are completed
1482
return GEARMAN_SUCCESS;
1486
_push_non_blocking(client);
1488
gearman_return_t rc= _client_run_tasks(client);
1490
_pop_non_blocking(client);
1492
if (rc == GEARMAN_COULD_NOT_CONNECT)
1494
gearman_reset(client->universal);
1500
gearman_return_t gearman_client_run_block_tasks(gearman_client_st *client)
1504
return GEARMAN_INVALID_ARGUMENT;
1507
if (not client->task_list) // We are immediatly successful if all tasks are completed
1509
return GEARMAN_SUCCESS;
1513
_push_blocking(client);
1515
gearman_return_t rc= _client_run_tasks(client);
1517
_pop_blocking(client);
1519
if (gearman_failed(rc))
1521
if (rc == GEARMAN_COULD_NOT_CONNECT)
1523
gearman_reset(client->universal);
1526
assert(gearman_universal_error_code(client->universal) == rc);
1533
* Static Definitions
1536
bool gearman_client_compare(const gearman_client_st *first, const gearman_client_st *second)
1538
if (first == NULL or second == NULL)
1543
if (strcmp(first->universal.con_list->host, second->universal.con_list->host))
1548
if (first->universal.con_list->port != second->universal.con_list->port)
1556
bool gearman_client_set_server_option(gearman_client_st *self, const char *option_arg, size_t option_arg_size)
1563
gearman_string_t option= { option_arg, option_arg_size };
1565
return gearman_request_option(self->universal, option);
1568
void gearman_client_set_namespace(gearman_client_st *self, const char *namespace_key, size_t namespace_key_size)
1575
gearman_universal_set_namespace(self->universal, namespace_key, namespace_key_size);
1578
gearman_return_t gearman_client_set_identifier(gearman_client_st *client,
1579
const char *id, size_t id_size)
1581
return gearman_set_identifier(client->universal, id, id_size);