~clint-fewbar/ubuntu/precise/gearmand/drop-unneeded-patches

« back to all changes in this revision

Viewing changes to libgearman/client.cc

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2009-09-28 21:43:31 UTC
  • mto: (1.2.3 upstream) (6.1.1 sid)
  • mto: This revision was merged to the branch mainline in revision 5.
  • Revision ID: james.westby@ubuntu.com-20090928214331-9bku0d3v1b1ypgp4
ImportĀ upstreamĀ versionĀ 0.10

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2
 
 * 
3
 
 *  Gearmand client and server library.
4
 
 *
5
 
 *  Copyright (C) 2011 Data Differential, http://datadifferential.com/
6
 
 *  Copyright (C) 2008 Brian Aker, Eric Day
7
 
 *  All rights reserved.
8
 
 *
9
 
 *  Redistribution and use in source and binary forms, with or without
10
 
 *  modification, are permitted provided that the following conditions are
11
 
 *  met:
12
 
 *
13
 
 *      * Redistributions of source code must retain the above copyright
14
 
 *  notice, this list of conditions and the following disclaimer.
15
 
 *
16
 
 *      * Redistributions in binary form must reproduce the above
17
 
 *  copyright notice, this list of conditions and the following disclaimer
18
 
 *  in the documentation and/or other materials provided with the
19
 
 *  distribution.
20
 
 *
21
 
 *      * The names of its contributors may not be used to endorse or
22
 
 *  promote products derived from this software without specific prior
23
 
 *  written permission.
24
 
 *
25
 
 *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26
 
 *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27
 
 *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28
 
 *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29
 
 *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30
 
 *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31
 
 *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32
 
 *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33
 
 *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34
 
 *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35
 
 *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36
 
 *
37
 
 */
38
 
 
39
 
#include <libgearman/common.h>
40
 
 
41
 
#include <arpa/inet.h>
42
 
#include <cassert>
43
 
#include <cerrno>
44
 
#include <cstdio>
45
 
#include <cstdlib>
46
 
#include <cstring>
47
 
#include <memory>
48
 
#include <netdb.h>
49
 
#include <netinet/in.h>
50
 
#include <sys/socket.h>
51
 
#include <poll.h>
52
 
 
53
 
/*
54
 
  Allocate a client structure.
55
 
 */
56
 
static gearman_client_st *_client_allocate(gearman_client_st *client, bool is_clone)
57
 
{
58
 
  if (client)
59
 
  {
60
 
    client->options.allocated= false;
61
 
  }
62
 
  else
63
 
  {
64
 
    client= new (std::nothrow) gearman_client_st;
65
 
    if (client == NULL)
66
 
      return NULL;
67
 
 
68
 
    client->options.allocated= true;
69
 
  }
70
 
 
71
 
  client->options.non_blocking= false;
72
 
  client->options.unbuffered_result= false;
73
 
  client->options.no_new= false;
74
 
  client->options.free_tasks= false;
75
 
 
76
 
  client->state= GEARMAN_CLIENT_STATE_IDLE;
77
 
  client->new_tasks= 0;
78
 
  client->running_tasks= 0;
79
 
  client->task_count= 0;
80
 
  client->context= NULL;
81
 
  client->con= NULL;
82
 
  client->task= NULL;
83
 
  client->task_list= NULL;
84
 
  client->task_context_free_fn= NULL;
85
 
  gearman_client_clear_fn(client);
86
 
 
87
 
  if (not is_clone)
88
 
  {
89
 
    gearman_universal_initialize(client->universal);
90
 
  }
91
 
 
92
 
  return client;
93
 
}
94
 
 
95
 
/**
96
 
 * Callback function used when parsing server lists.
97
 
 */
98
 
static gearman_return_t _client_add_server(const char *host, in_port_t port,
99
 
                                           void *context)
100
 
{
101
 
  return gearman_client_add_server(static_cast<gearman_client_st *>(context), host, port);
102
 
}
103
 
 
104
 
 
105
 
/**
106
 
 * Real do function.
107
 
 */
108
 
static void *_client_do(gearman_client_st *client, gearman_command_t command,
109
 
                        const char *function_name,
110
 
                        const char *unique,
111
 
                        const void *workload_str, size_t workload_size,
112
 
                        size_t *result_size, gearman_return_t *ret_ptr)
113
 
{
114
 
  gearman_return_t unused;
115
 
  if (ret_ptr == NULL)
116
 
  {
117
 
    ret_ptr= &unused;
118
 
  }
119
 
 
120
 
  size_t unused_size;
121
 
  if (result_size == NULL)
122
 
  {
123
 
    result_size= &unused_size;
124
 
  }
125
 
  *result_size= 0;
126
 
 
127
 
  if (client == NULL)
128
 
  {
129
 
    *ret_ptr= GEARMAN_INVALID_ARGUMENT;
130
 
    return NULL;
131
 
  }
132
 
 
133
 
  gearman_string_t function= { gearman_string_param_cstr(function_name) };
134
 
  gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
135
 
  gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
136
 
 
137
 
 
138
 
  gearman_task_st do_task;
139
 
  gearman_task_st *do_task_ptr= add_task(*client, &do_task, NULL, command,
140
 
                                         function,
141
 
                                         local_unique,
142
 
                                         workload,
143
 
                                         time_t(0),
144
 
                                         gearman_actions_do_default());
145
 
  if (do_task_ptr == NULL)
146
 
  {
147
 
    *ret_ptr= gearman_universal_error_code(client->universal);
148
 
    return NULL;
149
 
  }
150
 
  do_task_ptr->type= GEARMAN_TASK_KIND_DO;
151
 
 
152
 
  gearman_return_t ret;
153
 
  do {
154
 
    ret= gearman_client_run_tasks(client);
155
 
  } while (gearman_continue(ret));
156
 
 
157
 
  // gearman_client_run_tasks failed
158
 
  assert(client->task_list); // Programmer error, we should always have the task that we used for do
159
 
 
160
 
  char *returnable= NULL;
161
 
  if (gearman_failed(ret))
162
 
  {
163
 
    if (ret == GEARMAN_COULD_NOT_CONNECT)
164
 
    { }
165
 
    else
166
 
    {
167
 
      gearman_error(client->universal, ret, "occured during gearman_client_run_tasks()");
168
 
    }
169
 
 
170
 
    *ret_ptr= ret;
171
 
    *result_size= 0;
172
 
  }
173
 
  else if (gearman_success(ret) and do_task_ptr->result_rc == GEARMAN_SUCCESS)
174
 
  {
175
 
    *ret_ptr= do_task_ptr->result_rc;
176
 
    if (do_task_ptr->result_ptr)
177
 
    {
178
 
      if (gearman_has_allocator(client->universal))
179
 
      {
180
 
        gearman_string_t result= gearman_result_string(do_task_ptr->result_ptr);
181
 
        returnable= static_cast<char *>(gearman_malloc(client->universal, gearman_size(result) +1));
182
 
        if (not returnable)
183
 
        {
184
 
          gearman_error(client->universal, GEARMAN_MEMORY_ALLOCATION_FAILURE, "custom workload_fn failed to allocate memory");
185
 
          *result_size= 0;
186
 
        }
187
 
        else // NULL terminate
188
 
        {
189
 
          memcpy(returnable, gearman_c_str(result), gearman_size(result));
190
 
          returnable[gearman_size(result)]= 0;
191
 
          *result_size= gearman_size(result);
192
 
        }
193
 
      }
194
 
      else
195
 
      {
196
 
        gearman_string_t result= gearman_result_take_string(do_task_ptr->result_ptr);
197
 
        *result_size= gearman_size(result);
198
 
        returnable= const_cast<char *>(gearman_c_str(result));
199
 
      }
200
 
    }
201
 
    else // NULL job
202
 
    {
203
 
      *result_size= 0;
204
 
    }
205
 
  }
206
 
  else // gearman_client_run_tasks() was successful, but the task was not
207
 
  {
208
 
    gearman_error(client->universal, do_task_ptr->result_rc, "occured during gearman_client_run_tasks()");
209
 
 
210
 
    *ret_ptr= do_task_ptr->result_rc;
211
 
    *result_size= 0;
212
 
  }
213
 
 
214
 
  gearman_task_free(&do_task);
215
 
  client->new_tasks= 0;
216
 
  client->running_tasks= 0;
217
 
 
218
 
  return returnable;
219
 
}
220
 
 
221
 
