1
/* Gearman server and library
2
* Copyright (C) 2008 Brian Aker, Eric Day
5
* Use and distribution licensed under the BSD license. See
6
* the COPYING file in the parent directory for full text.
11
* @brief Server Definitions
17
* Private declarations
21
* @addtogroup gearman_server_private Private Server Functions
22
* @ingroup gearman_server
27
* Add job to queue wihle replaying queue during startup.
29
gearman_return_t _queue_replay_add(gearman_st *gearman, void *fn_arg,
30
const void *unique, size_t unique_size,
31
const void *function_name,
32
size_t function_name_size, const void *data,
34
gearman_job_priority_t priority);
37
* Queue an error packet.
39
static gearman_return_t _server_error_packet(gearman_server_con_st *server_con,
40
const char *error_code,
41
const char *error_string);
44
* Process text commands for a connection.
46
static gearman_return_t _server_run_text(gearman_server_con_st *server_con,
47
gearman_packet_st *packet);
50
* Send work result packets with data back to clients.
52
static gearman_return_t
53
_server_queue_work_data(gearman_server_job_st *server_job,
54
gearman_packet_st *packet, gearman_command_t command);
57
* Wrapper for log handling.
59
static void _log(gearman_st *gearman, gearman_verbose_t verbose,
60
const char *line, void *fn_arg);
68
gearman_server_st *gearman_server_create(gearman_server_st *server)
74
server= malloc(sizeof(gearman_server_st));
78
server->options= GEARMAN_SERVER_ALLOCATED;
83
server->shutdown= false;
84
server->shutdown_graceful= false;
85
server->proc_wakeup= false;
86
server->proc_shutdown= false;
87
server->thread_count= 0;
88
server->free_packet_count= 0;
89
server->function_count= 0;
91
server->unique_count= 0;
92
server->free_job_count= 0;
93
server->free_client_count= 0;
94
server->free_worker_count= 0;
95
server->thread_list= NULL;
96
server->free_packet_list= NULL;
97
server->function_list= NULL;
98
server->free_job_list= NULL;
99
server->free_client_list= NULL;
100
server->free_worker_list= NULL;
101
server->log_fn= NULL;
102
server->log_fn_arg= NULL;
103
memset(server->job_hash, 0,
104
sizeof(gearman_server_job_st *) * GEARMAN_JOB_HASH_SIZE);
105
memset(server->unique_hash, 0,
106
sizeof(gearman_server_job_st *) * GEARMAN_JOB_HASH_SIZE);
108
server->gearman= gearman_create(&(server->gearman_static));
109
if (server->gearman == NULL)
111
gearman_server_free(server);
115
if (uname(&un) == -1)
117
gearman_server_free(server);
121
snprintf(server->job_handle_prefix, GEARMAN_JOB_HANDLE_SIZE, "H:%s",
123
server->job_handle_count= 1;
128
void gearman_server_free(gearman_server_st *server)
131
gearman_server_packet_st *packet;
132
gearman_server_job_st *job;
133
gearman_server_client_st *client;
134
gearman_server_worker_st *worker;
136
/* All threads should be cleaned up before calling this. */
137
assert(server->thread_list == NULL);
139
for (key= 0; key < GEARMAN_JOB_HASH_SIZE; key++)
141
while (server->job_hash[key] != NULL)
142
gearman_server_job_free(server->job_hash[key]);
145
while (server->function_list != NULL)
146
gearman_server_function_free(server->function_list);
148
while (server->free_packet_list != NULL)
150
packet= server->free_packet_list;
151
server->free_packet_list= packet->next;
155
while (server->free_job_list != NULL)
157
job= server->free_job_list;
158
server->free_job_list= job->next;
162
while (server->free_client_list != NULL)
164
client= server->free_client_list;
165
server->free_client_list= client->con_next;
169
while (server->free_worker_list != NULL)
171
worker= server->free_worker_list;
172
server->free_worker_list= worker->con_next;
176
if (server->gearman != NULL)
177
gearman_free(server->gearman);
179
if (server->options & GEARMAN_SERVER_ALLOCATED)
183
void gearman_server_set_log(gearman_server_st *server,
184
gearman_server_log_fn log_fn, void *log_fn_arg,
185
gearman_verbose_t verbose)
187
server->log_fn= log_fn;
188
server->log_fn_arg= log_fn_arg;
189
gearman_set_log(server->gearman, _log, server, verbose);
192
gearman_return_t gearman_server_run_command(gearman_server_con_st *server_con,
193
gearman_packet_st *packet)
195
gearman_return_t ret;
196
gearman_server_job_st *server_job;
197
char job_handle[GEARMAN_JOB_HANDLE_SIZE];
198
char option[GEARMAN_OPTION_SIZE];
199
gearman_server_client_st *server_client;
200
char numerator_buffer[11]; /* Max string size to hold a uint32_t. */
201
char denominator_buffer[11]; /* Max string size to hold a uint32_t. */
202
gearman_job_priority_t priority;
203
gearman_st *gearman= gearman= server_con->thread->server->gearman;
205
if (packet->magic == GEARMAN_MAGIC_RESPONSE)
207
return _server_error_packet(server_con, "bad_magic",
208
"Request magic expected");
211
switch (packet->command)
213
/* Client/worker requests. */
214
case GEARMAN_COMMAND_ECHO_REQ:
215
/* Reuse the data buffer and just shove the data back. */
216
ret= gearman_server_io_packet_add(server_con, true, GEARMAN_MAGIC_RESPONSE,
217
GEARMAN_COMMAND_ECHO_RES, packet->data,
218
packet->data_size, NULL);
219
if (ret != GEARMAN_SUCCESS)
222
packet->options&= (gearman_packet_options_t)~GEARMAN_PACKET_FREE_DATA;
226
/* Client requests. */
227
case GEARMAN_COMMAND_SUBMIT_JOB:
228
case GEARMAN_COMMAND_SUBMIT_JOB_BG:
229
case GEARMAN_COMMAND_SUBMIT_JOB_HIGH:
230
case GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG:
231
case GEARMAN_COMMAND_SUBMIT_JOB_LOW:
232
case GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG:
234
if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB ||
235
packet->command == GEARMAN_COMMAND_SUBMIT_JOB_BG)
237
priority= GEARMAN_JOB_PRIORITY_NORMAL;
239
else if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH ||
240
packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG)
242
priority= GEARMAN_JOB_PRIORITY_HIGH;
245
priority= GEARMAN_JOB_PRIORITY_LOW;
247
if (packet->command == GEARMAN_COMMAND_SUBMIT_JOB_BG ||
248
packet->command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG ||
249
packet->command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG)
255
server_client= gearman_server_client_add(server_con);
256
if (server_client == NULL)
257
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
261
server_job= gearman_server_job_add(server_con->thread->server,
262
(char *)(packet->arg[0]),
263
packet->arg_size[0] - 1,
264
(char *)(packet->arg[1]),
265
packet->arg_size[1] - 1, packet->data,
266
packet->data_size, priority,
267
server_client, &ret);
268
if (ret == GEARMAN_SUCCESS)
269
packet->options&= (gearman_packet_options_t)~GEARMAN_PACKET_FREE_DATA;
270
else if (ret == GEARMAN_JOB_QUEUE_FULL)
272
return _server_error_packet(server_con, "queue_full",
273
"Job queue is full");
275
else if (ret != GEARMAN_JOB_EXISTS)
278
/* Queue the job created packet. */
279
ret= gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
280
GEARMAN_COMMAND_JOB_CREATED,
281
server_job->job_handle,
282
(size_t)strlen(server_job->job_handle),
284
if (ret != GEARMAN_SUCCESS)
289
case GEARMAN_COMMAND_GET_STATUS:
290
/* This may not be NULL terminated, so copy to make sure it is. */
291
snprintf(job_handle, GEARMAN_JOB_HANDLE_SIZE, "%.*s",
292
(uint32_t)(packet->arg_size[0]), (char *)(packet->arg[0]));
294
server_job= gearman_server_job_get(server_con->thread->server, job_handle);
296
/* Queue status result packet. */
297
if (server_job == NULL)
299
ret= gearman_server_io_packet_add(server_con, false,
300
GEARMAN_MAGIC_RESPONSE,
301
GEARMAN_COMMAND_STATUS_RES, job_handle,
302
(size_t)(strlen(job_handle) + 1),
303
"0", (size_t)2, "0", (size_t)2, "0",
304
(size_t)2, "0", (size_t)1, NULL);
308
snprintf(numerator_buffer, 11, "%u", server_job->numerator);
309
snprintf(denominator_buffer, 11, "%u", server_job->denominator);
311
ret= gearman_server_io_packet_add(server_con, false,
312
GEARMAN_MAGIC_RESPONSE,
313
GEARMAN_COMMAND_STATUS_RES, job_handle,
314
(size_t)(strlen(job_handle) + 1),
316
server_job->worker == NULL ? "0" : "1",
317
(size_t)2, numerator_buffer,
318
(size_t)(strlen(numerator_buffer) + 1),
320
(size_t)strlen(denominator_buffer),
324
if (ret != GEARMAN_SUCCESS)
329
case GEARMAN_COMMAND_OPTION_REQ:
330
/* This may not be NULL terminated, so copy to make sure it is. */
331
snprintf(option, GEARMAN_OPTION_SIZE, "%.*s",
332
(uint32_t)(packet->arg_size[0]), (char *)(packet->arg[0]));
333
if (!strcasecmp(option, "exceptions"))
334
server_con->options|= GEARMAN_SERVER_CON_EXCEPTIONS;
337
return _server_error_packet(server_con, "unknown_option",
338
"Server does not recognize given option");
341
ret= gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
342
GEARMAN_COMMAND_OPTION_RES,
343
packet->arg[0], packet->arg_size[0],
345
if (ret != GEARMAN_SUCCESS)
350
/* Worker requests. */
351
case GEARMAN_COMMAND_CAN_DO:
352
if (gearman_server_worker_add(server_con, (char *)(packet->arg[0]),
353
packet->arg_size[0], 0) == NULL)
355
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
360
case GEARMAN_COMMAND_CAN_DO_TIMEOUT:
361
if (gearman_server_worker_add(server_con, (char *)(packet->arg[0]),
362
packet->arg_size[0] - 1,
363
(in_port_t)atoi((char *)(packet->arg[1])))
366
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
371
case GEARMAN_COMMAND_CANT_DO:
372
gearman_server_con_free_worker(server_con, (char *)(packet->arg[0]),
373
packet->arg_size[0]);
376
case GEARMAN_COMMAND_RESET_ABILITIES:
377
gearman_server_con_free_workers(server_con);
380
case GEARMAN_COMMAND_PRE_SLEEP:
381
server_job= gearman_server_job_peek(server_con);
382
if (server_job == NULL)
383
server_con->options|= GEARMAN_SERVER_CON_SLEEPING;
386
/* If there are jobs that could be run, queue a NOOP packet to wake the
387
worker up. This could be the result of a race codition. */
388
ret= gearman_server_io_packet_add(server_con, false,
389
GEARMAN_MAGIC_RESPONSE,
390
GEARMAN_COMMAND_NOOP, NULL);
391
if (ret != GEARMAN_SUCCESS)
397
case GEARMAN_COMMAND_GRAB_JOB:
398
case GEARMAN_COMMAND_GRAB_JOB_UNIQ:
399
server_con->options&=
400
(gearman_server_con_options_t)~GEARMAN_SERVER_CON_SLEEPING;
402
server_job= gearman_server_job_take(server_con);
403
if (server_job == NULL)
405
/* No jobs found, queue no job packet. */
406
ret= gearman_server_io_packet_add(server_con, false,
407
GEARMAN_MAGIC_RESPONSE,
408
GEARMAN_COMMAND_NO_JOB, NULL);
410
else if (packet->command == GEARMAN_COMMAND_GRAB_JOB_UNIQ)
412
/* We found a runnable job, queue job assigned packet and take the job
414
ret= gearman_server_io_packet_add(server_con, false,
415
GEARMAN_MAGIC_RESPONSE,
416
GEARMAN_COMMAND_JOB_ASSIGN_UNIQ,
417
server_job->job_handle,
418
(size_t)(strlen(server_job->job_handle) + 1),
419
server_job->function->function_name,
420
server_job->function->function_name_size + 1,
422
(size_t)(strlen(server_job->unique) + 1),
423
server_job->data, server_job->data_size,
428
/* Same, but without unique ID. */
429
ret= gearman_server_io_packet_add(server_con, false,
430
GEARMAN_MAGIC_RESPONSE,
431
GEARMAN_COMMAND_JOB_ASSIGN,
432
server_job->job_handle,
433
(size_t)(strlen(server_job->job_handle) + 1),
434
server_job->function->function_name,
435
server_job->function->function_name_size + 1,
436
server_job->data, server_job->data_size,
440
if (ret != GEARMAN_SUCCESS)
442
if (server_job != NULL)
443
return gearman_server_job_queue(server_job);
449
case GEARMAN_COMMAND_WORK_DATA:
450
case GEARMAN_COMMAND_WORK_WARNING:
451
server_job= gearman_server_job_get(server_con->thread->server,
452
(char *)(packet->arg[0]));
453
if (server_job == NULL)
455
return _server_error_packet(server_con, "job_not_found",
456
"Job given in work result not found");
459
/* Queue the data/warning packet for all clients. */
460
ret= _server_queue_work_data(server_job, packet, packet->command);
461
if (ret != GEARMAN_SUCCESS)
466
case GEARMAN_COMMAND_WORK_STATUS:
467
server_job= gearman_server_job_get(server_con->thread->server,
468
(char *)(packet->arg[0]));
469
if (server_job == NULL)
471
return _server_error_packet(server_con, "job_not_found",
472
"Job given in work result not found");
475
/* Update job status. */
476
server_job->numerator= (uint32_t)atoi((char *)(packet->arg[1]));
478
/* This may not be NULL terminated, so copy to make sure it is. */
479
snprintf(denominator_buffer, 11, "%.*s", (uint32_t)(packet->arg_size[2]),
480
(char *)(packet->arg[2]));
481
server_job->denominator= (uint32_t)atoi(denominator_buffer);
483
/* Queue the status packet for all clients. */
484
for (server_client= server_job->client_list; server_client;
485
server_client= server_client->job_next)
487
ret= gearman_server_io_packet_add(server_client->con, false,
488
GEARMAN_MAGIC_RESPONSE,
489
GEARMAN_COMMAND_WORK_STATUS,
490
packet->arg[0], packet->arg_size[0],
491
packet->arg[1], packet->arg_size[1],
492
packet->arg[2], packet->arg_size[2],
494
if (ret != GEARMAN_SUCCESS)
500
case GEARMAN_COMMAND_WORK_COMPLETE:
501
server_job= gearman_server_job_get(server_con->thread->server,
502
(char *)(packet->arg[0]));
503
if (server_job == NULL)
505
return _server_error_packet(server_con, "job_not_found",
506
"Job given in work result not found");
509
/* Queue the complete packet for all clients. */
510
ret= _server_queue_work_data(server_job, packet,
511
GEARMAN_COMMAND_WORK_COMPLETE);
512
if (ret != GEARMAN_SUCCESS)
515
/* Remove from persistent queue if one exists. */
516
if (server_job->options & GEARMAN_SERVER_JOB_QUEUED &&
517
gearman->queue_done_fn != NULL)
519
ret= (*(gearman->queue_done_fn))(gearman, (void *)gearman->queue_fn_arg,
521
(size_t)strlen(server_job->unique),
522
server_job->function->function_name,
523
server_job->function->function_name_size);
524
if (ret != GEARMAN_SUCCESS)
528
/* Job is done, remove it. */
529
gearman_server_job_free(server_job);
532
case GEARMAN_COMMAND_WORK_EXCEPTION:
533
server_job= gearman_server_job_get(server_con->thread->server,
534
(char *)(packet->arg[0]));
535
if (server_job == NULL)
537
return _server_error_packet(server_con, "job_not_found",
538
"Job given in work result not found");
541
/* Queue the exception packet for all clients. */
542
ret= _server_queue_work_data(server_job, packet,
543
GEARMAN_COMMAND_WORK_EXCEPTION);
544
if (ret != GEARMAN_SUCCESS)
548
case GEARMAN_COMMAND_WORK_FAIL:
549
/* This may not be NULL terminated, so copy to make sure it is. */
550
snprintf(job_handle, GEARMAN_JOB_HANDLE_SIZE, "%.*s",
551
(uint32_t)(packet->arg_size[0]), (char *)(packet->arg[0]));
553
server_job= gearman_server_job_get(server_con->thread->server, job_handle);
554
if (server_job == NULL)
556
return _server_error_packet(server_con, "job_not_found",
557
"Job given in work result not found");
560
/* Queue the fail packet for all clients. */
561
for (server_client= server_job->client_list; server_client;
562
server_client= server_client->job_next)
564
ret= gearman_server_io_packet_add(server_client->con, false,
565
GEARMAN_MAGIC_RESPONSE,
566
GEARMAN_COMMAND_WORK_FAIL,
567
packet->arg[0], packet->arg_size[0],
569
if (ret != GEARMAN_SUCCESS)
573
/* Remove from persistent queue if one exists. */
574
if (server_job->options & GEARMAN_SERVER_JOB_QUEUED &&
575
gearman->queue_done_fn != NULL)
577
ret= (*(gearman->queue_done_fn))(gearman, (void *)gearman->queue_fn_arg,
579
(size_t)strlen(server_job->unique),
580
server_job->function->function_name,
581
server_job->function->function_name_size);
582
if (ret != GEARMAN_SUCCESS)
586
/* Job is done, remove it. */
587
gearman_server_job_free(server_job);
590
case GEARMAN_COMMAND_SET_CLIENT_ID:
591
gearman_server_con_set_id(server_con, (char *)(packet->arg[0]),
592
packet->arg_size[0]);
595
case GEARMAN_COMMAND_TEXT:
596
return _server_run_text(server_con, packet);
598
case GEARMAN_COMMAND_UNUSED:
599
case GEARMAN_COMMAND_NOOP:
600
case GEARMAN_COMMAND_JOB_CREATED:
601
case GEARMAN_COMMAND_NO_JOB:
602
case GEARMAN_COMMAND_JOB_ASSIGN:
603
case GEARMAN_COMMAND_ECHO_RES:
604
case GEARMAN_COMMAND_ERROR:
605
case GEARMAN_COMMAND_STATUS_RES:
606
case GEARMAN_COMMAND_ALL_YOURS:
607
case GEARMAN_COMMAND_OPTION_RES:
608
case GEARMAN_COMMAND_SUBMIT_JOB_SCHED:
609
case GEARMAN_COMMAND_SUBMIT_JOB_EPOCH:
610
case GEARMAN_COMMAND_JOB_ASSIGN_UNIQ:
611
case GEARMAN_COMMAND_MAX:
613
return _server_error_packet(server_con, "bad_command",
614
"Command not expected");
617
return GEARMAN_SUCCESS;
620
gearman_return_t gearman_server_shutdown_graceful(gearman_server_st *server)
622
server->shutdown_graceful= true;
624
if (server->job_count == 0)
625
return GEARMAN_SHUTDOWN;
627
return GEARMAN_SHUTDOWN_GRACEFUL;
630
gearman_return_t gearman_server_queue_replay(gearman_server_st *server)
632
gearman_return_t ret;
634
if (server->gearman->queue_replay_fn == NULL)
635
return GEARMAN_SUCCESS;
637
server->options|= GEARMAN_SERVER_QUEUE_REPLAY;
639
ret= (*(server->gearman->queue_replay_fn))(server->gearman,
640
(void *)server->gearman->queue_fn_arg,
641
_queue_replay_add, server);
643
server->options&= (gearman_server_options_t)~GEARMAN_SERVER_QUEUE_REPLAY;
649
* Private definitions
652
gearman_return_t _queue_replay_add(gearman_st *gearman __attribute__ ((unused)),
653
void *fn_arg, const void *unique,
655
const void *function_name,
656
size_t function_name_size, const void *data,
658
gearman_job_priority_t priority)
660
gearman_server_st *server= (gearman_server_st *)fn_arg;
661
gearman_return_t ret;
663
(void)gearman_server_job_add(server, (char *)function_name,
664
function_name_size, (char *)unique, unique_size,
665
data, data_size, priority, NULL, &ret);
669
static gearman_return_t _server_error_packet(gearman_server_con_st *server_con,
670
const char *error_code,
671
const char *error_string)
673
return gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
674
GEARMAN_COMMAND_ERROR, error_code,
675
(size_t)(strlen(error_code) + 1),
677
(size_t)strlen(error_string), NULL);
680
static gearman_return_t _server_run_text(gearman_server_con_st *server_con,
681
gearman_packet_st *packet)
688
gearman_server_thread_st *thread;
689
gearman_server_con_st *con;
690
gearman_server_worker_st *worker;
691
gearman_server_function_st *function;
692
gearman_server_packet_st *server_packet;
694
data= malloc(GEARMAN_TEXT_RESPONSE_SIZE);
697
GEARMAN_ERROR_SET(packet->gearman, "_server_run_text", "malloc")
698
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
700
total= GEARMAN_TEXT_RESPONSE_SIZE;
702
if (packet->argc == 0)
704
snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE,
705
"ERR unknown_command Unknown+server+command\n");
707
else if (!strcasecmp("workers", (char *)(packet->arg[0])))
711
for (thread= server_con->thread->server->thread_list; thread != NULL;
712
thread= thread->next)
714
GEARMAN_SERVER_THREAD_LOCK(thread)
716
for (con= thread->con_list; con != NULL; con= con->next)
718
if (con->host == NULL)
724
/* Make sure we have at least GEARMAN_TEXT_RESPONSE_SIZE bytes. */
725
if (size + GEARMAN_TEXT_RESPONSE_SIZE > total)
727
new_data= realloc(data, total + GEARMAN_TEXT_RESPONSE_SIZE);
728
if (new_data == NULL)
730
GEARMAN_SERVER_THREAD_UNLOCK(thread)
732
GEARMAN_ERROR_SET(packet->gearman, "_server_run_text", "malloc")
733
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
737
total+= GEARMAN_TEXT_RESPONSE_SIZE;
740
size+= (size_t)snprintf(data + size, total - size, "%d %s %s :",
741
con->con.fd, con->host, con->id);
745
for (worker= con->worker_list; worker != NULL; worker= worker->con_next)
747
size+= (size_t)snprintf(data + size, total - size, " %.*s",
748
(int)(worker->function->function_name_size),
749
worker->function->function_name);
757
size+= (size_t)snprintf(data + size, total - size, "\n");
760
GEARMAN_SERVER_THREAD_UNLOCK(thread)
764
snprintf(data + size, total - size, ".\n");
766
else if (!strcasecmp("status", (char *)(packet->arg[0])))
770
for (function= server_con->thread->server->function_list; function != NULL;
771
function= function->next)
773
if (size + GEARMAN_TEXT_RESPONSE_SIZE > total)
775
new_data= realloc(data, total + GEARMAN_TEXT_RESPONSE_SIZE);
776
if (new_data == NULL)
779
GEARMAN_ERROR_SET(packet->gearman, "_server_run_text", "malloc")
780
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
784
total+= GEARMAN_TEXT_RESPONSE_SIZE;
787
size+= (size_t)snprintf(data + size, total - size, "%.*s\t%u\t%u\t%u\n",
788
(int)(function->function_name_size),
789
function->function_name, function->job_total,
790
function->job_running, function->worker_count);
796
snprintf(data + size, total - size, ".\n");
798
else if (!strcasecmp("maxqueue", (char *)(packet->arg[0])))
800
if (packet->argc == 1)
802
snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "ERR incomplete_args "
803
"An+incomplete+set+of+arguments+was+sent+to+this+command\n");
807
if (packet->argc == 2)
808
max_queue_size= GEARMAN_DEFAULT_MAX_QUEUE_SIZE;
811
max_queue_size= atoi((char *)(packet->arg[2]));
812
if (max_queue_size < 0)
816
for (function= server_con->thread->server->function_list;
817
function != NULL; function= function->next)
819
if (strlen((char *)(packet->arg[1])) == function->function_name_size &&
820
!memcmp(packet->arg[1], function->function_name,
821
function->function_name_size))
823
function->max_queue_size= (uint32_t)max_queue_size;
827
snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
830
else if (!strcasecmp("shutdown", (char *)(packet->arg[0])))
832
if (packet->argc == 1)
834
server_con->thread->server->shutdown= true;
835
snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
837
else if (packet->argc == 2 &&
838
!strcasecmp("graceful", (char *)(packet->arg[1])))
840
server_con->thread->server->shutdown_graceful= true;
841
snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
845
snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE,
846
"ERR unknown_args Unknown+arguments+to+server+command\n");
849
else if (!strcasecmp("version", (char *)(packet->arg[0])))
850
snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "%s\n", PACKAGE_VERSION);
853
snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE,
854
"ERR unknown_command Unknown+server+command\n");
857
server_packet= gearman_server_packet_create(server_con->thread, false);
858
if (server_packet == NULL)
861
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
864
if (gearman_packet_create(server_con->thread->gearman,
865
&(server_packet->packet)) == NULL)
868
gearman_server_packet_free(server_packet, server_con->thread, false);
869
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
872
server_packet->packet.magic= GEARMAN_MAGIC_TEXT;
873
server_packet->packet.command= GEARMAN_COMMAND_TEXT;
874
server_packet->packet.options|= (GEARMAN_PACKET_COMPLETE |
875
GEARMAN_PACKET_FREE_DATA);
876
server_packet->packet.data= data;
877
server_packet->packet.data_size= strlen(data);
879
GEARMAN_SERVER_THREAD_LOCK(server_con->thread)
880
GEARMAN_FIFO_ADD(server_con->io_packet, server_packet,)
881
GEARMAN_SERVER_THREAD_UNLOCK(server_con->thread)
883
gearman_server_con_io_add(server_con);
885
return GEARMAN_SUCCESS;
888
static gearman_return_t
889
_server_queue_work_data(gearman_server_job_st *server_job,
890
gearman_packet_st *packet, gearman_command_t command)
892
gearman_server_client_st *server_client;
894
gearman_return_t ret;
896
for (server_client= server_job->client_list; server_client;
897
server_client= server_client->job_next)
899
if (command == GEARMAN_COMMAND_WORK_EXCEPTION &&
900
!(server_client->con->options & GEARMAN_SERVER_CON_EXCEPTIONS))
905
if (packet->data_size > 0)
907
if (packet->options & GEARMAN_PACKET_FREE_DATA &&
908
server_client->job_next == NULL)
910
data= (uint8_t *)(packet->data);
911
packet->options&= (gearman_packet_options_t)~GEARMAN_PACKET_FREE_DATA;
915
data= malloc(packet->data_size);
918
GEARMAN_ERROR_SET(packet->gearman, "_server_run_command", "malloc")
919
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
922
memcpy(data, packet->data, packet->data_size);
928
ret= gearman_server_io_packet_add(server_client->con, true,
929
GEARMAN_MAGIC_RESPONSE, command,
930
packet->arg[0], packet->arg_size[0],
931
data, packet->data_size, NULL);
932
if (ret != GEARMAN_SUCCESS)
936
return GEARMAN_SUCCESS;
939
static void _log(gearman_st *gearman __attribute__ ((unused)),
940
gearman_verbose_t verbose, const char *line, void *fn_arg)
942
gearman_server_st *server= (gearman_server_st *)fn_arg;
943
(*(server->log_fn))(server, verbose, line, server->log_fn_arg);