/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ #include #include #include #include #include #include #include #include #include "storage.h" const char *tablename= "memcached/items"; #define key_col_idx 0 #define data_col_idx 1 #define flags_col_idx 2 #define cas_col_idx 3 #define exp_col_idx 4 static uint64_t cas; /** * To avoid cluttering down all the code with error checking I use the * following macro. It will execute the statement and verify that the * result of the operation is DB_SUCCESS. If any other error code is * returned it will print an "assert-like" output and jump to the * label error_exit. There I release resources before returning out of * the function. * * @param a the expression to execute * */ #define checked(expression) \ do { \ ib_err_t checked_err= expression; \ if (checked_err != DB_SUCCESS) \ { \ fprintf(stderr, "ERROR: %s at %u: Failed: <%s>\n\t%s\n", \ __FILE__, __LINE__, #expression, \ ib_strerror(checked_err)); \ goto error_exit; \ } \ } while (0); /** * Create the database schema. * @return true if the database schema was created without any problems * false otherwise. */ static bool create_schema(void) { ib_tbl_sch_t schema= NULL; ib_idx_sch_t dbindex= NULL; if (ib_database_create("memcached") != IB_TRUE) { fprintf(stderr, "Failed to create database\n"); return false; } ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE); ib_id_t table_id; checked(ib_table_schema_create(tablename, &schema, IB_TBL_COMPACT, 0)); checked(ib_table_schema_add_col(schema, "key", IB_BLOB, IB_COL_NOT_NULL, 0, 32767)); checked(ib_table_schema_add_col(schema, "data", IB_BLOB, IB_COL_NONE, 0, 1024*1024)); checked(ib_table_schema_add_col(schema, "flags", IB_INT, IB_COL_UNSIGNED, 0, 4)); checked(ib_table_schema_add_col(schema, "cas", IB_INT, IB_COL_UNSIGNED, 0, 8)); checked(ib_table_schema_add_col(schema, "exp", IB_INT, IB_COL_UNSIGNED, 0, 4)); checked(ib_table_schema_add_index(schema, "PRIMARY_KEY", &dbindex)); checked(ib_index_schema_add_col(dbindex, "key", 0)); checked(ib_index_schema_set_clustered(dbindex)); checked(ib_schema_lock_exclusive(transaction)); checked(ib_table_create(transaction, schema, &table_id)); checked(ib_trx_commit(transaction)); ib_table_schema_delete(schema); return true; error_exit: /* @todo release resources! */ { ib_err_t error= ib_trx_rollback(transaction); if (error != DB_SUCCESS) fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n", ib_strerror(error)); } return false; } /** * Store an item into the database. Update the CAS id on the item before * storing it in the database. * * @param trx the transaction to use * @param item the item to store * @return true if we can go ahead and commit the transaction, false otherwise */ static bool do_put_item(ib_trx_t trx, struct item* item) { update_cas(item); ib_crsr_t cursor= NULL; ib_tpl_t tuple= NULL; bool retval= false; checked(ib_cursor_open_table(tablename, trx, &cursor)); checked(ib_cursor_lock(cursor, IB_LOCK_X)); tuple= ib_clust_read_tuple_create(cursor); checked(ib_col_set_value(tuple, key_col_idx, item->key, item->nkey)); checked(ib_col_set_value(tuple, data_col_idx, item->data, item->size)); checked(ib_tuple_write_u32(tuple, flags_col_idx, item->flags)); checked(ib_tuple_write_u64(tuple, cas_col_idx, item->cas)); checked(ib_tuple_write_u32(tuple, exp_col_idx, (ib_u32_t)item->exp)); checked(ib_cursor_insert_row(cursor, tuple)); retval= true; /* Release resources: */ /* FALLTHROUGH */ error_exit: if (tuple != NULL) ib_tuple_delete(tuple); if (cursor != NULL) ib_cursor_close(cursor); return retval; } /** * Try to locate an item in the database. Return a cursor and the tuple to * the item if I found it in the database. * * @param trx the transaction to use * @param key the key of the item to look up * @param nkey the size of the key * @param cursor where to store the cursor (OUT) * @param tuple where to store the tuple (OUT) * @return true if I found the object, false otherwise */ static bool do_locate_item(ib_trx_t trx, const void* key, size_t nkey, ib_crsr_t *cursor) { int res; ib_tpl_t tuple= NULL; *cursor= NULL; checked(ib_cursor_open_table(tablename, trx, cursor)); tuple= ib_clust_search_tuple_create(*cursor); if (tuple == NULL) { fprintf(stderr, "Failed to allocate tuple object\n"); goto error_exit; } checked(ib_col_set_value(tuple, key_col_idx, key, nkey)); ib_err_t err= ib_cursor_moveto(*cursor, tuple, IB_CUR_GE, &res); if (err == DB_SUCCESS && res == 0) { ib_tuple_delete(tuple); return true; } else if (err != DB_SUCCESS && err != DB_RECORD_NOT_FOUND && err != DB_END_OF_INDEX) { fprintf(stderr, "ERROR: ib_cursor_moveto(): %s\n", ib_strerror(err)); } /* FALLTHROUGH */ error_exit: if (tuple != NULL) ib_tuple_delete(tuple); if (*cursor != NULL) ib_cursor_close(*cursor); *cursor= NULL; return false; } /** * Try to get an item from the database * * @param trx the transaction to use * @param key the key to get * @param nkey the lenght of the key * @return a pointer to the item if I found it in the database */ static struct item* do_get_item(ib_trx_t trx, const void* key, size_t nkey) { ib_crsr_t cursor= NULL; ib_tpl_t tuple= NULL; struct item* retval= NULL; if (do_locate_item(trx, key, nkey, &cursor)) { tuple= ib_clust_read_tuple_create(cursor); if (tuple == NULL) { fprintf(stderr, "Failed to create read tuple\n"); goto error_exit; } checked(ib_cursor_read_row(cursor, tuple)); ib_col_meta_t meta; ib_ulint_t datalen= ib_col_get_meta(tuple, data_col_idx, &meta); ib_ulint_t flaglen= ib_col_get_meta(tuple, flags_col_idx, &meta); ib_ulint_t caslen= ib_col_get_meta(tuple, cas_col_idx, &meta); ib_ulint_t explen= ib_col_get_meta(tuple, exp_col_idx, &meta); const void *dataptr= ib_col_get_value(tuple, data_col_idx); retval= create_item(key, nkey, dataptr, datalen, 0, 0); if (retval == NULL) { fprintf(stderr, "Failed to allocate memory\n"); goto error_exit; } if (flaglen != 0) { ib_u32_t val; checked(ib_tuple_read_u32(tuple, flags_col_idx, &val)); retval->flags= (uint32_t)val; } if (caslen != 0) { ib_u64_t val; checked(ib_tuple_read_u64(tuple, cas_col_idx, &val)); retval->cas= (uint64_t)val; } if (explen != 0) { ib_u32_t val; checked(ib_tuple_read_u32(tuple, exp_col_idx, &val)); retval->exp= (time_t)val; } } /* Release resources */ /* FALLTHROUGH */ error_exit: if (tuple != NULL) ib_tuple_delete(tuple); if (cursor != NULL) ib_cursor_close(cursor); return retval; } /** * Delete an item from the cache * @param trx the transaction to use * @param key the key of the item to delete * @param nkey the length of the key * @return true if we should go ahead and commit the transaction * or false if we should roll back (if the key didn't exists) */ static bool do_delete_item(ib_trx_t trx, const void* key, size_t nkey) { ib_crsr_t cursor= NULL; bool retval= false; if (do_locate_item(trx, key, nkey, &cursor)) { checked(ib_cursor_lock(cursor, IB_LOCK_X)); checked(ib_cursor_delete_row(cursor)); retval= true; } /* Release resources */ /* FALLTHROUGH */ error_exit: if (cursor != NULL) ib_cursor_close(cursor); return retval; } /**************************************************************************** * External interface ***************************************************************************/ /** * Initialize the database storage * @return true if the database was initialized successfully, false otherwise */ bool initialize_storage(void) { ib_err_t error; ib_id_t tid; checked(ib_init()); checked(ib_cfg_set_text("data_home_dir", "/tmp/memcached_light")); checked(ib_cfg_set_text("log_group_home_dir", "/tmp/memcached_light")); checked(ib_cfg_set_bool_on("file_per_table")); checked(ib_startup("barracuda")); /* check to see if the table exists or if we should create the schema */ error= ib_table_get_id(tablename, &tid); if (error == DB_TABLE_NOT_FOUND) { if (!create_schema()) { return false; } } else if (error != DB_SUCCESS) { fprintf(stderr, "Failed to get table id: %s\n", ib_strerror(error)); return false; } return true; error_exit: return false; } /** * Shut down this storage engine */ void shutdown_storage(void) { checked(ib_shutdown()); error_exit: ; } /** * Store an item in the databse * * @param item the item to store */ void put_item(struct item* item) { ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE); if (do_put_item(transaction, item)) { ib_err_t error= ib_trx_commit(transaction); if (error != DB_SUCCESS) { fprintf(stderr, "Failed to store key:\n\t%s\n", ib_strerror(error)); } } else { ib_err_t error= ib_trx_rollback(transaction); if (error != DB_SUCCESS) fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n", ib_strerror(error)); } } /** * Get an item from the engine * @param key the key to grab * @param nkey number of bytes in the key * @return pointer to the item if found */ struct item* get_item(const void* key, size_t nkey) { ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE); struct item* ret= do_get_item(transaction, key, nkey); ib_err_t error= ib_trx_rollback(transaction); if (error != DB_SUCCESS) fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n", ib_strerror(error)); return ret; } /** * Create an item structure and initialize it with the content * * @param key the key for the item * @param nkey the number of bytes in the key * @param data pointer to the value for the item (may be NULL) * @param size the size of the data * @param flags the flags to store with the data * @param exp the expiry time for the item * @return pointer to an initialized item object or NULL if allocation failed */ struct item* create_item(const void* key, size_t nkey, const void* data, size_t size, uint32_t flags, time_t exp) { struct item* ret= calloc(1, sizeof(*ret)); if (ret != NULL) { ret->key= malloc(nkey); if (size > 0) { ret->data= malloc(size); } if (ret->key == NULL || (size > 0 && ret->data == NULL)) { free(ret->key); free(ret->data); free(ret); return NULL; } memcpy(ret->key, key, nkey); if (data != NULL) { memcpy(ret->data, data, size); } ret->nkey= nkey; ret->size= size; ret->flags= flags; ret->exp= exp; } return ret; } /** * Delete an item from the cache * @param key the key of the item to delete * @param nkey the length of the key * @return true if the item was deleted from the cache */ bool delete_item(const void* key, size_t nkey) { ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ); bool ret= do_delete_item(transaction, key, nkey); if (ret) { /* object found. commit transaction */ ib_err_t error= ib_trx_commit(transaction); if (error != DB_SUCCESS) { fprintf(stderr, "Failed to delete key:\n\t%s\n", ib_strerror(error)); ret= false; } } else { ib_err_t error= ib_trx_rollback(transaction); if (error != DB_SUCCESS) fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n", ib_strerror(error)); } return ret; } /** * Flush the entire cache * @param when when the cache should be flushed (0 == immediately) */ void flush(uint32_t when __attribute__((unused))) { /* @TODO implement support for when != 0 */ ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ); ib_crsr_t cursor= NULL; ib_err_t err= DB_SUCCESS; checked(ib_cursor_open_table(tablename, transaction, &cursor)); checked(ib_cursor_first(cursor)); checked(ib_cursor_lock(cursor, IB_LOCK_X)); do { checked(ib_cursor_delete_row(cursor)); } while ((err= ib_cursor_next(cursor)) == DB_SUCCESS); if (err != DB_END_OF_INDEX) { fprintf(stderr, "Failed to flush the cache: %s\n", ib_strerror(err)); goto error_exit; } ib_cursor_close(cursor); cursor= NULL; checked(ib_trx_commit(transaction)); return; error_exit: if (cursor != NULL) ib_cursor_close(cursor); ib_err_t error= ib_trx_rollback(transaction); if (error != DB_SUCCESS) fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n", ib_strerror(error)); } /** * Update the cas ID in the item structure * @param item the item to update */ void update_cas(struct item* item) { item->cas= ++cas; } /** * Release all the resources allocated by the item * @param item the item to release */ void release_item(struct item* item) { free(item->key); free(item->data); free(item); }