~brianaker/libmemcached/1164440

« back to all changes in this revision

Viewing changes to example/storage_innodb.c

Merge in build trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2
 
#include <stdlib.h>
3
 
#include <inttypes.h>
4
 
#include <time.h>
5
 
#include <stdbool.h>
6
 
#include <string.h>
7
 
#include <unistd.h>
8
 
#include <assert.h>
9
 
#include <embedded_innodb-1.0/innodb.h>
10
 
 
11
 
#include "storage.h"
12
 
 
13
 
const char *tablename= "memcached/items";
14
 
 
15
 
#define key_col_idx 0
16
 
#define data_col_idx 1
17
 
#define flags_col_idx 2
18
 
#define cas_col_idx 3
19
 
#define exp_col_idx 4
20
 
 
21
 
static uint64_t cas;
22
 
 
23
 
/**
24
 
 * To avoid cluttering down all the code with error checking I use the
25
 
 * following macro. It will execute the statement and verify that the
26
 
 * result of the operation is DB_SUCCESS. If any other error code is
27
 
 * returned it will print an "assert-like" output and jump to the
28
 
 * label error_exit. There I release resources before returning out of
29
 
 * the function.
30
 
 *
31
 
 * @param a the expression to execute
32
 
 *
33
 
 */
34
 
#define checked(expression)                                    \
35
 
do {                                                           \
36
 
  ib_err_t checked_err= expression;                            \
37
 
  if (checked_err != DB_SUCCESS)                               \
38
 
  {                                                            \
39
 
    fprintf(stderr, "ERROR: %s at %u: Failed: <%s>\n\t%s\n",   \
40
 
            __FILE__, __LINE__, #expression,                   \
41
 
            ib_strerror(checked_err));                         \
42
 
    goto error_exit;                                           \
43
 
  }                                                            \
44
 
} while (0);
45
 
 
46
 
/**
47
 
 * Create the database schema.
48
 
 * @return true if the database schema was created without any problems
49
 
 *         false otherwise.
50
 
 */
51
 
static bool create_schema(void) 
52
 
{
53
 
  ib_tbl_sch_t schema= NULL;
54
 
  ib_idx_sch_t dbindex= NULL;
55
 
 
56
 
  if (ib_database_create("memcached") != IB_TRUE)
57
 
  {
58
 
    fprintf(stderr, "Failed to create database\n");
59
 
    return false;
60
 
  }
61
 
 
62
 
  ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
63
 
  ib_id_t table_id;
64
 
 
65
 
  checked(ib_table_schema_create(tablename, &schema, IB_TBL_COMPACT, 0));
66
 
  checked(ib_table_schema_add_col(schema, "key", IB_BLOB,
67
 
                                  IB_COL_NOT_NULL, 0, 32767));
68
 
  checked(ib_table_schema_add_col(schema, "data", IB_BLOB,
69
 
                                  IB_COL_NONE, 0, 1024*1024));
70
 
  checked(ib_table_schema_add_col(schema, "flags", IB_INT,
71
 
                                  IB_COL_UNSIGNED, 0, 4));
72
 
  checked(ib_table_schema_add_col(schema, "cas", IB_INT,
73
 
                                  IB_COL_UNSIGNED, 0, 8));
74
 
  checked(ib_table_schema_add_col(schema, "exp", IB_INT,
75
 
                                  IB_COL_UNSIGNED, 0, 4));
76
 
  checked(ib_table_schema_add_index(schema, "PRIMARY_KEY", &dbindex));
77
 
  checked(ib_index_schema_add_col(dbindex, "key", 0));
78
 
  checked(ib_index_schema_set_clustered(dbindex));
79
 
  checked(ib_schema_lock_exclusive(transaction));
80
 
  checked(ib_table_create(transaction, schema, &table_id));
81
 
  checked(ib_trx_commit(transaction));
82
 
  ib_table_schema_delete(schema);
83
 
 
84
 
  return true;
85
 
 
86
 
 error_exit:
87
 
  /* @todo release resources! */
88
 
  {
89
 
    ib_err_t error= ib_trx_rollback(transaction);
90
 
    if (error != DB_SUCCESS)
91
 
      fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
92
 
              ib_strerror(error));
93
 
  }
