~vjsamuel/gearmand/fix-bug-868883

« back to all changes in this revision

Viewing changes to libgearman/worker.cc

  • Committer: Brian Aker
  • Date: 2011-06-22 15:49:27 UTC
  • mfrom: (439.1.7 gearmand-trunk)
  • Revision ID: brian@tangent.org-20110622154927-ezro6sntfu48zf1q
Merge in curent ttrunk into the mainline.

Show diffs side-by-side

added added

removed removed

Lines of Context:
39
39
#include <libgearman/common.h>
40
40
#include <libgearman/connection.h>
41
41
#include <libgearman/packet.hpp>
 
42
#include <libgearman/allocator.hpp>
42
43
#include <libgearman/universal.hpp>
43
44
#include <libgearman/function/base.hpp>
44
45
#include <libgearman/function/make.hpp>
92
93
 * Allocate and add a function to the register list.
93
94
 */
94
95
static gearman_return_t _worker_function_create(gearman_worker_st *worker,
95
 
                                                const char *function_name,
96
 
                                                size_t function_length,
 
96
                                                const char *function_name, size_t function_length,
 
97
                                                const gearman_function_t &function,
97
98
                                                uint32_t timeout,
98
 
                                                gearman_worker_fn *worker_fb,
99
 
                                                gearman_function_fn *partitioner_fn,
100
 
                                                gearman_aggregator_fn *aggregator_fn,
101
99
                                                void *context);
102
100
 
