~clint-fewbar/ubuntu/precise/gearmand/drop-unneeded-patches

« back to all changes in this revision

Viewing changes to libgearman/server.c

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2009-08-11 10:06:22 UTC
  • mto: (1.2.3 upstream) (6.1.1 sid)
  • mto: This revision was merged to the branch mainline in revision 3.
  • Revision ID: james.westby@ubuntu.com-20090811100622-6ig4iknanc73olum
ImportĀ upstreamĀ versionĀ 0.9

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Gearman server and library
 
2
 * Copyright (C) 2008 Brian Aker, Eric Day
 
3
 * All rights reserved.
 
4
 *
 
5
 * Use and distribution licensed under the BSD license.  See
 
6
 * the COPYING file in the parent directory for full text.
 
7
 */
 
8
 
 
9
/**
 
10
 * @file
 
11
 * @brief Server Definitions
 
12
 */
 
13
 
 
14
#include "common.h"
 
15
 
 
16
/*
 
17
 * Private declarations
 
18
 */
 
19
 
 
20
/**
 
21
 * @addtogroup gearman_server_private Private Server Functions
 
22
 * @ingroup gearman_server
 
23
 * @{
 
24
 */
 
25
 
 
26
/**
 
27
 * Add job to queue wihle replaying queue during startup.
 
28
 */
 
29
gearman_return_t _queue_replay_add(gearman_st *gearman, void *fn_arg,
 
30
                                   const void *unique, size_t unique_size,
 
31
                                   const void *function_name,
 
32
                                   size_t function_name_size, const void *data,
 
33
                                   size_t data_size,
 
34
                                   gearman_job_priority_t priority);
 
35
 
 
36
/**
 
37
 * Queue an error packet.
 
38
 */
 
39
static gearman_return_t _server_error_packet(gearman_server_con_st *server_con,
 
40
                                             const char *error_code,
 
41
                                             const char *error_string);
 
42
 
 
43
/**
 
44
 * Process text commands for a connection.
 
45
 */
 
46
static gearman_return_t _server_run_text(gearman_server_con_st *server_con,
 
47
                                         gearman_packet_st *packet);
 
48
 
 
49
/**
 
50
 * Send work result packets with data back to clients.
 
51
 */
 
52
static gearman_return_t
 
53
_server_queue_work_data(gearman_server_job_st *server_job,
 
54
                        gearman_packet_st *packet, gearman_command_t command);
 
55
 
 
56
/**
 
57
 * Wrapper for log handling.
 
58
 */
 
59
static void _log(gearman_st *gearman, gearman_verbose_t verbose,
 
60
                 const char *line, void *fn_arg);
 
61
 
 
62
/** @} */
 
63
 
 
64
/*
 
65
 * Public definitions
 
66
 */
 
67
 
 
68
gearman_server_st *gearman_server_create(gearman_server_st *server)
 
69
{
 
70
  struct utsname un;
 
71
 
 
72
  if (server == NULL)
 
73
  {
 
74
    server= malloc(sizeof(gearman_server_st));
 
75
    if (server == NULL)
 
76
      return NULL;
 
77
 
 
78
    server->options= GEARMAN_SERVER_ALLOCATED;
 
79
  }
 
80
  else
 
81
    server->options= 0;
 
82
 
 
83
  server->shutdown= false;
 
84
  server->shutdown_graceful= false;
 
85
  server->proc_wakeup= false;
 
86
  server->proc_shutdown= false;
 
87
  server->thread_count= 0;
 
88
  server->free_packet_count= 0;
 
89
  server->function_count= 0;
 
90
  server->job_count= 0;
 
91
  server->unique_count= 0;
 
92
  server->free_job_count= 0;
 
93
  server->free_client_count= 0;
 
94
  server->free_worker_count= 0;
 
95
  server->thread_list= NULL;
 
96
  server->free_packet_list= NULL;
 
97
  server->function_list= NULL;
 
98
  server->free_job_list= NULL;
 
99
  server->free_client_list= NULL;
 
100
  server->free_worker_list= NULL;
 
101
  server->log_fn= NULL;
 
102
  server->log_fn_arg= NULL;
 
103
  memset(server->job_hash, 0,
 
104
         sizeof(gearman_server_job_st *) * GEARMAN_JOB_HASH_SIZE);
 
105
  memset(server->unique_hash, 0,
 
106
         sizeof(gearman_server_job_st *) * GEARMAN_JOB_HASH_SIZE);
 
107
 
 
108
  server->gearman= gearman_create(&(server->gearman_static));
 
109
  if (server->gearman == NULL)
 
110
  {
 
111
    gearman_server_free(server);
 
112
    return NULL;
 
113
  }
 
114
 
 
115
  if (uname(&un) == -1)
 
116
  {
 
117
    gearman_server_free(server);
 
118
    return NULL;
 
119
  }
 
120
 
 
121
  snprintf(server->job_handle_prefix, GEARMAN_JOB_HANDLE_SIZE, "H:%s",
 
122
           un.nodename);
 
123
  server->job_handle_count= 1;
 
124
 
 
125
  return server;
 
126
}
 
127
 
 
128
void gearman_server_free(gearman_server_st *server)
 
