1
/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
* Gearmand client and server library.
5
* Copyright (C) 2011 Data Differential, http://datadifferential.com/
6
* Copyright (C) 2008 Brian Aker, Eric Day
9
* Redistribution and use in source and binary forms, with or without
10
* modification, are permitted provided that the following conditions are
13
* * Redistributions of source code must retain the above copyright
14
* notice, this list of conditions and the following disclaimer.
16
* * Redistributions in binary form must reproduce the above
17
* copyright notice, this list of conditions and the following disclaimer
18
* in the documentation and/or other materials provided with the
21
* * The names of its contributors may not be used to endorse or
22
* promote products derived from this software without specific prior
25
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39
#include <libgearman/common.h>
45
gearman_return_t _client_run_task(gearman_client_st *client, gearman_task_st *task)
49
case GEARMAN_TASK_STATE_NEW:
50
if (not task->client->universal.con_list)
53
client->running_tasks--;
54
gearman_universal_set_error(client->universal, GEARMAN_NO_SERVERS, __func__, AT, "no servers added");
55
return GEARMAN_NO_SERVERS;
58
for (task->con= task->client->universal.con_list; task->con;
59
task->con= task->con->next)
61
if (task->con->send_state == GEARMAN_CON_SEND_STATE_NONE)
69
client->options.no_new= true;
70
gearman_gerror(client->universal, GEARMAN_IO_WAIT);
71
return GEARMAN_IO_WAIT;
76
if (task->send.command != GEARMAN_COMMAND_GET_STATUS)
78
task->created_id= task->con->created_id_next;
79
task->con->created_id_next++;
82
case GEARMAN_TASK_STATE_SUBMIT:
86
gearman_return_t ret= task->con->send_packet(task->send, client->new_tasks == 0 ? true : false);
88
if (gearman_success(ret))
92
else if (ret == GEARMAN_IO_WAIT)
94
task->state= GEARMAN_TASK_STATE_SUBMIT;
97
else if (gearman_failed(ret))
99
/* Increment this since the job submission failed. */
100
task->con->created_id++;
102
if (ret == GEARMAN_COULD_NOT_CONNECT)
104
for (task->con= task->con->next;
106
task->con= task->con->next)
108
if (task->con->send_state == GEARMAN_CON_SEND_STATE_NONE)
121
task->result_rc= ret;
123
if (ret == GEARMAN_COULD_NOT_CONNECT) // If no connection is found, we will let the user try again
125
task->state= GEARMAN_TASK_STATE_NEW;
130
task->state= GEARMAN_TASK_STATE_FAIL;
131
client->running_tasks--;
136
if (task->send.command != GEARMAN_COMMAND_GET_STATUS)
138
task->created_id= task->con->created_id_next;
139
task->con->created_id_next++;
144
if (task->send.data_size > 0 and not task->send.data)
146
if (not task->func.workload_fn)
148
gearman_error(client->universal, GEARMAN_NEED_WORKLOAD_FN,
149
"workload size > 0, but no data pointer or workload_fn was given");
150
return GEARMAN_NEED_WORKLOAD_FN;
153
case GEARMAN_TASK_STATE_WORKLOAD:
154
gearman_return_t ret= task->func.workload_fn(task);
155
if (gearman_failed(ret))
157
task->state= GEARMAN_TASK_STATE_WORKLOAD;
162
client->options.no_new= false;
163
task->state= GEARMAN_TASK_STATE_WORK;
164
task->con->set_events(POLLIN);
165
return GEARMAN_SUCCESS;
167
case GEARMAN_TASK_STATE_WORK:
168
if (task->recv->command == GEARMAN_COMMAND_JOB_CREATED)
170
task->options.is_known= true;
171
snprintf(task->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%.*s",
172
int(task->recv->arg_size[0]),
173
static_cast<char *>(task->recv->arg[0]));
175
case GEARMAN_TASK_STATE_CREATED:
176
if (task->func.created_fn)
178
gearman_return_t ret= task->func.created_fn(task);
179
if (gearman_failed(ret))
181
task->state= GEARMAN_TASK_STATE_CREATED;
186
if (task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_BG ||
187
task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG ||
188
task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG ||
189
task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_EPOCH ||
190
task->send.command == GEARMAN_COMMAND_SUBMIT_REDUCE_JOB_BACKGROUND)
195
else if (task->recv->command == GEARMAN_COMMAND_WORK_DATA)
197
task->options.is_known= true;
198
task->options.is_running= true;
200
case GEARMAN_TASK_STATE_DATA:
201
if (task->func.data_fn)
203
gearman_return_t ret= task->func.data_fn(task);
204
if (gearman_failed(ret))
206
task->state= GEARMAN_TASK_STATE_DATA;
211
else if (task->recv->command == GEARMAN_COMMAND_WORK_WARNING)
213
case GEARMAN_TASK_STATE_WARNING:
214
if (task->func.warning_fn)
216
gearman_return_t ret= task->func.warning_fn(task);
217
if (gearman_failed(ret))
219
task->state= GEARMAN_TASK_STATE_WARNING;
224
else if (task->recv->command == GEARMAN_COMMAND_WORK_STATUS ||
225
task->recv->command == GEARMAN_COMMAND_STATUS_RES)
229
if (task->recv->command == GEARMAN_COMMAND_STATUS_RES)
231
if (atoi(static_cast<char *>(task->recv->arg[1])) == 0)
232
task->options.is_known= false;
234
task->options.is_known= true;
236
if (atoi(static_cast<char *>(task->recv->arg[2])) == 0)
237
task->options.is_running= false;
239
task->options.is_running= true;
248
task->numerator= uint32_t(atoi(static_cast<char *>(task->recv->arg[x])));
249
char status_buffer[11]; /* Max string size to hold a uint32_t. */
250
snprintf(status_buffer, 11, "%.*s",
251
int(task->recv->arg_size[x + 1]),
252
static_cast<char *>(task->recv->arg[x + 1]));
253
task->denominator= uint32_t(atoi(status_buffer));
255
case GEARMAN_TASK_STATE_STATUS:
256
if (task->func.status_fn)
258
gearman_return_t ret= task->func.status_fn(task);
259
if (gearman_failed(ret))
261
task->state= GEARMAN_TASK_STATE_STATUS;
266
if (task->send.command == GEARMAN_COMMAND_GET_STATUS)
271
else if (task->recv->command == GEARMAN_COMMAND_WORK_COMPLETE)
273
task->options.is_known= false;
274
task->options.is_running= false;
275
task->result_rc= GEARMAN_SUCCESS;
277
case GEARMAN_TASK_STATE_COMPLETE:
278
if (task->func.complete_fn)
280
gearman_return_t ret= task->func.complete_fn(task);
281
if (gearman_failed(ret))
283
task->state= GEARMAN_TASK_STATE_COMPLETE;
290
else if (task->recv->command == GEARMAN_COMMAND_WORK_EXCEPTION)
292
case GEARMAN_TASK_STATE_EXCEPTION:
293
if (task->func.exception_fn)
295
gearman_return_t ret= task->func.exception_fn(task);
296
if (gearman_failed(ret))
298
task->state= GEARMAN_TASK_STATE_EXCEPTION;
303
else if (task->recv->command == GEARMAN_COMMAND_WORK_FAIL)
305
// If things fail we need to delete the result, and set the result_rc
307
task->options.is_known= false;
308
task->options.is_running= false;
309
delete task->result_ptr;
310
task->result_ptr= NULL;
311
task->result_rc= GEARMAN_WORK_FAIL;
313
case GEARMAN_TASK_STATE_FAIL:
314
if (task->func.fail_fn)
316
gearman_return_t ret= task->func.fail_fn(task);
317
if (gearman_failed(ret))
319
task->state= GEARMAN_TASK_STATE_FAIL;
327
task->state= GEARMAN_TASK_STATE_WORK;
328
return GEARMAN_SUCCESS;
330
case GEARMAN_TASK_STATE_FINISHED:
334
client->running_tasks--;
335
task->state= GEARMAN_TASK_STATE_FINISHED;
337
if (client->options.free_tasks and task->type == GEARMAN_TASK_KIND_ADD_TASK)
339
gearman_task_free(task);
342
return GEARMAN_SUCCESS;