1
/* Gearman server and library
2
* Copyright (C) 2011 Oleksiy Krivoshey
5
* Use and distribution licensed under the BSD license. See
6
* the COPYING file in the parent directory for full text.
9
#include <libgearman-server/common.h>
10
#include <libgearman-server/byte.h>
12
#include <libgearman-server/plugins/queue/mysql/queue.h>
13
#include <libgearman-server/plugins/queue/base.h>
15
#include <mysql/mysql.h>
16
#include <mysql/errmsg.h>
21
#define GEARMAN_QUEUE_MYSQL_DEFAULT_TABLE "gearman_queue"
31
static gearmand_error_t _initialize(gearman_server_st *server, gearmand::plugins::queue::MySQL *queue);
37
class MySQL : public gearmand::plugins::Queue {
42
gearmand_error_t initialize();
43
gearmand_error_t prepareAddStatement();
44
gearmand_error_t prepareDoneStatement();
48
MYSQL_STMT *done_stmt;
49
std::string mysql_host;
50
std::string mysql_user;
51
std::string mysql_password;
53
std::string mysql_table;
61
command_line_options().add_options()
62
("mysql-host", boost::program_options::value(&mysql_host)->default_value("localhost"), "MySQL host.")
63
("mysql-user", boost::program_options::value(&mysql_user)->default_value(""), "MySQL user.")
64
("mysql-password", boost::program_options::value(&mysql_password)->default_value(""), "MySQL user password.")
65
("mysql-db", boost::program_options::value(&mysql_db)->default_value(""), "MySQL database.")
66
("mysql-table", boost::program_options::value(&mysql_table)->default_value(GEARMAN_QUEUE_MYSQL_DEFAULT_TABLE), "MySQL table name.");
71
mysql_stmt_close(add_stmt);
78
gearmand_error_t MySQL::initialize() {
79
return _initialize(&Gearmand()->server, this);
82
gearmand_error_t MySQL::prepareAddStatement() {
83
char query_buffer[1024];
85
if((this->add_stmt = mysql_stmt_init(this->con)) == NULL){
86
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_init failed: %s", mysql_error(this->con));
87
return GEARMAN_QUEUE_ERROR;
90
snprintf(query_buffer, sizeof(query_buffer),
92
"(unique_key, function_name, priority, data, when_to_run) "
93
"VALUES(?, ?, ?, ?, ?)", this->mysql_table.c_str());
95
if (mysql_stmt_prepare(this->add_stmt, query_buffer, strlen(query_buffer))){
96
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_prepare failed: %s", mysql_error(this->con));
97
return GEARMAN_QUEUE_ERROR;
100
return GEARMAN_SUCCESS;
103
gearmand_error_t MySQL::prepareDoneStatement() {
104
char query_buffer[1024];
106
if((this->done_stmt = mysql_stmt_init(this->con)) == NULL){
107
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_init failed: %s", mysql_error(this->con));
108
return GEARMAN_QUEUE_ERROR;
111
snprintf(query_buffer, sizeof(query_buffer),
113
"WHERE unique_key=? "
114
"AND function_name=?", this->mysql_table.c_str());
116
if (mysql_stmt_prepare(this->done_stmt, query_buffer, strlen(query_buffer))){
117
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_prepare failed: %s", mysql_error(this->con));
118
return GEARMAN_QUEUE_ERROR;
121
return GEARMAN_SUCCESS;
124
void initialize_mysql() {
125
static MySQL local_instance;
129
} // namespace plugin
130
} // namespace gearmand
132
/* Queue callback functions. */
133
static gearmand_error_t _mysql_queue_add(gearman_server_st *server, void *context,
134
const char *unique, size_t unique_size,
135
const char *function_name,
136
size_t function_name_size,
137
const void *data, size_t data_size,
138
gearmand_job_priority_t priority,
141
static gearmand_error_t _mysql_queue_flush(gearman_server_st *server, void *context);
143
static gearmand_error_t _mysql_queue_done(gearman_server_st *server, void *context,
146
const char *function_name,
147
size_t function_name_size);
149
static gearmand_error_t _mysql_queue_replay(gearman_server_st *server, void *context,
150
gearman_queue_add_fn *add_fn,
154
gearmand_error_t _initialize(gearman_server_st *server,
155
gearmand::plugins::queue::MySQL *queue) {
158
char query_buffer[1024];
159
my_bool my_true = true;
161
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,"Initializing mysql module");
163
gearman_server_set_queue(server, queue, _mysql_queue_add, _mysql_queue_flush, _mysql_queue_done, _mysql_queue_replay);
165
queue->con = mysql_init(queue->con);
167
mysql_options(queue->con, MYSQL_READ_DEFAULT_GROUP, "gearmand");
169
if (!mysql_real_connect(queue->con,
170
queue->mysql_host.c_str(),
171
queue->mysql_user.c_str(),
172
queue->mysql_password.c_str(),
173
queue->mysql_db.c_str(), 0, NULL, 0)) {
174
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "Failed to connect to database: %s", mysql_error(queue->con));
175
return GEARMAN_QUEUE_ERROR;
178
mysql_options(queue->con, MYSQL_OPT_RECONNECT, &my_true);
180
if (!(result = mysql_list_tables(queue->con, queue->mysql_table.c_str()))){
181
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_list_tables failed: %s", mysql_error(queue->con));
182
return GEARMAN_QUEUE_ERROR;
185
if (mysql_num_rows(result) == 0){
186
snprintf(query_buffer, sizeof(query_buffer),
189
"unique_key VARCHAR(%d),"
190
"function_name VARCHAR(255),"
194
"unique key (unique_key, function_name)"
196
queue->mysql_table.c_str(), GEARMAN_UNIQUE_SIZE);
198
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,"mysql module: creating table %s", queue->mysql_table.c_str());
200
if(mysql_real_query(queue->con, query_buffer, strlen(query_buffer))){
201
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql module: create table failed: %s", mysql_error(queue->con));
202
return GEARMAN_QUEUE_ERROR;
206
mysql_free_result(result);
208
if(queue->prepareAddStatement() == GEARMAN_QUEUE_ERROR)
209
return GEARMAN_QUEUE_ERROR;
211
if(queue->prepareDoneStatement() == GEARMAN_QUEUE_ERROR)
212
return GEARMAN_QUEUE_ERROR;
214
return GEARMAN_SUCCESS;
222
static gearmand_error_t _mysql_queue_add(gearman_server_st *server, void *context,
223
const char *unique, size_t unique_size,
224
const char *function_name,
225
size_t function_name_size,
226
const void *data, size_t data_size,
227
gearmand_job_priority_t priority,
234
gearmand::plugins::queue::MySQL *queue = (gearmand::plugins::queue::MySQL *)context;
236
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,"mysql queue add: %.*s %.*s", (uint32_t) unique_size, (char *) unique,
237
(uint32_t) function_name_size, (char *) function_name);
239
bind[0].buffer_type= MYSQL_TYPE_STRING;
240
bind[0].buffer= (char *)unique;
241
bind[0].buffer_length= unique_size;
243
bind[0].length= (unsigned long*)&unique_size;
245
bind[1].buffer_type= MYSQL_TYPE_STRING;
246
bind[1].buffer= (char *)function_name;
247
bind[1].buffer_length= function_name_size;
249
bind[1].length= (unsigned long*)&function_name_size;
251
bind[2].buffer_type= MYSQL_TYPE_LONG;
252
bind[2].buffer= (char *)&priority;
256
bind[3].buffer_type= MYSQL_TYPE_LONG_BLOB;
257
bind[3].buffer= (char *)data;
258
bind[3].buffer_length= data_size;
260
bind[3].length= (unsigned long*)&data_size;
262
bind[4].buffer_type= MYSQL_TYPE_LONG;
263
bind[4].buffer= (char *)&when;
268
if (mysql_stmt_bind_param(queue->add_stmt, bind)){
269
if ( mysql_stmt_errno(queue->add_stmt) == CR_NO_PREPARE_STMT ){
270
if(queue->prepareAddStatement() == GEARMAN_QUEUE_ERROR){
271
return GEARMAN_QUEUE_ERROR;
275
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_bind_param failed: %s", mysql_error(queue->con));
276
return GEARMAN_QUEUE_ERROR;
280
if (mysql_stmt_execute(queue->add_stmt)){
281
if ( mysql_stmt_errno(queue->add_stmt) == CR_SERVER_LOST ){
282
mysql_stmt_close(queue->add_stmt);
283
if(queue->prepareAddStatement() != GEARMAN_QUEUE_ERROR){
287
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_execute failed: %s", mysql_error(queue->con));
288
return GEARMAN_QUEUE_ERROR;
294
return GEARMAN_SUCCESS;
297
static gearmand_error_t _mysql_queue_flush(gearman_server_st *server,
298
void *context __attribute__((unused))) {
303
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,"mysql queue flush");
305
return GEARMAN_SUCCESS;
308
static gearmand_error_t _mysql_queue_done(gearman_server_st *server, void *context,
311
const char *function_name,
312
size_t function_name_size) {
318
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,"mysql queue done: %.*s %.*s", (uint32_t) unique_size, (char *) unique,
319
(uint32_t) function_name_size, (char *) function_name);
321
gearmand::plugins::queue::MySQL *queue = (gearmand::plugins::queue::MySQL *)context;
323
bind[0].buffer_type= MYSQL_TYPE_STRING;
324
bind[0].buffer= (char *)unique;
325
bind[0].buffer_length= unique_size;
327
bind[0].length= (unsigned long*)&unique_size;
329
bind[1].buffer_type= MYSQL_TYPE_STRING;
330
bind[1].buffer= (char *)function_name;
331
bind[1].buffer_length= function_name_size;
333
bind[1].length= (unsigned long*)&function_name_size;
336
if (mysql_stmt_bind_param(queue->done_stmt, bind)){
337
if ( mysql_stmt_errno(queue->done_stmt) == CR_NO_PREPARE_STMT ){
338
if(queue->prepareDoneStatement() == GEARMAN_QUEUE_ERROR){
339
return GEARMAN_QUEUE_ERROR;
343
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_bind_param failed: %s", mysql_error(queue->con));
344
return GEARMAN_QUEUE_ERROR;
348
if (mysql_stmt_execute(queue->done_stmt)){
349
if ( mysql_stmt_errno(queue->done_stmt) == CR_SERVER_LOST ){
350
mysql_stmt_close(queue->done_stmt);
351
if(queue->prepareDoneStatement() != GEARMAN_QUEUE_ERROR){
355
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_execute failed: %s", mysql_error(queue->con));
356
return GEARMAN_QUEUE_ERROR;
362
return GEARMAN_SUCCESS;
365
static gearmand_error_t _mysql_queue_replay(gearman_server_st *server, void *context,
366
gearman_queue_add_fn *add_fn,
371
char query_buffer[1024];
375
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,"mysql queue replay");
377
gearmand::plugins::queue::MySQL *queue = (gearmand::plugins::queue::MySQL *)context;
379
snprintf(query_buffer, sizeof(query_buffer),
380
"SELECT unique_key, function_name, data, priority, when_to_run FROM %s",
381
queue->mysql_table.c_str());
383
if(mysql_real_query(queue->con, query_buffer, strlen(query_buffer))){
384
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_real_query failed: %s", mysql_error(queue->con));
385
return GEARMAN_QUEUE_ERROR;
388
if(!(result = mysql_store_result(queue->con))){
389
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_store_result failed: %s", mysql_error(queue->con));
390
return GEARMAN_QUEUE_ERROR;
393
if(mysql_num_fields(result) < 5){
394
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql queue: insufficient row fields in queue table");
395
return GEARMAN_QUEUE_ERROR;
398
gearmand_error_t ret = GEARMAN_SUCCESS;
400
while ((row = mysql_fetch_row(result))) {
401
unsigned long *lengths;
402
gearmand_job_priority_t priority = (gearmand_job_priority_t)0;
405
lengths = mysql_fetch_lengths(result);
407
/* need to make a copy here ... gearman_server_job_free will free it later */
408
size_t data_size= lengths[2];
409
char * data= (char *)malloc(data_size);
411
gearmand_perror("malloc failed");
412
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
414
memcpy(data, row[2], data_size);
416
if(lengths[3]) priority = (gearmand_job_priority_t) atoi(row[3]);
417
if(lengths[4]) when = atoi(row[4]);
419
ret = (*add_fn)(server, add_context,
420
row[0], (size_t) lengths[0],
421
row[1], (size_t) lengths[1],
426
if (ret != GEARMAN_SUCCESS) {
431
mysql_free_result(result);