129
{
 
130
  uint32_t key;
 
131
  gearman_server_packet_st *packet;
 
132
  gearman_server_job_st *job;
 
133
  gearman_server_client_st *client;
 
134
  gearman_server_worker_st *worker;
 
135
 
 
136
  /* All threads should be cleaned up before calling this. */
 
137
  assert(server->thread_list == NULL);
 
138
 
 
139
  for (key= 0; key < GEARMAN_JOB_HASH_SIZE; key++)
 
140
  {
 
141
    while (server->job_hash[key] != NULL)
 
142
      gearman_server_job_free(server->job_hash[key]);
 
143
  }
 
144
 
 
145
  while (server->function_list != NULL)
 
146
    gearman_server_function_free(server->function_list);
 
147
 
 
148
  while (server->free_packet_list != NULL)
 
149
  {
 
150
    packet= server->free_packet_list;
 
151
    server->free_packet_list= packet->next;
 
152
    free(packet);
 
153
  }
 
154
 
 
155
  while (server->free_job_list != NULL)
 
156
  {
 
157
    job= server->free_job_list;
 
158
    server->free_job_list= job->next;
 
159
    free(job);
 
160
  }
 
161
 
 
162
  while (server->free_client_list != NULL)
 
163
  {
 
164
    client= server->free_client_list;
 
165
    server->free_client_list= client->con_next;
 
166
    free(client);
 
167
  }
 
168
 
 
169
  while (server->free_worker_list != NULL)
 
170
  {
 
171
    worker= server->free_worker_list;
 
172
    server->free_worker_list= worker->con_next;
 
173
    free(worker);
 
174
  }
 
175
 
 
176
  if (server->gearman != NULL)
 
177
    gearman_free(server->gearman);
 
178
 
 
179
  if (server->options & GEARMAN_SERVER_ALLOCATED)
 
180
    free(server);
 
181
}
 
182
 
 
183
void gearman_server_set_log(gearman_server_st *server,
 
184
                            gearman_server_log_fn log_fn, void *log_fn_arg,
 
185
                            gearman_verbose_t verbose)
 
186
{
 
187
  server->log_fn= log_fn;
 
188
  server->log_fn_arg= log_fn_arg;
 
189
  gearman_set_log(server->gearman, _log, server, verbose);
 
190
}
 
191
 
 
192
gearman_return_t gearman_server_run_command(gearman_server_con_st *server_con,
 
193
                                            gearman_packet_st *packet)
 
194
{
 
195
  gearman_return_t ret;
 
196
  gearman_server_job_st *server_job;
 
197
  char job_handle[GEARMAN_JOB_HANDLE_SIZE];
 
198
  char option[GEARMAN_OPTION_SIZE];
 
199
  gearman_server_client_st *server_client;
 
200
  char numerator_buffer[11]; /* Max string size to hold a uint32_t. */
 
201
  char denominator_buffer[11]; /* Max string size to hold a uint32_t. */
 
202
  gearman_job_priority_t priority;
 
203
  gearman_st *gearman= gearman= server_con->thread->server->gearman;
 
204
 
 
205
  if (packet->magic == GEARMAN_MAGIC_RESPONSE)
 
206
  {
 
207
    return _server_error_packet(server_con, "bad_magic",
 
208
                                "Request magic expected");
 
209
  }
 
210
 
 
211
  switch (packet->command)
 
212
  {
 
213
  /* Client/worker requests. */
 
214
  case GEARMAN_COMMAND_ECHO_REQ:
 
215
    /* Reuse the data buffer and just shove the data back. */
 
216
    ret= gearman_server_io_packet_add(server_con, true, GEARMAN_MAGIC_RESPONSE,
 
217
                                      GEARMAN_COMMAND_ECHO_RES, packet->data,
 
218
                                      packet->data_size, NULL);
 
219
    if (ret != GEARMAN_SUCCESS)
 
220
      return ret;
 
221
 
 
222
    packet->options&= (gearman_packet_options_t)~GEARMAN_PACKET_FREE_DATA;
 
223
 
 
224
    break;
 
225
 
 
226
  /* Client requests. */
 
227
  case GEARMAN_COMMAND_SUBMIT_JOB:
 
228
  case GEARMAN_COMMAND_SUBMIT_JOB_BG:
 
229
  case GEARMAN_COMMAND_SUBMIT_JOB_HIGH:
 
230
  case GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG:
 
231
  case GEARMAN_COMMAND_SUBMIT_JOB_LOW:
 
232
  case GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG:
 
233
 
 
234
    if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB ||
 
235
        packet->command == GEARMAN_COMMAND_SUBMIT_JOB_BG)
 
236
    {
 
237
      priority= GEARMAN_JOB_PRIORITY_NORMAL;
 
238
    }
 
239
    else if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH ||
 
240
             packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG)
 
241
    {
 
242
      priority= GEARMAN_JOB_PRIORITY_HIGH;
 
243
    }
 
244
    else
 
245
      priority= GEARMAN_JOB_PRIORITY_LOW;
 
246
 
 
247
    if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_BG ||
 
248
        packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG ||
 
249
        packet->command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG)
 
250
    {
 
251
      server_client= NULL;
 
252
    }
 
253
    else
 