94
 
  return false;
95
 
}
96
 
 
97
 
/**
98
 
 * Store an item into the database. Update the CAS id on the item before
99
 
 * storing it in the database.
100
 
 *
101
 
 * @param trx the transaction to use
102
 
 * @param item the item to store
103
 
 * @return true if we can go ahead and commit the transaction, false otherwise
104
 
 */
105
 
static bool do_put_item(ib_trx_t trx, struct item* item) 
106
 
{
107
 
  update_cas(item);
108
 
 
109
 
  ib_crsr_t cursor= NULL;
110
 
  ib_tpl_t tuple= NULL;
111
 
  bool retval= false;
112
 
 
113
 
  checked(ib_cursor_open_table(tablename, trx, &cursor));
114
 
  checked(ib_cursor_lock(cursor, IB_LOCK_X));
115
 
  tuple= ib_clust_read_tuple_create(cursor);
116
 
 
117
 
  checked(ib_col_set_value(tuple, key_col_idx, item->key, item->nkey));
118
 
  checked(ib_col_set_value(tuple, data_col_idx, item->data, item->size));
119
 
  checked(ib_tuple_write_u32(tuple, flags_col_idx, item->flags));
120
 
  checked(ib_tuple_write_u64(tuple, cas_col_idx, item->cas));
121
 
  checked(ib_tuple_write_u32(tuple, exp_col_idx, (ib_u32_t)item->exp));
122
 
  checked(ib_cursor_insert_row(cursor, tuple));
123
 
 
124
 
  retval= true;
125
 
  /* Release resources: */
126
 
  /* FALLTHROUGH */
127
 
 
128
 
 error_exit:
129
 
  if (tuple != NULL)
130
 
    ib_tuple_delete(tuple);
131
 
 
132
 
  ib_err_t currsor_error;
133
 
  if (cursor != NULL)
134
 
    currsor_error= ib_cursor_close(cursor);
135
 
 
136
 
  return retval;
137
 
}
138
 
 
139
 
/**
140
 
 * Try to locate an item in the database. Return a cursor and the tuple to
141
 
 * the item if I found it in the database.
142
 
 *
143
 
 * @param trx the transaction to use
144
 
 * @param key the key of the item to look up
145
 
 * @param nkey the size of the key
146
 
 * @param cursor where to store the cursor (OUT)
147
 
 * @param tuple where to store the tuple (OUT)
148
 
 * @return true if I found the object, false otherwise
149
 
 */
150
 
static bool do_locate_item(ib_trx_t trx,
151
 
                           const void* key,
152
 
                           size_t nkey,
153
 
                           ib_crsr_t *cursor)
154
 
{
155
 
  int res;
156
 
  ib_tpl_t tuple= NULL;
157
 
 
158
 
  *cursor= NULL;
159
 
 
160
 
  checked(ib_cursor_open_table(tablename, trx, cursor));
161
 
  tuple= ib_clust_search_tuple_create(*cursor);
162
 
  if (tuple == NULL)
163
 
  {
164
 
    fprintf(stderr, "Failed to allocate tuple object\n");
165
 
    goto error_exit;
166
 
  }
167
 
 
168
 
  checked(ib_col_set_value(tuple, key_col_idx, key, nkey));
169
 
  ib_err_t err= ib_cursor_moveto(*cursor, tuple, IB_CUR_GE, &res);
170
 
 
171
 
  if (err == DB_SUCCESS && res == 0)
172
 
  {
173
 
    ib_tuple_delete(tuple);
174
 
    return true;
175
 
  }
176
 
  else if (err != DB_SUCCESS &&
177
 
           err != DB_RECORD_NOT_FOUND &&
178
 
           err != DB_END_OF_INDEX)
179
 
  {
180
 
    fprintf(stderr, "ERROR: ib_cursor_moveto(): %s\n", ib_strerror(err));
181
 
  }
182
 
  /* FALLTHROUGH */
183
 
 error_exit:
184
 
  if (tuple != NULL)
185
 
    ib_tuple_delete(tuple);
186
 
 
187
 
  ib_err_t cursor_error;
188
 
  if (*cursor != NULL)
189
 
    cursor_error= ib_cursor_close(*cursor);
190
 
 
191
 
  *cursor= NULL;
192
 
 
193
 
  return false;
194
 
}
195
 
 
196
 
