1
/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
9
#include <embedded_innodb-1.0/innodb.h>
13
const char *tablename= "memcached/items";
16
#define data_col_idx 1
17
#define flags_col_idx 2
24
* To avoid cluttering down all the code with error checking I use the
25
* following macro. It will execute the statement and verify that the
26
* result of the operation is DB_SUCCESS. If any other error code is
27
* returned it will print an "assert-like" output and jump to the
28
* label error_exit. There I release resources before returning out of
31
* @param a the expression to execute
34
#define checked(expression) \
36
ib_err_t checked_err= expression; \
37
if (checked_err != DB_SUCCESS) \
39
fprintf(stderr, "ERROR: %s at %u: Failed: <%s>\n\t%s\n", \
40
__FILE__, __LINE__, #expression, \
41
ib_strerror(checked_err)); \
47
* Create the database schema.
48
* @return true if the database schema was created without any problems
51
static bool create_schema(void)
53
ib_tbl_sch_t schema= NULL;
54
ib_idx_sch_t dbindex= NULL;
56
if (ib_database_create("memcached") != IB_TRUE)
58
fprintf(stderr, "Failed to create database\n");
62
ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
65
checked(ib_table_schema_create(tablename, &schema, IB_TBL_COMPACT, 0));
66
checked(ib_table_schema_add_col(schema, "key", IB_BLOB,
67
IB_COL_NOT_NULL, 0, 32767));
68
checked(ib_table_schema_add_col(schema, "data", IB_BLOB,
69
IB_COL_NONE, 0, 1024*1024));
70
checked(ib_table_schema_add_col(schema, "flags", IB_INT,
71
IB_COL_UNSIGNED, 0, 4));
72
checked(ib_table_schema_add_col(schema, "cas", IB_INT,
73
IB_COL_UNSIGNED, 0, 8));
74
checked(ib_table_schema_add_col(schema, "exp", IB_INT,
75
IB_COL_UNSIGNED, 0, 4));
76
checked(ib_table_schema_add_index(schema, "PRIMARY_KEY", &dbindex));
77
checked(ib_index_schema_add_col(dbindex, "key", 0));
78
checked(ib_index_schema_set_clustered(dbindex));
79
checked(ib_schema_lock_exclusive(transaction));
80
checked(ib_table_create(transaction, schema, &table_id));
81
checked(ib_trx_commit(transaction));
82
ib_table_schema_delete(schema);
87
/* @todo release resources! */
89
ib_err_t error= ib_trx_rollback(transaction);
90
if (error != DB_SUCCESS)
91
fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
98
* Store an item into the database. Update the CAS id on the item before
99
* storing it in the database.
101
* @param trx the transaction to use
102
* @param item the item to store
103
* @return true if we can go ahead and commit the transaction, false otherwise
105
static bool do_put_item(ib_trx_t trx, struct item* item)
109
ib_crsr_t cursor= NULL;
110
ib_tpl_t tuple= NULL;
113
checked(ib_cursor_open_table(tablename, trx, &cursor));
114
checked(ib_cursor_lock(cursor, IB_LOCK_X));
115
tuple= ib_clust_read_tuple_create(cursor);
117
checked(ib_col_set_value(tuple, key_col_idx, item->key, item->nkey));
118
checked(ib_col_set_value(tuple, data_col_idx, item->data, item->size));
119
checked(ib_tuple_write_u32(tuple, flags_col_idx, item->flags));
120
checked(ib_tuple_write_u64(tuple, cas_col_idx, item->cas));
121
checked(ib_tuple_write_u32(tuple, exp_col_idx, (ib_u32_t)item->exp));
122
checked(ib_cursor_insert_row(cursor, tuple));
125
/* Release resources: */
130
ib_tuple_delete(tuple);
132
ib_err_t currsor_error;
134
currsor_error= ib_cursor_close(cursor);
140
* Try to locate an item in the database. Return a cursor and the tuple to
141
* the item if I found it in the database.
143
* @param trx the transaction to use
144
* @param key the key of the item to look up
145
* @param nkey the size of the key
146
* @param cursor where to store the cursor (OUT)
147
* @param tuple where to store the tuple (OUT)
148
* @return true if I found the object, false otherwise
150
static bool do_locate_item(ib_trx_t trx,
156
ib_tpl_t tuple= NULL;
160
checked(ib_cursor_open_table(tablename, trx, cursor));
161
tuple= ib_clust_search_tuple_create(*cursor);
164
fprintf(stderr, "Failed to allocate tuple object\n");
168
checked(ib_col_set_value(tuple, key_col_idx, key, nkey));
169
ib_err_t err= ib_cursor_moveto(*cursor, tuple, IB_CUR_GE, &res);
171
if (err == DB_SUCCESS && res == 0)
173
ib_tuple_delete(tuple);
176
else if (err != DB_SUCCESS &&
177
err != DB_RECORD_NOT_FOUND &&
178
err != DB_END_OF_INDEX)
180
fprintf(stderr, "ERROR: ib_cursor_moveto(): %s\n", ib_strerror(err));
185
ib_tuple_delete(tuple);
187
ib_err_t cursor_error;
189
cursor_error= ib_cursor_close(*cursor);
197
* Try to get an item from the database
199
* @param trx the transaction to use
200
* @param key the key to get
201
* @param nkey the lenght of the key
202
* @return a pointer to the item if I found it in the database
204
static struct item* do_get_item(ib_trx_t trx, const void* key, size_t nkey)
206
ib_crsr_t cursor= NULL;
207
ib_tpl_t tuple= NULL;
208
struct item* retval= NULL;
210
if (do_locate_item(trx, key, nkey, &cursor))
212
tuple= ib_clust_read_tuple_create(cursor);
215
fprintf(stderr, "Failed to create read tuple\n");
218
checked(ib_cursor_read_row(cursor, tuple));
220
ib_ulint_t datalen= ib_col_get_meta(tuple, data_col_idx, &meta);
221
ib_ulint_t flaglen= ib_col_get_meta(tuple, flags_col_idx, &meta);
222
ib_ulint_t caslen= ib_col_get_meta(tuple, cas_col_idx, &meta);
223
ib_ulint_t explen= ib_col_get_meta(tuple, exp_col_idx, &meta);
224
const void *dataptr= ib_col_get_value(tuple, data_col_idx);
226
retval= create_item(key, nkey, dataptr, datalen, 0, 0);
229
fprintf(stderr, "Failed to allocate memory\n");
236
checked(ib_tuple_read_u32(tuple, flags_col_idx, &val));
237
retval->flags= (uint32_t)val;
242
checked(ib_tuple_read_u64(tuple, cas_col_idx, &val));
243
retval->cas= (uint64_t)val;
248
checked(ib_tuple_read_u32(tuple, exp_col_idx, &val));
249
retval->exp= (time_t)val;
253
/* Release resources */
258
ib_tuple_delete(tuple);
260
ib_err_t cursor_error;
262
cursor_error= ib_cursor_close(cursor);
268
* Delete an item from the cache
269
* @param trx the transaction to use
270
* @param key the key of the item to delete
271
* @param nkey the length of the key
272
* @return true if we should go ahead and commit the transaction
273
* or false if we should roll back (if the key didn't exists)
275
static bool do_delete_item(ib_trx_t trx, const void* key, size_t nkey) {
276
ib_crsr_t cursor= NULL;
279
if (do_locate_item(trx, key, nkey, &cursor))
281
checked(ib_cursor_lock(cursor, IB_LOCK_X));
282
checked(ib_cursor_delete_row(cursor));
285
/* Release resources */
291
ib_err_t cursor_error;
292
cursor_error= ib_cursor_close(cursor);
299
/****************************************************************************
301
***************************************************************************/
304
* Initialize the database storage
305
* @return true if the database was initialized successfully, false otherwise
307
bool initialize_storage(void)
313
checked(ib_cfg_set_text("data_home_dir", "/tmp/memcached_light"));
314
checked(ib_cfg_set_text("log_group_home_dir", "/tmp/memcached_light"));
315
checked(ib_cfg_set_bool_on("file_per_table"));
316
checked(ib_startup("barracuda"));
318
/* check to see if the table exists or if we should create the schema */
319
error= ib_table_get_id(tablename, &tid);
320
if (error == DB_TABLE_NOT_FOUND)
322
if (!create_schema())
327
else if (error != DB_SUCCESS)
329
fprintf(stderr, "Failed to get table id: %s\n", ib_strerror(error));
341
* Shut down this storage engine
343
void shutdown_storage(void)
345
checked(ib_shutdown(IB_SHUTDOWN_NORMAL));
351
* Store an item in the databse
353
* @param item the item to store
355
void put_item(struct item* item)
357
ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
358
if (do_put_item(transaction, item))
360
ib_err_t error= ib_trx_commit(transaction);
361
if (error != DB_SUCCESS)
363
fprintf(stderr, "Failed to store key:\n\t%s\n",
369
ib_err_t error= ib_trx_rollback(transaction);
370
if (error != DB_SUCCESS)
371
fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
377
* Get an item from the engine
378
* @param key the key to grab
379
* @param nkey number of bytes in the key
380
* @return pointer to the item if found
382
struct item* get_item(const void* key, size_t nkey)
384
ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
385
struct item* ret= do_get_item(transaction, key, nkey);
386
ib_err_t error= ib_trx_rollback(transaction);
388
if (error != DB_SUCCESS)
389
fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
396
* Create an item structure and initialize it with the content
398
* @param key the key for the item
399
* @param nkey the number of bytes in the key
400
* @param data pointer to the value for the item (may be NULL)
401
* @param size the size of the data
402
* @param flags the flags to store with the data
403
* @param exp the expiry time for the item
404
* @return pointer to an initialized item object or NULL if allocation failed
406
struct item* create_item(const void* key, size_t nkey, const void* data,
407
size_t size, uint32_t flags, time_t exp)
409
struct item* ret= calloc(1, sizeof(*ret));
412
ret->key= malloc(nkey);
415
ret->data= malloc(size);
418
if (ret->key == NULL || (size > 0 && ret->data == NULL))
426
memcpy(ret->key, key, nkey);
429
memcpy(ret->data, data, size);
442
* Delete an item from the cache
443
* @param key the key of the item to delete
444
* @param nkey the length of the key
445
* @return true if the item was deleted from the cache
447
bool delete_item(const void* key, size_t nkey) {
448
ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
450
bool ret= do_delete_item(transaction, key, nkey);
454
/* object found. commit transaction */
455
ib_err_t error= ib_trx_commit(transaction);
456
if (error != DB_SUCCESS)
458
fprintf(stderr, "Failed to delete key:\n\t%s\n",
465
ib_err_t error= ib_trx_rollback(transaction);
466
if (error != DB_SUCCESS)
467
fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
475
* Flush the entire cache
476
* @param when when the cache should be flushed (0 == immediately)
478
void flush(uint32_t when __attribute__((unused)))
480
/* @TODO implement support for when != 0 */
481
ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
482
ib_crsr_t cursor= NULL;
483
ib_err_t err= DB_SUCCESS;
485
checked(ib_cursor_open_table(tablename, transaction, &cursor));
486
checked(ib_cursor_first(cursor));
487
checked(ib_cursor_lock(cursor, IB_LOCK_X));
491
checked(ib_cursor_delete_row(cursor));
492
} while ((err= ib_cursor_next(cursor)) == DB_SUCCESS);
494
if (err != DB_END_OF_INDEX)
496
fprintf(stderr, "Failed to flush the cache: %s\n", ib_strerror(err));
499
ib_err_t cursor_error;
500
cursor_error= ib_cursor_close(cursor);
502
checked(ib_trx_commit(transaction));
508
cursor_error= ib_cursor_close(cursor);
511
ib_err_t error= ib_trx_rollback(transaction);
512
if (error != DB_SUCCESS)
513
fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
518
* Update the cas ID in the item structure
519
* @param item the item to update
521
void update_cas(struct item* item)
527
* Release all the resources allocated by the item
528
* @param item the item to release
530
void release_item(struct item* item)