254
    {
 
255
      server_client= gearman_server_client_add(server_con);
 
256
      if (server_client == NULL)
 
257
        return GEARMAN_MEMORY_ALLOCATION_FAILURE;
 
258
    }
 
259
 
 
260
    /* Create a job. */
 
261
    server_job= gearman_server_job_add(server_con->thread->server,
 
262
                                       (char *)(packet->arg[0]),
 
263
                                       packet->arg_size[0] - 1,
 
264
                                       (char *)(packet->arg[1]),
 
265
                                       packet->arg_size[1] - 1, packet->data,
 
266
                                       packet->data_size, priority,
 
267
                                       server_client, &ret);
 
268
    if (ret == GEARMAN_SUCCESS)
 
269
      packet->options&= (gearman_packet_options_t)~GEARMAN_PACKET_FREE_DATA;
 
270
    else if (ret == GEARMAN_JOB_QUEUE_FULL)
 
271
    {
 
272
      return _server_error_packet(server_con, "queue_full",
 
273
                                  "Job queue is full");
 
274
    }
 
275
    else if (ret != GEARMAN_JOB_EXISTS)
 
276
      return ret;
 
277
 
 
278
    /* Queue the job created packet. */
 
279
    ret= gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
 
280
                                      GEARMAN_COMMAND_JOB_CREATED,
 
281
                                      server_job->job_handle,
 
282
                                      (size_t)strlen(server_job->job_handle),
 
283
                                      NULL);
 
284
    if (ret != GEARMAN_SUCCESS)
 
285
      return ret;
 
286
 
 
287
    break;
 
288
 
 
289
  case GEARMAN_COMMAND_GET_STATUS:
 
290
    /* This may not be NULL terminated, so copy to make sure it is. */
 
291
    snprintf(job_handle, GEARMAN_JOB_HANDLE_SIZE, "%.*s",
 
292
             (uint32_t)(packet->arg_size[0]), (char *)(packet->arg[0]));
 
293
 
 
294
    server_job= gearman_server_job_get(server_con->thread->server, job_handle);
 
295
 
 
296
    /* Queue status result packet. */
 
297
    if (server_job == NULL)
 
298
    {
 
299
      ret= gearman_server_io_packet_add(server_con, false,
 
300
                                        GEARMAN_MAGIC_RESPONSE,
 
301
                                        GEARMAN_COMMAND_STATUS_RES, job_handle,
 
302
                                        (size_t)(strlen(job_handle) + 1),
 
303
                                        "0", (size_t)2, "0", (size_t)2, "0",
 
304
                                        (size_t)2, "0", (size_t)1, NULL);
 
305
    }
 
306
    else
 
307
    {
 
308
      snprintf(numerator_buffer, 11, "%u", server_job->numerator);
 
309
      snprintf(denominator_buffer, 11, "%u", server_job->denominator);
 
310
 
 
311
      ret= gearman_server_io_packet_add(server_con, false,
 
312
                                        GEARMAN_MAGIC_RESPONSE,
 
313
                                        GEARMAN_COMMAND_STATUS_RES, job_handle,
 
314
                                        (size_t)(strlen(job_handle) + 1),
 
315
                                        "1", (size_t)2,
 
316
                                        server_job->worker == NULL ? "0" : "1",
 
317
                                        (size_t)2, numerator_buffer,
 
318
                                        (size_t)(strlen(numerator_buffer) + 1),
 
319
                                        denominator_buffer,
 
320
                                        (size_t)strlen(denominator_buffer),
 
321
                                        NULL);
 
322
    }
 
323
 
 
324
    if (ret != GEARMAN_SUCCESS)
 
325
      return ret;
 
326
 
 
327
    break;
 
328
 
 
329
  case GEARMAN_COMMAND_OPTION_REQ:
 
330
    /* This may not be NULL terminated, so copy to make sure it is. */
 
331
    snprintf(option, GEARMAN_OPTION_SIZE, "%.*s",
 
332
             (uint32_t)(packet->arg_size[0]), (char *)(packet->arg[0]));
 
333
    if (!strcasecmp(option, "exceptions"))
 
334
      server_con->options|= GEARMAN_SERVER_CON_EXCEPTIONS;
 
335
    else
 
336
    {
 
337
      return _server_error_packet(server_con, "unknown_option",
 
338
                                  "Server does not recognize given option");
 
339
    }
 
340
 
 
341
    ret= gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
 
342
                                      GEARMAN_COMMAND_OPTION_RES,
 
343
                                      packet->arg[0], packet->arg_size[0],
 
344
                                      NULL);
 
345
    if (ret != GEARMAN_SUCCESS)
 
346
      return ret;
 
347
 
 
348
    break;
 
349
 
 
350
  /* Worker requests. */
 
351
  case GEARMAN_COMMAND_CAN_DO:
 
352
    if (gearman_server_worker_add(server_con, (char *)(packet->arg[0]),
 
353
                                  packet->arg_size[0], 0) == NULL)
 
354
    {
 
355
      return GEARMAN_MEMORY_ALLOCATION_FAILURE;
 
356
    }
 
357
 
 
358
    break;
 
359
 
 
360
  case GEARMAN_COMMAND_CAN_DO_TIMEOUT:
 
