~oleksiyk/gearmand/mysql

« back to all changes in this revision

Viewing changes to libgearman-server/plugins/queue/mysql/queue.cc

  • Committer: Oleksiy Krivoshey
  • Date: 2012-02-13 16:16:01 UTC
  • Revision ID: oleksiyk@gmail.com-20120213161601-ukuwc9xwgfbolrd7
Initial commit for mysqlclient queue support (-q MySQL)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Gearman server and library
 
2
 * Copyright (C) 2011 Oleksiy Krivoshey
 
3
 * All rights reserved.
 
4
 *
 
5
 * Use and distribution licensed under the BSD license.  See
 
6
 * the COPYING file in the parent directory for full text.
 
7
 */
 
8
 
 
9
#include <libgearman-server/common.h>
 
10
#include <libgearman-server/byte.h>
 
11
 
 
12
#include <libgearman-server/plugins/queue/mysql/queue.h>
 
13
#include <libgearman-server/plugins/queue/base.h>
 
14
 
 
15
#include <mysql/mysql.h>
 
16
#include <mysql/errmsg.h>
 
17
 
 
18
/**
 
19
 * Default values.
 
20
 */
 
21
#define GEARMAN_QUEUE_MYSQL_DEFAULT_TABLE "gearman_queue"
 
22
 
 
23
namespace gearmand {
 
24
    namespace plugins {
 
25
        namespace queue {
 
26
            class MySQL;
 
27
        }
 
28
    }
 
29
}
 
30
 
 
31
static gearmand_error_t _initialize(gearman_server_st *server, gearmand::plugins::queue::MySQL *queue);
 
32
 
 
33
namespace gearmand {
 
34
    namespace plugins {
 
35
        namespace queue {
 
36
 
 
37
            class MySQL : public gearmand::plugins::Queue {
 
38
            public:
 
39
                MySQL();
 
40
                ~MySQL();
 
41
 
 
42
                gearmand_error_t initialize();
 
43
                gearmand_error_t prepareAddStatement();
 
44
                gearmand_error_t prepareDoneStatement();
 
45
 
 
46
                MYSQL *con;
 
47
                MYSQL_STMT *add_stmt;
 
48
                MYSQL_STMT *done_stmt;
 
49
                std::string mysql_host;
 
50
                std::string mysql_user;
 
51
                std::string mysql_password;
 
52
                std::string mysql_db;
 
53
                std::string mysql_table;
 
54
            private:
 
55
            };
 
56
 
 
57
            MySQL::MySQL() :
 
58
            Queue("MySQL"),
 
59
            con(NULL),
 
60
            add_stmt(NULL) {
 
61
                command_line_options().add_options()
 
62
                        ("mysql-host", boost::program_options::value(&mysql_host)->default_value("localhost"), "MySQL host.")
 
63
                        ("mysql-user", boost::program_options::value(&mysql_user)->default_value(""), "MySQL user.")
 
64
                        ("mysql-password", boost::program_options::value(&mysql_password)->default_value(""), "MySQL user password.")
 
65
                        ("mysql-db", boost::program_options::value(&mysql_db)->default_value(""), "MySQL database.")
 
66
                        ("mysql-table", boost::program_options::value(&mysql_table)->default_value(GEARMAN_QUEUE_MYSQL_DEFAULT_TABLE), "MySQL table name.");
 
67
            }
 
68
 
 
69
            MySQL::~MySQL() {
 
70
                if (add_stmt){
 
71
                    mysql_stmt_close(add_stmt);
 
72
                }
 
73
                if (con){
 
74
                    mysql_close(con);
 
75
                }
 
76
            }
 
77
 
 
78
            gearmand_error_t MySQL::initialize() {
 
79
                return _initialize(&Gearmand()->server, this);
 
80
            }
 
81
 
 
82
            gearmand_error_t MySQL::prepareAddStatement() {
 
83
                char query_buffer[1024];
 
84
 
 
85
                if((this->add_stmt = mysql_stmt_init(this->con)) == NULL){
 
86
                    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_init failed: %s", mysql_error(this->con));
 
87
                    return GEARMAN_QUEUE_ERROR;
 
88
                }
 
89
 
 
90
                snprintf(query_buffer, sizeof(query_buffer),
 
91
                    "INSERT INTO %s "
 
92
                    "(unique_key, function_name, priority, data, when_to_run) "
 
93
                    "VALUES(?, ?, ?, ?, ?)", this->mysql_table.c_str());
 
94
 
 
95
                if (mysql_stmt_prepare(this->add_stmt, query_buffer, strlen(query_buffer))){
 
96
                    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_prepare failed: %s", mysql_error(this->con));
 
97
                    return GEARMAN_QUEUE_ERROR;
 
98
                }
 
99
 
 
100
                return GEARMAN_SUCCESS;
 
101
            }
 
102
 
 
103
            gearmand_error_t MySQL::prepareDoneStatement() {
 
104
                char query_buffer[1024];
 
105
 
 
106
                if((this->done_stmt = mysql_stmt_init(this->con)) == NULL){
 
107
                    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_init failed: %s", mysql_error(this->con));
 
108
                    return GEARMAN_QUEUE_ERROR;
 
109
                }
 
110
 
 
111
                snprintf(query_buffer, sizeof(query_buffer),
 
112
                    "DELETE FROM %s "
 
113
                    "WHERE unique_key=? "
 
114
                    "AND function_name=?", this->mysql_table.c_str());
 
115
 
 
116
                if (mysql_stmt_prepare(this->done_stmt, query_buffer, strlen(query_buffer))){
 
117
                    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_prepare failed: %s", mysql_error(this->con));
 
118
                    return GEARMAN_QUEUE_ERROR;
 
119
                }
 
120
 
 
121
                return GEARMAN_SUCCESS;
 
122
            }
 
123
 
 
124
            void initialize_mysql() {
 
125
                static MySQL local_instance;
 
126
            }
 
127
 
 
128
        } // namespace queue
 
129
    } // namespace plugin
 
130
} // namespace gearmand
 
