~clint-fewbar/ubuntu/precise/gearmand/drop-unneeded-patches

« back to all changes in this revision

Viewing changes to libgearman/queue_libsqlite3.c

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2009-08-11 10:06:22 UTC
  • mto: (1.2.3 upstream) (6.1.1 sid)
  • mto: This revision was merged to the branch mainline in revision 3.
  • Revision ID: james.westby@ubuntu.com-20090811100622-6ig4iknanc73olum
ImportĀ upstreamĀ versionĀ 0.9

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Gearman server and library
 
2
 * Copyright (C) 2009 Cory Bennett
 
3
 * All rights reserved.
 
4
 *
 
5
 * Use and distribution licensed under the BSD license.  See
 
6
 * the COPYING file in the parent directory for full text.
 
7
 */
 
8
 
 
9
/**
 
10
 * @file
 
11
 * @brief Sqlite Queue Storage Definitions
 
12
 */
 
13
 
 
14
#include "common.h"
 
15
 
 
16
#include <libgearman/queue_libsqlite3.h>
 
17
#include <sqlite3.h>
 
18
 
 
19
/**
 
20
 * @addtogroup gearman_queue_sqlite sqlite Queue Storage Functions
 
21
 * @ingroup gearman_queue
 
22
 * @{
 
23
 */
 
24
 
 
25
/**
 
26
 * Default values.
 
27
 */
 
28
#define GEARMAN_QUEUE_SQLITE_DEFAULT_TABLE "gearman_queue"
 
29
#define GEARMAN_QUEUE_QUERY_BUFFER 256
 
30
 
 
31
/*
 
32
 * Private declarations
 
33
 */
 
34
#define SQLITE_MAX_TABLE_SIZE 256
 
35
#define SQLITE_MAX_CREATE_TABLE_SIZE 1024
 
36
 
 
37
/**
 
38
 * Structure for sqlite specific data.
 
39
 */
 
40
typedef struct
 
41
{
 
42
  sqlite3* db;
 
43
  char table[SQLITE_MAX_TABLE_SIZE];
 
44
  char *query;
 
45
  size_t query_size;
 
46
  int in_trans;
 
47
} gearman_queue_sqlite_st;
 
48
 
 
49
/**
 
50
 * Query error handling function.
 
51
 */
 
52
static int _sqlite_query(gearman_st *gearman,
 
53
                         gearman_queue_sqlite_st *queue,
 
54
                         const char *query, size_t query_size,
 
55
                         sqlite3_stmt ** sth);
 
56
static int _sqlite_lock(gearman_st *gearman,
 
57
                        gearman_queue_sqlite_st *queue);
 
58
static int _sqlite_commit(gearman_st *gearman,
 
59
                          gearman_queue_sqlite_st *queue);
 
60
static int _sqlite_rollback(gearman_st *gearman,
 
61
                            gearman_queue_sqlite_st *queue);
 
62
 
 
63
/* Queue callback functions. */
 
64
static gearman_return_t _sqlite_add(gearman_st *gearman, void *fn_arg,
 
65
                                    const void *unique, size_t unique_size,
 
66
                                    const void *function_name,
 
67
                                    size_t function_name_size,
 
68
                                    const void *data, size_t data_size,
 
69
                                    gearman_job_priority_t priority);
 
70
static gearman_return_t _sqlite_flush(gearman_st *gearman, void *fn_arg);
 
71
static gearman_return_t _sqlite_done(gearman_st *gearman, void *fn_arg,
 
72
                                     const void *unique,
 
73
                                     size_t unique_size,
 
74
                                     const void *function_name,
 
75
                                     size_t function_name_size);
 
76
static gearman_return_t _sqlite_replay(gearman_st *gearman, void *fn_arg,
 
77
                                       gearman_queue_add_fn *add_fn,
 
78
                                       void *add_fn_arg);
 
79
 
 
80
/** @} */
 
81
 
 
82
/*
 
83
 * Public definitions
 
84
 */
 
85
 
 
86
gearman_return_t gearman_queue_libsqlite3_conf(gearman_conf_st *conf)
 