361
    if (gearman_server_worker_add(server_con, (char *)(packet->arg[0]),
 
362
                                  packet->arg_size[0] - 1,
 
363
                                  (in_port_t)atoi((char *)(packet->arg[1])))
 
364
         == NULL)
 
365
    {
 
366
      return GEARMAN_MEMORY_ALLOCATION_FAILURE;
 
367
    }
 
368
 
 
369
    break;
 
370
 
 
371
  case GEARMAN_COMMAND_CANT_DO:
 
372
    gearman_server_con_free_worker(server_con, (char *)(packet->arg[0]),
 
373
                                   packet->arg_size[0]);
 
374
    break;
 
375
 
 
376
  case GEARMAN_COMMAND_RESET_ABILITIES:
 
377
    gearman_server_con_free_workers(server_con);
 
378
    break;
 
379
 
 
380
  case GEARMAN_COMMAND_PRE_SLEEP:
 
381
    server_job= gearman_server_job_peek(server_con);
 
382
    if (server_job == NULL)
 
383
      server_con->options|= GEARMAN_SERVER_CON_SLEEPING;
 
384
    else
 
385
    {
 
386
      /* If there are jobs that could be run, queue a NOOP packet to wake the
 
387
         worker up. This could be the result of a race codition. */
 
388
      ret= gearman_server_io_packet_add(server_con, false,
 
389
                                        GEARMAN_MAGIC_RESPONSE,
 
390
                                        GEARMAN_COMMAND_NOOP, NULL);
 
391
      if (ret != GEARMAN_SUCCESS)
 
392
        return ret;
 
393
    }
 
394
 
 
395
    break;
 
396
 
 
397
  case GEARMAN_COMMAND_GRAB_JOB:
 
398
  case GEARMAN_COMMAND_GRAB_JOB_UNIQ:
 
399
    server_con->options&=
 
400
                     (gearman_server_con_options_t)~GEARMAN_SERVER_CON_SLEEPING;
 
401
 
 
402
    server_job= gearman_server_job_take(server_con);
 
403
    if (server_job == NULL)
 
404
    {
 
405
      /* No jobs found, queue no job packet. */
 
406
      ret= gearman_server_io_packet_add(server_con, false,
 
407
                                        GEARMAN_MAGIC_RESPONSE,
 
408
                                        GEARMAN_COMMAND_NO_JOB, NULL);
 
409
    }
 
410
    else if (packet->command == GEARMAN_COMMAND_GRAB_JOB_UNIQ)
 
411
    {
 
412
      /* We found a runnable job, queue job assigned packet and take the job
 
413
         off the queue. */
 
414
      ret= gearman_server_io_packet_add(server_con, false,
 
415
                                   GEARMAN_MAGIC_RESPONSE,
 
416
                                   GEARMAN_COMMAND_JOB_ASSIGN_UNIQ,
 
417
                                   server_job->job_handle,
 
418
                                   (size_t)(strlen(server_job->job_handle) + 1),
 
419
                                   server_job->function->function_name,
 
420
                                   server_job->function->function_name_size + 1,
 
421
                                   server_job->unique,
 
422
                                   (size_t)(strlen(server_job->unique) + 1),
 
423
                                   server_job->data, server_job->data_size,
 
424
                                   NULL);
 
425
    }
 
426
    else
 
427
    {
 
428
      /* Same, but without unique ID. */
 
429
      ret= gearman_server_io_packet_add(server_con, false,
 
430
                                   GEARMAN_MAGIC_RESPONSE,
 
431
                                   GEARMAN_COMMAND_JOB_ASSIGN,
 
432
                                   server_job->job_handle,
 
433
                                   (size_t)(strlen(server_job->job_handle) + 1),
 
434
                                   server_job->function->function_name,
 
435
                                   server_job->function->function_name_size + 1,
 
436
                                   server_job->data, server_job->data_size,
 
437
                                   NULL);
 
438
    }
 
439
 
 
440
    if (ret != GEARMAN_SUCCESS)
 
441
    {
 
442
      if (server_job != NULL)
 
443
        return gearman_server_job_queue(server_job);
 
444
      return ret;
 
445
    }
 
446
 
 
447
    break;
 
448
 
 
449
  case GEARMAN_COMMAND_WORK_DATA:
 
450
  case GEARMAN_COMMAND_WORK_WARNING:
 
451
    server_job= gearman_server_job_get(server_con->thread->server,
 
452
                                       (char *)(packet->arg[0]));
 
453
    if (server_job == NULL)
 
454
    {
 
455
      return _server_error_packet(server_con, "job_not_found",
 
456
                                  "Job given in work result not found");
 
457
    }
 
458
 
 
459
    /* Queue the data/warning packet for all clients. */
 
460
    ret= _server_queue_work_data(server_job, packet, packet->command);
 
461
    if (ret != GEARMAN_SUCCESS)
 
462
      return ret;
 
463
 
 
464
    break;
 
465
 
 
466
  case GEARMAN_COMMAND_WORK_STATUS:
 
467
    server_job= gearman_server_job_get(server_con->thread->server,
 
468
                                       (char *)(packet->arg[0]));
 
469
    if (server_job == NULL)
 
470
    {
 
471
      return _server_error_packet(server_con, "job_not_found",
 
472
                                  "Job given in work result not found");
 
473
    }
 
474
 
 
475
    /* Update job status. */
 