/*
222
 
  Real background do function.
223
 
*/
224
 
static gearman_return_t _client_do_background(gearman_client_st *client,
225
 
                                              gearman_command_t command,
226
 
                                              gearman_string_t &function,
227
 
                                              gearman_unique_t &unique,
228
 
                                              gearman_string_t &workload,
229
 
                                              gearman_job_handle_t job_handle)
230
 
{
231
 
  if (client == NULL)
232
 
  {
233
 
    return GEARMAN_INVALID_ARGUMENT;
234
 
  }
235
 
 
236
 
  if (gearman_size(function) == 0)
237
 
  {
238
 
    return gearman_error(client->universal, GEARMAN_INVALID_ARGUMENT, "function argument was empty");
239
 
  }
240
 
 
241
 
  client->_do_handle[0]= 0; // Reset the job_handle we store in client
242
 
 
243
 
  gearman_task_st do_task, *do_task_ptr;
244
 
  do_task_ptr= add_task(*client, &do_task, 
245
 
                        client, 
246
 
                        command,
247
 
                        function,
248
 
                        unique,
249
 
                        workload,
250
 
                        time_t(0),
251
 
                        gearman_actions_do_default());
252
 
  if (not do_task_ptr)
253
 
  {
254
 
    return gearman_universal_error_code(client->universal);
255
 
  }
256
 
  do_task_ptr->type= GEARMAN_TASK_KIND_DO;
257
 
 
258
 
  gearman_return_t ret;
259
 
  do {
260
 
    ret= gearman_client_run_tasks(client);
261
 
    
262
 
    // If either of the following is ever true, we will end up in an
263
 
    // infinite loop
264
 
    assert(ret != GEARMAN_IN_PROGRESS and ret != GEARMAN_JOB_EXISTS);
265
 
 
266
 
  } while (gearman_continue(ret));
267
 
 
268
 
  if (job_handle)
269
 
  {
270
 
    strncpy(job_handle, do_task.job_handle, GEARMAN_JOB_HANDLE_SIZE);
271
 
  }
272
 
  strncpy(client->_do_handle, do_task.job_handle, GEARMAN_JOB_HANDLE_SIZE);
273
 
  client->new_tasks= 0;
274
 
  client->running_tasks= 0;
275
 
  gearman_task_free(&do_task);
276
 
 
277
 
  return ret;
278
 
}
279
 
 
280
 
 
281
 
/*
282
 
 * Public Definitions
283
 
 */
284
 
 
285
 
gearman_client_st *gearman_client_create(gearman_client_st *client)
286
 
{
287
 
  return _client_allocate(client, false);
288
 
}
289
 
 
290
 
gearman_client_st *gearman_client_clone(gearman_client_st *client,
291
 
                                        const gearman_client_st *from)
292
 
{
293
 
  if (not from)
294
 
  {
295
 
    return _client_allocate(client, false);
296
 
  }
297
 
 
298
 
  client= _client_allocate(client, true);
299
 
 
300
 
  if (client == NULL)
301
 
  {
302
 
    return client;
303
 
  }
304
 
 
305
 
  client->options.non_blocking= from->options.non_blocking;
306
 
  client->options.unbuffered_result= from->options.unbuffered_result;
307
 
  client->options.no_new= from->options.no_new;
308
 
  client->options.free_tasks= from->options.free_tasks;
309
 
  client->actions= from->actions;
310
 
  client->_do_handle[0]= 0;
311
 
 
312
 
  gearman_universal_clone(client->universal, from->universal);
313
 
 
314
 
  return client;
315
 
}
316
 
 
317
 
void gearman_client_free(gearman_client_st *client)
318
 
{
319
 
  if (client == NULL)
320
 
  {
321
 
    return;
322
 
  }
323
 
 
324
 
  gearman_client_task_free_all(client);
325
 
 
326
 
  gearman_universal_free(client->universal);
327
 
 
328
 
  if (client->options.allocated)
329
 
  {
330
 
    delete client;
331
 
  }
332
 
}
333
 
 
334
 
const char *gearman_client_error(const gearman_client_st *client)
335
 
{
336
 
  if (client == NULL)
337
 
  {
338
 
    return NULL;
339
 
  }
340
 
 
341
 
  return gearman_universal_error(client->universal);
342
 
}
343
 
 
344
 
int gearman_client_errno(const gearman_client_st *client)
345
 
{
346
 
  if (client == NULL)
347
 
  {
348
 
    return 0;
349
 
  }
350
 
 
351
 
  return gearman_universal_errno(client->universal);
352
 
}
353
 
 
354
 
gearman_client_options_t gearman_client_options(const gearman_client_st *client)
355
 
{
356
 
  int32_t options;
357
 
  memset(&options, 0, sizeof(int32_t));
358
 
 
359
 
  if (client->options.allocated)
360
 
    options|= int(GEARMAN_CLIENT_ALLOCATED);
361
 
 
362
 
  if (client->options.non_blocking)
363
 
    options|= int(GEARMAN_CLIENT_NON_BLOCKING);
364
 
 
365
 
  if (client->options.unbuffered_result)
366
 
    options|= int(GEARMAN_CLIENT_UNBUFFERED_RESULT);
367
 
 
368
 
  if (client->options.no_new)
369
 
    options|= int(GEARMAN_CLIENT_NO_NEW);
370
 
 
371
 
  if (client->options.free_tasks)
372
 
    options|= int(GEARMAN_CLIENT_FREE_TASKS);
373
 
 
374
 
  return gearman_client_options_t(options);
375
 
}
376
 
 
377
 
bool gearman_client_has_option(gearman_client_st *client,
378
 
                                gearman_client_options_t option)
379
 
