~clint-fewbar/ubuntu/precise/gearmand/drop-unneeded-patches

« back to all changes in this revision

Viewing changes to libgearman-server/plugins/queue/tokyocabinet/queue.cc

  • Committer: Package Import Robot
  • Author(s): Stig Sandbeck Mathisen, Michael Fladischer, Stig Sandbeck Mathisen
  • Date: 2012-01-23 11:31:08 UTC
  • mfrom: (6.1.2 sid)
  • Revision ID: package-import@ubuntu.com-20120123113108-wl1yhiba13q9jusb
Tags: 0.27-1
[Michael Fladischer]
* Patch: fix spelling
* Patch: remove dependency on googleanalytics
* Patch: fix tests
* Use non-authenticating URL for Vcs-Git.
* Add "status" action to init script.

[Stig Sandbeck Mathisen]
* New upstream release (Closes: #621486) (LP: #682680)
* Remove build dependency on drizzle
  (until it reaches testing again)
* Build with support for tokyocabinet
* Remove backported ipv6 patch
* Patch: disable hostile build tests, they take hours...
* Patch: workaround duplicate address issue for tests
* Do not build API documentation, the sources are not shipped in
  upstream tarball
* Update debian/copyright

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/**
 
2
 * @file
 
3
 * @brief Tokyo Cabinet Queue Storage Definitions
 
4
 */
 
5
 
 
6
#include <libgearman-server/common.h>
 
7
#include <inttypes.h>
 
8
 
 
9
#include <libgearman-server/plugins/queue/tokyocabinet/queue.h>
 
10
#include <libgearman-server/plugins/queue/base.h>
 
11
 
 
12
#include <tcutil.h>
 
13
#include <tcadb.h>
 
14
 
 
15
namespace gearmand { namespace plugins { namespace queue { class TokyoCabinet;  }}}
 
16
 
 
17
/**
 
18
 * It is unclear from tokyocabinet's public headers what, if any, limit there is. 4k seems sane.
 
19
 */
 
20
 
 
21
#define GEARMAN_QUEUE_TOKYOCABINET_MAX_KEY_LEN 4096
 
22
gearmand_error_t _initialize(gearman_server_st *server,
 
23
                             gearmand::plugins::queue::TokyoCabinet *queue);
 
24
 
 
25
namespace gearmand {
 
26
namespace plugins {
 
27
namespace queue {
 
28
 
 
29
class TokyoCabinet : public Queue {
 
30
public:
 
31
  TokyoCabinet();
 
32
  ~TokyoCabinet();
 
33
 
 
34
  gearmand_error_t initialize();
 
35
 
 
36
  TCADB *db;
 
37
  std::string filename;
 
38
  bool optimize;
 
39
};
 
40
 
 
41
TokyoCabinet::TokyoCabinet() :
 
42
  Queue("libtokyocabinet"),
 
43
  db(NULL),
 
44
  optimize(false)
 
45
{
 
46
  command_line_options().add_options()
 
47
    ("libtokyocabinet-file", boost::program_options::value(&filename), "File name of the database. [see: man tcadb, tcadbopen() for name guidelines]")
 
48
    ("libtokyocabinet-optimize", boost::program_options::bool_switch(&optimize)->default_value(true), "Optimize database on open. [default=true]");
 
49
 
 
50
  db= tcadbnew();
 
51
}
 
52
 
 
53
TokyoCabinet::~TokyoCabinet()
 
54
{
 
55
  tcadbdel(db);
 
56
}
 
57
 
 
58
gearmand_error_t TokyoCabinet::initialize()
 
59
{
 
60
  return _initialize(&Gearmand()->server, this);
 
61
}
 
62
 
 
63
void initialize_tokyocabinet()
 
64
{
 
65
  static TokyoCabinet local_instance;
 
66
}
 
67
 
 
68
} // namespace queue
 
69
} // namespace plugins
 
70
} // namespace gearmand
 
71
 
 
72
 
 
73
/**
 
74
 * @addtogroup gearman_queue_libtokyocabinet libtokyocabinet Queue Storage Functions
 
75
 * @ingroup gearman_queue
 
76
 * @{
 
77
 */
 
78
 
 
79
/*
 
80
 * Private declarations
 
81
 */
 
82
 
 
83
/* Queue callback functions. */
 
84
static gearmand_error_t _libtokyocabinet_add(gearman_server_st *server, void *context,
 
85
                                             const char *unique,
 
86
                                             size_t unique_size,
 
87
                                             const char *function_name,
 
88
                                             size_t function_name_size,
 
89
                                             const void *data, size_t data_size,
 
90
                                             gearmand_job_priority_t priority,
 
91
                                             int64_t when);
 
92
 
 
93
static gearmand_error_t _libtokyocabinet_flush(gearman_server_st *server, void *context);
 
94
 
 
95
static gearmand_error_t _libtokyocabinet_done(gearman_server_st *server, void *context,
 
96
                                              const char *unique,
 
97
                                              size_t unique_size, 
 
98
                                              const char *function_name, 
 
99
                                              size_t function_name_size);
 
100
 
 
101
static gearmand_error_t _libtokyocabinet_replay(gearman_server_st *server, void *context,
 
102
                                                gearman_queue_add_fn *add_fn,
 
103
                                                void *add_context);
 
104
 
 
105
#pragma GCC diagnostic ignored "-Wold-style-cast"
 
106
 
 
107
/**
 
108
 * Missing function from tcadb.c ??
 
109
 */
 
110
static const char * _libtokyocabinet_tcaerrmsg(TCADB *db)
 
111
{
 
112
  switch (tcadbomode(db))
 
113
  {
 
114
  case ADBOHDB:
 
115
    return tcerrmsg(tchdbecode((TCHDB *)tcadbreveal(db)));
 
116
  case ADBOBDB:
 
117
    return tcerrmsg(tcbdbecode((TCBDB *)tcadbreveal(db)));
 
118
  default:
 
119
    return tcerrmsg(TCEMISC);
 
120
  }
 
121
}
 
122
 
 
123
gearmand_error_t _initialize(gearman_server_st *server,
 
124
                             gearmand::plugins::queue::TokyoCabinet *queue)
 
125
{
 
126
  gearmand_info("Initializing libtokyocabinet module");
 
127
 
 
128
  if ((queue->db= tcadbnew()) == NULL)
 
129
  {
 
130
    gearmand_error("tcadbnew");
 
131
    return GEARMAN_QUEUE_ERROR;
 
132
  }
 
133
     
 
134
  if (queue->filename.empty())
 
135
  {
 
136
    gearmand_error("No --file given");
 
137
    return GEARMAN_QUEUE_ERROR;
 
138
  }
 
139
 
 
140
  if (not tcadbopen(queue->db, queue->filename.c_str()))
 
141
  {
 
142
    tcadbdel(queue->db);
 
143
 
 
144
    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, 
 
145
                       "tcadbopen(%s): %s", queue->filename.c_str(), _libtokyocabinet_tcaerrmsg(queue->db));
 
146
 
 
147
    return GEARMAN_QUEUE_ERROR;
 
148
  }
 
149
 
 
150
  if (queue->optimize)
 
151
  {
 
152
    gearmand_info("libtokyocabinet optimizing database file");
 
153
    if (not tcadboptimize(queue->db, NULL))
 
154
    {
 
155
      tcadbdel(queue->db);
 
156
      return gearmand_gerror("tcadboptimize", GEARMAN_QUEUE_ERROR);
 
157
    }
 
158
  }
 
159
 
 
160
  gearman_server_set_queue(server, queue, _libtokyocabinet_add, _libtokyocabinet_flush, _libtokyocabinet_done, _libtokyocabinet_replay);   
 
161
   
 
162
  return GEARMAN_SUCCESS;
 
163
}
 
