1
/* Gearman server and library
2
* Copyright (C) 2009 Cory Bennett
5
* Use and distribution licensed under the BSD license. See
6
* the COPYING file in the parent directory for full text.
11
* @brief Sqlite Queue Storage Definitions
16
#include <libgearman/queue_libsqlite3.h>
20
* @addtogroup gearman_queue_sqlite sqlite Queue Storage Functions
21
* @ingroup gearman_queue
28
#define GEARMAN_QUEUE_SQLITE_DEFAULT_TABLE "gearman_queue"
29
#define GEARMAN_QUEUE_QUERY_BUFFER 256
32
* Private declarations
34
#define SQLITE_MAX_TABLE_SIZE 256
35
#define SQLITE_MAX_CREATE_TABLE_SIZE 1024
38
* Structure for sqlite specific data.
43
char table[SQLITE_MAX_TABLE_SIZE];
47
} gearman_queue_sqlite_st;
50
* Query error handling function.
52
static int _sqlite_query(gearman_st *gearman,
53
gearman_queue_sqlite_st *queue,
54
const char *query, size_t query_size,
56
static int _sqlite_lock(gearman_st *gearman,
57
gearman_queue_sqlite_st *queue);
58
static int _sqlite_commit(gearman_st *gearman,
59
gearman_queue_sqlite_st *queue);
60
static int _sqlite_rollback(gearman_st *gearman,
61
gearman_queue_sqlite_st *queue);
63
/* Queue callback functions. */
64
static gearman_return_t _sqlite_add(gearman_st *gearman, void *fn_arg,
65
const void *unique, size_t unique_size,
66
const void *function_name,
67
size_t function_name_size,
68
const void *data, size_t data_size,
69
gearman_job_priority_t priority);
70
static gearman_return_t _sqlite_flush(gearman_st *gearman, void *fn_arg);
71
static gearman_return_t _sqlite_done(gearman_st *gearman, void *fn_arg,
74
const void *function_name,
75
size_t function_name_size);
76
static gearman_return_t _sqlite_replay(gearman_st *gearman, void *fn_arg,
77
gearman_queue_add_fn *add_fn,
86
gearman_return_t gearman_queue_libsqlite3_conf(gearman_conf_st *conf)
88
gearman_conf_module_st *module;
90
module= gearman_conf_module_create(conf, NULL, "libsqlite3");
92
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
94
#define MCO(__name, __value, __help) \
95
gearman_conf_module_add_option(module, __name, 0, __value, __help);
97
MCO("db", "DB", "Database file to use.")
98
MCO("table", "TABLE", "Table to use.")
100
return gearman_conf_return(conf);
103
gearman_return_t gearman_queue_libsqlite3_init(gearman_st *gearman,
104
gearman_conf_st *conf)
106
gearman_queue_sqlite_st *queue;
107
gearman_conf_module_st *module;
113
char create[SQLITE_MAX_CREATE_TABLE_SIZE];
115
GEARMAN_INFO(gearman, "Initializing libsqlite3 module");
117
queue= malloc(sizeof(gearman_queue_sqlite_st));
120
GEARMAN_ERROR_SET(gearman, "gearman_queue_libsqlite3_init", "malloc")
121
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
124
memset(queue, 0, sizeof(gearman_queue_sqlite_st));
125
snprintf(queue->table, SQLITE_MAX_TABLE_SIZE,
126
GEARMAN_QUEUE_SQLITE_DEFAULT_TABLE);
128
/* Get module and parse the option values that were given. */
129
module= gearman_conf_module_find(conf, "libsqlite3");
132
GEARMAN_ERROR_SET(gearman, "gearman_queue_libsqlite3_init",
133
"gearman_conf_module_find:NULL");
135
return GEARMAN_QUEUE_ERROR;
138
gearman_set_queue_fn_arg(gearman, queue);
140
while (gearman_conf_module_value(module, &name, &value))
142
if (!strcmp(name, "db"))
144
if (sqlite3_open(value, &(queue->db)) != SQLITE_OK)
146
gearman_queue_libsqlite3_deinit(gearman);
147
GEARMAN_ERROR_SET(gearman, "gearman_queue_libsqlite3_init",
148
"Can't open database: %s\n", sqlite3_errmsg(queue->db));
150
return GEARMAN_QUEUE_ERROR;
153
else if (!strcmp(name, "table"))
154
snprintf(queue->table, SQLITE_MAX_TABLE_SIZE, "%s", value);
157
gearman_queue_libsqlite3_deinit(gearman);
158
GEARMAN_ERROR_SET(gearman, "gearman_queue_libsqlite3_init",
159
"Unknown argument: %s", name);
160
return GEARMAN_QUEUE_ERROR;
166
gearman_queue_libsqlite3_deinit(gearman);
167
GEARMAN_ERROR_SET(gearman, "gearman_queue_libsqlite3_init",
168
"missing required --libsqlite3-db=<dbfile> argument");
169
return GEARMAN_QUEUE_ERROR;
172
query= "SELECT name FROM sqlite_master WHERE type='table'";
173
if (_sqlite_query(gearman, queue, query, strlen(query), &sth) != SQLITE_OK)
175
gearman_queue_libsqlite3_deinit(gearman);
176
return GEARMAN_QUEUE_ERROR;
179
while (sqlite3_step(sth) == SQLITE_ROW)
181
if (sqlite3_column_type(sth,0) == SQLITE_TEXT)
182
table= (char*)sqlite3_column_text(sth, 0);
185
sqlite3_finalize(sth);
186
GEARMAN_ERROR_SET(gearman, "gearman_queue_libsqlite3_init",
187
"column %d is not type TEXT", 0);
188
return GEARMAN_QUEUE_ERROR;
191
if (!strcasecmp(queue->table, table))
193
GEARMAN_INFO(gearman, "sqlite module using table '%s'", table);
198
if (sqlite3_finalize(sth) != SQLITE_OK)
200
GEARMAN_ERROR_SET(gearman, "gearman_queue_libsqlite3_init",
201
"sqlite_finalize:%s", sqlite3_errmsg(queue->db));
202
gearman_queue_libsqlite3_deinit(gearman);
203
return GEARMAN_QUEUE_ERROR;
208
snprintf(create, SQLITE_MAX_CREATE_TABLE_SIZE,
211
"unique_key TEXT PRIMARY KEY,"
212
"function_name TEXT,"
218
GEARMAN_INFO(gearman, "sqlite module creating table '%s'", queue->table);
220
if (_sqlite_query(gearman, queue, create, strlen(create), &sth)
223
gearman_queue_libsqlite3_deinit(gearman);
224
return GEARMAN_QUEUE_ERROR;
227
if (sqlite3_step(sth) != SQLITE_DONE)
229
GEARMAN_ERROR_SET(gearman, "gearman_queue_libsqlite3_init",
230
"create table error: %s", sqlite3_errmsg(queue->db));
231
sqlite3_finalize(sth);
232
return GEARMAN_QUEUE_ERROR;
235
if (sqlite3_finalize(sth) != SQLITE_OK)
237
GEARMAN_ERROR_SET(gearman, "gearman_queue_libsqlite3_init",
238
"sqlite_finalize:%s", sqlite3_errmsg(queue->db));
239
gearman_queue_libsqlite3_deinit(gearman);
240
return GEARMAN_QUEUE_ERROR;
244
gearman_set_queue_add(gearman, _sqlite_add);
245
gearman_set_queue_flush(gearman, _sqlite_flush);
246
gearman_set_queue_done(gearman, _sqlite_done);
247
gearman_set_queue_replay(gearman, _sqlite_replay);
249
return GEARMAN_SUCCESS;
252
gearman_return_t gearman_queue_libsqlite3_deinit(gearman_st *gearman)
254
gearman_queue_sqlite_st *queue;
256
GEARMAN_INFO(gearman, "Shutting down sqlite queue module");
258
queue= (gearman_queue_sqlite_st *)gearman_queue_fn_arg(gearman);
259
gearman_set_queue_fn_arg(gearman, NULL);
260
sqlite3_close(queue->db);
261
if (queue->query != NULL)
265
return GEARMAN_SUCCESS;
268
gearman_return_t gearmand_queue_libsqlite3_init(gearmand_st *gearmand,
269
gearman_conf_st *conf)
271
return gearman_queue_libsqlite3_init(gearmand->server.gearman, conf);
274
gearman_return_t gearmand_queue_libsqlite3_deinit(gearmand_st *gearmand)
276
return gearman_queue_libsqlite3_deinit(gearmand->server.gearman);
280
* Private definitions
283
int _sqlite_query(gearman_st *gearman,
284
gearman_queue_sqlite_st *queue,
285
const char *query, size_t query_size,
290
if (query_size > UINT32_MAX)
292
GEARMAN_ERROR_SET(gearman, "_sqlite_query", "query size too big [%u]",
293
(uint32_t)query_size);
297
GEARMAN_CRAZY(gearman, "sqlite query: %s", query);
298
ret= sqlite3_prepare(queue->db, query, (int)query_size, sth, NULL);
299
if (ret != SQLITE_OK)
302
sqlite3_finalize(*sth);
304
GEARMAN_ERROR_SET(gearman, "_sqlite_query", "sqlite_prepare:%s",
305
sqlite3_errmsg(queue->db));
311
int _sqlite_lock(gearman_st *gearman,
312
gearman_queue_sqlite_st *queue)
318
/* already in transaction */
322
ret= _sqlite_query(gearman, queue, "BEGIN TRANSACTION",
323
sizeof("BEGIN TRANSACTION") - 1, &sth);
324
if (ret != SQLITE_OK)
326
GEARMAN_ERROR_SET(gearman, "_sqlite_lock",
327
"failed to begin transaction: %s",
328
sqlite3_errmsg(queue->db));
330
sqlite3_finalize(sth);
335
ret= sqlite3_step(sth);
336
if (ret != SQLITE_DONE)
338
GEARMAN_ERROR_SET(gearman, "_sqlite_lock", "lock error: %s",
339
sqlite3_errmsg(queue->db));
340
sqlite3_finalize(sth);
344
sqlite3_finalize(sth);
350
int _sqlite_commit(gearman_st *gearman,
351
gearman_queue_sqlite_st *queue)
356
if (! queue->in_trans)
358
/* not in transaction */
362
ret= _sqlite_query(gearman, queue, "COMMIT", sizeof("COMMIT") - 1, &sth);
363
if (ret != SQLITE_OK)
365
GEARMAN_ERROR_SET(gearman, "_sqlite_commit",
366
"failed to commit transaction: %s",
367
sqlite3_errmsg(queue->db));
369
sqlite3_finalize(sth);
372
ret= sqlite3_step(sth);
373
if (ret != SQLITE_DONE)
375
GEARMAN_ERROR_SET(gearman, "_sqlite_commit", "commit error: %s",
376
sqlite3_errmsg(queue->db));
377
sqlite3_finalize(sth);
380
sqlite3_finalize(sth);
385
int _sqlite_rollback(gearman_st *gearman,
386
gearman_queue_sqlite_st *queue)
392
if (! queue->in_trans)
394
/* not in transaction */
399
ret= _sqlite_query(gearman, queue, query, strlen(query), &sth);
400
if (ret != SQLITE_OK)
402
GEARMAN_ERROR_SET(gearman, "_sqlite_rollback",
403
"failed to rollback transaction: %s",
404
sqlite3_errmsg(queue->db));
406
sqlite3_finalize(sth);
409
ret= sqlite3_step(sth);
410
if (ret != SQLITE_DONE)
412
GEARMAN_ERROR_SET(gearman, "_sqlite_rollback", "rollback error: %s",
413
sqlite3_errmsg(queue->db));
414
sqlite3_finalize(sth);
417
sqlite3_finalize(sth);
422
static gearman_return_t _sqlite_add(gearman_st *gearman, void *fn_arg,
423
const void *unique, size_t unique_size,
424
const void *function_name,
425
size_t function_name_size,
426
const void *data, size_t data_size,
427
gearman_job_priority_t priority)
429
gearman_queue_sqlite_st *queue= (gearman_queue_sqlite_st *)fn_arg;
434
if (unique_size > UINT32_MAX || function_name_size > UINT32_MAX ||
435
data_size > UINT32_MAX)
437
GEARMAN_ERROR_SET(gearman, "_sqlite_add", "size too big [%u]",
438
(uint32_t)unique_size);
442
GEARMAN_DEBUG(gearman, "sqlite add: %.*s", (uint32_t)unique_size,
445
if (_sqlite_lock(gearman, queue) != SQLITE_OK)
446
return GEARMAN_QUEUE_ERROR;
448
query_size= ((unique_size + function_name_size + data_size) * 2) +
449
GEARMAN_QUEUE_QUERY_BUFFER;
450
if (query_size > queue->query_size)
452
query= realloc(queue->query, query_size);
455
GEARMAN_ERROR_SET(gearman, "_sqlite_add", "realloc")
456
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
460
queue->query_size= query_size;
465
query_size= (size_t)snprintf(query, query_size,
466
"INSERT INTO %s (priority,unique_key,"
467
"function_name,data) VALUES (?,?,?,?)",
470
if (_sqlite_query(gearman, queue, query, query_size, &sth) != SQLITE_OK)
471
return GEARMAN_QUEUE_ERROR;
473
if (sqlite3_bind_int(sth, 1, priority) != SQLITE_OK)
475
_sqlite_rollback(gearman, queue);
476
GEARMAN_ERROR_SET(gearman, "_sqlite_add", "failed to bind int [%d]: %s",
477
priority, sqlite3_errmsg(queue->db));
478
sqlite3_finalize(sth);
479
return GEARMAN_QUEUE_ERROR;
482
if (sqlite3_bind_text(sth, 2, unique, (int)unique_size,
483
SQLITE_TRANSIENT) != SQLITE_OK)
485
_sqlite_rollback(gearman, queue);
486
GEARMAN_ERROR_SET(gearman, "_sqlite_add", "failed to bind text [%.*s]: %s",
487
(uint32_t)unique_size, (char*)unique,
488
sqlite3_errmsg(queue->db));
489
sqlite3_finalize(sth);
490
return GEARMAN_QUEUE_ERROR;
493
if (sqlite3_bind_text(sth, 3, function_name, (int)function_name_size,
494
SQLITE_TRANSIENT) != SQLITE_OK)
496
_sqlite_rollback(gearman, queue);
497
GEARMAN_ERROR_SET(gearman, "_sqlite_add", "failed to bind text [%.*s]: %s",
498
(uint32_t)function_name_size, (char*)function_name,
499
sqlite3_errmsg(queue->db));
500
sqlite3_finalize(sth);
501
return GEARMAN_QUEUE_ERROR;
504
if (sqlite3_bind_blob(sth, 4, data, (int)data_size,
505
SQLITE_TRANSIENT) != SQLITE_OK)
507
_sqlite_rollback(gearman, queue);
508
GEARMAN_ERROR_SET(gearman, "_sqlite_add", "failed to bind blob: %s",
509
sqlite3_errmsg(queue->db));
510
sqlite3_finalize(sth);
511
return GEARMAN_QUEUE_ERROR;
514
if (sqlite3_step(sth) != SQLITE_DONE)
516
GEARMAN_ERROR_SET(gearman, "_sqlite_add", "insert error: %s",
517
sqlite3_errmsg(queue->db));
518
if (sqlite3_finalize(sth) != SQLITE_OK )
520
GEARMAN_ERROR_SET(gearman, "_sqlite_add", "finalize error: %s",
521
sqlite3_errmsg(queue->db));
524
return GEARMAN_QUEUE_ERROR;
527
sqlite3_finalize(sth);
529
if (_sqlite_commit(gearman, queue) != SQLITE_OK)
530
return GEARMAN_QUEUE_ERROR;
532
return GEARMAN_SUCCESS;
535
static gearman_return_t _sqlite_flush(gearman_st *gearman,
536
void *fn_arg __attribute__((unused)))
538
GEARMAN_DEBUG(gearman, "sqlite flush");
540
return GEARMAN_SUCCESS;
543
static gearman_return_t _sqlite_done(gearman_st *gearman, void *fn_arg,
546
const void *function_name __attribute__((unused)),
547
size_t function_name_size __attribute__((unused)))
549
gearman_queue_sqlite_st *queue= (gearman_queue_sqlite_st *)fn_arg;
554
if (unique_size > UINT32_MAX)
556
GEARMAN_ERROR_SET(gearman, "_sqlite_query", "unique key size too big [%u]",
557
(uint32_t)unique_size);
561
GEARMAN_DEBUG(gearman, "sqlite done: %.*s", (uint32_t)unique_size,
564
if (_sqlite_lock(gearman, queue) != SQLITE_OK)
565
return GEARMAN_QUEUE_ERROR;
567
query_size= (unique_size * 2) + GEARMAN_QUEUE_QUERY_BUFFER;
568
if (query_size > queue->query_size)
570
query= realloc(queue->query, query_size);
573
GEARMAN_ERROR_SET(gearman, "_sqlite_add", "realloc")
574
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
578
queue->query_size= query_size;
583
query_size= (size_t)snprintf(query, query_size,
584
"DELETE FROM %s WHERE unique_key=?",
587
if (_sqlite_query(gearman, queue, query, query_size, &sth) != SQLITE_OK)
588
return GEARMAN_QUEUE_ERROR;
590
sqlite3_bind_text(sth, 1, unique, (int)unique_size, SQLITE_TRANSIENT);
592
if (sqlite3_step(sth) != SQLITE_DONE)
594
GEARMAN_ERROR_SET(gearman, "_sqlite_done", "delete error: %s",
595
sqlite3_errmsg(queue->db));
596
sqlite3_finalize(sth);
597
return GEARMAN_QUEUE_ERROR;
600
sqlite3_finalize(sth);
602
if (_sqlite_commit(gearman, queue) != SQLITE_OK)
603
return GEARMAN_QUEUE_ERROR;
605
return GEARMAN_SUCCESS;
608
static gearman_return_t _sqlite_replay(gearman_st *gearman, void *fn_arg,
609
gearman_queue_add_fn *add_fn,
612
gearman_queue_sqlite_st *queue= (gearman_queue_sqlite_st *)fn_arg;
616
gearman_return_t gret;
618
GEARMAN_INFO(gearman, "sqlite replay start")
620
if (GEARMAN_QUEUE_QUERY_BUFFER > queue->query_size)
622
query= realloc(queue->query, GEARMAN_QUEUE_QUERY_BUFFER);
625
GEARMAN_ERROR_SET(gearman, "_sqlite_replay", "realloc")
626
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
630
queue->query_size= GEARMAN_QUEUE_QUERY_BUFFER;
635
query_size= (size_t)snprintf(query, GEARMAN_QUEUE_QUERY_BUFFER,
636
"SELECT unique_key,function_name,priority,data "
640
if (_sqlite_query(gearman, queue, query, query_size, &sth) != SQLITE_OK)
641
return GEARMAN_QUEUE_ERROR;
642
while (sqlite3_step(sth) == SQLITE_ROW)
644
const void *unique, *function_name;
646
size_t unique_size, function_name_size, data_size;
647
gearman_job_priority_t priority;
649
if (sqlite3_column_type(sth,0) == SQLITE_TEXT)
651
unique= sqlite3_column_text(sth,0);
652
unique_size= (size_t) sqlite3_column_bytes(sth,0);
656
sqlite3_finalize(sth);
657
GEARMAN_ERROR_SET(gearman, "_sqlite_replay",
658
"column %d is not type TEXT", 0);
659
return GEARMAN_QUEUE_ERROR;
662
if (sqlite3_column_type(sth,1) == SQLITE_TEXT)
664
function_name= sqlite3_column_text(sth,1);
665
function_name_size= (size_t)sqlite3_column_bytes(sth,1);
669
sqlite3_finalize(sth);
670
GEARMAN_ERROR_SET(gearman, "_sqlite_replay",
671
"column %d is not type TEXT", 1);
672
return GEARMAN_QUEUE_ERROR;
675
if (sqlite3_column_type(sth,2) == SQLITE_INTEGER)
677
priority= (double)sqlite3_column_int64(sth,2);
681
sqlite3_finalize(sth);
682
GEARMAN_ERROR_SET(gearman, "_sqlite_replay",
683
"column %d is not type INTEGER", 2);
684
return GEARMAN_QUEUE_ERROR;
687
if (sqlite3_column_type(sth,3) == SQLITE_BLOB)
689
data_size= (size_t)sqlite3_column_bytes(sth,3);
690
/* need to make a copy here ... gearman_server_job_free will free it later */
691
data= malloc(data_size);
694
sqlite3_finalize(sth);
695
GEARMAN_ERROR_SET(gearman, "_sqlite_replay", "malloc");
696
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
698
memcpy(data, sqlite3_column_blob(sth,3), data_size);
702
sqlite3_finalize(sth);
703
GEARMAN_ERROR_SET(gearman, "_sqlite_replay",
704
"column %d is not type TEXT", 3);
705
return GEARMAN_QUEUE_ERROR;
708
GEARMAN_DEBUG(gearman, "sqlite replay: %s", (char*)function_name);
710
gret= (*add_fn)(gearman, add_fn_arg,
712
function_name, function_name_size,
715
if (gret != GEARMAN_SUCCESS)
717
sqlite3_finalize(sth);
722
sqlite3_finalize(sth);
724
return GEARMAN_SUCCESS;