{
380
 
  if (client == NULL)
381
 
    return false;
382
 
 
383
 
  switch (option)
384
 
  {
385
 
  case GEARMAN_CLIENT_ALLOCATED:
386
 
    return client->options.allocated;
387
 
 
388
 
  case GEARMAN_CLIENT_NON_BLOCKING:
389
 
    return client->options.non_blocking;
390
 
 
391
 
  case GEARMAN_CLIENT_UNBUFFERED_RESULT:
392
 
    return client->options.unbuffered_result;
393
 
 
394
 
  case GEARMAN_CLIENT_NO_NEW:
395
 
    return client->options.no_new;
396
 
 
397
 
  case GEARMAN_CLIENT_FREE_TASKS:
398
 
    return client->options.free_tasks;
399
 
 
400
 
  default:
401
 
  case GEARMAN_CLIENT_TASK_IN_USE:
402
 
  case GEARMAN_CLIENT_MAX:
403
 
        return false;
404
 
  }
405
 
}
406
 
 
407
 
void gearman_client_set_options(gearman_client_st *client,
408
 
                                gearman_client_options_t options)
409
 
{
410
 
  if (client == NULL)
411
 
    return;
412
 
 
413
 
  gearman_client_options_t usable_options[]= {
414
 
    GEARMAN_CLIENT_NON_BLOCKING,
415
 
    GEARMAN_CLIENT_UNBUFFERED_RESULT,
416
 
    GEARMAN_CLIENT_FREE_TASKS,
417
 
    GEARMAN_CLIENT_MAX
418
 
  };
419
 
 
420
 
  gearman_client_options_t *ptr;
421
 
 
422
 
 
423
 
  for (ptr= usable_options; *ptr != GEARMAN_CLIENT_MAX ; ptr++)
424
 
  {
425
 
    if (options & *ptr)
426
 
    {
427
 
      gearman_client_add_options(client, *ptr);
428
 
    }
429
 
    else
430
 
    {
431
 
      gearman_client_remove_options(client, *ptr);
432
 
    }
433
 
  }
434
 
}
435
 
 
436
 
void gearman_client_add_options(gearman_client_st *client,
437
 
                                gearman_client_options_t options)
438
 
{
439
 
  if (client == NULL)
440
 
    return;
441
 
 
442
 
  if (options & GEARMAN_CLIENT_NON_BLOCKING)
443
 
  {
444
 
    gearman_universal_add_options(client->universal, GEARMAN_NON_BLOCKING);
445
 
    client->options.non_blocking= true;
446
 
  }
447
 
 
448
 
  if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
449
 
  {
450
 
    client->options.unbuffered_result= true;
451
 
  }
452
 
 
453
 
  if (options & GEARMAN_CLIENT_FREE_TASKS)
454
 
  {
455
 
    client->options.free_tasks= true;
456
 
  }
457
 
}
458
 
 
459
 
void gearman_client_remove_options(gearman_client_st *client,
460
 
                                   gearman_client_options_t options)
461
 
{
462
 
  if (client == NULL)
463
 
    return;
464
 
 
465
 
  if (options & GEARMAN_CLIENT_NON_BLOCKING)
466
 
  {
467
 
    gearman_universal_remove_options(client->universal, GEARMAN_NON_BLOCKING);
468
 
    client->options.non_blocking= false;
469
 
  }
470
 
 
471
 
  if (options & GEARMAN_CLIENT_UNBUFFERED_RESULT)
472
 
  {
473
 
    client->options.unbuffered_result= false;
474
 
  }
475
 
 
476
 
  if (options & GEARMAN_CLIENT_FREE_TASKS)
477
 
  {
478
 
    client->options.free_tasks= false;
479
 
  }
480
 
}
481
 
 
482
 
int gearman_client_timeout(gearman_client_st *client)
483
 
{
484
 
  return gearman_universal_timeout(client->universal);
485
 
}
486
 
 
487
 
void gearman_client_set_timeout(gearman_client_st *client, int timeout)
488
 
{
489
 
  if (client == NULL)
490
 
    return;
491
 
 
492
 
  gearman_universal_set_timeout(client->universal, timeout);
493
 
}
494
 
 
495
 
void *gearman_client_context(const gearman_client_st *client)
496
 
{
497
 
  if (client == NULL)
498
 
    return NULL;
499
 
 
500
 
  return const_cast<void *>(client->context);
501
 
}
502
 
 
503
 
void gearman_client_set_context(gearman_client_st *client, void *context)
504
 
{
505
 
  if (client == NULL)
506
 
    return;
507
 
 
508
 
  client->context= context;
509
 
}
510
 
 
511
 
void gearman_client_set_log_fn(gearman_client_st *client,
512
 
                               gearman_log_fn *function, void *context,
513
 
                               gearman_verbose_t verbose)
514
 
{
515
 
  if (client == NULL)
516
 
    return;
517
 
 
518
 
  gearman_set_log_fn(client->universal, function, context, verbose);
519
 
}
520
 
 
521
 
void gearman_client_set_workload_malloc_fn(gearman_client_st *client,
522
 
                                           gearman_malloc_fn *function,
523
 
                                           void *context)
524
 
{
525
 
  if (client == NULL)
526
 
    return;
527
 
 
528
 
  gearman_set_workload_malloc_fn(client->universal, function, context);
529
 
}
530
 
 
531
 
void gearman_client_set_workload_free_fn(gearman_client_st *client, gearman_free_fn *function, void *context)
532
 
{
533
 
  if (client == NULL)
534
 
    return;
535
 
 
536
 
  gearman_set_workload_free_fn(client->universal, function, context);
537
 
}
538
 
 
539
 
gearman_return_t gearman_client_add_server(gearman_client_st *client,
540
 
                                           const char *host, in_port_t port)
541
 
{
542
 
  if (client == NULL)
543
 
  {
544
 
    return GEARMAN_INVALID_ARGUMENT;
545
 
  }
546
 
 
547
 
  if (gearman_connection_create_args(client->universal, host, port) == false)
548
 
  {
549
 
    assert(client->universal.error.rc != GEARMAN_SUCCESS);
550
 
    return gearman_universal_error_code(client->universal);
551
 
  }
552
 
 
553
 
  return GEARMAN_SUCCESS;
554
 
}
555
 
 
556
 
gearman_return_t gearman_client_add_servers(gearman_client_st *client,
557
 
                                            const char *servers)
558
 
{
559
 
  return gearman_parse_servers(servers, _client_add_server, client);
560
 
}
561
 
 
562
 
void gearman_client_remove_servers(gearman_client_st *client)
563
 
{
564
 
  if (client == NULL)
565
 
  {
566
 
    return;
567
 
  }
568
 
 
569
 
  gearman_free_all_cons(client->universal);
570
 
}
571
 
 
572
 
gearman_return_t gearman_client_wait(gearman_client_st *client)
573
 
{
574
 
  if (client == NULL)
575
 
  {
576
 
    return GEARMAN_INVALID_ARGUMENT;
577
 
  }
578
 
 
579
 
  return gearman_wait(client->universal);
580
 
}
581
 
 
582
 
