~mordred/gearmand/fix-1.0-build

« back to all changes in this revision

Viewing changes to libgearman-server/server.cc

  • Committer: Brian Aker
  • Date: 2010-06-14 19:29:21 UTC
  • Revision ID: brian@gaz-20100614192921-2mqdsnwallo0eog3
Merge in our current tree (this is a collection of all patches, except some
dealing with the frontend).

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* Gearman server and library
 
2
 * Copyright (C) 2010 Data Differential
2
3
 * Copyright (C) 2008 Brian Aker, Eric Day
3
4
 * All rights reserved.
4
5
 *
11
12
 * @brief Server Definitions
12
13
 */
13
14
 
14
 
#include "common.h"
 
15
#include "libgearman-server/common.h"
15
16
 
16
17
/*
17
18
 * Private declarations
24
25
 */
25
26
 
26
27
/**
 
28
 * Generate hash key for job handles and unique IDs.
 
29
 */
 
30
static uint32_t _server_job_hash(const char *key, size_t key_size)
 
31
{
 
32
  const char *ptr= key;
 
33
  int32_t value= 0;
 
34
 
 
35
  while (key_size--)
 
36
  {
 
37
    value += (int32_t)*ptr++;
 
38
    value += (value << 10);
 
39
    value ^= (value >> 6);
 
40
  }
 
41
  value += (value << 3);
 
42
  value ^= (value >> 11);
 
43
  value += (value << 15);
 
44
 
 
45
  return (uint32_t)(value == 0 ? 1 : value);
 
46
}
 
47
 
 
48
 
 
49
 
 
50
/**
27
51
 * Add job to queue wihle replaying queue during startup.
28
52
 */
29
53
gearman_return_t _queue_replay_add(gearman_server_st *server, void *context,
32
56
                                   size_t function_name_size, const void *data,
33
57
                                   size_t data_size,
34
58
                                   gearman_job_priority_t priority);
35
 
 
36
 
/**
37
 
 * Queue an error packet.
38
 
 */
39
 
static gearman_return_t _server_error_packet(gearman_server_con_st *server_con,
40
 
                                             const char *error_code,
41
 
                                             const char *error_string);
42
 
 
43
 
/**
44
 
 * Process text commands for a connection.
45
 
 */
46
 
static gearman_return_t _server_run_text(gearman_server_con_st *server_con,
47
 
                                         gearman_packet_st *packet);
48
 
 
49
59
/**
50
60
 * Send work result packets with data back to clients.
51
61
 */
53
63
_server_queue_work_data(gearman_server_job_st *server_job,
54
64
                        gearman_packet_st *packet, gearman_command_t command);
55
65
 
56
 
/**
57
 
 * Wrapper for log handling.
58
 
 */
59
 
static void _log(const char *line, gearman_verbose_t verbose, void *context);
60
 
 
61
66
/** @} */
62
67
 
63
68
/*
64
69
 * Public definitions
65
70
 */
66
71
 
67
 
gearman_server_st *gearman_server_create(gearman_server_st *server)
 
72
gearman_server_st::gearman_server_st()
68
73
{
69
74
  struct utsname un;
70
75
 
71
 
  if (server == NULL)
72
 
  {
73
 
    server= malloc(sizeof(gearman_server_st));
74
 
    if (server == NULL)
75
 
      return NULL;
76
 
 
77
 
    server->options.allocated= true;
78
 
  }
79
 
  else
80
 
    server->options.allocated= false;
81
 
 
82
 
  server->state.queue_startup= false;
83
 
  server->flags.round_robin= false;
84
 
  server->flags.threaded= false;
85
 
  server->shutdown= false;
86
 
  server->shutdown_graceful= false;
87
 
  server->proc_wakeup= false;
88
 
  server->proc_shutdown= false;
89
 
  server->job_retries= 0;
90
 
  server->worker_wakeup= 0;
91
 
  server->thread_count= 0;
92
 
  server->free_packet_count= 0;
93
 
  server->function_count= 0;
94
 
  server->job_count= 0;
95
 
  server->unique_count= 0;
96
 
  server->free_job_count= 0;
97
 
  server->free_client_count= 0;
98
 
  server->free_worker_count= 0;
99
 
  server->thread_list= NULL;
100
 
  server->free_packet_list= NULL;
101
 
  server->function_list= NULL;
102
 
  server->free_job_list= NULL;
103
 
  server->free_client_list= NULL;
104
 
  server->free_worker_list= NULL;
105
 
  server->log_fn= NULL;
106
 
  server->log_context= NULL;
107
 
  server->queue_context= NULL;
108
 
  server->queue_add_fn= NULL;
109
 
  server->queue_flush_fn= NULL;
110
 
  server->queue_done_fn= NULL;
111
 
  server->queue_replay_fn= NULL;
112
 
  memset(server->job_hash, 0,
113
 
         sizeof(gearman_server_job_st *) * GEARMAN_JOB_HASH_SIZE);
114
 
  memset(server->unique_hash, 0,
115
 
         sizeof(gearman_server_job_st *) * GEARMAN_JOB_HASH_SIZE);
116
 
 
117
 
  server->gearman= gearman_universal_create(&(server->gearman_universal_static), NULL);
118
 
  if (server->gearman == NULL)
119
 
  {
120
 
    gearman_server_free(server);
121
 
    return NULL;
122
 
  }
123
 
 
124
 
  if (uname(&un) == -1)
125
 
  {
126
 
    gearman_server_free(server);
127
 
    return NULL;
128
 
  }
129
 
 
130
 
  snprintf(server->job_handle_prefix, GEARMAN_JOB_HANDLE_SIZE, "H:%s",
131
 
           un.nodename);
132
 
  server->job_handle_count= 1;
133
 
 
134
 
  return server;
 
76
  state.queue_startup= false;
 
77
  flags.round_robin= false;
 
78
  flags.threaded= false;
 
79
  shutdown= false;
 
80
  shutdown_graceful= false;
 
81
  proc_wakeup= false;
 
82
  proc_shutdown= false;
 
83
  job_retries= 0;
 
84
  worker_wakeup= 0;
 
85
  thread_count= 0;
 
86
  function_count= 0;
 
87
  job_count= 0;
 
88
  unique_count= 0;
 
89
  thread_list= NULL;
 
90
  function_list= NULL;
 
91
  queue= NULL;
 
92
 
 
93
  job_hash.resize(GEARMAN_JOB_HASH_SIZE);
 
94
  unique_hash.resize(GEARMAN_JOB_HASH_SIZE);
 
95
 
 
96
  gearman_universal_create(&gearman, NULL);
 
97
 
 
98
  uname(&un);
 
99
 
 
100
  snprintf(job_handle_prefix, GEARMAN_JOB_HANDLE_SIZE, "H:%s", un.nodename);
 
101
  job_handle_count= 1;
135
102
}
136
103
 
137
 
void gearman_server_free(gearman_server_st *server)
 
104
gearman_server_st::~gearman_server_st()
138
105
{
139
106
  uint32_t key;
140
 
  gearman_server_packet_st *packet;
141
 
  gearman_server_job_st *job;
142
 
  gearman_server_client_st *client;
143
 
  gearman_server_worker_st *worker;
144
107
 
145
108
  /* All threads should be cleaned up before calling this. */
146
 
  assert(server->thread_list == NULL);
147
 
 
148
 
  for (key= 0; key < GEARMAN_JOB_HASH_SIZE; key++)
149
 
  {
150
 
    while (server->job_hash[key] != NULL)
151
 
      gearman_server_job_free(server->job_hash[key]);
152
 
  }
153
 
 
154
 
  while (server->function_list != NULL)
155
 
    gearman_server_function_free(server->function_list);
156
 
 
157
 
  while (server->free_packet_list != NULL)
158
 
  {
159
 
    packet= server->free_packet_list;
160
 
    server->free_packet_list= packet->next;
161
 
    free(packet);
162
 
  }
163
 
 
164
 
  while (server->free_job_list != NULL)
165
 
  {
166
 
    job= server->free_job_list;
167
 
    server->free_job_list= job->next;
168
 
    free(job);
169
 
  }
170
 
 
171
 
  while (server->free_client_list != NULL)
172
 
  {
173
 
    client= server->free_client_list;
174
 
    server->free_client_list= client->con_next;
175
 
    free(client);
176
 
  }
177
 
 
178
 
  while (server->free_worker_list != NULL)
179
 
  {
180
 
    worker= server->free_worker_list;
181
 
    server->free_worker_list= worker->con_next;
182
 
    free(worker);
183
 
  }
184
 
 
185
 
  if (server->gearman != NULL)
186
 
    gearman_universal_free(server->gearman);
187
 
 
188
 
  if (server->options.allocated)
189
 
    free(server);
190
 
}
191
 
 
192
 