/**
197
 
 * Try to get an item from the database
198
 
 *
199
 
 * @param trx the transaction to use
200
 
 * @param key the key to get
201
 
 * @param nkey the lenght of the key
202
 
 * @return a pointer to the item if I found it in the database
203
 
 */
204
 
static struct item* do_get_item(ib_trx_t trx, const void* key, size_t nkey) 
205
 
{
206
 
  ib_crsr_t cursor= NULL;
207
 
  ib_tpl_t tuple= NULL;
208
 
  struct item* retval= NULL;
209
 
 
210
 
  if (do_locate_item(trx, key, nkey, &cursor)) 
211
 
  {
212
 
    tuple= ib_clust_read_tuple_create(cursor);
213
 
    if (tuple == NULL)
214
 
    {
215
 
      fprintf(stderr, "Failed to create read tuple\n");
216
 
      goto error_exit;
217
 
    }
218
 
    checked(ib_cursor_read_row(cursor, tuple));
219
 
    ib_col_meta_t meta;
220
 
    ib_ulint_t datalen= ib_col_get_meta(tuple, data_col_idx, &meta);
221
 
    ib_ulint_t flaglen= ib_col_get_meta(tuple, flags_col_idx, &meta);
222
 
    ib_ulint_t caslen= ib_col_get_meta(tuple, cas_col_idx, &meta);
223
 
    ib_ulint_t explen= ib_col_get_meta(tuple, exp_col_idx, &meta);
224
 
    const void *dataptr= ib_col_get_value(tuple, data_col_idx);
225
 
 
226
 
    retval= create_item(key, nkey, dataptr, datalen, 0, 0);
227
 
    if (retval == NULL) 
228
 
    {
229
 
      fprintf(stderr, "Failed to allocate memory\n");
230
 
      goto error_exit;
231
 
    }
232
 
 
233
 
    if (flaglen != 0) 
234
 
    {
235
 
      ib_u32_t val;
236
 
      checked(ib_tuple_read_u32(tuple, flags_col_idx, &val));
237
 
      retval->flags= (uint32_t)val;
238
 
    }
239
 
    if (caslen != 0) 
240
 
    {
241
 
      ib_u64_t val;
242
 
      checked(ib_tuple_read_u64(tuple, cas_col_idx, &val));
243
 
      retval->cas= (uint64_t)val;
244
 
    }
245
 
    if (explen != 0) 
246
 
    {
247
 
      ib_u32_t val;
248
 
      checked(ib_tuple_read_u32(tuple, exp_col_idx, &val));
249
 
      retval->exp= (time_t)val;
250
 
    }
251
 
  }
252
 
 
253
 
  /* Release resources */
254
 
  /* FALLTHROUGH */
255
 
 
256
 
 error_exit:
257
 
  if (tuple != NULL)
258
 
    ib_tuple_delete(tuple);
259
 
 
260
 
  ib_err_t cursor_error;
261
 
  if (cursor != NULL)
262
 
    cursor_error= ib_cursor_close(cursor);
263
 
 
264
 
  return retval;
265
 
}
266
 
 
267
 
/**
268
 
 * Delete an item from the cache
269
 
 * @param trx the transaction to use
270
 
 * @param key the key of the item to delete
271
 
 * @param nkey the length of the key
272
 
 * @return true if we should go ahead and commit the transaction
273
 
 *         or false if we should roll back (if the key didn't exists)
274
 
 */
275
 
static bool do_delete_item(ib_trx_t trx, const void* key, size_t nkey) {
276
 
  ib_crsr_t cursor= NULL;
277
 
  bool retval= false;
278
 
 
279
 
  if (do_locate_item(trx, key, nkey, &cursor))
280
 
  {
281
 
    checked(ib_cursor_lock(cursor, IB_LOCK_X));
282
 
    checked(ib_cursor_delete_row(cursor));
283
 
    retval= true;
284
 
  }
285
 
  /* Release resources */
286
 
  /* FALLTHROUGH */
287
 
 
288
 
 error_exit:
289
 
  if (cursor != NULL)
290
 
  {
291
 
    ib_err_t cursor_error;
292
 
    cursor_error= ib_cursor_close(cursor);
293
 
  }
294
 
 
295
 
  return retval;
296
 
}
297
 
 
298
 
 
299
 