void *gearman_client_do(gearman_client_st *client,
583
 
                        const char *function,
584
 
                        const char *unique,
585
 
                        const void *workload,
586
 
                        size_t workload_size, size_t *result_size,
587
 
                        gearman_return_t *ret_ptr)
588
 
{
589
 
  return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB,
590
 
                    function,
591
 
                    unique,
592
 
                    workload, workload_size,
593
 
                    result_size, ret_ptr);
594
 
}
595
 
 
596
 
void *gearman_client_do_high(gearman_client_st *client,
597
 
                             const char *function,
598
 
                             const char *unique,
599
 
                             const void *workload, size_t workload_size,
600
 
                             size_t *result_size, gearman_return_t *ret_ptr)
601
 
{
602
 
  return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
603
 
                    function,
604
 
                    unique,
605
 
                    workload, workload_size,
606
 
                    result_size, ret_ptr);
607
 
}
608
 
 
609
 
void *gearman_client_do_low(gearman_client_st *client,
610
 
                            const char *function,
611
 
                            const char *unique,
612
 
                            const void *workload, size_t workload_size,
613
 
                            size_t *result_size, gearman_return_t *ret_ptr)
614
 
{
615
 
  return _client_do(client, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
616
 
                    function,
617
 
                    unique,
618
 
                    workload, workload_size,
619
 
                    result_size, ret_ptr);
620
 
}
621
 
 
622
 
size_t gearman_client_count_tasks(gearman_client_st *client)
623
 
{
624
 
  if (client == NULL)
625
 
  {
626
 
    return 0;
627
 
  }
628
 
 
629
 
  size_t count= 1;
630
 
  gearman_task_st *search= client->task_list;
631
 
 
632
 
  while ((search= search->next))
633
 
  {
634
 
    count++;
635
 
  }
636
 
 
637
 
  return count;
638
 
}
639
 
 
640
 
#if 0
641
 
static bool _active_tasks(gearman_client_st *client)
642
 
{
643
 
  assert(client);
644
 
  gearman_task_st *search= client->task_list;
645
 
 
646
 
  if (not search)
647
 
    return false;
648
 
 
649
 
  do
650
 
  {
651
 
    if (gearman_task_is_active(search))
652
 
    {
653
 
      return true;
654
 
    }
655
 
  } while ((search= search->next));
656
 
 
657
 
  return false;
658
 
}
659
 
#endif
660
 
 
661
 
const char *gearman_client_do_job_handle(gearman_client_st *self)
662
 
{
663
 
  if (self == NULL)
664
 
  {
665
 
    errno= EINVAL;
666
 
    return NULL;
667
 
  }
668
 
 
669
 
  return self->_do_handle;
670
 
}
671
 
 
672
 
void gearman_client_do_status(gearman_client_st *, uint32_t *numerator, uint32_t *denominator)
673
 
{
674
 
  if (numerator)
675
 
    *numerator= 0;
676
 
 
677
 
  if (denominator)
678
 
    *denominator= 0;
679
 
}
680
 
 
681
 
gearman_return_t gearman_client_do_background(gearman_client_st *client,
682
 
                                              const char *function_name,
683
 
                                              const char *unique,
684
 
                                              const void *workload_str,
685
 
                                              size_t workload_size,
686
 
                                              gearman_job_handle_t job_handle)
687
 
{
688
 
  gearman_string_t function= { gearman_string_param_cstr(function_name) };
689
 
  gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
690
 
  gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
691
 
 
692
 
  return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_BG,
693
 
                               function,
694
 
                               local_unique,
695
 
                               workload,
696
 
                               job_handle);
697
 
}
698
 
 
699
 
gearman_return_t gearman_client_do_high_background(gearman_client_st *client,
700
 
                                                   const char *function_name,
701
 
                                                   const char *unique,
702
 
                                                   const void *workload_str,
703
 
                                                   size_t workload_size,
704
 
                                                   gearman_job_handle_t job_handle)
705
 
{
706
 
  gearman_string_t function= { gearman_string_param_cstr(function_name) };
707
 
  gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
708
 
  gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
709
 
 
710
 
  return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
711
 
                               function,
712
 
                               local_unique,
713
 
                               workload,
714
 
                               job_handle);
715
 
}
716
 
 
717
 
gearman_return_t gearman_client_do_low_background(gearman_client_st *client,
718
 
                                                  const char *function_name,
719
 
                                                  const char *unique,
720
 
                                                  const void *workload_str,
721
 
                                                  size_t workload_size,
722
 
                                                  gearman_job_handle_t job_handle)
723
 
{
724
 
  gearman_string_t function= { gearman_string_param_cstr(function_name) };
725
 
  gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
726
 
  gearman_string_t workload= { static_cast<const char*>(workload_str), workload_size };
727
 
 
728
 
  return _client_do_background(client, GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
729
 
                               function,
730
 
                               local_unique,
731
 
                               workload,
732
 
                               job_handle);
733
 
}
734
 
 
735
 
gearman_return_t gearman_client_job_status(gearman_client_st *client,
736
 
                                           const gearman_job_handle_t job_handle,
737
 
                                           bool *is_known, bool *is_running,
738
 
                                           uint32_t *numerator,
739
 
                                           uint32_t *denominator)
740
 
{
741
 
  gearman_return_t ret;
742
 
 
743
 
  if (client == NULL)
744
 
  {
745
 
    return GEARMAN_INVALID_ARGUMENT;
746
 
  }
747
 
 
748
 
  gearman_task_st do_task;
749
 
  gearman_task_st *do_task_ptr= gearman_client_add_task_status(client, &do_task, client,
750
 
                                                               job_handle, &ret);
751
 
  if (gearman_failed(ret))
752
 
  {
753
 
    return ret;
754
 
  }
755
 
  assert(do_task_ptr);
756
 
  do_task_ptr->type= GEARMAN_TASK_KIND_DO;
757
 
 
758
 
  gearman_task_clear_fn(do_task_ptr);
759
 
 
760
 
  do {
761
 
    ret= gearman_client_run_tasks(client);
762
 
    
763
 
    // If either of the following is ever true, we will end up in an
764
 
    // infinite loop
765
 
    assert(ret != GEARMAN_IN_PROGRESS and ret != GEARMAN_JOB_EXISTS);
766
 
 
767
 
  } while (gearman_continue(ret));
768
 
 
769
 
  // @note we don't know if our task was run or not, we just know something
770
 
  // happened.
771
 
 
772
 
  if (gearman_success(ret))
773
 
  {
774
 
    if (is_known)
775
 
      *is_known= do_task.options.is_known;
776
 
 
777
 
    if (is_running)
778
 
      *is_running= do_task.options.is_running;
779
 
 
780
 
    if (numerator)
781
 
      *numerator= do_task.numerator;
782
 
 
783
 
    if (denominator)
784
 
      *denominator= do_task.denominator;
785
 
 
786
 
    if (not is_known and not is_running)
787
 
    {
788
 
      if (do_task.options.is_running) 
789
 
      {
790
 
        ret= GEARMAN_IN_PROGRESS;
791
 
      }
792
 
      else if (do_task.options.is_known)
793
 
      {
794
 
        ret= GEARMAN_JOB_EXISTS;
795
 
      }
796
 
    }
797
 
  }
798
 
  else
799
 
  {
800
 
    if (is_known)
801
 
    {
802
 
      *is_known= false;
803
 
    }
804
 
 
805
 
    if (is_running)
806
 
      *is_running= false;
807
 
 
808
 
    if (numerator)
809
 
      *numerator= 0;
810
 
 
811
 
    if (denominator)
812
 
      *denominator= 0;
813
 
  }
814
 
  gearman_task_free(do_task_ptr);
815
 
 
816
 
  return ret;
817
 
}
818
 
 
819
 