131
 
 
132
/* Queue callback functions. */
 
133
static gearmand_error_t _mysql_queue_add(gearman_server_st *server, void *context,
 
134
        const char *unique, size_t unique_size,
 
135
        const char *function_name,
 
136
        size_t function_name_size,
 
137
        const void *data, size_t data_size,
 
138
        gearmand_job_priority_t priority,
 
139
        int64_t when);
 
140
 
 
141
static gearmand_error_t _mysql_queue_flush(gearman_server_st *server, void *context);
 
142
 
 
143
static gearmand_error_t _mysql_queue_done(gearman_server_st *server, void *context,
 
144
        const char *unique,
 
145
        size_t unique_size,
 
146
        const char *function_name,
 
147
        size_t function_name_size);
 
148
 
 
149
static gearmand_error_t _mysql_queue_replay(gearman_server_st *server, void *context,
 
150
        gearman_queue_add_fn *add_fn,
 
151
        void *add_context);
 
152
 
 
153
 
 
154
gearmand_error_t _initialize(gearman_server_st *server,
 
155
        gearmand::plugins::queue::MySQL *queue) {
 
156
 
 
157
    MYSQL_RES * result;
 
158
    char query_buffer[1024];
 
159
    my_bool  my_true = true;
 
160
 
 
161
    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,"Initializing mysql module");
 
162
 
 
163
    gearman_server_set_queue(server, queue, _mysql_queue_add, _mysql_queue_flush, _mysql_queue_done, _mysql_queue_replay);
 
164
 
 
165
    queue->con = mysql_init(queue->con);
 
166
 
 
167
    mysql_options(queue->con, MYSQL_READ_DEFAULT_GROUP, "gearmand");
 