void gearman_server_set_job_retries(gearman_server_st *server,
193
 
                                    uint8_t job_retries)
194
 
{
195
 
  server->job_retries= job_retries;
196
 
}
197
 
 
198
 
void gearman_server_set_worker_wakeup(gearman_server_st *server,
199
 
                                      uint8_t worker_wakeup)
200
 
{
201
 
  server->worker_wakeup= worker_wakeup;
202
 
}
203
 
 
204
 
void gearman_server_set_log_fn(gearman_server_st *server,
205
 
                               gearman_log_fn *function,
206
 
                               void *context, gearman_verbose_t verbose)
207
 
{
208
 
  server->log_fn= function;
209
 
  server->log_context= context;
210
 
  gearman_set_log_fn(server->gearman, _log, server, verbose);
 
109
#if 0
 
110
  assert(thread_list == 0)
 
111
#endif
 
112
 
 
113
    for (key= 0; key < GEARMAN_JOB_HASH_SIZE; key++)
 
114
    {
 
115
      while (job_hash[key] != NULL)
 
116
      {
 
117
        delete job_hash[key];
 
118
      }
 
119
    }
 
120
 
 
121
  while (function_list != NULL)
 
122
  {
 
123
    gearman_server_function_free(function_list);
 
124
  }
 
125
 
 
126
  gearman_universal_free(&gearman);
 
127
}
 
128
 
 
129
void gearman_server_st::client_free(gearman_server_client_st *client)
 
130
{
 
131
  GEARMAN_LIST_DEL(client->con->client, client, con_)
 
132
 
 
133
  if (client->job != NULL)
 
134
  {
 
135
    GEARMAN_LIST_DEL(client->job->client, client, job_)
 
136
 
 
137
    /* If this was a foreground job and is now abandoned, mark to not run. */
 
138
    if (client->job->client_list == NULL)
 
139
    {
 
140
      client->job->set_ignore_job(true);
 
141
      client->job->set_job_queued(false);
 
142
    }
 
143
  }
 
144
 
 
145
  delete client;
 
146
  client= NULL;
211
147
}
212
148
 
213
149
gearman_return_t gearman_server_run_command(gearman_server_con_st *server_con,
225
161
 
226
162
  if (packet->magic == GEARMAN_MAGIC_RESPONSE)
227
163
  {
228
 
    return _server_error_packet(server_con, "bad_magic",
229
 
                                "Request magic expected");
 
164
    return server_con->_server_error_packet("bad_magic", "Request magic expected");
230
165
  }
231
166
 
232
167
  switch (packet->command)
233
168
  {
234
 
  /* Client/worker requests. */
 
169
    /* Client/worker requests. */
235
170
  case GEARMAN_COMMAND_ECHO_REQ:
236
171
    /* Reuse the data buffer and just shove the data back. */
237
 
    ret= gearman_server_io_packet_add(server_con, true, GEARMAN_MAGIC_RESPONSE,
238
 
                                      GEARMAN_COMMAND_ECHO_RES, packet->data,
239
 
                                      packet->data_size, NULL);
 
172
    ret= server_con->io_packet_add(true, GEARMAN_MAGIC_RESPONSE,
 
173
                                   GEARMAN_COMMAND_ECHO_RES, packet->data,
 
174
                                   packet->data_size, NULL);
240
175
    if (ret != GEARMAN_SUCCESS)
241
176
      return ret;
242
177
 
244
179
 
245
180
    break;
246
181
 
247
 
  /* Client requests. */
 
182
    /* Client requests. */
248
183
  case GEARMAN_COMMAND_SUBMIT_JOB:
249
184
  case GEARMAN_COMMAND_SUBMIT_JOB_BG:
250
185
  case GEARMAN_COMMAND_SUBMIT_JOB_HIGH:
273
208
    }
274
209
    else
275
210
    {
276
 
      server_client= gearman_server_client_add(server_con);
 
211
      server_client= server_con->client_add();
277
212
      if (server_client == NULL)
278
213
        return GEARMAN_MEMORY_ALLOCATION_FAILURE;
279
214
    }
280
215
 
281
216
    /* Create a job. */
282
 
    server_job= gearman_server_job_add(server_con->thread->server,
283
 
                                       (char *)(packet->arg[0]),
284
 
                                       packet->arg_size[0] - 1,
285
 
                                       (char *)(packet->arg[1]),
286
 
                                       packet->arg_size[1] - 1, packet->data,
287
 
                                       packet->data_size, priority,
288
 
                                       server_client, &ret);
 
217
    server_job= server_con->thread->server->gearman_server_job_add((char *)(packet->arg[0]),
 
218
                                                                   packet->arg_size[0] - 1,
 
219
                                                                   (char *)(packet->arg[1]),
 
220
                                                                   packet->arg_size[1] - 1, packet->data,
 
221
                                                                   packet->data_size, priority,
 
222
                                                                   server_client, &ret);
289
223
    if (ret == GEARMAN_SUCCESS)
290
224
    {
291
225
      packet->options.free_data= false;
292
226
    }
293
227
    else if (ret == GEARMAN_JOB_QUEUE_FULL)
294
228
    {
295
 
      return _server_error_packet(server_con, "queue_full",
296
 
                                  "Job queue is full");
 
229
      return server_con->_server_error_packet("queue_full", "Job queue is full");
297
230
    }
298
231
    else if (ret != GEARMAN_JOB_EXISTS)
299
232
      return ret;
300
233
 
301
234
    /* Queue the job created packet. */
302
 
    ret= gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
303
 
                                      GEARMAN_COMMAND_JOB_CREATED,
304
 
                                      server_job->job_handle,
305
 
                                      (size_t)strlen(server_job->job_handle),
306
 
                                      NULL);
 
235
    ret= server_con->io_packet_add(false, GEARMAN_MAGIC_RESPONSE,
 
236
                                   GEARMAN_COMMAND_JOB_CREATED,
 
237
                                   server_job->job_handle,
 
238
                                   (size_t)strlen(server_job->job_handle),
 
239
                                   NULL);
307
240
    if (ret != GEARMAN_SUCCESS)
308
241
      return ret;
309
242
 
314
247
    snprintf(job_handle, GEARMAN_JOB_HANDLE_SIZE, "%.*s",
315
248
             (uint32_t)(packet->arg_size[0]), (char *)(packet->arg[0]));
316
249
 
317
 
    server_job= gearman_server_job_get(server_con->thread->server, job_handle, NULL);
 
250
    server_job= server_con->thread->server->gearman_server_job_get(job_handle, NULL);
318
251
 
319
252
    /* Queue status result packet. */
320
253
    if (server_job == NULL)
321
254
    {
322
 
      ret= gearman_server_io_packet_add(server_con, false,
323
 
                                        GEARMAN_MAGIC_RESPONSE,
324
 
                                        GEARMAN_COMMAND_STATUS_RES, job_handle,
325
 
                                        (size_t)(strlen(job_handle) + 1),
326
 
                                        "0", (size_t)2, "0", (size_t)2, "0",
327
 
                                        (size_t)2, "0", (size_t)1, NULL);
 
255
      ret= server_con->io_packet_add(false,
 
256
                                     GEARMAN_MAGIC_RESPONSE,
 
257
                                     GEARMAN_COMMAND_STATUS_RES, job_handle,
 
258
                                     (size_t)(strlen(job_handle) + 1),
 
259
                                     "0", (size_t)2, "0", (size_t)2, "0",
 
260
                                     (size_t)2, "0", (size_t)1, NULL);
328
261
    }