476
    server_job->numerator= (uint32_t)atoi((char *)(packet->arg[1]));
 
477
 
 
478
    /* This may not be NULL terminated, so copy to make sure it is. */
 
479
    snprintf(denominator_buffer, 11, "%.*s", (uint32_t)(packet->arg_size[2]),
 
480
             (char *)(packet->arg[2]));
 
481
    server_job->denominator= (uint32_t)atoi(denominator_buffer);
 
482
 
 
483
    /* Queue the status packet for all clients. */
 
484
    for (server_client= server_job->client_list; server_client;
 
485
         server_client= server_client->job_next)
 
486
    {
 
487
      ret= gearman_server_io_packet_add(server_client->con, false,
 
488
                                        GEARMAN_MAGIC_RESPONSE,
 
489
                                        GEARMAN_COMMAND_WORK_STATUS,
 
490
                                        packet->arg[0], packet->arg_size[0],
 
491
                                        packet->arg[1], packet->arg_size[1],
 
492
                                        packet->arg[2], packet->arg_size[2],
 
493
                                        NULL);
 
494
      if (ret != GEARMAN_SUCCESS)
 
495
        return ret;
 
496
    }
 
497
 
 
498
    break;
 
499
 
 
500
  case GEARMAN_COMMAND_WORK_COMPLETE:
 
501
    server_job= gearman_server_job_get(server_con->thread->server,
 
502
                                       (char *)(packet->arg[0]));
 
503
    if (server_job == NULL)
 
504
    {
 
505
      return _server_error_packet(server_con, "job_not_found",
 
506
                                  "Job given in work result not found");
 
507
    }
 
508
 
 
509
    /* Queue the complete packet for all clients. */
 
510
    ret= _server_queue_work_data(server_job, packet,
 
511
                                 GEARMAN_COMMAND_WORK_COMPLETE);
 
512
    if (ret != GEARMAN_SUCCESS)
 
513
      return ret;
 
514
 
 
515
    /* Remove from persistent queue if one exists. */
 
516
    if (server_job->options & GEARMAN_SERVER_JOB_QUEUED &&
 
517
        gearman->queue_done_fn != NULL)
 
518
    {
 
519
      ret= (*(gearman->queue_done_fn))(gearman, (void *)gearman->queue_fn_arg,
 
520
                                      server_job->unique,
 
521
                                      (size_t)strlen(server_job->unique),
 
522
                                      server_job->function->function_name,
 
523
                                      server_job->function->function_name_size);
 
524
      if (ret != GEARMAN_SUCCESS)
 
525
        return ret;
 
526
    }
 
527
 
 
528
    /* Job is done, remove it. */
 
529
    gearman_server_job_free(server_job);
 
530
    break;
 
531
 
 
532
  case GEARMAN_COMMAND_WORK_EXCEPTION:
 
533
    server_job= gearman_server_job_get(server_con->thread->server,
 
534
                                       (char *)(packet->arg[0]));
 
535
    if (server_job == NULL)
 
536
    {
 
537
      return _server_error_packet(server_con, "job_not_found",
 
538
                                  "Job given in work result not found");
 
539
    }
 
540
 
 
541
    /* Queue the exception packet for all clients. */
 
542
    ret= _server_queue_work_data(server_job, packet,
 
543
                                 GEARMAN_COMMAND_WORK_EXCEPTION);
 
544
    if (ret != GEARMAN_SUCCESS)
 
545
      return ret;
 
546
    break;
 
547
 
 
548
  case GEARMAN_COMMAND_WORK_FAIL:
 
549
    /* This may not be NULL terminated, so copy to make sure it is. */
 
550
    snprintf(job_handle, GEARMAN_JOB_HANDLE_SIZE, "%.*s",
 
551
             (uint32_t)(packet->arg_size[0]), (char *)(packet->arg[0]));
 
552
 
 
553
    server_job= gearman_server_job_get(server_con->thread->server, job_handle);
 
554
    if (server_job == NULL)
 
555
    {
 
556
      return _server_error_packet(server_con, "job_not_found",
 
557
                                  "Job given in work result not found");
 
558
    }
 
559
 
 
560
    /* Queue the fail packet for all clients. */
 
561
    for (server_client= server_job->client_list; server_client;
 
562
         server_client= server_client->job_next)
 
563
    {
 
564
      ret= gearman_server_io_packet_add(server_client->con, false,
 
565
                                        GEARMAN_MAGIC_RESPONSE,
 
566
                                        GEARMAN_COMMAND_WORK_FAIL,
 
567
                                        packet->arg[0], packet->arg_size[0],
 
568
                                        NULL);
 
569
      if (ret != GEARMAN_SUCCESS)
 
570
        return ret;
 
571
    }
 
572
 
 
573
    /* Remove from persistent queue if one exists. */
 
574
    if (server_job->options & GEARMAN_SERVER_JOB_QUEUED &&
 
575
        gearman->queue_done_fn != NULL)
 
576
    {
 
577
      ret= (*(gearman->queue_done_fn))(gearman, (void *)gearman->queue_fn_arg,
 
578
                                      server_job->unique,
 
579
                                      (size_t)strlen(server_job->unique),
 
580
                                      server_job->function->function_name,
 
581
                                      server_job->function->function_name_size);
 
582
      if (ret != GEARMAN_SUCCESS)
 
583
        return ret;
 
584
    }
 