87
{
 
88
  gearman_conf_module_st *module;
 
89
 
 
90
  module= gearman_conf_module_create(conf, NULL, "libsqlite3");
 
91
  if (module == NULL)
 
92
    return GEARMAN_MEMORY_ALLOCATION_FAILURE;
 
93
 
 
94
#define MCO(__name, __value, __help) \
 
95
  gearman_conf_module_add_option(module, __name, 0, __value, __help);
 
96
 
 
97
  MCO("db", "DB", "Database file to use.")
 
98
  MCO("table", "TABLE", "Table to use.")
 
99
 
 
100
  return gearman_conf_return(conf);
 
101
}
 
102
 
 
103
gearman_return_t gearman_queue_libsqlite3_init(gearman_st *gearman,
 
104
                                               gearman_conf_st *conf)
 
105
{
 
106
  gearman_queue_sqlite_st *queue;
 
107
  gearman_conf_module_st *module;
 
108
  const char *name;
 
109
  const char *value;
 
110
  char *table= NULL;
 
111
  const char *query;
 
112
  sqlite3_stmt* sth;
 
113
  char create[SQLITE_MAX_CREATE_TABLE_SIZE];
 
114
 
 
115
  GEARMAN_INFO(gearman, "Initializing libsqlite3 module");
 
116
 
 
117
  queue= malloc(sizeof(gearman_queue_sqlite_st));
 
118
  if (queue == NULL)
 
119
  {
 
120
    GEARMAN_ERROR_SET(gearman, "gearman_queue_libsqlite3_init", "malloc")
 
121
    return GEARMAN_MEMORY_ALLOCATION_FAILURE;
 
122
  }
 
123
 
 
124
  memset(queue, 0, sizeof(gearman_queue_sqlite_st));
 
125
  snprintf(queue->table, SQLITE_MAX_TABLE_SIZE,
 
126
           GEARMAN_QUEUE_SQLITE_DEFAULT_TABLE);
 
127
 
 
128
  /* Get module and parse the option values that were given. */
 
129
  module= gearman_conf_module_find(conf, "libsqlite3");
 
130
  if (module == NULL)
 
131
  {
 
132
    GEARMAN_ERROR_SET(gearman, "gearman_queue_libsqlite3_init",
 
133
                      "gearman_conf_module_find:NULL");
 
134
    free(queue);
 
135
    return GEARMAN_QUEUE_ERROR;
 
136
  }
 
137
 
 
138
  gearman_set_queue_fn_arg(gearman, queue);
 
139
 
 
140
  while (gearman_conf_module_value(module, &name, &value))
 
141
  {
 
142
    if (!strcmp(name, "db"))
 
143
    {
 
144
      if (sqlite3_open(value, &(queue->db)) != SQLITE_OK)
 
145
      {
 
146
        gearman_queue_libsqlite3_deinit(gearman);        
 
147
        GEARMAN_ERROR_SET(gearman, "gearman_queue_libsqlite3_init",
 
148
                          "Can't open database: %s\n", sqlite3_errmsg(queue->db));
 
149
        free(queue);
 
150
        return GEARMAN_QUEUE_ERROR;
 
151
      }
 
152
    }
 
153
    else if (!strcmp(name, "table"))
 
154
      snprintf(queue->table, SQLITE_MAX_TABLE_SIZE, "%s", value);
 
155
    else
 
156
    {
 
157
      gearman_queue_libsqlite3_deinit(gearman);
 
158
      GEARMAN_ERROR_SET(gearman, "gearman_queue_libsqlite3_init",
 
159
                        "Unknown argument: %s", name);
 
160
      return GEARMAN_QUEUE_ERROR;
 
161
    }
 
162
  }
 
163
 
 
164
  if (!queue->db)
 
165
  {
 
166
    gearman_queue_libsqlite3_deinit(gearman);
 
167
    GEARMAN_ERROR_SET(gearman, "gearman_queue_libsqlite3_init",
 
168
                      "missing required --libsqlite3-db=<dbfile> argument");
 
169
    return GEARMAN_QUEUE_ERROR;
 
170
  }    
 
171
 
 
172
  query= "SELECT name FROM sqlite_master WHERE type='table'";
 
173
  if (_sqlite_query(gearman, queue, query, strlen(query), &sth) != SQLITE_OK)
 
174
  {
 
175
    gearman_queue_libsqlite3_deinit(gearman);
 
176
    return GEARMAN_QUEUE_ERROR;
 
177
  }
 
178
 
 
179
  while (sqlite3_step(sth) == SQLITE_ROW)
 
180
  {
 
181
    if (sqlite3_column_type(sth,0) == SQLITE_TEXT)
 
182
      table= (char*)sqlite3_column_text(sth, 0);
 
183
    else
 
184
    {
 
185
      sqlite3_finalize(sth);
 
186
      GEARMAN_ERROR_SET(gearman, "gearman_queue_libsqlite3_init",
 
187
                        "column %d is not type TEXT", 0);
 
188
      return GEARMAN_QUEUE_ERROR;
 
189
    }
 
190
 
 
191
    if (!strcasecmp(queue->table, table))
 
192
    {
 
193
      GEARMAN_INFO(gearman, "sqlite module using table '%s'", table);
 
194
      break;
 
195
    }
 
196
  }
 
197
 
 
198
  if (sqlite3_finalize(sth) != SQLITE_OK)
 
199
  {
 
200
    GEARMAN_ERROR_SET(gearman, "gearman_queue_libsqlite3_init",
 
201
                      "sqlite_finalize:%s", sqlite3_errmsg(queue->db));
 
202
    gearman_queue_libsqlite3_deinit(gearman);
 
203
    return GEARMAN_QUEUE_ERROR;
 
204
  }
 
205
 
 
206
  if (table == NULL)
 
207
  {
 
208
    snprintf(create, SQLITE_MAX_CREATE_TABLE_SIZE,
 
209
             "CREATE TABLE %s"
 
210
             "("
 
211
             "unique_key TEXT PRIMARY KEY,"
 
212
             "function_name TEXT,"
 
213
             "priority INTEGER,"
 
214
             "data BLOB"
 
215
             ")",
 
216
             queue->table);
 
217
 
 
218
    GEARMAN_INFO(gearman, "sqlite module creating table '%s'", queue->table);
 
219
 
 
220
    if (_sqlite_query(gearman, queue, create, strlen(create), &sth)
 
221
        != SQLITE_OK)
 
222
    {
 
223
      gearman_queue_libsqlite3_deinit(gearman);
 
224
      return GEARMAN_QUEUE_ERROR;
 
225
    }
 
226
 
 
227
    if (sqlite3_step(sth) != SQLITE_DONE)
 
228
    {
 
229
      GEARMAN_ERROR_SET(gearman, "gearman_queue_libsqlite3_init",
 
230
                        "create table error: %s", sqlite3_errmsg(queue->db));
 
231
      sqlite3_finalize(sth);
 
232
      return GEARMAN_QUEUE_ERROR;
 
233
    }
 
234
 
 
235
    if (sqlite3_finalize(sth) != SQLITE_OK)
 
236
    {
 
237
      GEARMAN_ERROR_SET(gearman, "gearman_queue_libsqlite3_init",
 
238
                        "sqlite_finalize:%s", sqlite3_errmsg(queue->db));
 
239
      gearman_queue_libsqlite3_deinit(gearman);
 
240
      return GEARMAN_QUEUE_ERROR;
 
241
    }
 
242
  }
 
243
 
 
244
  gearman_set_queue_add(gearman, _sqlite_add);
 
245
  gearman_set_queue_flush(gearman, _sqlite_flush);
 
246
  gearman_set_queue_done(gearman, _sqlite_done);
 
247
  gearman_set_queue_replay(gearman, _sqlite_replay);
 
248
 
 
249
  return GEARMAN_SUCCESS;
 
250
}
 