329
262
    else
330
263
    {
331
264
      snprintf(numerator_buffer, 11, "%u", server_job->numerator);
332
265
      snprintf(denominator_buffer, 11, "%u", server_job->denominator);
333
266
 
334
 
      ret= gearman_server_io_packet_add(server_con, false,
335
 
                                        GEARMAN_MAGIC_RESPONSE,
336
 
                                        GEARMAN_COMMAND_STATUS_RES, job_handle,
337
 
                                        (size_t)(strlen(job_handle) + 1),
338
 
                                        "1", (size_t)2,
339
 
                                        server_job->worker == NULL ? "0" : "1",
340
 
                                        (size_t)2, numerator_buffer,
341
 
                                        (size_t)(strlen(numerator_buffer) + 1),
342
 
                                        denominator_buffer,
343
 
                                        (size_t)strlen(denominator_buffer),
344
 
                                        NULL);
 
267
      ret= server_con->io_packet_add(false,
 
268
                                     GEARMAN_MAGIC_RESPONSE,
 
269
                                     GEARMAN_COMMAND_STATUS_RES, job_handle,
 
270
                                     (size_t)(strlen(job_handle) + 1),
 
271
                                     "1", (size_t)2,
 
272
                                     server_job->worker == NULL ? "0" : "1",
 
273
                                     (size_t)2, numerator_buffer,
 
274
                                     (size_t)(strlen(numerator_buffer) + 1),
 
275
                                     denominator_buffer,
 
276
                                     (size_t)strlen(denominator_buffer),
 
277
                                     NULL);
345
278
    }
346
279
 
347
280
    if (ret != GEARMAN_SUCCESS)
360
293
    }
361
294
    else
362
295
    {
363
 
      return _server_error_packet(server_con, "unknown_option",
364
 
                                  "Server does not recognize given option");
 
296
      return server_con->_server_error_packet("unknown_option", "Server does not recognize given option");
365
297
    }
366
298
 
367
 
    ret= gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
368
 
                                      GEARMAN_COMMAND_OPTION_RES,
369
 
                                      packet->arg[0], packet->arg_size[0],
370
 
                                      NULL);
 
299
    ret= server_con->io_packet_add(false, GEARMAN_MAGIC_RESPONSE,
 
300
                                   GEARMAN_COMMAND_OPTION_RES,
 
301
                                   packet->arg[0], packet->arg_size[0],
 
302
                                   NULL);
371
303
    if (ret != GEARMAN_SUCCESS)
372
304
      return ret;
373
305
 
374
306
    break;
375
307
 
376
 
  /* Worker requests. */
 
308
    /* Worker requests. */
377
309
  case GEARMAN_COMMAND_CAN_DO:
378
 
    if (gearman_server_worker_add(server_con, (char *)(packet->arg[0]),
379
 
                                  packet->arg_size[0], 0) == NULL)
 
310
    if (server_con->worker_add((const char *)(packet->arg[0]),
 
311
                               packet->arg_size[0], 0) == NULL)
380
312
    {
381
313
      return GEARMAN_MEMORY_ALLOCATION_FAILURE;
382
314
    }
384
316
    break;
385
317
 
386
318
  case GEARMAN_COMMAND_CAN_DO_TIMEOUT:
387
 
    if (gearman_server_worker_add(server_con, (char *)(packet->arg[0]),
388
 
                                  packet->arg_size[0] - 1,
389
 
                                  (in_port_t)atoi((char *)(packet->arg[1])))
390
 
         == NULL)
 
319
    if (server_con->worker_add((const char *)(packet->arg[0]),
 
320
                               packet->arg_size[0] - 1,
 
321
                               (in_port_t)atoi((char *)(packet->arg[1])))
 
322
        == NULL)
391
323
    {
392
324
      return GEARMAN_MEMORY_ALLOCATION_FAILURE;
393
325
    }
395
327
    break;
396
328
 
397
329
  case GEARMAN_COMMAND_CANT_DO:
398
 
    gearman_server_con_free_worker(server_con, (char *)(packet->arg[0]),
399
 
                                   packet->arg_size[0]);
 
330
    server_con->free_worker((const char *)(packet->arg[0]), packet->arg_size[0]);
400
331
    break;
401
332
 
402
333
  case GEARMAN_COMMAND_RESET_ABILITIES:
403
 
    gearman_server_con_free_workers(server_con);
 
334
    server_con->free_workers();
404
335
    break;
405
336
 
406
337
  case GEARMAN_COMMAND_PRE_SLEEP:
407
 
    server_job= gearman_server_job_peek(server_con);
 
338
    server_job= server_con->job_peek();
408
339
    if (server_job == NULL)