103
101
/**
177
175
 
178
176
  if (worker->work_result)
179
177
  {
180
 
    if (worker->universal.workload_free_fn)
181
 
    {
182
 
      worker->universal.workload_free_fn(worker->work_result,
183
 
                                         static_cast<void *>((&worker->universal)->workload_free_context));
184
 
    }
185
 
    else
186
 
    {
187
 
      // Created with malloc
188
 
      free(worker->work_result);
189
 
    }
 
178
    gearman_free(worker->universal, worker->work_result);
190
179
  }
191
180
 
192
181
  while (worker->function_list)
435
424
                                         const char *function_name,
436
425
                                         uint32_t timeout)
437
426
{
438
 
  return _worker_function_create(worker, function_name, strlen(function_name), timeout, NULL, NULL, NULL, NULL);
 
427
  gearman_function_t null_func= gearman_function_create_null();
 
428
  return _worker_function_create(worker, function_name, strlen(function_name), null_func, timeout, NULL);
439
429
}
440
430
 
441
431
bool gearman_worker_function_exist(gearman_worker_st *worker,
836
826
  {
837
827
    return gearman_error(worker->universal, GEARMAN_INVALID_ARGUMENT, "function not given");
838
828
  }
 
829
  gearman_function_t local= gearman_function_create_v1(worker_fn);
839
830
 
840
831
  return _worker_function_create(worker,
841
832
                                 function_name, strlen(function_name),
 
833
                                 local,
842
834
                                 timeout,
843
 
                                 worker_fn,
844
 
                                 NULL, NULL,
845
835
                                 context);
846
836
}
847
837
 
861
851
    return gearman_error(worker->universal, GEARMAN_INVALID_ARGUMENT, "function name not given");
862
852
  }
863
853
 
864
 
  switch (function.kind)
865
 
  {
866
 
  case GEARMAN_WORKER_FUNCTION_V2:
867
 
    return _worker_function_create(worker,
868
 
                                   function_name, function_name_length,
869
 
                                   timeout,
870
 
                                   NULL,
871
 
                                   function.callback.function_v2.func,
872
 
                                   NULL,
873
 
                                   context);
874
 
 
875
 
  case GEARMAN_WORKER_FUNCTION_PARTITION:
876
 
    {
877
 
      gearman_return_t ret= _worker_function_create(worker,
878
 
                                                    function_name, function_name_length,
879
 
                                                    timeout,
880
 
                                                    NULL,
881
 
                                                    function.callback.partitioner.func,
882
 
                                                    function.callback.partitioner.aggregator,
883
 
                                                    context);
884
 
      worker->options.grab_all= gearman_success(ret);
885
 
 
886
 
      return ret;
887
 
    }
888
 
 
889
 
  case GEARMAN_WORKER_FUNCTION_V1:
890
 
    break;
891
 
  }
 
854
  return _worker_function_create(worker,
 
855
                                 function_name, function_name_length,
 
856
                                 function,
 
857
                                 timeout,
 
858
                                 context);
892
859
 
893
860
  return GEARMAN_INVALID_ARGUMENT;
894
861
}
895
862
 
896
863
gearman_return_t gearman_worker_work(gearman_worker_st *worker)
897
864
{
 
865
  bool shutdown= false;
 
866
 
898
867
  if (not worker)
899
868
  {
900
869
    return GEARMAN_INVALID_ARGUMENT;
965
934
        worker->work_job->error_code= GEARMAN_LOST_CONNECTION;
966
935
        break;
967
936
 
 
937
      case GEARMAN_FUNCTION_SHUTDOWN:
 
938
        shutdown= true;
 
939
 
968
940
      case GEARMAN_FUNCTION_SUCCESS:
969
941
        break;
970
942
      }
986
958
 
987
959
      if (worker->work_result)
988
960
      {
989
 
        if (worker->universal.workload_free_fn)
990
 
        {
991
 
          worker->universal.workload_free_fn(worker->work_result,
992
 
                                             (&worker->universal)->workload_free_context);
993
 
        }
994
 
        else
995
 
        {
996
 
          free(worker->work_result);
997
 
        }
 
961
        gearman_free(worker->universal, worker->work_result);
998
962
        worker->work_result= NULL;
999
963
      }
1000
964
 
1032
996
 
1033
997
  worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_GRAB_JOB;
1034
998
 
1035
 
  return GEARMAN_SUCCESS;
 
999
  return shutdown ? GEARMAN_SHUTDOWN : GEARMAN_SUCCESS;
1036
1000
}
1037
1001
 
1038
1002
gearman_return_t gearman_worker_echo(gearman_worker_st *worker,
1125
1089
}
1126
1090
 
1127
1091
static gearman_return_t _worker_function_create(gearman_worker_st *worker,
1128
 
                                                const char *function_name,
1129
 
                                                size_t function_length,
 
1092
                                                const char *function_name, size_t function_length,
 
1093
                                                const gearman_function_t &function_arg,
1130
1094
                                                uint32_t timeout,
1131
 
                                                gearman_worker_fn *worker_fn,
1132
 
                                                gearman_function_fn *partitioner_fn,
1133
 
                                                gearman_aggregator_fn *aggregator_fn,
1134
1095
                                                void *context)
1135
1096
{
1136
1097
  const void *args[2];
1137
1098
  size_t args_size[2];
1138
1099
 
1139
 
  _worker_function_st *function;
1140
 
  if (partitioner_fn and aggregator_fn)
1141
 
  {
1142
 
    function= make(worker->universal._namespace, function_name, function_length, partitioner_fn, aggregator_fn, context);
1143
 
  }
1144
 
  else
1145
 
  {
1146
 
    function= make(worker->universal._namespace, function_name, function_length, worker_fn, context);
1147
 
  }
 
1100
  _worker_function_st *function= make(worker->universal._namespace, function_name, function_length, function_arg, context);
1148
1101
 
1149
1102
  if (not function)
1150
1103
  {
1209
1162
  delete function;
1210
1163
}
1211
1164
 
 
1165
gearman_return_t gearman_worker_set_memory_allocators(gearman_worker_st *worker,
 
1166
                                                      gearman_malloc_fn *malloc_fn,
 
1167
                                                      gearman_free_fn *free_fn,
 
1168
                                                      gearman_realloc_fn *realloc_fn,
 
1169
                                                      gearman_calloc_fn *calloc_fn,
 
1170
                                                      void *context)
 
1171
{
 
1172
  if (not worker)
 
1173
    return GEARMAN_INVALID_ARGUMENT;
 
1174
 
 
1175
  return gearman_set_memory_allocator(worker->universal.allocator, malloc_fn, free_fn, realloc_fn, calloc_fn, context);
 
1176
}
 
1177
 
1212
1178
bool gearman_worker_set_server_option(gearman_worker_st *self, const char *option_arg, size_t option_arg_size)
1213
1179
{
1214
1180
  gearman_string_t option= { option_arg, option_arg_size };