251
 
 
252
gearman_return_t gearman_queue_libsqlite3_deinit(gearman_st *gearman)
 
253
{
 
254
  gearman_queue_sqlite_st *queue;
 
255
 
 
256
  GEARMAN_INFO(gearman, "Shutting down sqlite queue module");
 
257
 
 
258
  queue= (gearman_queue_sqlite_st *)gearman_queue_fn_arg(gearman);
 
259
  gearman_set_queue_fn_arg(gearman, NULL);
 
260
  sqlite3_close(queue->db);
 
261
  if (queue->query != NULL)
 
262
    free(queue->query);
 
263
  free(queue);
 
264
 
 
265
  return GEARMAN_SUCCESS;
 
266
}
 
267
 
 
268
gearman_return_t gearmand_queue_libsqlite3_init(gearmand_st *gearmand,
 
269
                                                gearman_conf_st *conf)
 
270
{
 
271
  return gearman_queue_libsqlite3_init(gearmand->server.gearman, conf);
 
272
}
 
273
 
 
274
gearman_return_t gearmand_queue_libsqlite3_deinit(gearmand_st *gearmand)
 
275
{
 
276
  return gearman_queue_libsqlite3_deinit(gearmand->server.gearman);
 
277
}
 
278
 
 
279
/*
 
280
 * Private definitions
 
281
 */
 