585
 
 
586
    /* Job is done, remove it. */
 
587
    gearman_server_job_free(server_job);
 
588
    break;
 
589
 
 
590
  case GEARMAN_COMMAND_SET_CLIENT_ID:
 
591
    gearman_server_con_set_id(server_con, (char *)(packet->arg[0]),
 
592
                              packet->arg_size[0]);
 
593
    break;
 
594
 
 
595
  case GEARMAN_COMMAND_TEXT:
 
596
    return _server_run_text(server_con, packet);
 
597
 
 
598
  case GEARMAN_COMMAND_UNUSED:
 
599
  case GEARMAN_COMMAND_NOOP:
 
600
  case GEARMAN_COMMAND_JOB_CREATED:
 
601
  case GEARMAN_COMMAND_NO_JOB:
 
602
  case GEARMAN_COMMAND_JOB_ASSIGN:
 
603
  case GEARMAN_COMMAND_ECHO_RES:
 
604
  case GEARMAN_COMMAND_ERROR:
 
605
  case GEARMAN_COMMAND_STATUS_RES:
 
606
  case GEARMAN_COMMAND_ALL_YOURS:
 
607
  case GEARMAN_COMMAND_OPTION_RES:
 
608
  case GEARMAN_COMMAND_SUBMIT_JOB_SCHED:
 
609
  case GEARMAN_COMMAND_SUBMIT_JOB_EPOCH:
 
610
  case GEARMAN_COMMAND_JOB_ASSIGN_UNIQ:
 
611
  case GEARMAN_COMMAND_MAX:
 
612
  default:
 
613
    return _server_error_packet(server_con, "bad_command",
 
614
                                "Command not expected");
 
615
  }
 
616
 
 
617
  return GEARMAN_SUCCESS;
 
618
}
 
619
 
 
620
gearman_return_t gearman_server_shutdown_graceful(gearman_server_st *server)
 
621
{
 
622
  server->shutdown_graceful= true;
 
623
 
 
624
  if (server->job_count == 0)
 
625
    return GEARMAN_SHUTDOWN;
 
626
 
 
627
  return GEARMAN_SHUTDOWN_GRACEFUL;
 
628
}
 
629
 
 
630
gearman_return_t gearman_server_queue_replay(gearman_server_st *server)
 
631
{
 
632
  gearman_return_t ret;
 
633
 
 
634
  if (server->gearman->queue_replay_fn == NULL)
 
635
    return GEARMAN_SUCCESS;
 
636
 
 
637
  server->options|= GEARMAN_SERVER_QUEUE_REPLAY;
 
638
 
 
639
  ret= (*(server->gearman->queue_replay_fn))(server->gearman,
 
640
                                          (void *)server->gearman->queue_fn_arg,
 
641
                                          _queue_replay_add, server);
 
642
 
 
643
  server->options&= (gearman_server_options_t)~GEARMAN_SERVER_QUEUE_REPLAY;
 
644
 
 
645
  return ret;
 
646
}
 
647
 
 
648
/*
 
649
 * Private definitions
 
650
 */
 
651
 
 
652
gearman_return_t _queue_replay_add(gearman_st *gearman __attribute__ ((unused)),
 
653
                                   void *fn_arg, const void *unique,
 
654
                                   size_t unique_size,
 
655
                                   const void *function_name,
 
656
                                   size_t function_name_size, const void *data,
 
657
                                   size_t data_size,
 
658
                                   gearman_job_priority_t priority)
 
659
{
 
660
  gearman_server_st *server= (gearman_server_st *)fn_arg;
 
661
  gearman_return_t ret;
 
662
 
 
663
  (void)gearman_server_job_add(server, (char *)function_name,
 
664
                               function_name_size, (char *)unique, unique_size,
 
665
                               data, data_size, priority, NULL, &ret);
 
666
  return ret;
 
667
}
 
668
 
 
669
static gearman_return_t _server_error_packet(gearman_server_con_st *server_con,
 
670
                                             const char *error_code,
 
671
                                             const char *error_string)
 
672
{
 
673
  return gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
 
674
                                      GEARMAN_COMMAND_ERROR, error_code,
 
675
                                      (size_t)(strlen(error_code) + 1),
 
676
                                      error_string,
 
677
                                      (size_t)strlen(error_string), NULL);
 
678
}
 
679
 
 
680
static gearman_return_t _server_run_text(gearman_server_con_st *server_con,
 
681
                                         gearman_packet_st *packet)
 
682
{
 
683
  char *data;
 
684
  char *new_data;
 
685
  size_t size;
 
686
  size_t total;
 
687
  int max_queue_size;
 
688
  gearman_server_thread_st *thread;
 
689
  gearman_server_con_st *con;
 
690
  gearman_server_worker_st *worker;
 
691
  gearman_server_function_st *function;
 
692
  gearman_server_packet_st *server_packet;
 
693
 
 
694
  data= malloc(GEARMAN_TEXT_RESPONSE_SIZE);
 
695
  if (data == NULL)
 
696
  {
 
697
    GEARMAN_ERROR_SET(packet->gearman, "_server_run_text", "malloc")
 
698
    return GEARMAN_MEMORY_ALLOCATION_FAILURE;
 
699
  }
 
700
  total= GEARMAN_TEXT_RESPONSE_SIZE;
 
701
 
 
702
  if (packet->argc == 0)
 
703
  {
 
704
    snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE,
 
705
             "ERR unknown_command Unknown+server+command\n");
 
706
  }
 