168
 
 
169
    if (!mysql_real_connect(queue->con,
 
170
            queue->mysql_host.c_str(),
 
171
            queue->mysql_user.c_str(),
 
172
            queue->mysql_password.c_str(),
 
173
            queue->mysql_db.c_str(), 0, NULL, 0)) {
 
174
        gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "Failed to connect to database: %s", mysql_error(queue->con));
 
175
        return GEARMAN_QUEUE_ERROR;
 
176
    }
 
177
 
 
178
    mysql_options(queue->con, MYSQL_OPT_RECONNECT, &my_true);
 
179
 
 
180
    if (!(result = mysql_list_tables(queue->con, queue->mysql_table.c_str()))){
 
181
        gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_list_tables failed: %s", mysql_error(queue->con));
 
182
        return GEARMAN_QUEUE_ERROR;
 
183
    }
 
184
 
 
185
    if (mysql_num_rows(result) == 0){
 
186
        snprintf(query_buffer, sizeof(query_buffer),
 
187
             "CREATE TABLE %s"
 
188
             "("
 
189
               "unique_key VARCHAR(%d),"
 
190
               "function_name VARCHAR(255),"
 
191
               "priority INT,"
 
192
               "data LONGBLOB,"
 
193
               "when_to_run INT,"
 
194
               "unique key (unique_key, function_name)"
 
195
             ")",
 
196
             queue->mysql_table.c_str(), GEARMAN_UNIQUE_SIZE);
 
197
 
 
198
        gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,"mysql module: creating table %s", queue->mysql_table.c_str());
 
199
 
 
200
        if(mysql_real_query(queue->con, query_buffer, strlen(query_buffer))){
 
201
            gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql module: create table failed: %s", mysql_error(queue->con));
 
202
            return GEARMAN_QUEUE_ERROR;
 
203
        }
 
204
    }
 
205
 
 
206
    mysql_free_result(result);
 
207
 
 
208
    if(queue->prepareAddStatement() == GEARMAN_QUEUE_ERROR)
 
209
        return GEARMAN_QUEUE_ERROR;
 
210
 
 
211
    if(queue->prepareDoneStatement() == GEARMAN_QUEUE_ERROR)
 
212
        return GEARMAN_QUEUE_ERROR;
 
213
 
 
214
    return GEARMAN_SUCCESS;
 
215
}
 
216
 
 
217
/*
 
218
 * Static definitions
 
219
 */
 
220
 
 
221
 
 
222
static gearmand_error_t _mysql_queue_add(gearman_server_st *server, void *context,
 
223
        const char *unique, size_t unique_size,
 
224
        const char *function_name,
 
225
        size_t function_name_size,
 
226
        const void *data, size_t data_size,
 
227
        gearmand_job_priority_t priority,
 
228
        int64_t when) {
 
229
 
 
230
    MYSQL_BIND    bind[5];
 
231
    
 
232
    (void) server;
 
233
 
 
234
    gearmand::plugins::queue::MySQL *queue = (gearmand::plugins::queue::MySQL *)context;
 
235
 
 
236
    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,"mysql queue add: %.*s %.*s", (uint32_t) unique_size, (char *) unique,
 
237
        (uint32_t) function_name_size, (char *) function_name);
 
238
 
 
239
    bind[0].buffer_type= MYSQL_TYPE_STRING;
 
240
    bind[0].buffer= (char *)unique;
 
241
    bind[0].buffer_length= unique_size;
 
242
    bind[0].is_null= 0;
 
243
    bind[0].length= (unsigned long*)&unique_size;
 
244
 
 
245
    bind[1].buffer_type= MYSQL_TYPE_STRING;
 
246
    bind[1].buffer= (char *)function_name;
 
247
    bind[1].buffer_length= function_name_size;
 
248
    bind[1].is_null= 0;
 
249
    bind[1].length= (unsigned long*)&function_name_size;
 
250
 
 
251
    bind[2].buffer_type= MYSQL_TYPE_LONG;
 
252
    bind[2].buffer= (char *)&priority;
 
253
    bind[2].is_null= 0;
 
254
    bind[2].length= 0;
 