409
340
    {
410
341
      server_con->is_sleeping= true;
412
343
    else
413
344
    {
414
345
      /* If there are jobs that could be run, queue a NOOP packet to wake the
415
 
         worker up. This could be the result of a race codition. */
416
 
      ret= gearman_server_io_packet_add(server_con, false,
417
 
                                        GEARMAN_MAGIC_RESPONSE,
418
 
                                        GEARMAN_COMMAND_NOOP, NULL);
 
346
        worker up. This could be the result of a race codition. */
 
347
      ret= server_con->io_packet_add(false,
 
348
                                     GEARMAN_MAGIC_RESPONSE,
 
349
                                     GEARMAN_COMMAND_NOOP, NULL);
419
350
      if (ret != GEARMAN_SUCCESS)
420
351
        return ret;
421
352
    }
427
358
    server_con->is_sleeping= false;
428
359
    server_con->is_noop_sent= false;
429
360
 
430
 
    server_job= gearman_server_job_take(server_con);
 
361
    server_job= server_con->job_take();
431
362
    if (server_job == NULL)
432
363
    {
433
364
      /* No jobs found, queue no job packet. */
434
 
      ret= gearman_server_io_packet_add(server_con, false,
435
 
                                        GEARMAN_MAGIC_RESPONSE,
436
 
                                        GEARMAN_COMMAND_NO_JOB, NULL);
 
365
      ret= server_con->io_packet_add(false,
 
366
                                     GEARMAN_MAGIC_RESPONSE,
 
367
                                     GEARMAN_COMMAND_NO_JOB, NULL);
437
368
    }
438
369
    else if (packet->command == GEARMAN_COMMAND_GRAB_JOB_UNIQ)
439
370
    {
440
371
      /* We found a runnable job, queue job assigned packet and take the job
441
 
         off the queue. */
442
 
      ret= gearman_server_io_packet_add(server_con, false,
443
 
                                   GEARMAN_MAGIC_RESPONSE,
444
 
                                   GEARMAN_COMMAND_JOB_ASSIGN_UNIQ,
445
 
                                   server_job->job_handle,
446
 
                                   (size_t)(strlen(server_job->job_handle) + 1),
447
 
                                   server_job->function->function_name,
448
 
                                   server_job->function->function_name_size + 1,
449
 
                                   server_job->unique,
450
 
                                   (size_t)(strlen(server_job->unique) + 1),
451
 
                                   server_job->data, server_job->data_size,
452
 
                                   NULL);
 
372
        off the queue. */
 
373
      ret= server_con->io_packet_add(false,
 
374
                                     GEARMAN_MAGIC_RESPONSE,
 
375
                                     GEARMAN_COMMAND_JOB_ASSIGN_UNIQ,
 
376
                                     server_job->job_handle,
 
377
                                     (size_t)(strlen(server_job->job_handle) + 1),
 
378
                                     server_job->function->function_name.c_str(),
 
379
                                     server_job->function->function_name.size() + 1,
 
380
                                     server_job->unique,
 
381
                                     (size_t)(strlen(server_job->unique) + 1),
 
382
                                     server_job->data(), server_job->data_size(),
 
383
                                     NULL);
453
384
    }
454
385
    else
455
386
    {
456
387
      /* Same, but without unique ID. */
457
 
      ret= gearman_server_io_packet_add(server_con, false,
458
 
                                   GEARMAN_MAGIC_RESPONSE,
459
 
                                   GEARMAN_COMMAND_JOB_ASSIGN,
460
 
                                   server_job->job_handle,
461
 
                                   (size_t)(strlen(server_job->job_handle) + 1),
462
 
                                   server_job->function->function_name,
463
 
                                   server_job->function->function_name_size + 1,
464
 
                                   server_job->data, server_job->data_size,
465
 
                                   NULL);
 
388
      ret= server_con->io_packet_add(false,
 
389
                                     GEARMAN_MAGIC_RESPONSE,
 
390
                                     GEARMAN_COMMAND_JOB_ASSIGN,
 
391
                                     server_job->job_handle,
 
392
                                     (size_t)(strlen(server_job->job_handle) + 1),
 
393
                                     server_job->function->function_name.c_str(),
 
394
                                     server_job->function->function_name.size() + 1,
 
395
                                     server_job->data(), server_job->data_size(),
 
396
                                     NULL);
466
397
    }
467
398
 
468
399
    if (ret != GEARMAN_SUCCESS)
469
400
    {
470
401
      if (server_job != NULL)
471
 
        return gearman_server_job_queue(server_job);
 
402
        return server_job->job_queue();
472
403
      return ret;
473
404
    }
474
405
 
476
407
 
477
408
  case GEARMAN_COMMAND_WORK_DATA:
478
409
  case GEARMAN_COMMAND_WORK_WARNING:
479
 
    server_job= gearman_server_job_get(server_con->thread->server,
480
 
                                       (char *)(packet->arg[0]),
481
 
                                       server_con);
 
410
    server_job= server_con->thread->server->gearman_server_job_get((char *)(packet->arg[0]), server_con);
 
411
 
482
412
    if (server_job == NULL)
483
413
    {
484
 
      return _server_error_packet(server_con, "job_not_found",
485
 
                                  "Job given in work result not found");
 
414
      return server_con->_server_error_packet("job_not_found", "Job given in work result not found");
486
415
    }
487
416
 
488
417
    /* Queue the data/warning packet for all clients. */
493
422
    break;
494
423
 
495
424
  case GEARMAN_COMMAND_WORK_STATUS:
496
 
    server_job= gearman_server_job_get(server_con->thread->server,
497
 
                                       (char *)(packet->arg[0]),
498
 
                                       server_con);
 
425
    server_job= server_con->thread->server->gearman_server_job_get((char *)(packet->arg[0]), server_con);
499
426
    if (server_job == NULL)
500
427
    {
501
 
      return _server_error_packet(server_con, "job_not_found",
502
 
                                  "Job given in work result not found");
 
428
      return server_con->_server_error_packet("job_not_found", "Job given in work result not found");
503
429
    }
504
430
 
505
431
    /* Update job status. */
514
440
    for (server_client= server_job->client_list; server_client;
515
441
         server_client= server_client->job_next)
516
442
    {
517
 
      ret= gearman_server_io_packet_add(server_client->con, false,
518
 
                                        GEARMAN_MAGIC_RESPONSE,
519
 
                                        GEARMAN_COMMAND_WORK_STATUS,
520
 
                                        packet->arg[0], packet->arg_size[0],
521
 
                                        packet->arg[1], packet->arg_size[1],
522
 
                                        packet->arg[2], packet->arg_size[2],
523
 
                                        NULL);
 
443
      ret= server_client->con->io_packet_add(false,
 
444
                                             GEARMAN_MAGIC_RESPONSE,
 
445
                                             GEARMAN_COMMAND_WORK_STATUS,
 
446
                                             packet->arg[0], packet->arg_size[0],
 
447
                                             packet->arg[1], packet->arg_size[1],
 
448
                                             packet->arg[2], packet->arg_size[2],
 
449
                                             NULL);
524
450
      if (ret != GEARMAN_SUCCESS)
525
451
        return ret;
526
452
    }
528
454
    break;
529
455
 
530
456
  case GEARMAN_COMMAND_WORK_COMPLETE:
531
 
    server_job= gearman_server_job_get(server_con->thread->server,
532
 
                                       (char *)(packet->arg[0]),
533
 
                                       server_con);
 
457
    server_job= server_con->thread->server->gearman_server_job_get((char *)(packet->arg[0]), server_con);
534
458
    if (server_job == NULL)
535
459
    {
536
 
      return _server_error_packet(server_con, "job_not_found",
537
 
                                  "Job given in work result not found");
 
460
      return server_con->_server_error_packet("job_not_found", "Job given in work result not found");
538
461
    }
539
462
 
540
463
    /* Queue the complete packet for all clients. */
544
467
      return ret;
545
468
 
546
469
    /* Remove from persistent queue if one exists. */
547
 
    if (server_job->job_queued && server->queue_done_fn != NULL)
 
470
    if (server_job->job_queued() && server->queue != NULL)
548
471
    {
549
 
      ret= (*(server->queue_done_fn))(server, (void *)server->queue_context,
550
 
                                      server_job->unique,
551
 
                                      (size_t)strlen(server_job->unique),
552
 
                                      server_job->function->function_name,
553
 
                                      server_job->function->function_name_size);
 
472
      ret= server->queue->done(server,
 
473
                               server_job->unique,
 
474
                               (size_t)strlen(server_job->unique),
 
475
                               server_job->function->function_name.c_str());
554
476
      if (ret != GEARMAN_SUCCESS)
555
477
        return ret;
556
478
    }
557
479
 
558
480
    /* Job is done, remove it. */
559
 
    gearman_server_job_free(server_job);
 
481
    delete server_job;
560
482
    break;
561
483
 
562
484
  case GEARMAN_COMMAND_WORK_EXCEPTION:
563
 
    server_job= gearman_server_job_get(server_con->thread->server,
564
 
                                       (char *)(packet->arg[0]),
565
 
                                       server_con);
 
485
    server_job= server_con->thread->server->gearman_server_job_get((char *)(packet->arg[0]), server_con);
566
486
    if (server_job == NULL)
567
487
    {
568
 
      return _server_error_packet(server_con, "job_not_found",
569
 
                                  "Job given in work result not found");
 
488
      return server_con->_server_error_packet("job_not_found", "Job given in work result not found");
570
489
    }
571
490
 
572
491
    /* Queue the exception packet for all clients. */
581
500
    snprintf(job_handle, GEARMAN_JOB_HANDLE_SIZE, "%.*s",
582
501
             (uint32_t)(packet->arg_size[0]), (char *)(packet->arg[0]));
583
502
 
584
 
    server_job= gearman_server_job_get(server_con->thread->server, job_handle,
585
 
                                       server_con);
 
503
    server_job= server_con->thread->server->gearman_server_job_get(job_handle, server_con);
586
504
    if (server_job == NULL)
587
505
    {
588
 
      return _server_error_packet(server_con, "job_not_found",
589
 
                                  "Job given in work result not found");
 
506
      return server_con->_server_error_packet("job_not_found", "Job given in work result not found");
590
507
    }
591
508
 
592
509
    /* Queue the fail packet for all clients. */
593
510
    for (server_client= server_job->client_list; server_client;
594
511
         server_client= server_client->job_next)
595
512
    {
596
 
      ret= gearman_server_io_packet_add(server_client->con, false,
597
 
                                        GEARMAN_MAGIC_RESPONSE,
598
 
                                        GEARMAN_COMMAND_WORK_FAIL,
599
 
                                        packet->arg[0], packet->arg_size[0],
600
 
                                        NULL);
 
513
      ret= server_client->con->io_packet_add(false,
 
514
                                             GEARMAN_MAGIC_RESPONSE,
 
515
                                             GEARMAN_COMMAND_WORK_FAIL,
 
516
                                             packet->arg[0], packet->arg_size[0],
 
517
                                             NULL);
601
518
      if (ret != GEARMAN_SUCCESS)
602
519
        return ret;
603
520
    }
604
521
 
605
522
    /* Remove from persistent queue if one exists. */
606
 
    if (server_job->job_queued && server->queue_done_fn != NULL)
 
523
    if (server_job->job_queued() && server->queue != NULL)
607
524
    {
608
 
      ret= (*(server->queue_done_fn))(server, (void *)server->queue_context,
609
 
                                      server_job->unique,
610
 
                                      (size_t)strlen(server_job->unique),
611
 
                                      server_job->function->function_name,
612
 
                                      server_job->function->function_name_size);
 
525
      ret= server->queue->done(server,
 
526
                               server_job->unique,
 
527
                               (size_t)strlen(server_job->unique),
 
528
                               server_job->function->function_name);
 
529
 
613
530
      if (ret != GEARMAN_SUCCESS)
614
531
        return ret;
615
532
    }
616
533
 
617
534
    /* Job is done, remove it. */
618
 
    gearman_server_job_free(server_job);
 
535
    delete server_job;
619
536
    break;
620
537
 
621
538
  case GEARMAN_COMMAND_SET_CLIENT_ID:
622
 
    gearman_server_con_set_id(server_con, (char *)(packet->arg[0]),
623
 
                              packet->arg_size[0]);
 
539
    server_con->set_id((char *)(packet->arg[0]), packet->arg_size[0]);
624
540
    break;
625
541
 
626
542
  case GEARMAN_COMMAND_TEXT:
627
 
    return _server_run_text(server_con, packet);
 
543
    return server_con->_server_run_text(packet);
628
544
 
629
545
  case GEARMAN_COMMAND_UNUSED:
630
546
  case GEARMAN_COMMAND_NOOP:
641
557
  case GEARMAN_COMMAND_JOB_ASSIGN_UNIQ:
642
558
  case GEARMAN_COMMAND_MAX:
643
559
  default:
644
 
    return _server_error_packet(server_con, "bad_command",
645
 
                                "Command not expected");
 
560
    return server_con->_server_error_packet("bad_command", "Command not expected");
646
561
  }