/****************************************************************************
300
 
 * External interface
301
 
 ***************************************************************************/
302
 
 
303
 
/**
304
 
 * Initialize the database storage
305
 
 * @return true if the database was initialized successfully, false otherwise
306
 
 */
307
 
bool initialize_storage(void) 
308
 
{
309
 
  ib_err_t error;
310
 
  ib_id_t tid;
311
 
 
312
 
  checked(ib_init());
313
 
  checked(ib_cfg_set_text("data_home_dir", "/tmp/memcached_light"));
314
 
  checked(ib_cfg_set_text("log_group_home_dir", "/tmp/memcached_light"));
315
 
  checked(ib_cfg_set_bool_on("file_per_table"));
316
 
  checked(ib_startup("barracuda"));
317
 
 
318
 
  /* check to see if the table exists or if we should create the schema */
319
 
  error= ib_table_get_id(tablename, &tid);
320
 
  if (error == DB_TABLE_NOT_FOUND) 
321
 
  {
322
 
    if (!create_schema()) 
323
 
    {
324
 
      return false;
325
 
    }
326
 
  } 
327
 
  else if (error != DB_SUCCESS) 
328
 
  {
329
 
    fprintf(stderr, "Failed to get table id: %s\n", ib_strerror(error));
330
 
    return false;
331
 
  }
332
 
 
333
 
  return true;
334
 
 
335
 
 error_exit:
336
 
 
337
 
  return false;
338
 
}
339
 
 
340
 
/**
341
 
 * Shut down this storage engine
342
 
 */
343
 
void shutdown_storage(void) 
344
 
{
345
 
  checked(ib_shutdown(IB_SHUTDOWN_NORMAL));
346
 
 error_exit:
347
 
  ;
348
 
}
349
 
 
350
 
/**
351
 
 * Store an item in the databse
352
 
 *
353
 
 * @param item the item to store
354
 
 */
355
 
void put_item(struct item* item) 
356
 
{
357
 
  ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
358
 
  if (do_put_item(transaction, item)) 
359
 
  {
360
 
    ib_err_t error= ib_trx_commit(transaction);
361
 
    if (error != DB_SUCCESS) 
362
 
    {
363
 
      fprintf(stderr, "Failed to store key:\n\t%s\n",
364
 
              ib_strerror(error));
365
 
    }
366
 
  } 
367
 
  else 
368
 
  {
369
 
    ib_err_t error= ib_trx_rollback(transaction);
370
 
    if (error != DB_SUCCESS)
371
 
      fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
372
 
              ib_strerror(error));
373
 
  }
374
 
}
375
 
 
376
 
/**
377
 
 * Get an item from the engine
378
 
 * @param key the key to grab
379
 
 * @param nkey number of bytes in the key
380
 
 * @return pointer to the item if found
381
 
 */
382
 
struct item* get_item(const void* key, size_t nkey) 
383
 
{
384
 
  ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
385
 
  struct item* ret= do_get_item(transaction, key, nkey);
386
 
  ib_err_t error= ib_trx_rollback(transaction);
387
 
 
388
 
  if (error != DB_SUCCESS)
389
 
    fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
390
 
            ib_strerror(error));
391
 
 
392
 
  return ret;
393
 
}
394
 
 
395
 
/**
396
 
 * Create an item structure and initialize it with the content
397
 
 *
398
 
 * @param key the key for the item
399
 
 * @param nkey the number of bytes in the key
400
 
 * @param data pointer to the value for the item (may be NULL)
401
 
 * @param size the size of the data
402
 
 * @param flags the flags to store with the data
403
 
 * @param exp the expiry time for the item
404
 
 * @return pointer to an initialized item object or NULL if allocation failed
405
 
 */
406
 
struct item* create_item(const void* key, size_t nkey, const void* data,
407
 
                         size_t size, uint32_t flags, time_t exp)
408
 