255
 
 
256
    bind[3].buffer_type= MYSQL_TYPE_LONG_BLOB;
 
257
    bind[3].buffer= (char *)data;
 
258
    bind[3].buffer_length= data_size;
 
259
    bind[3].is_null= 0;
 
260
    bind[3].length= (unsigned long*)&data_size;
 
261
 
 
262
    bind[4].buffer_type= MYSQL_TYPE_LONG;
 
263
    bind[4].buffer= (char *)&when;
 
264
    bind[4].is_null= 0;
 
265
    bind[4].length= 0;
 
266
 
 
267
    while(1){
 
268
        if (mysql_stmt_bind_param(queue->add_stmt, bind)){
 
269
            if ( mysql_stmt_errno(queue->add_stmt) == CR_NO_PREPARE_STMT ){
 
270
                if(queue->prepareAddStatement() == GEARMAN_QUEUE_ERROR){
 
271
                    return GEARMAN_QUEUE_ERROR;
 
272
                }
 
273
                continue;
 
274
            } else {
 
275
                gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_bind_param failed: %s", mysql_error(queue->con));
 
276
                return GEARMAN_QUEUE_ERROR;
 
277
            }
 
278
        }
 
279
 
 
280
        if (mysql_stmt_execute(queue->add_stmt)){
 
281
            if ( mysql_stmt_errno(queue->add_stmt) == CR_SERVER_LOST ){
 
282
                mysql_stmt_close(queue->add_stmt);
 
283
                if(queue->prepareAddStatement() != GEARMAN_QUEUE_ERROR){
 
284
                    continue;
 
285
                }
 
286
            }
 
287
            gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_execute failed: %s", mysql_error(queue->con));
 
288
            return GEARMAN_QUEUE_ERROR;
 
289
        }
 
290
        
 
291
        break;
 
292
    }
 
293
 
 
294
    return GEARMAN_SUCCESS;
 
295
}
 
296
 
 
297
static gearmand_error_t _mysql_queue_flush(gearman_server_st *server,
 
298
        void *context __attribute__((unused))) {
 
299
 
 
300
    (void) server;
 
301
    (void) context;
 
302
 
 
303
    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,"mysql queue flush");
 
304
 
 
305
    return GEARMAN_SUCCESS;
 
306
}
 
307
 
 
308
static gearmand_error_t _mysql_queue_done(gearman_server_st *server, void *context,
 
309
        const char *unique,
 
310
        size_t unique_size,
 
311
        const char *function_name,
 
312
        size_t function_name_size) {
 
313
 
 
314
    MYSQL_BIND    bind[2];
 
315
 
 
316
    (void) server;
 
317
 
 
318
    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,"mysql queue done: %.*s %.*s", (uint32_t) unique_size, (char *) unique,
 
319
        (uint32_t) function_name_size, (char *) function_name);
 
320
 
 
321
    gearmand::plugins::queue::MySQL *queue = (gearmand::plugins::queue::MySQL *)context;
 
322
 
 
323
    bind[0].buffer_type= MYSQL_TYPE_STRING;
 
324
    bind[0].buffer= (char *)unique;
 
325
    bind[0].buffer_length= unique_size;
 
326
    bind[0].is_null= 0;
 
327
    bind[0].length= (unsigned long*)&unique_size;
 
328
 
 
329
    bind[1].buffer_type= MYSQL_TYPE_STRING;
 
330
    bind[1].buffer= (char *)function_name;
 
331
    bind[1].buffer_length= function_name_size;
 
332
    bind[1].is_null= 0;
 
333
    bind[1].length= (unsigned long*)&function_name_size;
 