282
 
 
283
int _sqlite_query(gearman_st *gearman,
 
284
                  gearman_queue_sqlite_st *queue,
 
285
                  const char *query, size_t query_size,
 
286
                  sqlite3_stmt ** sth)
 
287
{
 
288
  int ret;
 
289
 
 
290
  if (query_size > UINT32_MAX)
 
291
  {
 
292
    GEARMAN_ERROR_SET(gearman, "_sqlite_query", "query size too big [%u]",
 
293
                      (uint32_t)query_size);
 
294
    return SQLITE_ERROR;
 
295
  }
 
296
 
 
297
  GEARMAN_CRAZY(gearman, "sqlite query: %s", query);
 
298
  ret= sqlite3_prepare(queue->db, query, (int)query_size, sth, NULL);
 
299
  if (ret  != SQLITE_OK)
 
300
  {
 
301
    if (*sth)
 
302
      sqlite3_finalize(*sth);
 
303
    *sth= NULL;
 
304
    GEARMAN_ERROR_SET(gearman, "_sqlite_query", "sqlite_prepare:%s", 
 
305
                      sqlite3_errmsg(queue->db));
 
306
  }
 
307
 
 
308
  return ret;
 
309
}
 
310
 
 
311
int _sqlite_lock(gearman_st *gearman,
 
312
                 gearman_queue_sqlite_st *queue)
 
313
{
 
314
  sqlite3_stmt* sth;
 
315
  int ret;
 
316
  if (queue->in_trans)
 
317
  {
 
318
    /* already in transaction */
 
319
    return SQLITE_OK;
 
320
  }
 
321
 
 
322
  ret= _sqlite_query(gearman, queue, "BEGIN TRANSACTION", 
 
323
                     sizeof("BEGIN TRANSACTION") - 1, &sth);
 
324
  if (ret != SQLITE_OK)
 
325
  {
 
326
    GEARMAN_ERROR_SET(gearman, "_sqlite_lock",
 
327
                      "failed to begin transaction: %s", 
 
328
                      sqlite3_errmsg(queue->db));
 
329
    if(sth)
 
330
      sqlite3_finalize(sth);
 
331
 
 
332
    return ret;
 
333
  }
 
334
 
 
335
  ret= sqlite3_step(sth);
 
336
  if (ret != SQLITE_DONE)
 
337
  {
 
338
    GEARMAN_ERROR_SET(gearman, "_sqlite_lock", "lock error: %s",
 
339
                      sqlite3_errmsg(queue->db));
 
340
    sqlite3_finalize(sth);
 
341
    return ret;
 
342
  }
 
343
 
 
344
  sqlite3_finalize(sth);
 
345
  queue->in_trans++;
 
346
 
 
347
  return SQLITE_OK;
 
348
}
 
349
 
 
350
int _sqlite_commit(gearman_st *gearman,
 
351
                   gearman_queue_sqlite_st *queue)
 