647
562
 
648
563
  return GEARMAN_SUCCESS;
649
564
}
650
565
 
651
 
gearman_return_t gearman_server_shutdown_graceful(gearman_server_st *server)
 
566
gearman_return_t gearman_server_st::gearman_server_shutdown_graceful()
652
567
{
653
 
  server->shutdown_graceful= true;
 
568
  shutdown_graceful= true;
654
569
 
655
 
  if (server->job_count == 0)
 
570
  if (job_count == 0)
656
571
    return GEARMAN_SHUTDOWN;
657
572
 
658
573
  return GEARMAN_SHUTDOWN_GRACEFUL;
659
574
}
660
575
 
661
 
gearman_return_t gearman_server_queue_replay(gearman_server_st *server)
 
576
gearman_return_t gearman_server_st::queue_replay()
662
577
{
663
578
  gearman_return_t ret;
664
579
 
665
 
  if (server->queue_replay_fn == NULL)
 
580
  if (queue == NULL)
666
581
    return GEARMAN_SUCCESS;
667
582
 
668
 
  server->state.queue_startup= true;
669
 
 
670
 
  ret= (*(server->queue_replay_fn))(server, (void *)server->queue_context,
671
 
                                    _queue_replay_add, server);
672
 
 
673
 
  server->state.queue_startup= false;
 
583
  state.queue_startup= true;
 
584
 
 
585
  ret= queue->replay(this, _queue_replay_add, this);
 
586
 
 
587
  state.queue_startup= false;
674
588
 
675
589
  return ret;
676
590
}
677
591
 
678
 
void *gearman_server_queue_context(const gearman_server_st *server)
679
 
{
680
 
  return (void *)server->queue_context;
681
 
}
682
 
 
683
 
void gearman_server_set_queue_context(gearman_server_st *server,
684
 
                                      void *context)
685
 
{
686
 
  server->queue_context= context;
687
 
}
688
 
 
689
 
void gearman_server_set_queue_add_fn(gearman_server_st *server,
690
 
                                     gearman_queue_add_fn *function)
691
 
{
692
 
  server->queue_add_fn= function;
693
 
}
694
 
 
695
 
void gearman_server_set_queue_flush_fn(gearman_server_st *server,
696
 
                                       gearman_queue_flush_fn *function)
697
 
{
698
 
  server->queue_flush_fn= function;
699
 
}
700
 
 
701
 
void gearman_server_set_queue_done_fn(gearman_server_st *server,
702
 
                                      gearman_queue_done_fn *function)
703
 
{
704
 
  server->queue_done_fn= function;
705
 
}
706
 
 
707
 
void gearman_server_set_queue_replay_fn(gearman_server_st *server,
708
 
                                        gearman_queue_replay_fn *function)
709
 
{
710
 
  server->queue_replay_fn= function;
711
 
}
712
 
 
713
592
/*
714
593
 * Private definitions
715
594
 */
724
603
{
725
604
  gearman_return_t ret;
726
605
 
727
 
  (void)gearman_server_job_add(server, (char *)function_name,
728
 
                               function_name_size, (char *)unique, unique_size,
729
 
                               data, data_size, priority, NULL, &ret);
 
606
  (void)server->gearman_server_job_add((char *)function_name,
 
607
                                       function_name_size, (char *)unique, unique_size,
 
608
                                       data, data_size, priority, NULL, &ret);
730
609
  return ret;
731
610
}
732
611
 
733
 
static gearman_return_t _server_error_packet(gearman_server_con_st *server_con,
734
 
                                             const char *error_code,
735
 
                                             const char *error_string)
736
 
{
737
 
  return gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
738
 
                                      GEARMAN_COMMAND_ERROR, error_code,
739
 
                                      (size_t)(strlen(error_code) + 1),
740
 
                                      error_string,
741
 
                                      (size_t)strlen(error_string), NULL);
742
 
}
743
612
 
744
 
static gearman_return_t _server_run_text(gearman_server_con_st *server_con,
745
 
                                         gearman_packet_st *packet)
 