334
 
 
335
    while(1){
 
336
        if (mysql_stmt_bind_param(queue->done_stmt, bind)){
 
337
            if ( mysql_stmt_errno(queue->done_stmt) == CR_NO_PREPARE_STMT ){
 
338
                if(queue->prepareDoneStatement() == GEARMAN_QUEUE_ERROR){
 
339
                    return GEARMAN_QUEUE_ERROR;
 
340
                }
 
341
                continue;
 
342
            } else {
 
343
                gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_bind_param failed: %s", mysql_error(queue->con));
 
344
                return GEARMAN_QUEUE_ERROR;
 
345
            }
 
346
        }
 
347
 
 
348
        if (mysql_stmt_execute(queue->done_stmt)){
 
349
            if ( mysql_stmt_errno(queue->done_stmt) == CR_SERVER_LOST ){
 
350
                mysql_stmt_close(queue->done_stmt);
 
351
                if(queue->prepareDoneStatement() != GEARMAN_QUEUE_ERROR){
 
352
                    continue;
 
353
                }
 
354
            }
 
355
            gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_execute failed: %s", mysql_error(queue->con));
 
356
            return GEARMAN_QUEUE_ERROR;
 
357
        }
 
358
 
 
359
        break;
 
360
    }
 
361
 
 
362
    return GEARMAN_SUCCESS;
 
363
}
 
364
 
 
365
static gearmand_error_t _mysql_queue_replay(gearman_server_st *server, void *context,
 
366
        gearman_queue_add_fn *add_fn,
 
367
        void *add_context) {
 
368
 
 
369
    MYSQL_RES * result;
 
370
    MYSQL_ROW row;
 
371
    char query_buffer[1024];
 
372
 
 
373
    (void) server;
 
374
 
 
375
    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,"mysql queue replay");
 
376
 
 
377
    gearmand::plugins::queue::MySQL *queue = (gearmand::plugins::queue::MySQL *)context;
 
378
 
 
379
    snprintf(query_buffer, sizeof(query_buffer),
 
380
        "SELECT unique_key, function_name, data, priority, when_to_run FROM %s",
 
381
        queue->mysql_table.c_str());
 
382
 
 
383
    if(mysql_real_query(queue->con, query_buffer, strlen(query_buffer))){
 
384
        gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_real_query failed: %s", mysql_error(queue->con));
 
385
        return GEARMAN_QUEUE_ERROR;
 
386
    }
 
387
 
 
388
    if(!(result = mysql_store_result(queue->con))){
 
389
        gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_store_result failed: %s", mysql_error(queue->con));
 
390
        return GEARMAN_QUEUE_ERROR;
 
391
    }
 
392
 
 
393
    if(mysql_num_fields(result) < 5){
 
394
        gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql queue: insufficient row fields in queue table");
 
395
        return GEARMAN_QUEUE_ERROR;
 
396
    }
 
397
 
 
398
    gearmand_error_t ret = GEARMAN_SUCCESS;
 
399
 
 
400
    while ((row = mysql_fetch_row(result))) {
 
401
        unsigned long *lengths;
 
402
        gearmand_job_priority_t priority = (gearmand_job_priority_t)0;
 
403
        int when = 0;
 
404
 
 
405
        lengths = mysql_fetch_lengths(result);
 
406
 
 
407
        /* need to make a copy here ... gearman_server_job_free will free it later */
 
408
        size_t data_size= lengths[2];
 
409
        char * data= (char *)malloc(data_size);
 
410
        if (data == NULL){
 
411
            gearmand_perror("malloc failed");
 
412
            return GEARMAN_MEMORY_ALLOCATION_FAILURE;
 
413
        }
 
414
        memcpy(data, row[2], data_size);
 
415
 
 
416
        if(lengths[3]) priority = (gearmand_job_priority_t) atoi(row[3]);
 
417
        if(lengths[4]) when = atoi(row[4]);
 
418
 
 
419
        ret = (*add_fn)(server, add_context,
 
420
            row[0], (size_t) lengths[0],
 
421
            row[1], (size_t) lengths[1],
 
422
            data, data_size,
 
423
            priority,
 
424
            when);
 
425
 
 
426
        if (ret != GEARMAN_SUCCESS) {
 
427
            break;
 
428
        }
 
429
    }
 
430
 
 
431
    mysql_free_result(result);
 
432
 
 
433
    return ret;
 
434
}