352
{
 
353
  sqlite3_stmt* sth;
 
354
  int ret;
 
355
 
 
356
  if (! queue->in_trans)
 
357
  {
 
358
    /* not in transaction */
 
359
    return SQLITE_OK;
 
360
  }
 
361
 
 
362
  ret= _sqlite_query(gearman, queue, "COMMIT", sizeof("COMMIT") - 1, &sth);
 
363
  if (ret != SQLITE_OK)
 
364
  {
 
365
    GEARMAN_ERROR_SET(gearman, "_sqlite_commit",
 
366
                      "failed to commit transaction: %s", 
 
367
                      sqlite3_errmsg(queue->db));
 
368
    if(sth)
 
369
      sqlite3_finalize(sth);
 
370
    return ret;
 
371
  }
 
372
  ret= sqlite3_step(sth);
 
373
  if (ret != SQLITE_DONE)
 
374
  {
 
375
    GEARMAN_ERROR_SET(gearman, "_sqlite_commit", "commit error: %s",
 
376
                      sqlite3_errmsg(queue->db));
 
377
    sqlite3_finalize(sth);
 
378
    return ret;
 
379
  }
 
380
  sqlite3_finalize(sth);
 
381
  queue->in_trans= 0;
 
382
  return SQLITE_OK;
 
383
}
 
384
 
 
385
int _sqlite_rollback(gearman_st *gearman,
 
386
                     gearman_queue_sqlite_st *queue)
 
387
{
 
388
  sqlite3_stmt* sth;
 
389
  int ret;
 
390
  const char* query;
 
391
 
 
392
  if (! queue->in_trans)
 
393
  {
 
394
    /* not in transaction */
 
395
    return SQLITE_OK;
 
396
  }
 
397
 
 
398
  query= "ROLLBACK";
 
399
  ret= _sqlite_query(gearman, queue, query, strlen(query), &sth);
 
400
  if (ret != SQLITE_OK)
 
401
  {
 
402
    GEARMAN_ERROR_SET(gearman, "_sqlite_rollback",
 
403
                      "failed to rollback transaction: %s",
 
404
                      sqlite3_errmsg(queue->db));
 
405
    if(sth)
 
406
      sqlite3_finalize(sth);
 
407
    return ret;
 
408
  }
 
409
  ret= sqlite3_step(sth);
 
410
  if (ret != SQLITE_DONE)
 
411
  {
 
412
    GEARMAN_ERROR_SET(gearman, "_sqlite_rollback", "rollback error: %s",
 
413
                      sqlite3_errmsg(queue->db));
 
414
    sqlite3_finalize(sth);
 
415
    return ret;
 
416
  }
 
417
  sqlite3_finalize(sth);
 
418
  queue->in_trans= 0;
 
419
  return SQLITE_OK;
 
420
}
 
421
 
 
422
static gearman_return_t _sqlite_add(gearman_st *gearman, void *fn_arg,
 
423
                                    const void *unique, size_t unique_size,
 
424
                                    const void *function_name,
 
425
                                    size_t function_name_size,
 
426
                                    const void *data, size_t data_size,
 
427
                                    gearman_job_priority_t priority)
 