{
409
 
  struct item* ret= calloc(1, sizeof(*ret));
410
 
  if (ret != NULL)
411
 
  {
412
 
    ret->key= malloc(nkey);
413
 
    if (size > 0)
414
 
    {
415
 
      ret->data= malloc(size);
416
 
    }
417
 
 
418
 
    if (ret->key == NULL || (size > 0 && ret->data == NULL))
419
 
    {
420
 
      free(ret->key);
421
 
      free(ret->data);
422
 
      free(ret);
423
 
      return NULL;
424
 
    }
425
 
 
426
 
    memcpy(ret->key, key, nkey);
427
 
    if (data != NULL)
428
 
    {
429
 
      memcpy(ret->data, data, size);
430
 
    }
431
 
 
432
 
    ret->nkey= nkey;
433
 
    ret->size= size;
434
 
    ret->flags= flags;
435
 
    ret->exp= exp;
436
 
  }
437
 
 
438
 
  return ret;
439
 
}
440
 
 
441
 
/**
442
 
 * Delete an item from the cache
443
 
 * @param key the key of the item to delete
444
 
 * @param nkey the length of the key
445
 
 * @return true if the item was deleted from the cache
446
 
 */
447
 
bool delete_item(const void* key, size_t nkey) {
448
 
  ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
449
 
 
450
 
  bool ret= do_delete_item(transaction, key, nkey);
451
 
 
452
 
  if (ret)
453
 
  {
454
 
    /* object found. commit transaction */
455
 
    ib_err_t error= ib_trx_commit(transaction);
456
 
    if (error != DB_SUCCESS)
457
 
    {
458
 
      fprintf(stderr, "Failed to delete key:\n\t%s\n",
459
 
              ib_strerror(error));
460
 
      ret= false;
461
 
    }
462
 
  }
463
 
  else
464
 
  {
465
 
    ib_err_t error= ib_trx_rollback(transaction);
466
 
    if (error != DB_SUCCESS)
467
 
      fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
468
 
              ib_strerror(error));
469
 
  }
470
 
 
471
 
  return ret;
472
 
}
473
 
 
474
 
/**
475
 
 * Flush the entire cache
476
 
 * @param when when the cache should be flushed (0 == immediately)
477
 
 */
478
 
void flush(uint32_t when __attribute__((unused))) 
479
 
{
480
 
  /* @TODO implement support for when != 0 */
481
 
  ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
482
 
  ib_crsr_t cursor= NULL;
483
 
        ib_err_t err= DB_SUCCESS;
484
 
 
485
 
  checked(ib_cursor_open_table(tablename, transaction, &cursor));
486
 
  checked(ib_cursor_first(cursor));
487
 
  checked(ib_cursor_lock(cursor, IB_LOCK_X));
488
 
 
489
 
  do 
490
 
  {
491
 
    checked(ib_cursor_delete_row(cursor));
492
 
  } while ((err= ib_cursor_next(cursor)) == DB_SUCCESS);
493
 
 
494
 
  if (err != DB_END_OF_INDEX)
495
 
  {
496
 
    fprintf(stderr, "Failed to flush the cache: %s\n", ib_strerror(err));
497
 
    goto error_exit;
498
 
  }
499
 
  ib_err_t cursor_error;
500
 
  cursor_error= ib_cursor_close(cursor);
501
 
  cursor= NULL;
502
 
  checked(ib_trx_commit(transaction));
503
 
  return;
504
 
 
505
 
 error_exit:
506
 
  if (cursor != NULL)
507
 
  {
508
 
    cursor_error= ib_cursor_close(cursor);
509
 
  }
510
 
 
511
 
  ib_err_t error= ib_trx_rollback(transaction);
512
 
  if (error != DB_SUCCESS)
513
 
    fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
514
 
            ib_strerror(error));
515
 
}
516
 
 
517
 
/**
518
 
 * Update the cas ID in the item structure
519
 
 * @param item the item to update
520
 
 */
521
 
void update_cas(struct item* item) 
522
 
{
523
 
  item->cas= ++cas;
524
 
}
525
 
 
526
 
/**
527
 
 * Release all the resources allocated by the item
528
 
 * @param item the item to release
529
 
 */
530
 
void release_item(struct item* item) 
531
 
{
532
 
  free(item->key);
533
 
  free(item->data);
534
 
  free(item);
535
 
}