707
  else if (!strcasecmp("workers", (char *)(packet->arg[0])))
 
708
  {
 
709
    size= 0;
 
710
 
 
711
    for (thread= server_con->thread->server->thread_list; thread != NULL;
 
712
         thread= thread->next)
 
713
    {
 
714
      GEARMAN_SERVER_THREAD_LOCK(thread)
 
715
 
 
716
      for (con= thread->con_list; con != NULL; con= con->next)
 
717
      {
 
718
        if (con->host == NULL)
 
719
          continue;
 
720
 
 
721
        if (size > total)
 
722
          size= total;
 
723
 
 
724
        /* Make sure we have at least GEARMAN_TEXT_RESPONSE_SIZE bytes. */
 
725
        if (size + GEARMAN_TEXT_RESPONSE_SIZE > total)
 
726
        {
 
727
          new_data= realloc(data, total + GEARMAN_TEXT_RESPONSE_SIZE);
 
728
          if (new_data == NULL)
 
729
          {
 
730
            GEARMAN_SERVER_THREAD_UNLOCK(thread)
 
731
            free(data);
 
732
            GEARMAN_ERROR_SET(packet->gearman, "_server_run_text", "malloc")
 
733
            return GEARMAN_MEMORY_ALLOCATION_FAILURE;
 
734
          }
 
735
 
 
736
          data= new_data;
 
737
          total+= GEARMAN_TEXT_RESPONSE_SIZE;
 
738
        }
 
739
 
 
740
        size+= (size_t)snprintf(data + size, total - size, "%d %s %s :",
 
741
                                con->con.fd, con->host, con->id);
 
742
        if (size > total)
 
743
          continue;
 
744
 
 
745
        for (worker= con->worker_list; worker != NULL; worker= worker->con_next)
 
746
        {
 
747
          size+= (size_t)snprintf(data + size, total - size, " %.*s",
 
748
                                  (int)(worker->function->function_name_size),
 
749
                                  worker->function->function_name);
 
750
          if (size > total)
 
751
            break;
 
752
        }
 
753
 
 
754
        if (size > total)
 
755
          continue;
 
756
 
 
757
        size+= (size_t)snprintf(data + size, total - size, "\n");
 
758
      }
 
759
 
 
760
      GEARMAN_SERVER_THREAD_UNLOCK(thread)
 
761
    }
 
762
 
 
763
    if (size < total)
 
764
      snprintf(data + size, total - size, ".\n");
 
765
  }
 
766
  else if (!strcasecmp("status", (char *)(packet->arg[0])))
 
767
  {
 
768
    size= 0;
 
769
 
 
770
    for (function= server_con->thread->server->function_list; function != NULL;
 
771
         function= function->next)
 
772
    {
 
773
      if (size + GEARMAN_TEXT_RESPONSE_SIZE > total)
 
774
      {
 
775
        new_data= realloc(data, total + GEARMAN_TEXT_RESPONSE_SIZE);
 
776
        if (new_data == NULL)
 
777
        {
 
778
          free(data);
 
779
          GEARMAN_ERROR_SET(packet->gearman, "_server_run_text", "malloc")
 
780
          return GEARMAN_MEMORY_ALLOCATION_FAILURE;
 
781
        }
 
782
 
 
783
        data= new_data;
 
784
        total+= GEARMAN_TEXT_RESPONSE_SIZE;
 
785
      }
 
786
 
 
787
      size+= (size_t)snprintf(data + size, total - size, "%.*s\t%u\t%u\t%u\n",
 
788
                              (int)(function->function_name_size),
 
789
                              function->function_name, function->job_total,
 
790
                              function->job_running, function->worker_count);
 
791
      if (size > total)
 
792
        size= total;
 
793
    }
 
794
 
 
795
    if (size < total)
 
796
      snprintf(data + size, total - size, ".\n");
 
797
  }
 
798
  else if (!strcasecmp("maxqueue", (char *)(packet->arg[0])))
 
799
  {
 
800
    if (packet->argc == 1)
 
801
    {
 
802
      snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "ERR incomplete_args "
 
803
               "An+incomplete+set+of+arguments+was+sent+to+this+command\n");
 
804
    }
 
805
    else
 
806
    {
 
807
      if (packet->argc == 2)
 
808
        max_queue_size= GEARMAN_DEFAULT_MAX_QUEUE_SIZE;
 
809
      else
 
810
      {
 
811
        max_queue_size= atoi((char *)(packet->arg[2]));
 
812
        if (max_queue_size < 0)
 
813
          max_queue_size= 0;
 
814
      }
 
815
 
 
816
      for (function= server_con->thread->server->function_list;
 
817
           function != NULL; function= function->next)
 
818
      {
 
819
        if (strlen((char *)(packet->arg[1])) == function->function_name_size &&
 
820
            !memcmp(packet->arg[1], function->function_name,
 
821
                    function->function_name_size))
 
822
        {
 
823
          function->max_queue_size= (uint32_t)max_queue_size;
 
824
        }
 
825
      }
 
826
 
 
827
      snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
 
828
    }
 