428
{
 
429
  gearman_queue_sqlite_st *queue= (gearman_queue_sqlite_st *)fn_arg;
 
430
  char *query;
 
431
  size_t query_size;
 
432
  sqlite3_stmt* sth;
 
433
 
 
434
  if (unique_size > UINT32_MAX || function_name_size > UINT32_MAX ||
 
435
      data_size > UINT32_MAX)
 
436
  {
 
437
    GEARMAN_ERROR_SET(gearman, "_sqlite_add", "size too big [%u]",
 
438
                      (uint32_t)unique_size);
 
439
    return SQLITE_ERROR;
 
440
  }
 
441
 
 
442
  GEARMAN_DEBUG(gearman, "sqlite add: %.*s", (uint32_t)unique_size,
 
443
                (char *)unique);
 
444
 
 
445
  if (_sqlite_lock(gearman, queue) !=  SQLITE_OK)
 
446
    return GEARMAN_QUEUE_ERROR;
 
447
 
 
448
  query_size= ((unique_size + function_name_size + data_size) * 2) +
 
449
    GEARMAN_QUEUE_QUERY_BUFFER;
 
450
  if (query_size > queue->query_size)
 
451
  {
 
452
    query= realloc(queue->query, query_size);
 
453
    if (query == NULL)
 
454
    {
 
455
      GEARMAN_ERROR_SET(gearman, "_sqlite_add", "realloc")
 
456
      return GEARMAN_MEMORY_ALLOCATION_FAILURE;
 
457
    }
 
458
 
 
459
    queue->query= query;
 
460
    queue->query_size= query_size;
 
461
  }
 
462
  else
 
463
    query= queue->query;
 
464
 
 
465
  query_size= (size_t)snprintf(query, query_size,
 
466
                               "INSERT INTO %s (priority,unique_key,"
 
467
                               "function_name,data) VALUES (?,?,?,?)",
 
468
                               queue->table);
 
469
 
 
470
  if (_sqlite_query(gearman, queue, query, query_size, &sth) != SQLITE_OK)
 
471
    return GEARMAN_QUEUE_ERROR;
 
472
 
 
473
  if (sqlite3_bind_int(sth,  1, priority) != SQLITE_OK)
 
474
  {
 
475
    _sqlite_rollback(gearman, queue);
 
476
    GEARMAN_ERROR_SET(gearman, "_sqlite_add", "failed to bind int [%d]: %s",
 
477
                      priority, sqlite3_errmsg(queue->db));
 
478
    sqlite3_finalize(sth);
 
479
    return GEARMAN_QUEUE_ERROR;
 
480
  }
 
481
 
 
482
  if (sqlite3_bind_text(sth, 2, unique, (int)unique_size,
 
483
                        SQLITE_TRANSIENT) != SQLITE_OK)
 
484
  {
 
485
    _sqlite_rollback(gearman, queue);
 
486
    GEARMAN_ERROR_SET(gearman, "_sqlite_add", "failed to bind text [%.*s]: %s", 
 
487
                      (uint32_t)unique_size, (char*)unique,
 
488
                      sqlite3_errmsg(queue->db));
 
489
    sqlite3_finalize(sth);
 
490
    return GEARMAN_QUEUE_ERROR;
 
491
  }
 
492
 
 
493
  if (sqlite3_bind_text(sth, 3, function_name, (int)function_name_size,
 
494
                        SQLITE_TRANSIENT) != SQLITE_OK)
 
495
  {
 
496
    _sqlite_rollback(gearman, queue);
 
497
    GEARMAN_ERROR_SET(gearman, "_sqlite_add", "failed to bind text [%.*s]: %s", 
 
498
                      (uint32_t)function_name_size, (char*)function_name,
 
499
                      sqlite3_errmsg(queue->db));
 
500
    sqlite3_finalize(sth);
 
501
    return GEARMAN_QUEUE_ERROR;
 
502
  }
 
503
 
 
504
  if (sqlite3_bind_blob(sth, 4, data, (int)data_size,
 
505
                        SQLITE_TRANSIENT) != SQLITE_OK)
 
506
  {
 
507
    _sqlite_rollback(gearman, queue);
 
508
    GEARMAN_ERROR_SET(gearman, "_sqlite_add", "failed to bind blob: %s",
 
509
                      sqlite3_errmsg(queue->db));
 
510
    sqlite3_finalize(sth);
 
511
    return GEARMAN_QUEUE_ERROR;
 
512
  }
 
513
 
 
514
  if (sqlite3_step(sth) != SQLITE_DONE)
 
515
  {
 
516
    GEARMAN_ERROR_SET(gearman, "_sqlite_add", "insert error: %s",
 
517
                      sqlite3_errmsg(queue->db));
 
518
    if (sqlite3_finalize(sth) != SQLITE_OK )
 
519
    {
 
520
      GEARMAN_ERROR_SET(gearman, "_sqlite_add", "finalize error: %s",
 
521
                        sqlite3_errmsg(queue->db));
 
522
    }
 
523
 
 
524
    return GEARMAN_QUEUE_ERROR;
 
525
  }
 
526
 
 
527
  sqlite3_finalize(sth);
 
528
 
 
529
  if (_sqlite_commit(gearman, queue) !=  SQLITE_OK)
 
530
    return GEARMAN_QUEUE_ERROR;
 
531
 
 
532
  return GEARMAN_SUCCESS;
 
533
}
 
534
 
 
535
static gearman_return_t _sqlite_flush(gearman_st *gearman,
 
536
                                      void *fn_arg __attribute__((unused)))
 
537
{
 
538
  GEARMAN_DEBUG(gearman, "sqlite flush");
 
539
 
 
540
  return GEARMAN_SUCCESS;
 
541
}
 