613
gearman_return_t gearman_server_con_st::_server_run_text(gearman_packet_st *packet_arg)
746
614
{
747
 
  char *data;
 
615
  gearman_server_con_st *server_con= this;
 
616
  char *_data;
748
617
  char *new_data;
749
618
  size_t size;
750
619
  size_t total;
751
620
  int max_queue_size;
752
 
  gearman_server_thread_st *thread;
753
 
  gearman_server_con_st *con;
 
621
  gearman_server_thread_st *_thread;
 
622
  gearman_server_con_st *con_local;
754
623
  gearman_server_worker_st *worker;
755
624
  gearman_server_function_st *function;
756
 
  gearman_server_packet_st *server_packet;
757
625
 
758
 
  data= malloc(GEARMAN_TEXT_RESPONSE_SIZE);
759
 
  if (data == NULL)
 
626
  _data= (char *)malloc(GEARMAN_TEXT_RESPONSE_SIZE);
 
627
  if (_data == NULL)
760
628
  {
761
 
    gearman_log_error(packet->universal, "_server_run_text", "malloc");
 
629
    Log::Instance()->error("_server_run_text: malloc()");
 
630
 
762
631
    return GEARMAN_MEMORY_ALLOCATION_FAILURE;
763
632
  }
764
633
  total= GEARMAN_TEXT_RESPONSE_SIZE;
765
634
 
766
 
  if (packet->argc == 0)
 
635
  if (packet_arg->argc == 0)
767
636
  {
768
 
    snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE,
 
637
    snprintf(_data, GEARMAN_TEXT_RESPONSE_SIZE,
769
638
             "ERR unknown_command Unknown+server+command\n");
770
639
  }
771
 
  else if (!strcasecmp("workers", (char *)(packet->arg[0])))
 
640
  else if (!strcasecmp("workers", (char *)(packet_arg->arg[0])))
772
641
  {
773
642
    size= 0;
774
643
 
775
 
    for (thread= server_con->thread->server->thread_list; thread != NULL;
776
 
         thread= thread->next)
 
644
    for (_thread= server_con->thread->server->thread_list; _thread != NULL;
 
645
         _thread= thread->next)
777
646
    {
778
 
      (void) pthread_mutex_lock(&thread->lock);
 
647
      boost::mutex::scoped_lock l(_thread->lock);
779
648
 
780
 
      for (con= thread->con_list; con != NULL; con= con->next)
 
649
      BOOST_FOREACH(ServerConMap::value_type iter, _thread->con_list)
781
650
      {
782
 
        if (con->host == NULL)
 
651
        con_local= iter.second;
 
652
 
 
653
        if (con_local->_host == NULL)
783
654
          continue;
784
655
 
785
656
        if (size > total)
788
659
        /* Make sure we have at least GEARMAN_TEXT_RESPONSE_SIZE bytes. */
789
660
        if (size + GEARMAN_TEXT_RESPONSE_SIZE > total)
790
661
        {
791
 
          new_data= realloc(data, total + GEARMAN_TEXT_RESPONSE_SIZE);
 
662
          new_data= (char *)realloc(_data, total + GEARMAN_TEXT_RESPONSE_SIZE);
792
663
          if (new_data == NULL)
793
664
          {
794
 
            (void) pthread_mutex_unlock(&thread->lock);
795
 
            free(data);
796
 
            gearman_log_error(packet->universal, "_server_run_text", "malloc");
 
665
            free(_data);
 
666
            Log::Instance()->error("_server_run_text: malloc()");
 
667
 
797
668
            return GEARMAN_MEMORY_ALLOCATION_FAILURE;
798
669
          }
799
670
 
800
 
          data= new_data;
 
671
          _data= new_data;
801
672
          total+= GEARMAN_TEXT_RESPONSE_SIZE;
802
673
        }
803
674
 
804
 
        size+= (size_t)snprintf(data + size, total - size, "%d %s %s :",
805
 
                                con->con.fd, con->host, con->id);
 
675
        size+= (size_t)snprintf(_data + size, total - size, "%d %s %s :",
 
676
                                con_local->con()->fd, con_local->_host, con_local->_id);
806
677
        if (size > total)
807
678
          continue;
808
679
 
809
 
        for (worker= con->worker_list; worker != NULL; worker= worker->con_next)
 
680
        for (worker= con_local->worker_list; worker != NULL; worker= worker->con_next)
810
681
        {
811
 
          size+= (size_t)snprintf(data + size, total - size, " %.*s",
812
 
                                  (int)(worker->function->function_name_size),
813
 
                                  worker->function->function_name);
 
682
          size+= (size_t)snprintf(_data + size, total - size, " %.*s",
 
683
                                  (int)(worker->function->function_name.size()),
 
684
                                  worker->function->function_name.c_str());
814
685
          if (size > total)
815
686
            break;
816
687
        }
818
689
        if (size > total)
819
690
          continue;
820
691
 
821
 
        size+= (size_t)snprintf(data + size, total - size, "\n");
 
692
        size+= (size_t)snprintf(_data + size, total - size, "\n");
822
693
      }
823
 
 
824
 
      (void) pthread_mutex_unlock(&thread->lock);
825
694
    }
826
695
 
827
696
    if (size < total)
828
 
      snprintf(data + size, total - size, ".\n");
 
697
      snprintf(_data + size, total - size, ".\n");
829
698
  }
830
 
  else if (!strcasecmp("status", (char *)(packet->arg[0])))
 
699
  else if (!strcasecmp("status", (char *)(packet_arg->arg[0])))
831
700
  {
832
701
    size= 0;
833
702
 
836
705
    {
837
706
      if (size + GEARMAN_TEXT_RESPONSE_SIZE > total)
838
707
      {
839
 
        new_data= realloc(data, total + GEARMAN_TEXT_RESPONSE_SIZE);
 
708
        new_data= (char *)realloc(_data, total + GEARMAN_TEXT_RESPONSE_SIZE);
840
709
        if (new_data == NULL)
841
710
        {
842
 
          free(data);
843
 
          gearman_log_error(packet->universal, "_server_run_text", "malloc");
 
711
          free(_data);
 
712
          Log::Instance()->error("_server_run_text: malloc()");
 
713
 
844
714
          return GEARMAN_MEMORY_ALLOCATION_FAILURE;
845
715
        }
846
716
 
847
 
        data= new_data;
 
717
        _data= new_data;
848
718
        total+= GEARMAN_TEXT_RESPONSE_SIZE;
849
719
      }
850
720
 
851
 
      size+= (size_t)snprintf(data + size, total - size, "%.*s\t%u\t%u\t%u\n",
852
 
                              (int)(function->function_name_size),
853
 
                              function->function_name, function->job_total,
 
721
      size+= (size_t)snprintf(_data + size, total - size, "%.*s\t%u\t%u\t%u\n",
 
722
                              (int)(function->function_name.size()),
 
723
                              function->function_name.c_str(), function->job_total,
854
724
                              function->job_running, function->worker_count);
855
725
      if (size > total)
856
726
        size= total;
857
727
    }
858
728
 
859
729
    if (size < total)
860
 
      snprintf(data + size, total - size, ".\n");
 
730
      snprintf(_data + size, total - size, ".\n");
861
731
  }
862
 
  else if (!strcasecmp("maxqueue", (char *)(packet->arg[0])))
 
732
  else if (!strcasecmp("maxqueue", (char *)(packet_arg->arg[0])))
863
733
  {
864
 
    if (packet->argc == 1)
 
734
    if (packet_arg->argc == 1)
865
735
    {
866
 
      snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "ERR incomplete_args "
 
736
      snprintf(_data, GEARMAN_TEXT_RESPONSE_SIZE, "ERR incomplete_args "
867
737
               "An+incomplete+set+of+arguments+was+sent+to+this+command\n");
868
738
    }
869
739
    else
870
740
    {
871
 
      if (packet->argc == 2)
 
741
      if (packet_arg->argc == 2)
872
742
        max_queue_size= GEARMAN_DEFAULT_MAX_QUEUE_SIZE;
873
743
      else
874
744
      {
875
 
        max_queue_size= atoi((char *)(packet->arg[2]));
 
745
        max_queue_size= atoi((char *)(packet_arg->arg[2]));
876
746
        if (max_queue_size < 0)
877
747
          max_queue_size= 0;
878
748
      }
880
750
      for (function= server_con->thread->server->function_list;
881
751
           function != NULL; function= function->next)
882
752
      {
883
 
        if (strlen((char *)(packet->arg[1])) == function->function_name_size &&
884
 
            !memcmp(packet->arg[1], function->function_name,
885
 
                    function->function_name_size))
 
753
        if (strlen((char *)(packet_arg->arg[1])) == function->function_name.size() &&
 
754
            !memcmp(packet_arg->arg[1], function->function_name.c_str(),
 
755
                    function->function_name.size()))
886
756
        {
887
757
          function->max_queue_size= (uint32_t)max_queue_size;
888
758
        }
889
759
      }
890
760
 
891
 
      snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
 
761
      snprintf(_data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
892
762
    }
893
763
  }
894
 
  else if (!strcasecmp("shutdown", (char *)(packet->arg[0])))
 
764
  else if (!strcasecmp("shutdown", (char *)(packet_arg->arg[0])))
895
765
  {
896
 
    if (packet->argc == 1)
 
766
    if (packet_arg->argc == 1)
897
767
    {
898
768
      server_con->thread->server->shutdown= true;
899
 
      snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
 
769
      snprintf(_data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
900
770
    }
901
 
    else if (packet->argc == 2 &&
902
 
             !strcasecmp("graceful", (char *)(packet->arg[1])))
 
771
    else if (packet_arg->argc == 2 &&
 
772
             !strcasecmp("graceful", (char *)(packet_arg->arg[1])))
903
773
    {
904
774
      server_con->thread->server->shutdown_graceful= true;
905
 
      snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
 
775
      snprintf(_data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
906
776
    }
907
777
    else
908
778
    {
909
 
      snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE,
 
779
      snprintf(_data, GEARMAN_TEXT_RESPONSE_SIZE,
910
780
               "ERR unknown_args Unknown+arguments+to+server+command\n");
911
781
    }
912
782
  }
913
 
  else if (!strcasecmp("version", (char *)(packet->arg[0])))
914
 
    snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "%s\n", PACKAGE_VERSION);
 
783
  else if (!strcasecmp("version", (char *)(packet_arg->arg[0])))
 
784
    snprintf(_data, GEARMAN_TEXT_RESPONSE_SIZE, "%s\n", PACKAGE_VERSION);
915
785
  else
916
786
  {
917
 
    snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE,
 
787
    snprintf(_data, GEARMAN_TEXT_RESPONSE_SIZE,
918
788
             "ERR unknown_command Unknown+server+command\n");
919
789
  }
920
790
 
921
 
  server_packet= gearman_server_packet_create(server_con->thread, false);
 
791
  gearman_server_packet_st *server_packet= new gearman_server_packet_st(server_con->thread->gearman, 
 
792
                                                                        GEARMAN_MAGIC_TEXT, GEARMAN_COMMAND_TEXT);
 
793
 
922
794
  if (server_packet == NULL)
923
795
  {
924
 
    free(data);
925
 
    return GEARMAN_MEMORY_ALLOCATION_FAILURE;
926
 
  }
927
 
 
928
 
  if (gearman_packet_create(server_con->thread->gearman,
929
 
                            &(server_packet->packet)) == NULL)
930
 
  {
931
 
    free(data);
932
 
    gearman_server_packet_free(server_packet, server_con->thread, false);
933
 
    return GEARMAN_MEMORY_ALLOCATION_FAILURE;
934
 
  }
935
 
 
936
 
  server_packet->packet.magic= GEARMAN_MAGIC_TEXT;
937
 
  server_packet->packet.command= GEARMAN_COMMAND_TEXT;
 
796
    free(_data);
 
797
    return GEARMAN_MEMORY_ALLOCATION_FAILURE;
 
798
  }
 
799
 
938
800
  server_packet->packet.options.complete= true;
939
801
  server_packet->packet.options.free_data= true;
940
802
 
941
 
  server_packet->packet.data= data;
942
 
  server_packet->packet.data_size= strlen(data);
943
 
 
944
 
  (void) pthread_mutex_lock(&server_con->thread->lock);
945
 
  GEARMAN_FIFO_ADD(server_con->io_packet, server_packet,)
946
 
  (void) pthread_mutex_unlock(&server_con->thread->lock);
947
 
 
948
 
  gearman_server_con_io_add(server_con);
 
803
  server_packet->packet.data= _data;
 
804
  server_packet->packet.data_size= strlen(_data);
 
805
 
 
806
  {
 
807
    boost::mutex::scoped_lock l(server_con->thread->lock);
 
808
    server_con->io_packet_list.push(server_packet);
 
809
  }
 
810
 
 
811
  server_con->io_add();
949
812
 
950
813
  return GEARMAN_SUCCESS;
951
814
}
976
839
      }
977
840
      else
978
841
      {
979
 
        data= malloc(packet->data_size);
 
842
        data= (uint8_t *)malloc(packet->data_size);
980
843
        if (data == NULL)
981
844
        {
982
 
          gearman_log_error(packet->universal, "_server_run_command", "malloc");
 
845
          Log::Instance()->error("_server_run_command: malloc()");
983
846
          return GEARMAN_MEMORY_ALLOCATION_FAILURE;
984
847
        }
985
848
 
987
850
      }
988
851
    }