829
  }
 
830
  else if (!strcasecmp("shutdown", (char *)(packet->arg[0])))
 
831
  {
 
832
    if (packet->argc == 1)
 
833
    {
 
834
      server_con->thread->server->shutdown= true;
 
835
      snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
 
836
    }
 
837
    else if (packet->argc == 2 &&
 
838
             !strcasecmp("graceful", (char *)(packet->arg[1])))
 
839
    {
 
840
      server_con->thread->server->shutdown_graceful= true;
 
841
      snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
 
842
    }
 
843
    else
 
844
    {
 
845
      snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE,
 
846
               "ERR unknown_args Unknown+arguments+to+server+command\n");
 
847
    }
 
848
  }
 
849
  else if (!strcasecmp("version", (char *)(packet->arg[0])))
 
850
    snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "%s\n", PACKAGE_VERSION);
 
851
  else
 
852
  {
 
853
    snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE,
 
854
             "ERR unknown_command Unknown+server+command\n");
 
855
  }
 
856
 
 
857
  server_packet= gearman_server_packet_create(server_con->thread, false);
 
858
  if (server_packet == NULL)
 
859
  {
 
860
    free(data);
 
861
    return GEARMAN_MEMORY_ALLOCATION_FAILURE;
 
862
  }
 
863
 
 
864
  if (gearman_packet_create(server_con->thread->gearman,
 
865
                            &(server_packet->packet)) == NULL)
 
866
  {
 
867
    free(data);
 
868
    gearman_server_packet_free(server_packet, server_con->thread, false);
 
869
    return GEARMAN_MEMORY_ALLOCATION_FAILURE;
 
870
  }
 
871
  
 
872
  server_packet->packet.magic= GEARMAN_MAGIC_TEXT;
 
873
  server_packet->packet.command= GEARMAN_COMMAND_TEXT;
 
874
  server_packet->packet.options|= (GEARMAN_PACKET_COMPLETE |
 
875
                                   GEARMAN_PACKET_FREE_DATA);
 
876
  server_packet->packet.data= data;
 
877
  server_packet->packet.data_size= strlen(data);
 
878
 
 
879
  GEARMAN_SERVER_THREAD_LOCK(server_con->thread)
 
880
  GEARMAN_FIFO_ADD(server_con->io_packet, server_packet,)
 
881
  GEARMAN_SERVER_THREAD_UNLOCK(server_con->thread)
 
882
 
 
883
  gearman_server_con_io_add(server_con);
 
884
 
 
885
  return GEARMAN_SUCCESS;
 
886
}
 
887
 
 
888
static gearman_return_t
 
889
_server_queue_work_data(gearman_server_job_st *server_job,
 
890
                        gearman_packet_st *packet, gearman_command_t command)
 
891
{
 
892
  gearman_server_client_st *server_client;
 
893
  uint8_t *data;
 
894
  gearman_return_t ret;
 
895
 
 
896
  for (server_client= server_job->client_list; server_client;
 
897
       server_client= server_client->job_next)
 
898
  {
 
899
    if (command == GEARMAN_COMMAND_WORK_EXCEPTION &&
 
900
        !(server_client->con->options & GEARMAN_SERVER_CON_EXCEPTIONS))
 
901
    {
 
902
      continue;
 
903
    }
 
904
 
 
905
    if (packet->data_size > 0)
 
906
    {
 
907
      if (packet->options & GEARMAN_PACKET_FREE_DATA &&
 
908
          server_client->job_next == NULL)
 
909
      {
 
910
        data= (uint8_t *)(packet->data);
 
911
        packet->options&= (gearman_packet_options_t)~GEARMAN_PACKET_FREE_DATA;
 
912
      }
 
913
      else
 
914
      {
 
915
        data= malloc(packet->data_size);
 
916
        if (data == NULL)
 
917
        {
 
918
          GEARMAN_ERROR_SET(packet->gearman, "_server_run_command", "malloc")
 
919
          return GEARMAN_MEMORY_ALLOCATION_FAILURE;
 
920
        }
 
921
 
 
922
        memcpy(data, packet->data, packet->data_size);
 
923
      }
 
924
    }
 
925
    else
 
926
      data= NULL;
 
927
 
 
928
    ret= gearman_server_io_packet_add(server_client->con, true,
 
929
                                      GEARMAN_MAGIC_RESPONSE, command,
 
930
                                      packet->arg[0], packet->arg_size[0],
 
931
                                      data, packet->data_size, NULL);
 
932
    if (ret != GEARMAN_SUCCESS)
 
933
      return ret;
 
934
  }
 
935
 
 
936
  return GEARMAN_SUCCESS;
 
937
}
 
938
 
 
939
static void _log(gearman_st *gearman __attribute__ ((unused)),
 
940
                 gearman_verbose_t verbose, const char *line, void *fn_arg)
 
941
{
 
942
  gearman_server_st *server= (gearman_server_st *)fn_arg;
 
943
  (*(server->log_fn))(server, verbose, line, server->log_fn_arg);
 
944
}