53
63
_server_queue_work_data(gearman_server_job_st *server_job,
54
64
gearman_packet_st *packet, gearman_command_t command);
57
* Wrapper for log handling.
59
static void _log(const char *line, gearman_verbose_t verbose, void *context);
64
69
* Public definitions
67
gearman_server_st *gearman_server_create(gearman_server_st *server)
72
gearman_server_st::gearman_server_st()
73
server= malloc(sizeof(gearman_server_st));
77
server->options.allocated= true;
80
server->options.allocated= false;
82
server->state.queue_startup= false;
83
server->flags.round_robin= false;
84
server->flags.threaded= false;
85
server->shutdown= false;
86
server->shutdown_graceful= false;
87
server->proc_wakeup= false;
88
server->proc_shutdown= false;
89
server->job_retries= 0;
90
server->worker_wakeup= 0;
91
server->thread_count= 0;
92
server->free_packet_count= 0;
93
server->function_count= 0;
95
server->unique_count= 0;
96
server->free_job_count= 0;
97
server->free_client_count= 0;
98
server->free_worker_count= 0;
99
server->thread_list= NULL;
100
server->free_packet_list= NULL;
101
server->function_list= NULL;
102
server->free_job_list= NULL;
103
server->free_client_list= NULL;
104
server->free_worker_list= NULL;
105
server->log_fn= NULL;
106
server->log_context= NULL;
107
server->queue_context= NULL;
108
server->queue_add_fn= NULL;
109
server->queue_flush_fn= NULL;
110
server->queue_done_fn= NULL;
111
server->queue_replay_fn= NULL;
112
memset(server->job_hash, 0,
113
sizeof(gearman_server_job_st *) * GEARMAN_JOB_HASH_SIZE);
114
memset(server->unique_hash, 0,
115
sizeof(gearman_server_job_st *) * GEARMAN_JOB_HASH_SIZE);
117
server->gearman= gearman_universal_create(&(server->gearman_universal_static), NULL);
118
if (server->gearman == NULL)
120
gearman_server_free(server);
124
if (uname(&un) == -1)
126
gearman_server_free(server);
130
snprintf(server->job_handle_prefix, GEARMAN_JOB_HANDLE_SIZE, "H:%s",
132
server->job_handle_count= 1;
76
state.queue_startup= false;
77
flags.round_robin= false;
78
flags.threaded= false;
80
shutdown_graceful= false;
93
job_hash.resize(GEARMAN_JOB_HASH_SIZE);
94
unique_hash.resize(GEARMAN_JOB_HASH_SIZE);
96
gearman_universal_create(&gearman, NULL);
100
snprintf(job_handle_prefix, GEARMAN_JOB_HANDLE_SIZE, "H:%s", un.nodename);
137
void gearman_server_free(gearman_server_st *server)
104
gearman_server_st::~gearman_server_st()
140
gearman_server_packet_st *packet;
141
gearman_server_job_st *job;
142
gearman_server_client_st *client;
143
gearman_server_worker_st *worker;
145
108
/* All threads should be cleaned up before calling this. */
146
assert(server->thread_list == NULL);
148
for (key= 0; key < GEARMAN_JOB_HASH_SIZE; key++)
150
while (server->job_hash[key] != NULL)
151
gearman_server_job_free(server->job_hash[key]);
154
while (server->function_list != NULL)
155
gearman_server_function_free(server->function_list);
157
while (server->free_packet_list != NULL)
159
packet= server->free_packet_list;
160
server->free_packet_list= packet->next;
164
while (server->free_job_list != NULL)
166
job= server->free_job_list;
167
server->free_job_list= job->next;
171
while (server->free_client_list != NULL)
173
client= server->free_client_list;
174
server->free_client_list= client->con_next;
178
while (server->free_worker_list != NULL)
180
worker= server->free_worker_list;
181
server->free_worker_list= worker->con_next;
185
if (server->gearman != NULL)
186
gearman_universal_free(server->gearman);
188
if (server->options.allocated)
192
void gearman_server_set_job_retries(gearman_server_st *server,
195
server->job_retries= job_retries;
198
void gearman_server_set_worker_wakeup(gearman_server_st *server,
199
uint8_t worker_wakeup)
201
server->worker_wakeup= worker_wakeup;
204
void gearman_server_set_log_fn(gearman_server_st *server,
205
gearman_log_fn *function,
206
void *context, gearman_verbose_t verbose)
208
server->log_fn= function;
209
server->log_context= context;
210
gearman_set_log_fn(server->gearman, _log, server, verbose);
110
assert(thread_list == 0)
113
for (key= 0; key < GEARMAN_JOB_HASH_SIZE; key++)
115
while (job_hash[key] != NULL)
117
delete job_hash[key];
121
while (function_list != NULL)
123
gearman_server_function_free(function_list);
126
gearman_universal_free(&gearman);
129
void gearman_server_st::client_free(gearman_server_client_st *client)
131
GEARMAN_LIST_DEL(client->con->client, client, con_)
133
if (client->job != NULL)
135
GEARMAN_LIST_DEL(client->job->client, client, job_)
137
/* If this was a foreground job and is now abandoned, mark to not run. */
138
if (client->job->client_list == NULL)
140
client->job->set_ignore_job(true);
141
client->job->set_job_queued(false);
213
149
gearman_return_t gearman_server_run_command(gearman_server_con_st *server_con,
276
server_client= gearman_server_client_add(server_con);
211
server_client= server_con->client_add();
277
212
if (server_client == NULL)
278
213
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
281
216
/* Create a job. */
282
server_job= gearman_server_job_add(server_con->thread->server,
283
(char *)(packet->arg[0]),
284
packet->arg_size[0] - 1,
285
(char *)(packet->arg[1]),
286
packet->arg_size[1] - 1, packet->data,
287
packet->data_size, priority,
288
server_client, &ret);
217
server_job= server_con->thread->server->gearman_server_job_add((char *)(packet->arg[0]),
218
packet->arg_size[0] - 1,
219
(char *)(packet->arg[1]),
220
packet->arg_size[1] - 1, packet->data,
221
packet->data_size, priority,
222
server_client, &ret);
289
223
if (ret == GEARMAN_SUCCESS)
291
225
packet->options.free_data= false;
293
227
else if (ret == GEARMAN_JOB_QUEUE_FULL)
295
return _server_error_packet(server_con, "queue_full",
296
"Job queue is full");
229
return server_con->_server_error_packet("queue_full", "Job queue is full");
298
231
else if (ret != GEARMAN_JOB_EXISTS)
301
234
/* Queue the job created packet. */
302
ret= gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
303
GEARMAN_COMMAND_JOB_CREATED,
304
server_job->job_handle,
305
(size_t)strlen(server_job->job_handle),
235
ret= server_con->io_packet_add(false, GEARMAN_MAGIC_RESPONSE,
236
GEARMAN_COMMAND_JOB_CREATED,
237
server_job->job_handle,
238
(size_t)strlen(server_job->job_handle),
307
240
if (ret != GEARMAN_SUCCESS)
314
247
snprintf(job_handle, GEARMAN_JOB_HANDLE_SIZE, "%.*s",
315
248
(uint32_t)(packet->arg_size[0]), (char *)(packet->arg[0]));
317
server_job= gearman_server_job_get(server_con->thread->server, job_handle, NULL);
250
server_job= server_con->thread->server->gearman_server_job_get(job_handle, NULL);
319
252
/* Queue status result packet. */
320
253
if (server_job == NULL)
322
ret= gearman_server_io_packet_add(server_con, false,
323
GEARMAN_MAGIC_RESPONSE,
324
GEARMAN_COMMAND_STATUS_RES, job_handle,
325
(size_t)(strlen(job_handle) + 1),
326
"0", (size_t)2, "0", (size_t)2, "0",
327
(size_t)2, "0", (size_t)1, NULL);
255
ret= server_con->io_packet_add(false,
256
GEARMAN_MAGIC_RESPONSE,
257
GEARMAN_COMMAND_STATUS_RES, job_handle,
258
(size_t)(strlen(job_handle) + 1),
259
"0", (size_t)2, "0", (size_t)2, "0",
260
(size_t)2, "0", (size_t)1, NULL);
331
264
snprintf(numerator_buffer, 11, "%u", server_job->numerator);
332
265
snprintf(denominator_buffer, 11, "%u", server_job->denominator);
334
ret= gearman_server_io_packet_add(server_con, false,
335
GEARMAN_MAGIC_RESPONSE,
336
GEARMAN_COMMAND_STATUS_RES, job_handle,
337
(size_t)(strlen(job_handle) + 1),
339
server_job->worker == NULL ? "0" : "1",
340
(size_t)2, numerator_buffer,
341
(size_t)(strlen(numerator_buffer) + 1),
343
(size_t)strlen(denominator_buffer),
267
ret= server_con->io_packet_add(false,
268
GEARMAN_MAGIC_RESPONSE,
269
GEARMAN_COMMAND_STATUS_RES, job_handle,
270
(size_t)(strlen(job_handle) + 1),
272
server_job->worker == NULL ? "0" : "1",
273
(size_t)2, numerator_buffer,
274
(size_t)(strlen(numerator_buffer) + 1),
276
(size_t)strlen(denominator_buffer),
347
280
if (ret != GEARMAN_SUCCESS)
427
358
server_con->is_sleeping= false;
428
359
server_con->is_noop_sent= false;
430
server_job= gearman_server_job_take(server_con);
361
server_job= server_con->job_take();
431
362
if (server_job == NULL)
433
364
/* No jobs found, queue no job packet. */
434
ret= gearman_server_io_packet_add(server_con, false,
435
GEARMAN_MAGIC_RESPONSE,
436
GEARMAN_COMMAND_NO_JOB, NULL);
365
ret= server_con->io_packet_add(false,
366
GEARMAN_MAGIC_RESPONSE,
367
GEARMAN_COMMAND_NO_JOB, NULL);
438
369
else if (packet->command == GEARMAN_COMMAND_GRAB_JOB_UNIQ)
440
371
/* We found a runnable job, queue job assigned packet and take the job
442
ret= gearman_server_io_packet_add(server_con, false,
443
GEARMAN_MAGIC_RESPONSE,
444
GEARMAN_COMMAND_JOB_ASSIGN_UNIQ,
445
server_job->job_handle,
446
(size_t)(strlen(server_job->job_handle) + 1),
447
server_job->function->function_name,
448
server_job->function->function_name_size + 1,
450
(size_t)(strlen(server_job->unique) + 1),
451
server_job->data, server_job->data_size,
373
ret= server_con->io_packet_add(false,
374
GEARMAN_MAGIC_RESPONSE,
375
GEARMAN_COMMAND_JOB_ASSIGN_UNIQ,
376
server_job->job_handle,
377
(size_t)(strlen(server_job->job_handle) + 1),
378
server_job->function->function_name.c_str(),
379
server_job->function->function_name.size() + 1,
381
(size_t)(strlen(server_job->unique) + 1),
382
server_job->data(), server_job->data_size(),
456
387
/* Same, but without unique ID. */
457
ret= gearman_server_io_packet_add(server_con, false,
458
GEARMAN_MAGIC_RESPONSE,
459
GEARMAN_COMMAND_JOB_ASSIGN,
460
server_job->job_handle,
461
(size_t)(strlen(server_job->job_handle) + 1),
462
server_job->function->function_name,
463
server_job->function->function_name_size + 1,
464
server_job->data, server_job->data_size,
388
ret= server_con->io_packet_add(false,
389
GEARMAN_MAGIC_RESPONSE,
390
GEARMAN_COMMAND_JOB_ASSIGN,
391
server_job->job_handle,
392
(size_t)(strlen(server_job->job_handle) + 1),
393
server_job->function->function_name.c_str(),
394
server_job->function->function_name.size() + 1,
395
server_job->data(), server_job->data_size(),
468
399
if (ret != GEARMAN_SUCCESS)
470
401
if (server_job != NULL)
471
return gearman_server_job_queue(server_job);
402
return server_job->job_queue();
546
469
/* Remove from persistent queue if one exists. */
547
if (server_job->job_queued && server->queue_done_fn != NULL)
470
if (server_job->job_queued() && server->queue != NULL)
549
ret= (*(server->queue_done_fn))(server, (void *)server->queue_context,
551
(size_t)strlen(server_job->unique),
552
server_job->function->function_name,
553
server_job->function->function_name_size);
472
ret= server->queue->done(server,
474
(size_t)strlen(server_job->unique),
475
server_job->function->function_name.c_str());
554
476
if (ret != GEARMAN_SUCCESS)
558
480
/* Job is done, remove it. */
559
gearman_server_job_free(server_job);
562
484
case GEARMAN_COMMAND_WORK_EXCEPTION:
563
server_job= gearman_server_job_get(server_con->thread->server,
564
(char *)(packet->arg[0]),
485
server_job= server_con->thread->server->gearman_server_job_get((char *)(packet->arg[0]), server_con);
566
486
if (server_job == NULL)
568
return _server_error_packet(server_con, "job_not_found",
569
"Job given in work result not found");
488
return server_con->_server_error_packet("job_not_found", "Job given in work result not found");
572
491
/* Queue the exception packet for all clients. */
581
500
snprintf(job_handle, GEARMAN_JOB_HANDLE_SIZE, "%.*s",
582
501
(uint32_t)(packet->arg_size[0]), (char *)(packet->arg[0]));
584
server_job= gearman_server_job_get(server_con->thread->server, job_handle,
503
server_job= server_con->thread->server->gearman_server_job_get(job_handle, server_con);
586
504
if (server_job == NULL)
588
return _server_error_packet(server_con, "job_not_found",
589
"Job given in work result not found");
506
return server_con->_server_error_packet("job_not_found", "Job given in work result not found");
592
509
/* Queue the fail packet for all clients. */
593
510
for (server_client= server_job->client_list; server_client;
594
511
server_client= server_client->job_next)
596
ret= gearman_server_io_packet_add(server_client->con, false,
597
GEARMAN_MAGIC_RESPONSE,
598
GEARMAN_COMMAND_WORK_FAIL,
599
packet->arg[0], packet->arg_size[0],
513
ret= server_client->con->io_packet_add(false,
514
GEARMAN_MAGIC_RESPONSE,
515
GEARMAN_COMMAND_WORK_FAIL,
516
packet->arg[0], packet->arg_size[0],
601
518
if (ret != GEARMAN_SUCCESS)
605
522
/* Remove from persistent queue if one exists. */
606
if (server_job->job_queued && server->queue_done_fn != NULL)
523
if (server_job->job_queued() && server->queue != NULL)
608
ret= (*(server->queue_done_fn))(server, (void *)server->queue_context,
610
(size_t)strlen(server_job->unique),
611
server_job->function->function_name,
612
server_job->function->function_name_size);
525
ret= server->queue->done(server,
527
(size_t)strlen(server_job->unique),
528
server_job->function->function_name);
613
530
if (ret != GEARMAN_SUCCESS)
617
534
/* Job is done, remove it. */
618
gearman_server_job_free(server_job);
621
538
case GEARMAN_COMMAND_SET_CLIENT_ID:
622
gearman_server_con_set_id(server_con, (char *)(packet->arg[0]),
623
packet->arg_size[0]);
539
server_con->set_id((char *)(packet->arg[0]), packet->arg_size[0]);
626
542
case GEARMAN_COMMAND_TEXT:
627
return _server_run_text(server_con, packet);
543
return server_con->_server_run_text(packet);
629
545
case GEARMAN_COMMAND_UNUSED:
630
546
case GEARMAN_COMMAND_NOOP:
641
557
case GEARMAN_COMMAND_JOB_ASSIGN_UNIQ:
642
558
case GEARMAN_COMMAND_MAX:
644
return _server_error_packet(server_con, "bad_command",
645
"Command not expected");
560
return server_con->_server_error_packet("bad_command", "Command not expected");
648
563
return GEARMAN_SUCCESS;
651
gearman_return_t gearman_server_shutdown_graceful(gearman_server_st *server)
566
gearman_return_t gearman_server_st::gearman_server_shutdown_graceful()
653
server->shutdown_graceful= true;
568
shutdown_graceful= true;
655
if (server->job_count == 0)
656
571
return GEARMAN_SHUTDOWN;
658
573
return GEARMAN_SHUTDOWN_GRACEFUL;
661
gearman_return_t gearman_server_queue_replay(gearman_server_st *server)
576
gearman_return_t gearman_server_st::queue_replay()
663
578
gearman_return_t ret;
665
if (server->queue_replay_fn == NULL)
666
581
return GEARMAN_SUCCESS;
668
server->state.queue_startup= true;
670
ret= (*(server->queue_replay_fn))(server, (void *)server->queue_context,
671
_queue_replay_add, server);
673
server->state.queue_startup= false;
583
state.queue_startup= true;
585
ret= queue->replay(this, _queue_replay_add, this);
587
state.queue_startup= false;
678
void *gearman_server_queue_context(const gearman_server_st *server)
680
return (void *)server->queue_context;
683
void gearman_server_set_queue_context(gearman_server_st *server,
686
server->queue_context= context;
689
void gearman_server_set_queue_add_fn(gearman_server_st *server,
690
gearman_queue_add_fn *function)
692
server->queue_add_fn= function;
695
void gearman_server_set_queue_flush_fn(gearman_server_st *server,
696
gearman_queue_flush_fn *function)
698
server->queue_flush_fn= function;
701
void gearman_server_set_queue_done_fn(gearman_server_st *server,
702
gearman_queue_done_fn *function)
704
server->queue_done_fn= function;
707
void gearman_server_set_queue_replay_fn(gearman_server_st *server,
708
gearman_queue_replay_fn *function)
710
server->queue_replay_fn= function;
714
593
* Private definitions
725
604
gearman_return_t ret;
727
(void)gearman_server_job_add(server, (char *)function_name,
728
function_name_size, (char *)unique, unique_size,
729
data, data_size, priority, NULL, &ret);
606
(void)server->gearman_server_job_add((char *)function_name,
607
function_name_size, (char *)unique, unique_size,
608
data, data_size, priority, NULL, &ret);
733
static gearman_return_t _server_error_packet(gearman_server_con_st *server_con,
734
const char *error_code,
735
const char *error_string)
737
return gearman_server_io_packet_add(server_con, false, GEARMAN_MAGIC_RESPONSE,
738
GEARMAN_COMMAND_ERROR, error_code,
739
(size_t)(strlen(error_code) + 1),
741
(size_t)strlen(error_string), NULL);
744
static gearman_return_t _server_run_text(gearman_server_con_st *server_con,
745
gearman_packet_st *packet)
613
gearman_return_t gearman_server_con_st::_server_run_text(gearman_packet_st *packet_arg)
615
gearman_server_con_st *server_con= this;
751
620
int max_queue_size;
752
gearman_server_thread_st *thread;
753
gearman_server_con_st *con;
621
gearman_server_thread_st *_thread;
622
gearman_server_con_st *con_local;
754
623
gearman_server_worker_st *worker;
755
624
gearman_server_function_st *function;
756
gearman_server_packet_st *server_packet;
758
data= malloc(GEARMAN_TEXT_RESPONSE_SIZE);
626
_data= (char *)malloc(GEARMAN_TEXT_RESPONSE_SIZE);
761
gearman_log_error(packet->universal, "_server_run_text", "malloc");
629
Log::Instance()->error("_server_run_text: malloc()");
762
631
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
764
633
total= GEARMAN_TEXT_RESPONSE_SIZE;
766
if (packet->argc == 0)
635
if (packet_arg->argc == 0)
768
snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE,
637
snprintf(_data, GEARMAN_TEXT_RESPONSE_SIZE,
769
638
"ERR unknown_command Unknown+server+command\n");
771
else if (!strcasecmp("workers", (char *)(packet->arg[0])))
640
else if (!strcasecmp("workers", (char *)(packet_arg->arg[0])))
775
for (thread= server_con->thread->server->thread_list; thread != NULL;
776
thread= thread->next)
644
for (_thread= server_con->thread->server->thread_list; _thread != NULL;
645
_thread= thread->next)
778
(void) pthread_mutex_lock(&thread->lock);
647
boost::mutex::scoped_lock l(_thread->lock);
780
for (con= thread->con_list; con != NULL; con= con->next)
649
BOOST_FOREACH(ServerConMap::value_type iter, _thread->con_list)
782
if (con->host == NULL)
651
con_local= iter.second;
653
if (con_local->_host == NULL)
785
656
if (size > total)
788
659
/* Make sure we have at least GEARMAN_TEXT_RESPONSE_SIZE bytes. */
789
660
if (size + GEARMAN_TEXT_RESPONSE_SIZE > total)
791
new_data= realloc(data, total + GEARMAN_TEXT_RESPONSE_SIZE);
662
new_data= (char *)realloc(_data, total + GEARMAN_TEXT_RESPONSE_SIZE);
792
663
if (new_data == NULL)
794
(void) pthread_mutex_unlock(&thread->lock);
796
gearman_log_error(packet->universal, "_server_run_text", "malloc");
666
Log::Instance()->error("_server_run_text: malloc()");
797
668
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
801
672
total+= GEARMAN_TEXT_RESPONSE_SIZE;
804
size+= (size_t)snprintf(data + size, total - size, "%d %s %s :",
805
con->con.fd, con->host, con->id);
675
size+= (size_t)snprintf(_data + size, total - size, "%d %s %s :",
676
con_local->con()->fd, con_local->_host, con_local->_id);
806
677
if (size > total)
809
for (worker= con->worker_list; worker != NULL; worker= worker->con_next)
680
for (worker= con_local->worker_list; worker != NULL; worker= worker->con_next)
811
size+= (size_t)snprintf(data + size, total - size, " %.*s",
812
(int)(worker->function->function_name_size),
813
worker->function->function_name);
682
size+= (size_t)snprintf(_data + size, total - size, " %.*s",
683
(int)(worker->function->function_name.size()),
684
worker->function->function_name.c_str());
814
685
if (size > total)
837
706
if (size + GEARMAN_TEXT_RESPONSE_SIZE > total)
839
new_data= realloc(data, total + GEARMAN_TEXT_RESPONSE_SIZE);
708
new_data= (char *)realloc(_data, total + GEARMAN_TEXT_RESPONSE_SIZE);
840
709
if (new_data == NULL)
843
gearman_log_error(packet->universal, "_server_run_text", "malloc");
712
Log::Instance()->error("_server_run_text: malloc()");
844
714
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
848
718
total+= GEARMAN_TEXT_RESPONSE_SIZE;
851
size+= (size_t)snprintf(data + size, total - size, "%.*s\t%u\t%u\t%u\n",
852
(int)(function->function_name_size),
853
function->function_name, function->job_total,
721
size+= (size_t)snprintf(_data + size, total - size, "%.*s\t%u\t%u\t%u\n",
722
(int)(function->function_name.size()),
723
function->function_name.c_str(), function->job_total,
854
724
function->job_running, function->worker_count);
855
725
if (size > total)
859
729
if (size < total)
860
snprintf(data + size, total - size, ".\n");
730
snprintf(_data + size, total - size, ".\n");
862
else if (!strcasecmp("maxqueue", (char *)(packet->arg[0])))
732
else if (!strcasecmp("maxqueue", (char *)(packet_arg->arg[0])))
864
if (packet->argc == 1)
734
if (packet_arg->argc == 1)
866
snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "ERR incomplete_args "
736
snprintf(_data, GEARMAN_TEXT_RESPONSE_SIZE, "ERR incomplete_args "
867
737
"An+incomplete+set+of+arguments+was+sent+to+this+command\n");
871
if (packet->argc == 2)
741
if (packet_arg->argc == 2)
872
742
max_queue_size= GEARMAN_DEFAULT_MAX_QUEUE_SIZE;
875
max_queue_size= atoi((char *)(packet->arg[2]));
745
max_queue_size= atoi((char *)(packet_arg->arg[2]));
876
746
if (max_queue_size < 0)
877
747
max_queue_size= 0;
880
750
for (function= server_con->thread->server->function_list;
881
751
function != NULL; function= function->next)
883
if (strlen((char *)(packet->arg[1])) == function->function_name_size &&
884
!memcmp(packet->arg[1], function->function_name,
885
function->function_name_size))
753
if (strlen((char *)(packet_arg->arg[1])) == function->function_name.size() &&
754
!memcmp(packet_arg->arg[1], function->function_name.c_str(),
755
function->function_name.size()))
887
757
function->max_queue_size= (uint32_t)max_queue_size;
891
snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
761
snprintf(_data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
894
else if (!strcasecmp("shutdown", (char *)(packet->arg[0])))
764
else if (!strcasecmp("shutdown", (char *)(packet_arg->arg[0])))
896
if (packet->argc == 1)
766
if (packet_arg->argc == 1)
898
768
server_con->thread->server->shutdown= true;
899
snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
769
snprintf(_data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
901
else if (packet->argc == 2 &&
902
!strcasecmp("graceful", (char *)(packet->arg[1])))
771
else if (packet_arg->argc == 2 &&
772
!strcasecmp("graceful", (char *)(packet_arg->arg[1])))
904
774
server_con->thread->server->shutdown_graceful= true;
905
snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
775
snprintf(_data, GEARMAN_TEXT_RESPONSE_SIZE, "OK\n");
909
snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE,
779
snprintf(_data, GEARMAN_TEXT_RESPONSE_SIZE,
910
780
"ERR unknown_args Unknown+arguments+to+server+command\n");
913
else if (!strcasecmp("version", (char *)(packet->arg[0])))
914
snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "%s\n", PACKAGE_VERSION);
783
else if (!strcasecmp("version", (char *)(packet_arg->arg[0])))
784
snprintf(_data, GEARMAN_TEXT_RESPONSE_SIZE, "%s\n", PACKAGE_VERSION);
917
snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE,
787
snprintf(_data, GEARMAN_TEXT_RESPONSE_SIZE,
918
788
"ERR unknown_command Unknown+server+command\n");
921
server_packet= gearman_server_packet_create(server_con->thread, false);
791
gearman_server_packet_st *server_packet= new gearman_server_packet_st(server_con->thread->gearman,
792
GEARMAN_MAGIC_TEXT, GEARMAN_COMMAND_TEXT);
922
794
if (server_packet == NULL)
925
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
928
if (gearman_packet_create(server_con->thread->gearman,
929
&(server_packet->packet)) == NULL)
932
gearman_server_packet_free(server_packet, server_con->thread, false);
933
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
936
server_packet->packet.magic= GEARMAN_MAGIC_TEXT;
937
server_packet->packet.command= GEARMAN_COMMAND_TEXT;
797
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
938
800
server_packet->packet.options.complete= true;
939
801
server_packet->packet.options.free_data= true;
941
server_packet->packet.data= data;
942
server_packet->packet.data_size= strlen(data);
944
(void) pthread_mutex_lock(&server_con->thread->lock);
945
GEARMAN_FIFO_ADD(server_con->io_packet, server_packet,)
946
(void) pthread_mutex_unlock(&server_con->thread->lock);
948
gearman_server_con_io_add(server_con);
803
server_packet->packet.data= _data;
804
server_packet->packet.data_size= strlen(_data);
807
boost::mutex::scoped_lock l(server_con->thread->lock);
808
server_con->io_packet_list.push(server_packet);
811
server_con->io_add();
950
813
return GEARMAN_SUCCESS;
1000
865
return GEARMAN_SUCCESS;
1003
static void _log(const char *line, gearman_verbose_t verbose, void *context)
1005
gearman_server_st *server= (gearman_server_st *)context;
1006
(*(server->log_fn))(line, verbose, (void *)server->log_context);
868
gearman_return_t gearman_server_st::_proc_thread_start()
870
m_thread= new boost::thread(boost::bind(&gearman_server_st::_proc, this));
874
flags.threaded= true;
876
return GEARMAN_SUCCESS;
879
void gearman_server_st::_proc_thread_kill()
881
gearman_server_st *server= this;
882
if (! (server->flags.threaded) || server->proc_shutdown)
885
server->proc_shutdown= true;
887
/* Signal proc thread to shutdown. */
889
boost::mutex::scoped_lock l(server->proc_lock);
890
server->proc_cond.notify_all();
893
/* Wait for the proc thread to exit and then cleanup. */
898
void *gearman_server_st::_proc()
900
gearman_server_st *server= this;
901
gearman_server_thread_st *thread;
902
gearman_server_con_st *con;
903
gearman_server_packet_st *packet;
908
boost::mutex::scoped_lock l(server->proc_lock);
910
while (server->proc_wakeup == false)
912
if (server->proc_shutdown)
917
server->proc_cond.wait(l);
919
server->proc_wakeup= false;
922
for (thread= server->thread_list; thread != NULL; thread= thread->next)
924
while ((con= thread->proc_next()) != NULL)
930
while (con->client_list != NULL)
932
con->client_list->con->thread->server->client_free(con->client_list);
935
con->proc_removed= true;
942
packet= con->proc_packet_remove();
946
con->ret= gearman_server_run_command(con, &(packet->packet));
954
gearman_server_job_st *gearman_server_st::gearman_server_job_add(const char *function_name,
955
size_t , const char *unique,
956
size_t unique_size, const void *data, size_t data_size,
957
gearman_job_priority_t priority,
958
gearman_server_client_st *server_client,
959
gearman_return_t *ret_ptr)
961
gearman_server_st *server= this;
962
gearman_server_job_st *server_job;
963
gearman_server_function_st *server_function;
966
server_function= server->function_get(function_name);
967
if (server_function == NULL)
969
*ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
973
if (unique_size == 0)
980
if (unique_size == 1 && *unique == '-')
989
/* Look up job via unique data when unique = '-'. */
990
key= _server_job_hash((const char*)data, data_size);
991
server_job= server->_server_job_get_unique(key, server_function, (const char*)data, data_size);
996
/* Look up job via unique ID first to make sure it's not a duplicate. */
997
key= _server_job_hash(unique, unique_size);
998
server_job= server->_server_job_get_unique(key, server_function, unique, 0);
1002
if (server_job == NULL)
1004
if (server_function->max_queue_size > 0 &&
1005
server_function->job_total >= server_function->max_queue_size)
1007
Log::Instance()->error("Queue full, failed to add %s : %s", function_name, unique);
1008
*ret_ptr= GEARMAN_JOB_QUEUE_FULL;
1013
server_job= server->job_create();
1014
if (server_job == NULL)
1016
*ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
1019
Log::Instance()->event(Logger::JOB_ACCEPTED, NULL);
1021
server_job->set_priority(priority);
1023
server_job->function= server_function;
1024
server_function->job_total++;
1026
snprintf(server_job->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%s:%u",
1027
server->job_handle_prefix, server->job_handle_count);
1028
snprintf(server_job->unique, GEARMAN_UNIQUE_SIZE, "%.*s",
1029
(uint32_t)unique_size, unique);
1030
server->job_handle_count++;
1031
server_job->set_data(data, data_size);
1033
server_job->unique_key= key;
1034
key= key % GEARMAN_JOB_HASH_SIZE;
1035
GEARMAN_HASH_ADD(server->unique, key, server_job, unique_);
1037
key= _server_job_hash(server_job->job_handle,
1038
strlen(server_job->job_handle));
1039
server_job->set_job_handle_key(key);
1040
key= key % GEARMAN_JOB_HASH_SIZE;
1041
GEARMAN_HASH_ADD(server->job, key, server_job,);
1043
if (server->state.queue_startup)
1045
server_job->set_job_queued(true);
1047
else if (server_client == NULL && server->queue != NULL)
1049
*ret_ptr= server->queue->add(server,
1053
data, data_size, priority);
1054
if (*ret_ptr != GEARMAN_SUCCESS)
1056
server_job->set_data();
1061
*ret_ptr= server->queue->flush(server);
1062
if (*ret_ptr != GEARMAN_SUCCESS)
1064
server_job->set_data();
1069
server_job->set_job_queued(true);
1072
*ret_ptr= server_job->job_queue();
1073
if (*ret_ptr != GEARMAN_SUCCESS)
1075
if (server_client == NULL && server->queue != NULL)
1077
/* Do our best to remove the job from the queue. */
1078
(void)server->queue->done(server,
1079
server_job->unique, unique_size,
1080
server_job->function->function_name);
1089
*ret_ptr= GEARMAN_JOB_EXISTS;
1092
if (server_client != NULL)
1094
server_client->job= server_job;
1095
GEARMAN_LIST_ADD(server_job->client, server_client, job_)
1101
gearman_server_job_st *gearman_server_st::job_create()
1103
gearman_server_job_st *server_job;
1105
server_job= new gearman_server_job_st(this);
1110
gearman_server_job_st *gearman_server_st::gearman_server_job_get(const char *job_handle,
1111
gearman_server_con_st *worker_con)
1113
gearman_server_st *server= this;
1116
key= _server_job_hash(job_handle, strlen(job_handle));
1118
for (gearman_server_job_st *server_job= server->job_hash[key % GEARMAN_JOB_HASH_SIZE];
1119
server_job != NULL; server_job= server_job->next)
1121
if (server_job->job_handle_key() == key &&
1122
!strcmp(server_job->job_handle, job_handle))
1124
/* Check to make sure the worker asking for the job still owns the job. */
1125
if (worker_con != NULL && not server_job->owns_job(worker_con))
1138
gearman_server_job_st *gearman_server_st::_server_job_get_unique(uint32_t unique_key,
1139
gearman_server_function_st *server_function,
1140
const char *unique, size_t data_size)
1142
gearman_server_st *server= this;
1143
gearman_server_job_st *server_job;
1145
for (server_job= server->unique_hash[unique_key % GEARMAN_JOB_HASH_SIZE];
1146
server_job != NULL; server_job= server_job->unique_next)
1150
if (server_job->function == server_function &&
1151
server_job->unique_key == unique_key &&
1152
!strcmp(server_job->unique, unique))
1159
if (server_job->function == server_function &&
1160
server_job->unique_key == unique_key &&
1161
server_job->compare_data(unique, data_size))