gearman_return_t gearman_client_echo(gearman_client_st *client,
820
 
                                     const void *workload,
821
 
                                     size_t workload_size)
822
 
{
823
 
  if (client == NULL)
824
 
  {
825
 
    return GEARMAN_INVALID_ARGUMENT;
826
 
  }
827
 
 
828
 
  return gearman_echo(client->universal, workload, workload_size);
829
 
}
830
 
 
831
 
void gearman_client_task_free_all(gearman_client_st *client)
832
 
{
833
 
  if (client == NULL)
834
 
  {
835
 
    return;
836
 
  }
837
 
 
838
 
  while (client->task_list)
839
 
  {
840
 
    gearman_task_free(client->task_list);
841
 
  }
842
 
}
843
 
 
844
 
void gearman_client_set_task_context_free_fn(gearman_client_st *client,
845
 
                                             gearman_task_context_free_fn *function)
846
 
{
847
 
  if (client == NULL)
848
 
  {
849
 
    return;
850
 
  }
851
 
 
852
 
  client->task_context_free_fn= function;
853
 
 
854
 
}
855
 
 
856
 
gearman_return_t gearman_client_set_memory_allocators(gearman_client_st *client,
857
 
                                                      gearman_malloc_fn *malloc_fn,
858
 
                                                      gearman_free_fn *free_fn,
859
 
                                                      gearman_realloc_fn *realloc_fn,
860
 
                                                      gearman_calloc_fn *calloc_fn,
861
 
                                                      void *context)
862
 
{
863
 
  if (client == NULL)
864
 
  {
865
 
    return GEARMAN_INVALID_ARGUMENT;
866
 
  }
867
 
 
868
 
  return gearman_set_memory_allocator(client->universal.allocator, malloc_fn, free_fn, realloc_fn, calloc_fn, context);
869
 
}
870
 
 
871
 
 
872
 
 
873
 
gearman_task_st *gearman_client_add_task(gearman_client_st *client,
874
 
                                         gearman_task_st *task,
875
 
                                         void *context,
876
 
                                         const char *function,
877
 
                                         const char *unique,
878
 
                                         const void *workload, size_t workload_size,
879
 
                                         gearman_return_t *ret_ptr)
880
 
{
881
 
  gearman_return_t unused;
882
 
  if (ret_ptr == NULL)
883
 
  {
884
 
    ret_ptr= &unused;
885
 
  }
886
 
 
887
 
  if (client == NULL)
888
 
  {
889
 
    *ret_ptr= GEARMAN_INVALID_ARGUMENT;
890
 
    return NULL;
891
 
  }
892
 
 
893
 
  return add_task(*client, task,
894
 
                  context, GEARMAN_COMMAND_SUBMIT_JOB,
895
 
                  function,
896
 
                  unique,
897
 
                  workload, workload_size,
898
 
                  time_t(0),
899
 
                  ret_ptr,
900
 
                  client->actions);
901
 
}
902
 
 
903
 
gearman_task_st *gearman_client_add_task_high(gearman_client_st *client,
904
 
                                              gearman_task_st *task,
905
 
                                              void *context,
906
 
                                              const char *function,
907
 
                                              const char *unique,
908
 
                                              const void *workload, size_t workload_size,
909
 
                                              gearman_return_t *ret_ptr)
910
 
{
911
 
  gearman_return_t unused;
912
 
  if (ret_ptr == NULL)
913
 
  {
914
 
    ret_ptr= &unused;
915
 
  }
916
 
 
917
 
  if (client == NULL)
918
 
  {
919
 
    *ret_ptr= GEARMAN_INVALID_ARGUMENT;
920
 
    return NULL;
921
 
  }
922
 
 
923
 
  return add_task(*client, task, context,
924
 
                  GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
925
 
                  function,
926
 
                  unique,
927
 
                  workload, workload_size,
928
 
                  time_t(0),
929
 
                  ret_ptr,
930
 
                  client->actions);
931
 
}
932
 
 
933
 
gearman_task_st *gearman_client_add_task_low(gearman_client_st *client,
934
 
                                             gearman_task_st *task,
935
 
                                             void *context,
936
 
                                             const char *function,
937
 
                                             const char *unique,
938
 
                                             const void *workload, size_t workload_size,
939
 
                                             gearman_return_t *ret_ptr)
940
 
{
941
 
  gearman_return_t unused;
942
 
  if (ret_ptr == NULL)
943
 
  {
944
 
    ret_ptr= &unused;
945
 
  }
946
 
 
947
 
  if (client == NULL)
948
 
  {
949
 
    *ret_ptr= GEARMAN_INVALID_ARGUMENT;
950
 
    return NULL;
951
 
  }
952
 
 
953
 
  return add_task(*client, task, context, GEARMAN_COMMAND_SUBMIT_JOB_LOW,
954
 
                  function,
955
 
                  unique,
956
 
                  workload, workload_size,
957
 
                  time_t(0),
958
 
                  ret_ptr,
959
 
                  client->actions);
960
 
}
961
 
 
962
 
gearman_task_st *gearman_client_add_task_background(gearman_client_st *client,
963
 
                                                    gearman_task_st *task,
964
 
                                                    void *context,
965
 
                                                    const char *function,
966
 
                                                    const char *unique,
967
 
                                                    const void *workload, size_t workload_size,
968
 
                                                    gearman_return_t *ret_ptr)
969
 
{
970
 
  gearman_return_t unused;
971
 
  if (ret_ptr == NULL)
972
 
  {
973
 
    ret_ptr= &unused;
974
 
  }
975
 
 
976
 
  if (client == NULL)
977
 
  {
978
 
    *ret_ptr= GEARMAN_INVALID_ARGUMENT;
979
 
    return NULL;
980
 
  }
981
 
 
982
 
  return add_task(*client, task, context, GEARMAN_COMMAND_SUBMIT_JOB_BG,
983
 
                  function,
984
 
                  unique,
985
 
                  workload, workload_size,
986
 
                  time_t(0),
987
 
                  ret_ptr,
988
 
                  client->actions);
989
 
}
990
 
 
991
 
gearman_task_st *
992
 
gearman_client_add_task_high_background(gearman_client_st *client,
993
 
                                        gearman_task_st *task,
994
 
                                        void *context,
995
 
                                        const char *function,
996
 
                                        const char *unique,
997
 
                                        const void *workload, size_t workload_size,
998
 
                                        gearman_return_t *ret_ptr)
999
 
