3
* @brief Tokyo Cabinet Queue Storage Definitions
6
#include <libgearman-server/common.h>
9
#include <libgearman-server/plugins/queue/tokyocabinet/queue.h>
10
#include <libgearman-server/plugins/queue/base.h>
15
namespace gearmand { namespace plugins { namespace queue { class TokyoCabinet; }}}
18
* It is unclear from tokyocabinet's public headers what, if any, limit there is. 4k seems sane.
21
#define GEARMAN_QUEUE_TOKYOCABINET_MAX_KEY_LEN 4096
22
gearmand_error_t _initialize(gearman_server_st *server,
23
gearmand::plugins::queue::TokyoCabinet *queue);
29
class TokyoCabinet : public Queue {
34
gearmand_error_t initialize();
41
TokyoCabinet::TokyoCabinet() :
42
Queue("libtokyocabinet"),
46
command_line_options().add_options()
47
("libtokyocabinet-file", boost::program_options::value(&filename), "File name of the database. [see: man tcadb, tcadbopen() for name guidelines]")
48
("libtokyocabinet-optimize", boost::program_options::bool_switch(&optimize)->default_value(true), "Optimize database on open. [default=true]");
53
TokyoCabinet::~TokyoCabinet()
58
gearmand_error_t TokyoCabinet::initialize()
60
return _initialize(&Gearmand()->server, this);
63
void initialize_tokyocabinet()
65
static TokyoCabinet local_instance;
69
} // namespace plugins
70
} // namespace gearmand
74
* @addtogroup gearman_queue_libtokyocabinet libtokyocabinet Queue Storage Functions
75
* @ingroup gearman_queue
80
* Private declarations
83
/* Queue callback functions. */
84
static gearmand_error_t _libtokyocabinet_add(gearman_server_st *server, void *context,
87
const char *function_name,
88
size_t function_name_size,
89
const void *data, size_t data_size,
90
gearmand_job_priority_t priority,
93
static gearmand_error_t _libtokyocabinet_flush(gearman_server_st *server, void *context);
95
static gearmand_error_t _libtokyocabinet_done(gearman_server_st *server, void *context,
98
const char *function_name,
99
size_t function_name_size);
101
static gearmand_error_t _libtokyocabinet_replay(gearman_server_st *server, void *context,
102
gearman_queue_add_fn *add_fn,
105
#pragma GCC diagnostic ignored "-Wold-style-cast"
108
* Missing function from tcadb.c ??
110
static const char * _libtokyocabinet_tcaerrmsg(TCADB *db)
112
switch (tcadbomode(db))
115
return tcerrmsg(tchdbecode((TCHDB *)tcadbreveal(db)));
117
return tcerrmsg(tcbdbecode((TCBDB *)tcadbreveal(db)));
119
return tcerrmsg(TCEMISC);
123
gearmand_error_t _initialize(gearman_server_st *server,
124
gearmand::plugins::queue::TokyoCabinet *queue)
126
gearmand_info("Initializing libtokyocabinet module");
128
if ((queue->db= tcadbnew()) == NULL)
130
gearmand_error("tcadbnew");
131
return GEARMAN_QUEUE_ERROR;
134
if (queue->filename.empty())
136
gearmand_error("No --file given");
137
return GEARMAN_QUEUE_ERROR;
140
if (not tcadbopen(queue->db, queue->filename.c_str()))
144
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
145
"tcadbopen(%s): %s", queue->filename.c_str(), _libtokyocabinet_tcaerrmsg(queue->db));
147
return GEARMAN_QUEUE_ERROR;
152
gearmand_info("libtokyocabinet optimizing database file");
153
if (not tcadboptimize(queue->db, NULL))
156
return gearmand_gerror("tcadboptimize", GEARMAN_QUEUE_ERROR);
160
gearman_server_set_queue(server, queue, _libtokyocabinet_add, _libtokyocabinet_flush, _libtokyocabinet_done, _libtokyocabinet_replay);
162
return GEARMAN_SUCCESS;
166
* Private definitions
169
static gearmand_error_t _libtokyocabinet_add(gearman_server_st *server, void *context,
172
const char *function_name,
173
size_t function_name_size,
174
const void *data, size_t data_size,
175
gearmand_job_priority_t priority,
179
gearmand::plugins::queue::TokyoCabinet *queue= (gearmand::plugins::queue::TokyoCabinet *)context;
183
gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "libtokyocabinet add: %.*s at %lld", (uint32_t)unique_size, (char *)unique, (long long int)when);
185
char key_str[GEARMAN_QUEUE_TOKYOCABINET_MAX_KEY_LEN];
186
size_t key_length= (size_t)snprintf(key_str, GEARMAN_QUEUE_TOKYOCABINET_MAX_KEY_LEN, "%.*s-%.*s",
187
(int)function_name_size,
188
(const char *)function_name, (int)unique_size,
189
(const char *)unique);
192
tcxstrcat(key, key_str, (int)key_length);
194
gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "libtokyocabinet key: %.*s", (int)key_length, key_str);
196
job_data= tcxstrnew();
198
tcxstrcat(job_data, (const char *)function_name, (int)function_name_size);
199
tcxstrcat(job_data, "\0", 1);
200
tcxstrcat(job_data, (const char *)unique, (int)unique_size);
201
tcxstrcat(job_data, "\0", 1);
205
case GEARMAND_JOB_PRIORITY_HIGH:
206
case GEARMAND_JOB_PRIORITY_MAX:
207
tcxstrcat2(job_data, "0");
209
case GEARMAND_JOB_PRIORITY_LOW:
210
tcxstrcat2(job_data, "2");
212
case GEARMAND_JOB_PRIORITY_NORMAL:
214
tcxstrcat2(job_data, "1");
217
// get int64_t as string
219
snprintf(timestr, sizeof(timestr), "%lld", (long long int)when);
221
// append to job_data
222
tcxstrcat(job_data, (const char *)timestr, (int)strlen(timestr));
223
tcxstrcat(job_data, "\0", 1);
226
tcxstrcat(job_data, (const char *)data, (int)data_size);
228
bool rc= tcadbput(queue->db, tcxstrptr(key), tcxstrsize(key),
229
tcxstrptr(job_data), tcxstrsize(job_data));
235
return GEARMAN_SUCCESS;
237
return GEARMAN_QUEUE_ERROR;
240
static gearmand_error_t _libtokyocabinet_flush(gearman_server_st *, void *context)
242
gearmand::plugins::queue::TokyoCabinet *queue= (gearmand::plugins::queue::TokyoCabinet *)context;
244
gearmand_debug("libtokyocabinet flush");
246
if (not tcadbsync(queue->db))
247
return GEARMAN_QUEUE_ERROR;
249
return GEARMAN_SUCCESS;
252
static gearmand_error_t _libtokyocabinet_done(gearman_server_st *, void *context,
255
const char *function_name,
256
size_t function_name_size)
258
gearmand::plugins::queue::TokyoCabinet *queue= (gearmand::plugins::queue::TokyoCabinet *)context;
261
gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "libtokyocabinet add: %.*s", (uint32_t)unique_size, (char *)unique);
263
char key_str[GEARMAN_QUEUE_TOKYOCABINET_MAX_KEY_LEN];
264
size_t key_length= (size_t)snprintf(key_str, GEARMAN_QUEUE_TOKYOCABINET_MAX_KEY_LEN, "%.*s-%.*s",
265
(int)function_name_size,
266
(const char *)function_name, (int)unique_size,
267
(const char *)unique);
270
tcxstrcat(key, key_str, (int)key_length);
271
bool rc= tcadbout(queue->db, tcxstrptr(key), tcxstrsize(key));
275
return GEARMAN_SUCCESS;
277
return GEARMAN_QUEUE_ERROR;
280
static gearmand_error_t _callback_for_record(gearman_server_st *server,
281
TCXSTR *key, TCXSTR *data,
282
gearman_queue_add_fn *add_fn,
286
size_t data_cstr_size;
287
const char *function;
291
gearmand_job_priority_t priority;
292
gearmand_error_t gret;
295
gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "replaying: %s", (char *) tcxstrptr(key));
297
data_cstr= (char *)tcxstrptr(data);
298
data_cstr_size= (size_t)tcxstrsize(data);
301
function_len= strlen(function);
303
unique= data_cstr+function_len+1;
304
unique_len= strlen(unique); // strlen is only safe because tcxstrptr guarantees nul term
307
data_cstr += unique_len+function_len+2;
308
data_cstr_size -= unique_len+function_len+2;
313
assert(function_len);
315
// single char for priority
316
if (*data_cstr == '2')
317
priority = GEARMAND_JOB_PRIORITY_LOW;
318
else if (*data_cstr == '0')
319
priority = GEARMAND_JOB_PRIORITY_HIGH;
321
priority = GEARMAND_JOB_PRIORITY_NORMAL;
326
// out ptr for strtoul
327
char *new_data_cstr= NULL;
329
// parse time from record
330
when= (int64_t)strtoul(data_cstr, &new_data_cstr, 10);
332
// decrease opaque data size by the length of the numbers read by strtoul
333
data_cstr_size -= (new_data_cstr - data_cstr) + 1;
335
// move data pointer to end of timestamp + 1 (null)
336
data_cstr= new_data_cstr + 1;
338
// data is freed later so we must make a copy
339
void *data_ptr= (void *)malloc(data_cstr_size);
340
if (data_ptr == NULL)
342
return GEARMAN_QUEUE_ERROR;
344
memcpy(data_ptr, data_cstr, data_cstr_size);
346
gret = (*add_fn)(server, add_context, unique, unique_len,
347
function, function_len,
348
data_ptr, data_cstr_size,
351
if (gret != GEARMAN_SUCCESS)
355
return GEARMAN_SUCCESS;
359
static gearmand_error_t _libtokyocabinet_replay(gearman_server_st *server, void *context,
360
gearman_queue_add_fn *add_fn,
363
gearmand::plugins::queue::TokyoCabinet *queue= (gearmand::plugins::queue::TokyoCabinet *)context;
368
gearmand_error_t gret;
369
gearmand_error_t tmp_gret;
371
gearmand_info("libtokyocabinet replay start");
373
if (!tcadbiterinit(queue->db))
375
return GEARMAN_QUEUE_ERROR;
379
gret= GEARMAN_SUCCESS;
381
while ((iter= tcadbiternext(queue->db, &iter_size)))
385
tcxstrcat(key, iter, iter_size);
387
iter= tcadbget(queue->db, tcxstrptr(key), tcxstrsize(key), &iter_size);
390
gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "libtokyocabinet replay key disappeared: %s", (char *)tcxstrptr(key));
393
tcxstrcat(data, iter, iter_size);
395
tmp_gret= _callback_for_record(server, key, data, add_fn, add_context);
396
if (tmp_gret != GEARMAN_SUCCESS)
398
gret= GEARMAN_QUEUE_ERROR;
406
gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "libtokyocabinet replayed %ld records", x);