164
 
 
165
/*
 
166
 * Private definitions
 
167
 */
 
168
 
 
169
static gearmand_error_t _libtokyocabinet_add(gearman_server_st *server, void *context,
 
170
                                             const char *unique,
 
171
                                             size_t unique_size,
 
172
                                             const char *function_name,
 
173
                                             size_t function_name_size,
 
174
                                             const void *data, size_t data_size,
 
175
                                             gearmand_job_priority_t priority,
 
176
                                             int64_t when)
 
177
{
 
178
  (void)server;
 
179
  gearmand::plugins::queue::TokyoCabinet *queue= (gearmand::plugins::queue::TokyoCabinet *)context;
 
180
  TCXSTR *key;
 
181
  TCXSTR *job_data;
 
182
 
 
183
  gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "libtokyocabinet add: %.*s at %lld", (uint32_t)unique_size, (char *)unique, (long long int)when);
 
184
 
 
185
  char key_str[GEARMAN_QUEUE_TOKYOCABINET_MAX_KEY_LEN];
 
186
  size_t key_length= (size_t)snprintf(key_str, GEARMAN_QUEUE_TOKYOCABINET_MAX_KEY_LEN, "%.*s-%.*s",
 
187
                               (int)function_name_size,
 
188
                               (const char *)function_name, (int)unique_size,
 
189
                               (const char *)unique);
 
190
 
 
191
  key= tcxstrnew();
 
192
  tcxstrcat(key, key_str, (int)key_length);
 
193
 
 
194
  gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "libtokyocabinet key: %.*s", (int)key_length, key_str);
 
195
 
 
196
  job_data= tcxstrnew();
 