542
 
 
543
static gearman_return_t _sqlite_done(gearman_st *gearman, void *fn_arg,
 
544
                                     const void *unique,
 
545
                                     size_t unique_size,
 
546
                                     const void *function_name __attribute__((unused)),
 
547
                                     size_t function_name_size __attribute__((unused)))
 
548
{
 
549
  gearman_queue_sqlite_st *queue= (gearman_queue_sqlite_st *)fn_arg;
 
550
  char *query;
 
551
  size_t query_size;
 
552
  sqlite3_stmt* sth;
 
553
 
 
554
  if (unique_size > UINT32_MAX)
 
555
  {
 
556
    GEARMAN_ERROR_SET(gearman, "_sqlite_query", "unique key size too big [%u]",
 
557
                      (uint32_t)unique_size);
 
558
    return SQLITE_ERROR;
 
559
  }
 
560
 
 
561
  GEARMAN_DEBUG(gearman, "sqlite done: %.*s", (uint32_t)unique_size,
 
562
                (char *)unique);
 
563
 
 
564
    if (_sqlite_lock(gearman, queue) !=  SQLITE_OK)
 
565
      return GEARMAN_QUEUE_ERROR;
 
566
 
 
567
  query_size= (unique_size * 2) + GEARMAN_QUEUE_QUERY_BUFFER;
 
568
  if (query_size > queue->query_size)
 
569
  {
 
570
    query= realloc(queue->query, query_size);
 
571
    if (query == NULL)
 
572
    {
 
573
      GEARMAN_ERROR_SET(gearman, "_sqlite_add", "realloc")
 
574
      return GEARMAN_MEMORY_ALLOCATION_FAILURE;
 
575
    }
 
576
 
 
577
    queue->query= query;
 
578
    queue->query_size= query_size;
 
579
  }
 
580
  else
 
581
    query= queue->query;
 
582
 
 
583
  query_size= (size_t)snprintf(query, query_size,
 
584
                               "DELETE FROM %s WHERE unique_key=?",
 
585
                               queue->table);
 
586
 
 
587
  if (_sqlite_query(gearman, queue, query, query_size, &sth) != SQLITE_OK)
 
588
    return GEARMAN_QUEUE_ERROR;
 
589
 
 
590
  sqlite3_bind_text(sth, 1, unique, (int)unique_size, SQLITE_TRANSIENT);
 
591
 
 
592
  if (sqlite3_step(sth) != SQLITE_DONE)
 
593
  {
 
594
    GEARMAN_ERROR_SET(gearman, "_sqlite_done", "delete error: %s",
 
595
                      sqlite3_errmsg(queue->db));
 
596
    sqlite3_finalize(sth);
 
597
    return GEARMAN_QUEUE_ERROR;
 
598
  }
 
599
 
 
600
  sqlite3_finalize(sth);
 
601
 
 
602
  if (_sqlite_commit(gearman, queue) !=  SQLITE_OK)
 
603
    return GEARMAN_QUEUE_ERROR;
 
604
 
 
605
  return GEARMAN_SUCCESS;
 
606
}
 
607
 
 
608
static gearman_return_t _sqlite_replay(gearman_st *gearman, void *fn_arg,
 
609
                                       gearman_queue_add_fn *add_fn,
 
610
                                       void *add_fn_arg)
 