{
1000
 
  gearman_return_t unused;
1001
 
  if (ret_ptr == NULL)
1002
 
  {
1003
 
    ret_ptr= &unused;
1004
 
  }
1005
 
 
1006
 
  if (client == NULL)
1007
 
  {
1008
 
    *ret_ptr= GEARMAN_INVALID_ARGUMENT;
1009
 
    return NULL;
1010
 
  }
1011
 
 
1012
 
  return add_task(*client, task, context,
1013
 
                  GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
1014
 
                  function,
1015
 
                  unique,
1016
 
                  workload, workload_size,
1017
 
                  time_t(0),
1018
 
                  ret_ptr,
1019
 
                  client->actions);
1020
 
}
1021
 
 
1022
 
gearman_task_st *
1023
 
gearman_client_add_task_low_background(gearman_client_st *client,
1024
 
                                       gearman_task_st *task,
1025
 
                                       void *context,
1026
 
                                       const char *function,
1027
 
                                       const char *unique,
1028
 
                                       const void *workload, size_t workload_size,
1029
 
                                       gearman_return_t *ret_ptr)
1030
 
{
1031
 
  gearman_return_t unused;
1032
 
  if (ret_ptr == NULL)
1033
 
  {
1034
 
    ret_ptr= &unused;
1035
 
  }
1036
 
 
1037
 
  if (client == NULL)
1038
 
  {
1039
 
    *ret_ptr= GEARMAN_INVALID_ARGUMENT;
1040
 
    return NULL;
1041
 
  }
1042
 
 
1043
 
  return add_task(*client, task, context,
1044
 
                  GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
1045
 
                  function,
1046
 
                  unique,
1047
 
                  workload, workload_size,
1048
 
                  time_t(0),
1049
 
                  ret_ptr,
1050
 
                  client->actions);
1051
 
 
1052
 
}
1053
 
 
1054
 
gearman_task_st *gearman_client_add_task_status(gearman_client_st *client,
1055
 
                                                gearman_task_st *task,
1056
 
                                                void *context,
1057
 
                                                const gearman_job_handle_t job_handle,
1058
 
                                                gearman_return_t *ret_ptr)
1059
 