197
 
 
198
  tcxstrcat(job_data, (const char *)function_name, (int)function_name_size);
 
199
  tcxstrcat(job_data, "\0", 1);
 
200
  tcxstrcat(job_data, (const char *)unique, (int)unique_size);
 
201
  tcxstrcat(job_data, "\0", 1);
 
202
 
 
203
  switch (priority)
 
204
  {
 
205
   case GEARMAND_JOB_PRIORITY_HIGH:
 
206
   case GEARMAND_JOB_PRIORITY_MAX:     
 
207
     tcxstrcat2(job_data, "0");
 
208
     break;
 
209
   case GEARMAND_JOB_PRIORITY_LOW:
 
210
     tcxstrcat2(job_data, "2");
 
211
     break;
 
212
   case GEARMAND_JOB_PRIORITY_NORMAL:
 
213
   default:
 
214
     tcxstrcat2(job_data, "1");
 
215
  }
 
216
 
 
217
  // get int64_t as string
 
218
  char timestr[32];
 
219
  snprintf(timestr, sizeof(timestr), "%lld", (long long int)when);
 
220
 
 
221
  // append to job_data
 
222
  tcxstrcat(job_data, (const char *)timestr, (int)strlen(timestr));
 
223
  tcxstrcat(job_data, "\0", 1);
 
224
  
 
225
  // add the rest...
 
226
  tcxstrcat(job_data, (const char *)data, (int)data_size);
 
227
 
 
228
  bool rc= tcadbput(queue->db, tcxstrptr(key), tcxstrsize(key),
 
229
                    tcxstrptr(job_data), tcxstrsize(job_data));
 
230
 
 
231
  tcxstrdel(key);
 
232
  tcxstrdel(job_data);
 
233
 
 
234
  if (rc) // Success
 
235
    return GEARMAN_SUCCESS;
 
236
 
 
237
  return GEARMAN_QUEUE_ERROR;
 
238
}
 
239
 
 
240
static gearmand_error_t _libtokyocabinet_flush(gearman_server_st *, void *context)
 
241
{
 
242
  gearmand::plugins::queue::TokyoCabinet *queue= (gearmand::plugins::queue::TokyoCabinet *)context;
 
243
   
 
244
  gearmand_debug("libtokyocabinet flush");
 
245
 
 
246
  if (not tcadbsync(queue->db))
 
247
     return GEARMAN_QUEUE_ERROR;
 
248
   
 
249
  return GEARMAN_SUCCESS;
 
250
}
 
251
 
 
252
static gearmand_error_t _libtokyocabinet_done(gearman_server_st *, void *context,
 
253
                                              const char *unique,
 
254
                                              size_t unique_size, 
 
255
                                              const char *function_name,
 
256
                                              size_t function_name_size)
 
257
{
 
258
  gearmand::plugins::queue::TokyoCabinet *queue= (gearmand::plugins::queue::TokyoCabinet *)context;
 
259
  TCXSTR *key;
 
260
 
 
261
  gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "libtokyocabinet add: %.*s", (uint32_t)unique_size, (char *)unique);
 
262
  
 
263
  char key_str[GEARMAN_QUEUE_TOKYOCABINET_MAX_KEY_LEN];
 
264
  size_t key_length= (size_t)snprintf(key_str, GEARMAN_QUEUE_TOKYOCABINET_MAX_KEY_LEN, "%.*s-%.*s",
 
265
                                      (int)function_name_size,
 
266
                                      (const char *)function_name, (int)unique_size,
 
267
                                      (const char *)unique);
 
268
 
 
269
  key= tcxstrnew();
 
270
  tcxstrcat(key, key_str, (int)key_length);
 
271
  bool rc= tcadbout(queue->db, tcxstrptr(key), tcxstrsize(key));
 
272
  tcxstrdel(key);
 
273
 
 
274
  if (rc)
 
275
    return GEARMAN_SUCCESS;
 
276
 
 
277
  return GEARMAN_QUEUE_ERROR;
 
278
}
 
279
 
 
280
static gearmand_error_t _callback_for_record(gearman_server_st *server,
 
281
                                             TCXSTR *key, TCXSTR *data,
 
282
                                             gearman_queue_add_fn *add_fn,
 
283
                                             void *add_context)
 
284
{
 
285
  char *data_cstr;
 
286
  size_t data_cstr_size;
 
287
  const char *function;
 
288
  size_t function_len;
 
289
  char *unique;
 
290
  size_t unique_len;
 
291
  gearmand_job_priority_t priority;
 
292
  gearmand_error_t gret;
 
293
  int64_t when; 
 
294
  
 
295
  gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "replaying: %s", (char *) tcxstrptr(key));
 
296
 
 
297
  data_cstr= (char *)tcxstrptr(data);
 