611
{
 
612
  gearman_queue_sqlite_st *queue= (gearman_queue_sqlite_st *)fn_arg;
 
613
  char *query;
 
614
  size_t query_size;
 
615
  sqlite3_stmt* sth;
 
616
  gearman_return_t gret;
 
617
 
 
618
  GEARMAN_INFO(gearman, "sqlite replay start")
 
619
 
 
620
  if (GEARMAN_QUEUE_QUERY_BUFFER > queue->query_size)
 
621
  {
 
622
    query= realloc(queue->query, GEARMAN_QUEUE_QUERY_BUFFER);
 
623
    if (query == NULL)
 
624
    {
 
625
      GEARMAN_ERROR_SET(gearman, "_sqlite_replay", "realloc")
 
626
      return GEARMAN_MEMORY_ALLOCATION_FAILURE;
 
627
    }
 
628
 
 
629
    queue->query= query;
 
630
    queue->query_size= GEARMAN_QUEUE_QUERY_BUFFER;
 
631
  }
 
632
  else
 
633
    query= queue->query;
 
634
 
 
635
  query_size= (size_t)snprintf(query, GEARMAN_QUEUE_QUERY_BUFFER,
 
636
                               "SELECT unique_key,function_name,priority,data "
 
637
                               "FROM %s",
 
638
                               queue->table);
 
639
 
 
640
  if (_sqlite_query(gearman, queue, query, query_size, &sth) != SQLITE_OK)
 
641
    return GEARMAN_QUEUE_ERROR;
 
642
  while (sqlite3_step(sth) == SQLITE_ROW)
 
643
  {
 
644
    const void *unique, *function_name;
 
645
    void *data;
 
646
    size_t unique_size, function_name_size, data_size;
 
647
    gearman_job_priority_t priority;
 
648
 
 
649
    if (sqlite3_column_type(sth,0) == SQLITE_TEXT)
 
650
    {
 
651
      unique= sqlite3_column_text(sth,0);
 
652
      unique_size= (size_t) sqlite3_column_bytes(sth,0);
 
653
    }
 
654
    else
 
655
    {
 
656
      sqlite3_finalize(sth);
 
657
      GEARMAN_ERROR_SET(gearman, "_sqlite_replay",
 
658
                        "column %d is not type TEXT", 0);
 
659
      return GEARMAN_QUEUE_ERROR;
 
660
    }
 
661
 
 
662
    if (sqlite3_column_type(sth,1) == SQLITE_TEXT)
 
663
    {
 
664
      function_name= sqlite3_column_text(sth,1);
 
665
      function_name_size= (size_t)sqlite3_column_bytes(sth,1);
 
666
    }
 
667
    else
 
668
    {
 
669
      sqlite3_finalize(sth);
 
670
      GEARMAN_ERROR_SET(gearman, "_sqlite_replay",
 
671
                        "column %d is not type TEXT", 1);
 
672
      return GEARMAN_QUEUE_ERROR;
 
673
    }
 
674
 
 
675
    if (sqlite3_column_type(sth,2) == SQLITE_INTEGER)
 
676
    {
 
677
      priority= (double)sqlite3_column_int64(sth,2);
 
678
    }
 
679
    else
 
680
    {
 
681
      sqlite3_finalize(sth);
 
682
      GEARMAN_ERROR_SET(gearman, "_sqlite_replay",
 
683
                        "column %d is not type INTEGER", 2);
 
684
      return GEARMAN_QUEUE_ERROR;
 
685
    }
 
686
 
 
687
    if (sqlite3_column_type(sth,3) == SQLITE_BLOB)
 
688
    {
 
689
      data_size= (size_t)sqlite3_column_bytes(sth,3);
 
690
      /* need to make a copy here ... gearman_server_job_free will free it later */
 
691
      data= malloc(data_size);
 
692
      if (data == NULL)
 
693
      {
 
694
        sqlite3_finalize(sth);
 
695
        GEARMAN_ERROR_SET(gearman, "_sqlite_replay", "malloc");
 
696
        return GEARMAN_MEMORY_ALLOCATION_FAILURE;
 
697
      }
 
698
      memcpy(data, sqlite3_column_blob(sth,3), data_size);
 
699
    }
 
700
    else
 
701
    {
 
702
      sqlite3_finalize(sth);
 
703
      GEARMAN_ERROR_SET(gearman, "_sqlite_replay",
 
704
                        "column %d is not type TEXT", 3);
 
705
      return GEARMAN_QUEUE_ERROR;
 
706
    }
 
707
 
 
708
    GEARMAN_DEBUG(gearman, "sqlite replay: %s", (char*)function_name);
 
709
 
 
710
    gret= (*add_fn)(gearman, add_fn_arg,
 
711
                    unique, unique_size,
 
712
                    function_name, function_name_size,
 
713
                    data, data_size,
 
714
                    priority);
 
715
    if (gret != GEARMAN_SUCCESS)
 
716
    {
 
717
      sqlite3_finalize(sth);
 
718
      return gret;
 
719
    }
 
720
  }
 
721
 
 
722
  sqlite3_finalize(sth);
 
723
 
 
724
  return GEARMAN_SUCCESS;
 
725
}