{
1060
 
  const void *args[1];
1061
 
  size_t args_size[1];
1062
 
 
1063
 
  gearman_return_t unused;
1064
 
  if (ret_ptr == NULL)
1065
 
  {
1066
 
    ret_ptr= &unused;
1067
 
  }
1068
 
 
1069
 
  if (not (task= gearman_task_internal_create(client, task)))
1070
 
  {
1071
 
    *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
1072
 
    return NULL;
1073
 
  }
1074
 
 
1075
 
  task->context= context;
1076
 
  snprintf(task->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%s", job_handle);
1077
 
 
1078
 
  args[0]= job_handle;
1079
 
  args_size[0]= strlen(job_handle);
1080
 
  gearman_return_t rc= gearman_packet_create_args(client->universal, task->send,
1081
 
                                                  GEARMAN_MAGIC_REQUEST,
1082
 
                                                  GEARMAN_COMMAND_GET_STATUS,
1083
 
                                                  args, args_size, 1);
1084
 
  if (gearman_success(rc))
1085
 
  {
1086
 
    client->new_tasks++;
1087
 
    client->running_tasks++;
1088
 
    task->options.send_in_use= true;
1089
 
  }
1090
 
  *ret_ptr= rc;
1091
 
 
1092
 
  return task;
1093
 
}
1094
 
 
1095
 
void gearman_client_set_workload_fn(gearman_client_st *client,
1096
 
                                    gearman_workload_fn *function)
1097
 
{
1098
 
  if (client == NULL)
1099
 
  {
1100
 
    return;
1101
 
  }
1102
 
 
1103
 
  client->actions.workload_fn= function;
1104
 
}
1105
 
 
1106
 
void gearman_client_set_created_fn(gearman_client_st *client,
1107
 
                                   gearman_created_fn *function)
1108
 
{
1109
 
  if (client == NULL)
1110
 
  {
1111
 
    return;
1112
 
  }
1113
 
 
1114
 
  client->actions.created_fn= function;
1115
 
}
1116
 
 
1117
 
void gearman_client_set_data_fn(gearman_client_st *client,
1118
 
                                gearman_data_fn *function)
1119
 
{
1120
 
  if (client == NULL)
1121
 
  {
1122
 
    return;
1123
 
  }
1124
 
 
1125
 
  client->actions.data_fn= function;
1126
 
}
1127
 
 
1128
 
void gearman_client_set_warning_fn(gearman_client_st *client,
1129
 
                                   gearman_warning_fn *function)
1130
 
{
1131
 
  if (client == NULL)
1132
 
  {
1133
 
    return;
1134
 
  }
1135
 
 
1136
 
  client->actions.warning_fn= function;
1137
 
}
1138
 
 
1139
 
void gearman_client_set_status_fn(gearman_client_st *client,
1140
 
                                  gearman_universal_status_fn *function)
1141
 
{
1142
 
  if (client == NULL)
1143
 
  {
1144
 
    return;
1145
 
  }
1146
 
 
1147
 
  client->actions.status_fn= function;
1148
 
}
1149
 
 
1150
 
void gearman_client_set_complete_fn(gearman_client_st *client,
1151
 
                                    gearman_complete_fn *function)
1152
 
{
1153
 
  if (client == NULL)
1154
 
  {
1155
 
    return;
1156
 
  }
1157
 
 
1158
 
  client->actions.complete_fn= function;
1159
 
}
1160
 
 
1161
 
void gearman_client_set_exception_fn(gearman_client_st *client,
1162
 
                                     gearman_exception_fn *function)
1163
 
{
1164
 
  if (client == NULL)
1165
 
  {
1166
 
    return;
1167
 
  }
1168
 
 
1169
 
  client->actions.exception_fn= function;
1170
 
}
1171
 
 
1172
 
void gearman_client_set_fail_fn(gearman_client_st *client,
1173
 
                                gearman_fail_fn *function)
1174
 
{
1175
 
  if (client == NULL)
1176
 
  {
1177
 
    return;
1178
 
  }
1179
 
 
1180
 
  client->actions.fail_fn= function;
1181
 
}
1182
 
 
1183
 
void gearman_client_clear_fn(gearman_client_st *client)
1184
 
{
1185
 
  if (client == NULL)
1186
 
  {
1187
 
    return;
1188
 
  }
1189
 
 
1190
 
  client->actions= gearman_actions_default();
1191
 
}
1192
 
 
1193
 
static inline void _push_non_blocking(gearman_client_st *client)
1194
 
{
1195
 
  client->universal.options.stored_non_blocking= client->universal.options.non_blocking;
1196
 
  client->universal.options.non_blocking= true;
1197
 
}
1198
 
 
1199
 
static inline void _pop_non_blocking(gearman_client_st *client)
1200
 
{
1201
 
  client->universal.options.non_blocking= client->options.non_blocking;
1202
 
  assert(client->universal.options.stored_non_blocking == client->options.non_blocking);
1203
 
}
1204
 
 
1205
 
static inline void _push_blocking(gearman_client_st *client)
1206
 
{
1207
 
  client->universal.options.stored_non_blocking= client->universal.options.non_blocking;
1208
 
  client->universal.options.non_blocking= false;
1209
 
}
1210
 
 
1211
 
static inline void _pop_blocking(gearman_client_st *client)
1212
 
{
1213
 
  client->universal.options.non_blocking= client->options.non_blocking;
1214
 
  assert(client->universal.options.stored_non_blocking == client->options.non_blocking);
1215
 
}
1216
 
 
1217
 
static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
1218
 
{
1219
 
  gearman_return_t ret= GEARMAN_MAX_RETURN;
1220
 
 
1221
 
  switch(client->state)
1222
 
  {
1223
 
  case GEARMAN_CLIENT_STATE_IDLE:
1224
 
    while (1)
1225
 
    {
1226
 
      /* Start any new tasks. */
1227
 
      if (client->new_tasks > 0 && ! (client->options.no_new))
1228
 
      {
1229
 
        for (client->task= client->task_list; client->task;
1230
 
             client->task= client->task->next)
1231
 
        {
1232
 
          if (client->task->state != GEARMAN_TASK_STATE_NEW)
1233
 
          {
1234
 
            continue;
1235
 
          }
1236
 
 
1237
 
  case GEARMAN_CLIENT_STATE_NEW:
1238
 
          gearman_return_t local_ret= _client_run_task(client, client->task);
1239
 
          if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
1240
 
          {
1241
 
            client->state= GEARMAN_CLIENT_STATE_NEW;
1242
 
 
1243
 
            return local_ret;
1244
 
          }
1245
 
        }
1246
 
 
1247
 
        if (client->new_tasks == 0)
1248
 
        {
1249
 
          gearman_return_t local_ret= gearman_flush_all(client->universal);
1250
 
          if (gearman_failed(local_ret))
1251
 
          {
1252
 
            return local_ret;
1253
 
          }
1254
 
        }
1255
 
      }
1256
 
 
1257
 
      /* See if there are any connections ready for I/O. */
1258
 
      while ((client->con= gearman_ready(client->universal)))
1259
 
      {
1260
 
        if (client->con->revents & (POLLOUT | POLLERR | POLLHUP | POLLNVAL))
1261
 
        {
1262
 
          /* Socket is ready for writing, continue submitting jobs. */
1263
 
          for (client->task= client->task_list; client->task;
1264
 
               client->task= client->task->next)
1265
 
          {
1266
 
            if (client->task->con != client->con or
1267
 
                (client->task->state != GEARMAN_TASK_STATE_SUBMIT and
1268
 
                 client->task->state != GEARMAN_TASK_STATE_WORKLOAD))
1269
 
            {
1270
 
              continue;
1271
 
            }
1272
 
 
1273
 
  case GEARMAN_CLIENT_STATE_SUBMIT:
1274
 
            gearman_return_t local_ret= _client_run_task(client, client->task);
1275
 
            if (local_ret == GEARMAN_COULD_NOT_CONNECT)
1276
 
            {
1277
 
              client->state= GEARMAN_CLIENT_STATE_IDLE;
1278
 
              return local_ret;
1279
 
            }
1280
 
            else if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
1281
 
            {
1282
 
              client->state= GEARMAN_CLIENT_STATE_SUBMIT;
1283
 
              return local_ret;
1284
 
            }
1285
 
          }
1286
 
        }
1287
 
 
1288
 
        if (not (client->con->revents & POLLIN))
1289
 
          continue;
1290
 
 
1291
 
        /* Socket is ready for reading. */
1292
 
        while (1)
1293
 
        {
1294
 
          /* Read packet on connection and find which task it belongs to. */
1295
 
          if (client->options.unbuffered_result)
1296
 
          {
1297
 
            /* If client is handling the data read, make sure it's complete. */
1298
 
            if (client->con->recv_state == GEARMAN_CON_RECV_STATE_READ_DATA)
1299
 
            {
1300
 
              for (client->task= client->task_list; client->task;
1301
 
                   client->task= client->task->next)
1302
 
              {
1303
 
                if (client->task->con == client->con &&
1304
 
                    (client->task->state == GEARMAN_TASK_STATE_DATA or
1305
 
                     client->task->state == GEARMAN_TASK_STATE_COMPLETE))
1306
 
                {
1307
 
                  break;
1308
 
                }
1309
 
              }
1310
 
 
1311
 
              /*
1312
 
                Someone has set GEARMAN_CLIENT_UNBUFFERED_RESULT but hasn't setup the client to fetch data correctly.
1313
 
                Fatal error :(
1314
 
              */
1315
 
              return gearman_universal_set_error(client->universal, GEARMAN_INVALID_ARGUMENT, AT,
1316
 
                                                 "client created with GEARMAN_CLIENT_UNBUFFERED_RESULT, but was not setup to use it. %s", __func__);
1317
 
            }
1318
 
            else
1319
 
            {
1320
 
              /* Read the next packet, without buffering the data part. */
1321
 
              client->task= NULL;
1322
 
              (void)client->con->receiving(client->con->_packet, ret, false);
1323
 
            }
1324
 
          }
1325
 
          else
1326
 
          {
1327
 
            /* Read the next packet, buffering the data part. */
1328
 
            client->task= NULL;
1329
 
            (void)client->con->receiving(client->con->_packet, ret, true);
1330
 
          }
1331
 
 
1332
 
          if (client->task == NULL)
1333
 
          {
1334
 
            assert(ret != GEARMAN_MAX_RETURN);
1335
 
 
1336
 
            /* Check the return of the gearman_connection_recv() calls above. */
1337
 
            if (gearman_failed(ret))
1338
 
            {
1339
 
              if (ret == GEARMAN_IO_WAIT)
1340
 
                break;
1341
 
 
1342
 
              client->state= GEARMAN_CLIENT_STATE_IDLE;
1343
 
              return ret;
1344
 
            }
1345
 
 
1346
 
            client->con->options.packet_in_use= true;
1347
 
 
1348
 
            /* We have a packet, see which task it belongs to. */
1349
 
            for (client->task= client->task_list; client->task;
1350
 
                 client->task= client->task->next)
1351
 
            {
1352
 
              if (client->task->con != client->con)
1353
 
                continue;
1354
 
 
1355
 
              if (client->con->_packet.command == GEARMAN_COMMAND_JOB_CREATED)
1356
 
              {
1357
 
                if (client->task->created_id != client->con->created_id)
1358
 
                  continue;
1359
 
 
1360
 
                /* New job created, drop through below and notify task. */
1361
 
                client->con->created_id++;
1362
 
              }
1363
 
              else if (client->con->_packet.command == GEARMAN_COMMAND_ERROR)
1364
 
              {
1365
 
                gearman_universal_set_error(client->universal, GEARMAN_SERVER_ERROR, AT,
1366
 
                                            "%s:%.*s",
1367
 
                                            static_cast<char *>(client->con->_packet.arg[0]),
1368
 
                                            int(client->con->_packet.arg_size[1]),
1369
 
                                            static_cast<char *>(client->con->_packet.arg[1]));
1370
 
 
1371
 
                return GEARMAN_SERVER_ERROR;
1372
 
              }
1373
 
              else if (strncmp(client->task->job_handle,
1374
 
                               static_cast<char *>(client->con->_packet.arg[0]),
1375
 
                               client->con->_packet.arg_size[0]) ||
1376
 
                       (client->con->_packet.command != GEARMAN_COMMAND_WORK_FAIL &&
1377
 
                        strlen(client->task->job_handle) != client->con->_packet.arg_size[0] - 1) ||
1378
 
                       (client->con->_packet.command == GEARMAN_COMMAND_WORK_FAIL &&
1379
 
                        strlen(client->task->job_handle) != client->con->_packet.arg_size[0]))
1380
 
              {
1381
 
                continue;
1382
 
              }
1383
 
 
1384
 
              /* Else, we have a matching result packet of some kind. */
1385
 
 
1386
 
              break;
1387
 
            }
1388
 
 
1389
 
            if (not client->task)
1390
 
            {
1391
 
              /* The client has stopped waiting for the response, ignore it. */
1392
 
              client->con->free_private_packet();
1393
 
              continue;
1394
 
            }
1395
 
 
1396
 
            client->task->recv= &(client->con->_packet);
1397
 
          }
1398
 
 
1399
 
  case GEARMAN_CLIENT_STATE_PACKET:
1400
 
          /* Let task process job created or result packet. */
1401
 
          gearman_return_t local_ret= _client_run_task(client, client->task);
1402
 
 
1403
 
          if (local_ret == GEARMAN_IO_WAIT)
1404
 
            break;
1405
 
 
1406
 
          if (gearman_failed(local_ret))
1407
 
          {
1408
 
            client->state= GEARMAN_CLIENT_STATE_PACKET;
1409
 
            return local_ret;
1410
 
          }
1411
 
 
1412
 
          /* Clean up the packet. */
1413
 
          client->con->free_private_packet();
1414
 
 
1415
 
          /* If all tasks are done, return. */
1416
 
          if (client->running_tasks == 0)
1417
 
            break;
1418
 
        }
1419
 
      }
1420
 
 
1421
 
      /* If all tasks are done, return. */
1422
 
      if (client->running_tasks == 0)
1423
 
      {
1424
 
        break;
1425
 
      }
1426
 
 
1427
 
      if (client->new_tasks > 0 && ! (client->options.no_new))
1428
 
        continue;
1429
 
 
1430
 
      if (client->options.non_blocking)
1431
 
      {
1432
 
        /* Let the caller wait for activity. */
1433
 
        client->state= GEARMAN_CLIENT_STATE_IDLE;
1434
 
        gearman_gerror(client->universal, GEARMAN_IO_WAIT);
1435
 
 
1436
 
        return GEARMAN_IO_WAIT;
1437
 
      }
1438
 
 
1439
 
      /* Wait for activity on one of the connections. */
1440
 
      gearman_return_t local_ret= gearman_wait(client->universal);
1441
 
      if (gearman_failed(local_ret) and local_ret != GEARMAN_IO_WAIT)
1442
 
      {
1443
 
        client->state= GEARMAN_CLIENT_STATE_IDLE;
1444
 
 
1445
 
        return local_ret;
1446
 
      }
1447
 
    }
1448
 
 
1449
 
    break;
1450
 
  }
1451
 
 
1452
 
  client->state= GEARMAN_CLIENT_STATE_IDLE;
1453
 
 
1454
 
  return GEARMAN_SUCCESS;
1455
 
}
1456
 
 
1457
 
gearman_return_t gearman_client_run_tasks(gearman_client_st *client)
1458
 
{
1459
 
  if (client == NULL)
1460
 
  {
1461
 
    return GEARMAN_INVALID_ARGUMENT;
1462
 
  }
1463
 
 
1464
 
  if (not client->task_list) // We are immediatly successful if all tasks are completed
1465
 
  {
1466
 
    return GEARMAN_SUCCESS;
1467
 
  }
1468
 
 
1469
 
 
1470
 
  _push_non_blocking(client);
1471
 
 
1472
 
  gearman_return_t rc= _client_run_tasks(client);
1473
 
 
1474
 
  _pop_non_blocking(client);
1475
 
 
1476
 
  if (rc == GEARMAN_COULD_NOT_CONNECT)
1477
 
  {
1478
 
    gearman_reset(client->universal);
1479
 
  }
1480
 
 
1481
 
  return rc;
1482
 
}
1483
 
 
1484
 
gearman_return_t gearman_client_run_block_tasks(gearman_client_st *client)
1485
 
{
1486
 
  if (client == NULL)
1487
 
  {
1488
 
    return GEARMAN_INVALID_ARGUMENT;
1489
 
  }
1490
 
 
1491
 
  if (not client->task_list) // We are immediatly successful if all tasks are completed
1492
 
  {
1493
 
    return GEARMAN_SUCCESS;
1494
 
  }
1495
 
 
1496
 
 
1497
 
  _push_blocking(client);
1498
 
 
1499
 
  gearman_return_t rc= _client_run_tasks(client);
1500
 
 
1501
 
  _pop_blocking(client);
1502
 
 
1503
 
  if (gearman_failed(rc))
1504
 
  {
1505
 
    if (rc == GEARMAN_COULD_NOT_CONNECT)
1506
 
    {
1507
 
      gearman_reset(client->universal);
1508
 
    }
1509
 
 
1510
 
    assert(gearman_universal_error_code(client->universal) == rc);
1511
 
  }
1512
 
 
1513
 
  return rc;
1514
 
}
1515
 
 
1516
 
/*
1517
 
 * Static Definitions
1518
 
 */
1519
 
 
1520
 
bool gearman_client_compare(const gearman_client_st *first, const gearman_client_st *second)
1521
 
{
1522
 
  if (first == NULL or second == NULL)
1523
 
  {
1524
 
    return false;
1525
 
  }
1526
 
 
1527
 
  if (strcmp(first->universal.con_list->host, second->universal.con_list->host))
1528
 
  {
1529
 
    return false;
1530
 
  }
1531
 
 
1532
 
  if (first->universal.con_list->port != second->universal.con_list->port)
1533
 
  {
1534
 
    return false;
1535
 
  }
1536
 
 
1537
 
  return true;
1538
 
}
1539
 
 
1540
 
bool gearman_client_set_server_option(gearman_client_st *self, const char *option_arg, size_t option_arg_size)
1541
 
{
1542
 
  if (self == NULL)
1543
 
  {
1544
 
    return false;
1545
 
  }
1546
 
 
1547
 
  gearman_string_t option= { option_arg, option_arg_size };
1548
 
 
1549
 
  return gearman_request_option(self->universal, option);
1550
 
}
1551
 
 
1552
 
void gearman_client_set_namespace(gearman_client_st *self, const char *namespace_key, size_t namespace_key_size)
1553
 
{
1554
 
  if (self == NULL)
1555
 
  {
1556
 
    return;
1557
 
  }
1558
 
 
1559
 
  gearman_universal_set_namespace(self->universal, namespace_key, namespace_key_size);
1560
 
}