989
852
    else
 
853
    {
990
854
      data= NULL;
 
855
    }
991
856
 
992
 
    ret= gearman_server_io_packet_add(server_client->con, true,
993
 
                                      GEARMAN_MAGIC_RESPONSE, command,
994
 
                                      packet->arg[0], packet->arg_size[0],
995
 
                                      data, packet->data_size, NULL);
 
857
    ret= server_client->con->io_packet_add(true,
 
858
                                           GEARMAN_MAGIC_RESPONSE, command,
 
859
                                           packet->arg[0], packet->arg_size[0],
 
860
                                           data, packet->data_size, NULL);
996
861
    if (ret != GEARMAN_SUCCESS)
997
862
      return ret;
998
863
  }
1000
865
  return GEARMAN_SUCCESS;
1001
866
}
1002
867
 
1003
 
static void _log(const char *line, gearman_verbose_t verbose, void *context)
1004
 
{
1005
 
  gearman_server_st *server= (gearman_server_st *)context;
1006
 
  (*(server->log_fn))(line, verbose, (void *)server->log_context);
 
868
gearman_return_t gearman_server_st::_proc_thread_start()
 
869
{
 
870
  m_thread= new boost::thread(boost::bind(&gearman_server_st::_proc, this));
 
871
 
 
872
  assert(m_thread);
 
873
 
 
874
  flags.threaded= true;
 
875
 
 
876
  return GEARMAN_SUCCESS;
 
877
}
 
878
 
 
879
void gearman_server_st::_proc_thread_kill()
 
880
{
 
881
  gearman_server_st *server= this;
 
882
  if (! (server->flags.threaded) || server->proc_shutdown)
 
883
    return;
 
884
 
 
885
  server->proc_shutdown= true;
 
886
 
 
887
  /* Signal proc thread to shutdown. */
 
888
  {
 
889
    boost::mutex::scoped_lock l(server->proc_lock);
 
890
    server->proc_cond.notify_all();
 
891
  }
 
892
 
 
893
  /* Wait for the proc thread to exit and then cleanup. */
 
894
  m_thread->join();
 
895
  delete m_thread;
 
896
}
 
897
 
 
898
void *gearman_server_st::_proc()
 
899
{
 
900
  gearman_server_st *server= this;
 
901
  gearman_server_thread_st *thread;
 
902
  gearman_server_con_st *con;
 
903
  gearman_server_packet_st *packet;
 
904
 
 
905
  while (1)
 
906
  {
 
907
    {
 
908
      boost::mutex::scoped_lock l(server->proc_lock);
 
909
 
 
910
      while (server->proc_wakeup == false)
 
911
      {
 
912
        if (server->proc_shutdown)
 
913
        {
 
914
          return NULL;
 
915
        }
 
916
 
 
917
        server->proc_cond.wait(l);
 
918
      }
 
919
      server->proc_wakeup= false;
 
920
    }
 
921
 
 
922
    for (thread= server->thread_list; thread != NULL; thread= thread->next)
 
923
    {
 
924
      while ((con= thread->proc_next()) != NULL)
 
925
      {
 
926
        if (con->is_dead)
 
927
        {
 
928
          con->free_workers();
 
929
 
 
930
          while (con->client_list != NULL)
 
931
          {
 
932
            con->client_list->con->thread->server->client_free(con->client_list);
 
933
          }
 
934
 
 
935
          con->proc_removed= true;
 
936
          con->io_add();
 
937
          continue;
 
938
        }
 
939
 
 
940
        while (1)
 
941
        {
 
942
          packet= con->proc_packet_remove();
 
943
          if (packet == NULL)
 
944
            break;
 
945
 
 
946
          con->ret= gearman_server_run_command(con, &(packet->packet));
 
947
          delete packet;
 
948
        }
 
949
      }
 
950
    }
 
951
  }
 
952
}
 
953
 
 
954
gearman_server_job_st *gearman_server_st::gearman_server_job_add(const char *function_name,
 
955
                                                                 size_t , const char *unique,
 
956
                                                                 size_t unique_size, const void *data, size_t data_size,
 
957
                                                                 gearman_job_priority_t priority,
 
958
                                                                 gearman_server_client_st *server_client,
 
959
                                                                 gearman_return_t *ret_ptr)
 
960
{
 
961
  gearman_server_st *server= this;
 
962
  gearman_server_job_st *server_job;
 
963
  gearman_server_function_st *server_function;
 
964
  uint32_t key;
 
965
 
 
966
  server_function= server->function_get(function_name);
 
967
  if (server_function == NULL)
 
968
  {
 
969
    *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
 
970
    return NULL;
 
971
  }
 
972
 
 
973
  if (unique_size == 0)
 
974
  {
 
975
    server_job= NULL;
 
976
    key= 0;
 
977
  }
 
978
  else
 
979
  {
 
980
    if (unique_size == 1 && *unique ==  '-')
 
981
    {
 
982
      if (data_size == 0)
 
983
      {
 
984
        key= 0;
 
985
        server_job= NULL;
 
986
      }
 
987
      else
 
988
      {
 
989
        /* Look up job via unique data when unique = '-'. */
 
990
        key= _server_job_hash((const char*)data, data_size);
 
991
        server_job= server->_server_job_get_unique(key, server_function, (const char*)data, data_size);
 
992
      }
 
993
    }
 
994
    else
 
995
    {
 
996
      /* Look up job via unique ID first to make sure it's not a duplicate. */
 
997
      key= _server_job_hash(unique, unique_size);
 
998
      server_job= server->_server_job_get_unique(key, server_function, unique, 0);
 
999
    }
 
1000
  }
 
1001
 
 
1002
  if (server_job == NULL)
 
1003
  {
 
1004
    if (server_function->max_queue_size > 0 &&
 
1005
        server_function->job_total >= server_function->max_queue_size)
 
1006
    {
 
1007
      Log::Instance()->error("Queue full, failed to add %s : %s", function_name, unique);
 
1008
      *ret_ptr= GEARMAN_JOB_QUEUE_FULL;
 
1009
 
 
1010
      return NULL;
 
1011
    }
 
1012
 
 
1013
    server_job= server->job_create();
 
1014
    if (server_job == NULL)
 
1015
    {
 
1016
      *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
 
1017
      return NULL;
 
1018
    }
 
1019
    Log::Instance()->event(Logger::JOB_ACCEPTED, NULL);
 
1020
 
 
1021
    server_job->set_priority(priority);
 
1022
 
 
1023
    server_job->function= server_function;
 
1024
    server_function->job_total++;
 
1025
 
 
1026
    snprintf(server_job->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%s:%u",
 
1027
             server->job_handle_prefix, server->job_handle_count);
 
1028
    snprintf(server_job->unique, GEARMAN_UNIQUE_SIZE, "%.*s",
 
1029
             (uint32_t)unique_size, unique);
 
1030
    server->job_handle_count++;
 
1031
    server_job->set_data(data, data_size);
 
1032
 
 
1033
    server_job->unique_key= key;
 
1034
    key= key % GEARMAN_JOB_HASH_SIZE;
 
1035
    GEARMAN_HASH_ADD(server->unique, key, server_job, unique_);
 
1036
 
 
1037
    key= _server_job_hash(server_job->job_handle,
 
1038
                          strlen(server_job->job_handle));
 
1039
    server_job->set_job_handle_key(key);
 
1040
    key= key % GEARMAN_JOB_HASH_SIZE;
 
1041
    GEARMAN_HASH_ADD(server->job, key, server_job,);
 
1042
 
 
1043
    if (server->state.queue_startup)
 
1044
    {
 
1045
      server_job->set_job_queued(true);
 
1046
    }
 
1047
    else if (server_client == NULL && server->queue != NULL)
 
1048
    {
 
1049
      *ret_ptr= server->queue->add(server,
 
1050
                                   server_job->unique,
 
1051
                                   unique_size,
 
1052
                                   function_name,
 
1053
                                   data, data_size, priority);
 
1054
      if (*ret_ptr != GEARMAN_SUCCESS)
 
1055
      {
 
1056
        server_job->set_data();
 
1057
        delete server_job;
 
1058
        return NULL;
 
1059
      }
 
1060
 
 
1061
      *ret_ptr= server->queue->flush(server);
 
1062
      if (*ret_ptr != GEARMAN_SUCCESS)
 
1063
      {
 
1064
        server_job->set_data();
 
1065
        delete server_job;
 
1066
        return NULL;
 
1067
      }
 
1068
 
 
1069
      server_job->set_job_queued(true);
 
1070
    }
 
1071
 
 
1072
    *ret_ptr= server_job->job_queue();
 
1073
    if (*ret_ptr != GEARMAN_SUCCESS)
 
1074
    {
 
1075
      if (server_client == NULL && server->queue != NULL)
 
1076
      {
 
1077
        /* Do our best to remove the job from the queue. */
 
1078
        (void)server->queue->done(server,
 
1079
                                  server_job->unique, unique_size,
 
1080
                                  server_job->function->function_name);
 
1081
      }
 
1082
 
 
1083
      delete server_job;
 
1084
      return NULL;
 
1085
    }
 
1086
  }
 
1087
  else
 
1088
  {
 
1089
    *ret_ptr= GEARMAN_JOB_EXISTS;
 
1090
  }
 
1091
 
 
1092
  if (server_client != NULL)
 
1093
  {
 
1094
    server_client->job= server_job;
 
1095
    GEARMAN_LIST_ADD(server_job->client, server_client, job_)
 
1096
  }
 
1097
 
 
1098
  return server_job;
 
1099
}
 
1100
 
 
1101
gearman_server_job_st *gearman_server_st::job_create()
 
1102
{
 
1103
  gearman_server_job_st *server_job;
 
1104
 
 
1105
  server_job= new gearman_server_job_st(this);
 
1106
 
 
1107
  return server_job;
 
1108
}
 
1109
 
 
1110
gearman_server_job_st *gearman_server_st::gearman_server_job_get(const char *job_handle,
 
1111
                                                                 gearman_server_con_st *worker_con)
 
1112
{
 
1113
  gearman_server_st *server= this;
 
1114
  uint32_t key;
 
1115
 
 
1116
  key= _server_job_hash(job_handle, strlen(job_handle));
 
1117
 
 
1118
  for (gearman_server_job_st *server_job= server->job_hash[key % GEARMAN_JOB_HASH_SIZE];
 
1119
       server_job != NULL; server_job= server_job->next)
 
1120
  {
 
1121
    if (server_job->job_handle_key() == key &&
 
1122
        !strcmp(server_job->job_handle, job_handle))
 
1123
    {
 
1124
      /* Check to make sure the worker asking for the job still owns the job. */
 
1125
      if (worker_con != NULL && not server_job->owns_job(worker_con))
 
1126
      {
 
1127
        return NULL;
 
1128
      }
 
1129
 
 
1130
      return server_job;
 
1131
    }
 
1132
  }
 
1133
 
 
1134
  return NULL;
 
1135
}
 
1136
 
 
1137
 
 
1138
gearman_server_job_st *gearman_server_st::_server_job_get_unique(uint32_t unique_key,
 
1139
                                                                 gearman_server_function_st *server_function,
 
1140
                                                                 const char *unique, size_t data_size)
 
1141
{
 
1142
  gearman_server_st *server= this;
 
1143
  gearman_server_job_st *server_job;
 
1144
 
 
1145
  for (server_job= server->unique_hash[unique_key % GEARMAN_JOB_HASH_SIZE];
 
1146
       server_job != NULL; server_job= server_job->unique_next)
 
1147
  {
 
1148
    if (data_size == 0)
 
1149
    {
 
1150
      if (server_job->function == server_function &&
 
1151
          server_job->unique_key == unique_key &&
 
1152
          !strcmp(server_job->unique, unique))
 
1153
      {
 
1154
        return server_job;
 
1155
      }
 
1156
    }
 
1157
    else
 
1158
    {
 
1159
      if (server_job->function == server_function &&
 
1160
          server_job->unique_key == unique_key &&
 
1161
          server_job->compare_data(unique, data_size))
 
1162
      {
 
1163
        return server_job;
 
1164
      }
 
1165
    }
 
1166
  }
 
1167
 
 
1168
  return NULL;
1007
1169
}