~clint-fewbar/ubuntu/precise/gearmand/drop-unneeded-patches

« back to all changes in this revision

Viewing changes to libgearman/run.cc

  • Committer: Package Import Robot
  • Author(s): Stig Sandbeck Mathisen, Michael Fladischer, Stig Sandbeck Mathisen
  • Date: 2012-01-23 11:31:08 UTC
  • mfrom: (6.1.2 sid)
  • Revision ID: package-import@ubuntu.com-20120123113108-wl1yhiba13q9jusb
Tags: 0.27-1
[Michael Fladischer]
* Patch: fix spelling
* Patch: remove dependency on googleanalytics
* Patch: fix tests
* Use non-authenticating URL for Vcs-Git.
* Add "status" action to init script.

[Stig Sandbeck Mathisen]
* New upstream release (Closes: #621486) (LP: #682680)
* Remove build dependency on drizzle
  (until it reaches testing again)
* Build with support for tokyocabinet
* Remove backported ipv6 patch
* Patch: disable hostile build tests, they take hours...
* Patch: workaround duplicate address issue for tests
* Do not build API documentation, the sources are not shipped in
  upstream tarball
* Update debian/copyright

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
2
 * 
 
3
 *  Gearmand client and server library.
 
4
 *
 
5
 *  Copyright (C) 2011 Data Differential, http://datadifferential.com/
 
6
 *  Copyright (C) 2008 Brian Aker, Eric Day
 
7
 *  All rights reserved.
 
8
 *
 
9
 *  Redistribution and use in source and binary forms, with or without
 
10
 *  modification, are permitted provided that the following conditions are
 
11
 *  met:
 
12
 *
 
13
 *      * Redistributions of source code must retain the above copyright
 
14
 *  notice, this list of conditions and the following disclaimer.
 
15
 *
 
16
 *      * Redistributions in binary form must reproduce the above
 
17
 *  copyright notice, this list of conditions and the following disclaimer
 
18
 *  in the documentation and/or other materials provided with the
 
19
 *  distribution.
 
20
 *
 
21
 *      * The names of its contributors may not be used to endorse or
 
22
 *  promote products derived from this software without specific prior
 
23
 *  written permission.
 
24
 *
 
25
 *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 
26
 *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 
27
 *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 
28
 *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 
29
 *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 
30
 *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 
31
 *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
32
 *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
33
 *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
34
 *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 
35
 *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
36
 *
 
37
 */
 
38
 
 
39
#include <libgearman/common.h>
 
40
 
 
41
#include <cassert>
 
42
#include <cstdio>
 
43
#include <cstdlib>
 
44
 
 
45
gearman_return_t _client_run_task(gearman_client_st *client, gearman_task_st *task)
 
46
{
 
47
  switch(task->state)
 
48
  {
 
49
  case GEARMAN_TASK_STATE_NEW:
 
50
    if (not task->client->universal.con_list)
 
51
    {
 
52
      client->new_tasks--;
 
53
      client->running_tasks--;
 
54
      gearman_universal_set_error(client->universal, GEARMAN_NO_SERVERS, __func__, AT, "no servers added");
 
55
      return GEARMAN_NO_SERVERS;
 
56
    }
 
57
 
 
58
    for (task->con= task->client->universal.con_list; task->con;
 
59
         task->con= task->con->next)
 
60
    {
 
61
      if (task->con->send_state == GEARMAN_CON_SEND_STATE_NONE)
 
62
      {
 
63
        break;
 
64
      }
 
65
    }
 
66
 
 
67
    if (not task->con)
 
68
    {
 
69
      client->options.no_new= true;
 
70
      gearman_gerror(client->universal, GEARMAN_IO_WAIT);
 
71
      return GEARMAN_IO_WAIT;
 
72
    }
 
73
 
 
74
    client->new_tasks--;
 
75
 
 
76
    if (task->send.command != GEARMAN_COMMAND_GET_STATUS)
 
77
    {
 
78
      task->created_id= task->con->created_id_next;
 
79
      task->con->created_id_next++;
 
80
    }
 
81
 
 
82
  case GEARMAN_TASK_STATE_SUBMIT:
 
83
    while (1)
 
84
    {
 
85
      assert(task->con);
 
86
      gearman_return_t ret= task->con->send_packet(task->send, client->new_tasks == 0 ? true : false);
 
87
 
 
88
      if (gearman_success(ret))
 
89
      {
 
90
        break;
 
91
      }
 
92
      else if (ret == GEARMAN_IO_WAIT)
 
93
      {
 
94
        task->state= GEARMAN_TASK_STATE_SUBMIT;
 
95
        return ret;
 
96
      }
 
97
      else if (gearman_failed(ret))
 
98
      {
 
99
        /* Increment this since the job submission failed. */
 
100
        task->con->created_id++;
 
101
 
 
102
        if (ret == GEARMAN_COULD_NOT_CONNECT)
 
103
        {
 
104
          for (task->con= task->con->next; 
 
105
               task->con;
 
106
               task->con= task->con->next)
 
107
          {
 
108
            if (task->con->send_state == GEARMAN_CON_SEND_STATE_NONE)
 
109
            {
 
110
              break;
 
111
            }
 
112
          }
 
113
        }
 
114
        else
 
115
        {
 
116
          task->con= NULL;
 
117
        }
 
118
 
 
119
        if (not task->con)
 
120
        {
 
121
          task->result_rc= ret;
 
122
 
 
123
          if (ret == GEARMAN_COULD_NOT_CONNECT) // If no connection is found, we will let the user try again
 
124
          {
 
125
            task->state= GEARMAN_TASK_STATE_NEW;
 
126
            client->new_tasks++;
 
127
          }
 
128
          else
 
129
          {
 
130
            task->state= GEARMAN_TASK_STATE_FAIL;
 
131
            client->running_tasks--;
 
132
          }
 
133
          return ret;
 
134
        }
 
135
 
 
136
        if (task->send.command != GEARMAN_COMMAND_GET_STATUS)
 
137
        {
 
138
          task->created_id= task->con->created_id_next;
 
139
          task->con->created_id_next++;
 
140
        }
 
141
      }
 
142
    }
 
143
 
 
144
    if (task->send.data_size > 0 and not task->send.data)
 
145
    {
 
146
      if (not task->func.workload_fn)
 
147
      {
 
148
        gearman_error(client->universal, GEARMAN_NEED_WORKLOAD_FN,
 
149
                      "workload size > 0, but no data pointer or workload_fn was given");
 
150
        return GEARMAN_NEED_WORKLOAD_FN;
 
151
      }
 
152
 
 
153
  case GEARMAN_TASK_STATE_WORKLOAD:
 
154
      gearman_return_t ret= task->func.workload_fn(task);
 
155
      if (gearman_failed(ret))
 
156
      {
 
157
        task->state= GEARMAN_TASK_STATE_WORKLOAD;
 
158
        return ret;
 
159
      }
 
160
    }
 
161
 
 
162
    client->options.no_new= false;
 
163
    task->state= GEARMAN_TASK_STATE_WORK;
 
164
    task->con->set_events(POLLIN);
 
165
    return GEARMAN_SUCCESS;
 
166
 
 
167
  case GEARMAN_TASK_STATE_WORK:
 
168
    if (task->recv->command == GEARMAN_COMMAND_JOB_CREATED)
 
169
    {
 
170
      task->options.is_known= true;
 
171
      snprintf(task->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%.*s",
 
172
               int(task->recv->arg_size[0]),
 
173
               static_cast<char *>(task->recv->arg[0]));
 
174
 
 
175
  case GEARMAN_TASK_STATE_CREATED:
 
176
      if (task->func.created_fn)
 
177
      {
 
178
        gearman_return_t ret= task->func.created_fn(task);
 
179
        if (gearman_failed(ret))
 
180
        {
 
181
          task->state= GEARMAN_TASK_STATE_CREATED;
 
182
          return ret;
 
183
        }
 
184
      }
 
185
 
 
186
      if (task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_BG ||
 
187
          task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG ||
 
188
          task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG ||
 
189
          task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_EPOCH ||
 
190
          task->send.command == GEARMAN_COMMAND_SUBMIT_REDUCE_JOB_BACKGROUND)
 
191
      {
 
192
        break;
 
193
      }
 
194
    }
 
195
    else if (task->recv->command == GEARMAN_COMMAND_WORK_DATA)
 
196
    {
 
197
      task->options.is_known= true;
 
198
      task->options.is_running= true;
 
199
 
 
200
  case GEARMAN_TASK_STATE_DATA:
 
201
      if (task->func.data_fn)
 
202
      {
 
203
        gearman_return_t ret= task->func.data_fn(task);
 
204
        if (gearman_failed(ret))
 
205
        {
 
206
          task->state= GEARMAN_TASK_STATE_DATA;
 
207
          return ret;
 
208
        }
 
209
      }
 
210
    }
 
211
    else if (task->recv->command == GEARMAN_COMMAND_WORK_WARNING)
 
212
    {
 
213
  case GEARMAN_TASK_STATE_WARNING:
 
214
      if (task->func.warning_fn)
 
215
      {
 
216
        gearman_return_t ret= task->func.warning_fn(task);
 
217
        if (gearman_failed(ret))
 
218
        {
 
219
          task->state= GEARMAN_TASK_STATE_WARNING;
 
220
          return ret;
 
221
        }
 
222
      }
 
223
    }
 
224
    else if (task->recv->command == GEARMAN_COMMAND_WORK_STATUS ||
 
225
             task->recv->command == GEARMAN_COMMAND_STATUS_RES)
 
226
    {
 
227
      uint8_t x;
 
228
 
 
229
      if (task->recv->command == GEARMAN_COMMAND_STATUS_RES)
 
230
      {
 
231
        if (atoi(static_cast<char *>(task->recv->arg[1])) == 0)
 
232
          task->options.is_known= false;
 
233
        else
 
234
          task->options.is_known= true;
 
235
 
 
236
        if (atoi(static_cast<char *>(task->recv->arg[2])) == 0)
 
237
          task->options.is_running= false;
 
238
        else
 
239
          task->options.is_running= true;
 
240
 
 
241
        x= 3;
 
242
      }
 
243
      else
 
244
      {
 
245
        x= 1;
 
246
      }
 
247
 
 
248
      task->numerator= uint32_t(atoi(static_cast<char *>(task->recv->arg[x])));
 
249
      char status_buffer[11]; /* Max string size to hold a uint32_t. */
 
250
      snprintf(status_buffer, 11, "%.*s",
 
251
               int(task->recv->arg_size[x + 1]),
 
252
               static_cast<char *>(task->recv->arg[x + 1]));
 
253
      task->denominator= uint32_t(atoi(status_buffer));
 
254
 
 
255
  case GEARMAN_TASK_STATE_STATUS:
 
256
      if (task->func.status_fn)
 
257
      {
 
258
        gearman_return_t ret= task->func.status_fn(task);
 
259
        if (gearman_failed(ret))
 
260
        {
 
261
          task->state= GEARMAN_TASK_STATE_STATUS;
 
262
          return ret;
 
263
        }
 
264
      }
 
265
 
 
266
      if (task->send.command == GEARMAN_COMMAND_GET_STATUS)
 
267
      {
 
268
        break;
 
269
      }
 
270
    }
 
271
    else if (task->recv->command == GEARMAN_COMMAND_WORK_COMPLETE)
 
272
    {
 
273
      task->options.is_known= false;
 
274
      task->options.is_running= false;
 
275
      task->result_rc= GEARMAN_SUCCESS;
 
276
 
 
277
  case GEARMAN_TASK_STATE_COMPLETE:
 
278
      if (task->func.complete_fn)
 
279
      {
 
280
        gearman_return_t ret= task->func.complete_fn(task);
 
281
        if (gearman_failed(ret))
 
282
        {
 
283
          task->state= GEARMAN_TASK_STATE_COMPLETE;
 
284
          return ret;
 
285
        }
 
286
      }
 
287
 
 
288
      break;
 
289
    }
 
290
    else if (task->recv->command == GEARMAN_COMMAND_WORK_EXCEPTION)
 
291
    {
 
292
  case GEARMAN_TASK_STATE_EXCEPTION:
 
293
      if (task->func.exception_fn)
 
294
      {
 
295
        gearman_return_t ret= task->func.exception_fn(task);
 
296
        if (gearman_failed(ret))
 
297
        {
 
298
          task->state= GEARMAN_TASK_STATE_EXCEPTION;
 
299
          return ret;
 
300
        }
 
301
      }
 
302
    }
 
303
    else if (task->recv->command == GEARMAN_COMMAND_WORK_FAIL)
 
304
    {
 
305
      // If things fail we need to delete the result, and set the result_rc
 
306
      // correctly.
 
307
      task->options.is_known= false;
 
308
      task->options.is_running= false;
 
309
      delete task->result_ptr;
 
310
      task->result_ptr= NULL;
 
311
      task->result_rc= GEARMAN_WORK_FAIL;
 
312
 
 
313
  case GEARMAN_TASK_STATE_FAIL:
 
314
      if (task->func.fail_fn)
 
315
      {
 
316
        gearman_return_t ret= task->func.fail_fn(task);
 
317
        if (gearman_failed(ret))
 
318
        {
 
319
          task->state= GEARMAN_TASK_STATE_FAIL;
 
320
          return ret;
 
321
        }
 
322
      }
 
323
 
 
324
      break;
 
325
    }
 
326
 
 
327
    task->state= GEARMAN_TASK_STATE_WORK;
 
328
    return GEARMAN_SUCCESS;
 
329
 
 
330
  case GEARMAN_TASK_STATE_FINISHED:
 
331
    break;
 
332
  }
 
333
 
 
334
  client->running_tasks--;
 
335
  task->state= GEARMAN_TASK_STATE_FINISHED;
 
336
 
 
337
  if (client->options.free_tasks and task->type == GEARMAN_TASK_KIND_ADD_TASK)
 
338
  {
 
339
    gearman_task_free(task);
 
340
  }
 
341
 
 
342
  return GEARMAN_SUCCESS;
 
343
}