298
  data_cstr_size= (size_t)tcxstrsize(data);
 
299
 
 
300
  function= data_cstr;
 
301
  function_len= strlen(function);
 
302
 
 
303
  unique= data_cstr+function_len+1;
 
304
  unique_len= strlen(unique); // strlen is only safe because tcxstrptr guarantees nul term
 
305
 
 
306
  // +2 for nulls
 
307
  data_cstr += unique_len+function_len+2;
 
308
  data_cstr_size -= unique_len+function_len+2;
 
309
 
 
310
  assert(unique);
 
311
  assert(unique_len);
 
312
  assert(function);
 
313
  assert(function_len);
 
314
 
 
315
  // single char for priority
 
316
  if (*data_cstr == '2')
 
317
    priority = GEARMAND_JOB_PRIORITY_LOW;
 
318
  else if (*data_cstr == '0')
 
319
    priority = GEARMAND_JOB_PRIORITY_HIGH;
 
320
  else
 
321
    priority = GEARMAND_JOB_PRIORITY_NORMAL;
 
322
 
 
323
  ++data_cstr;
 
324
  --data_cstr_size;
 
325
 
 
326
  // out ptr for strtoul
 
327
  char *new_data_cstr= NULL;
 
328
  
 
329
  // parse time from record
 
330
  when= (int64_t)strtoul(data_cstr, &new_data_cstr, 10);
 
331
  
 
332
  // decrease opaque data size by the length of the numbers read by strtoul
 
333
  data_cstr_size -= (new_data_cstr - data_cstr) + 1;
 
334
  
 
335
  // move data pointer to end of timestamp + 1 (null)
 
336
  data_cstr= new_data_cstr + 1; 
 
337
  
 
338
  // data is freed later so we must make a copy
 
339
  void *data_ptr= (void *)malloc(data_cstr_size);
 
340
  if (data_ptr == NULL)
 
341
  {
 
342
    return GEARMAN_QUEUE_ERROR;
 
343
  }
 
344
  memcpy(data_ptr, data_cstr, data_cstr_size); 
 
345
 
 
346
  gret = (*add_fn)(server, add_context, unique, unique_len,
 
347
                   function, function_len,
 
348
                   data_ptr, data_cstr_size,
 
349
                   priority, when);
 
350
 
 
351
  if (gret != GEARMAN_SUCCESS)
 
352
  {
 
353
     return gret;
 
354
  }   
 
355
  return GEARMAN_SUCCESS;
 
356
}
 
357
 
 
358
 
 
359
static gearmand_error_t _libtokyocabinet_replay(gearman_server_st *server, void *context,
 
360
                                                gearman_queue_add_fn *add_fn,
 
361
                                                void *add_context)
 
362
{
 
363
  gearmand::plugins::queue::TokyoCabinet *queue= (gearmand::plugins::queue::TokyoCabinet *)context;
 
364
  TCXSTR *key;
 
365
  TCXSTR *data;
 
366
  void *iter= NULL;
 
367
  int iter_size= 0;
 
368
  gearmand_error_t gret;
 
369
  gearmand_error_t tmp_gret;   
 
370
   
 
371
  gearmand_info("libtokyocabinet replay start");
 
372
  
 
373
  if (!tcadbiterinit(queue->db))
 
374
  {
 
375
    return GEARMAN_QUEUE_ERROR;
 
376
  }
 
377
  key= tcxstrnew();
 
378
  data= tcxstrnew();
 
379
  gret= GEARMAN_SUCCESS;
 
380
  uint64_t x= 0;
 
381
  while ((iter= tcadbiternext(queue->db, &iter_size)))
 
382
  {     
 
383
    tcxstrclear(key);
 
384
    tcxstrclear(data);
 
385
    tcxstrcat(key, iter, iter_size);
 
386
    free(iter);
 
387
    iter= tcadbget(queue->db, tcxstrptr(key), tcxstrsize(key), &iter_size);
 
388
    if (not iter)
 
389
    {
 
390
      gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "libtokyocabinet replay key disappeared: %s", (char *)tcxstrptr(key));
 
391
      continue;
 
392
    }
 
393
    tcxstrcat(data, iter, iter_size);
 
394
    free(iter);
 
395
    tmp_gret= _callback_for_record(server, key, data, add_fn, add_context);
 
396
    if (tmp_gret != GEARMAN_SUCCESS)
 
397
    {
 
398
      gret= GEARMAN_QUEUE_ERROR;
 
399
      break;
 
400
    }
 
401
    ++x;
 
402
  }
 
403
  tcxstrdel(key);
 
404
  tcxstrdel(data);
 
405
 
 
406
  gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "libtokyocabinet replayed %ld records", x);
 
407
 
 
408
  return gret;
 
409
}