~pierre-php/libmemcached/libmemcached-windows

« back to all changes in this revision

Viewing changes to example/storage_innodb.c

  • Committer: pierre
  • Date: 2009-11-28 19:35:07 UTC
  • mfrom: (533.1.89 libmemcached)
  • Revision ID: pierre@pierre-win7-20091128193507-amrg03lsrgyas64x
- merge from main libmemcached branch, not tested yet

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
 
2
#include <stdlib.h>
 
3
#include <inttypes.h>
 
4
#include <time.h>
 
5
#include <stdbool.h>
 
6
#include <string.h>
 
7
#include <unistd.h>
 
8
#include <assert.h>
 
9
#include <embedded_innodb-1.0/innodb.h>
 
10
 
 
11
#include "storage.h"
 
12
 
 
13
const char *tablename= "memcached/items";
 
14
 
 
15
#define key_col_idx 0
 
16
#define data_col_idx 1
 
17
#define flags_col_idx 2
 
18
#define cas_col_idx 3
 
19
#define exp_col_idx 4
 
20
 
 
21
static uint64_t cas;
 
22
 
 
23
/**
 
24
 * To avoid cluttering down all the code with error checking I use the
 
25
 * following macro. It will execute the statement and verify that the
 
26
 * result of the operation is DB_SUCCESS. If any other error code is
 
27
 * returned it will print an "assert-like" output and jump to the
 
28
 * label error_exit. There I release resources before returning out of
 
29
 * the function.
 
30
 *
 
31
 * @param a the expression to execute
 
32
 *
 
33
 */
 
34
#define checked(expression)                                    \
 
35
do {                                                           \
 
36
  ib_err_t checked_err= expression;                            \
 
37
  if (checked_err != DB_SUCCESS)                               \
 
38
  {                                                            \
 
39
    fprintf(stderr, "ERROR: %s at %u: Failed: <%s>\n\t%s\n",   \
 
40
            __FILE__, __LINE__, #expression,                   \
 
41
            ib_strerror(checked_err));                         \
 
42
    goto error_exit;                                           \
 
43
  }                                                            \
 
44
} while (0);
 
45
 
 
46
/**
 
47
 * Create the database schema.
 
48
 * @return true if the database schema was created without any problems
 
49
 *         false otherwise.
 
50
 */
 
51
static bool create_schema(void) 
 
52
{
 
53
  ib_tbl_sch_t schema= NULL;
 
54
  ib_idx_sch_t dbindex= NULL;
 
55
 
 
56
  if (ib_database_create("memcached") != IB_TRUE)
 
57
  {
 
58
    fprintf(stderr, "Failed to create database\n");
 
59
    return false;
 
60
  }
 
61
 
 
62
  ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
 
63
  ib_id_t table_id;
 
64
 
 
65
  checked(ib_table_schema_create(tablename, &schema, IB_TBL_COMPACT, 0));
 
66
  checked(ib_table_schema_add_col(schema, "key", IB_BLOB,
 
67
                                  IB_COL_NOT_NULL, 0, 32767));
 
68
  checked(ib_table_schema_add_col(schema, "data", IB_BLOB,
 
69
                                  IB_COL_NONE, 0, 1024*1024));
 
70
  checked(ib_table_schema_add_col(schema, "flags", IB_INT,
 
71
                                  IB_COL_UNSIGNED, 0, 4));
 
72
  checked(ib_table_schema_add_col(schema, "cas", IB_INT,
 
73
                                  IB_COL_UNSIGNED, 0, 8));
 
74
  checked(ib_table_schema_add_col(schema, "exp", IB_INT,
 
75
                                  IB_COL_UNSIGNED, 0, 4));
 
76
  checked(ib_table_schema_add_index(schema, "PRIMARY_KEY", &dbindex));
 
77
  checked(ib_index_schema_add_col(dbindex, "key", 0));
 
78
  checked(ib_index_schema_set_clustered(dbindex));
 
79
  checked(ib_schema_lock_exclusive(transaction));
 
80
  checked(ib_table_create(transaction, schema, &table_id));
 
81
  checked(ib_trx_commit(transaction));
 
82
  ib_table_schema_delete(schema);
 
83
 
 
84
  return true;
 
85
 
 
86
 error_exit:
 
87
  /* @todo release resources! */
 
88
  {
 
89
    ib_err_t error= ib_trx_rollback(transaction);
 
90
    if (error != DB_SUCCESS)
 
91
      fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
 
92
              ib_strerror(error));
 
93
  }
 
94
  return false;
 
95
}
 
96
 
 
97
/**
 
98
 * Store an item into the database. Update the CAS id on the item before
 
99
 * storing it in the database.
 
100
 *
 
101
 * @param trx the transaction to use
 
102
 * @param item the item to store
 
103
 * @return true if we can go ahead and commit the transaction, false otherwise
 
104
 */
 
105
static bool do_put_item(ib_trx_t trx, struct item* item) 
 
106
{
 
107
  update_cas(item);
 
108
 
 
109
  ib_crsr_t cursor= NULL;
 
110
  ib_tpl_t tuple= NULL;
 
111
  bool retval= false;
 
112
 
 
113
  checked(ib_cursor_open_table(tablename, trx, &cursor));
 
114
  checked(ib_cursor_lock(cursor, IB_LOCK_X));
 
115
  tuple= ib_clust_read_tuple_create(cursor);
 
116
 
 
117
  checked(ib_col_set_value(tuple, key_col_idx, item->key, item->nkey));
 
118
  checked(ib_col_set_value(tuple, data_col_idx, item->data, item->size));
 
119
  checked(ib_tuple_write_u32(tuple, flags_col_idx, item->flags));
 
120
  checked(ib_tuple_write_u64(tuple, cas_col_idx, item->cas));
 
121
  checked(ib_tuple_write_u32(tuple, exp_col_idx, (ib_u32_t)item->exp));
 
122
  checked(ib_cursor_insert_row(cursor, tuple));
 
123
 
 
124
  retval= true;
 
125
  /* Release resources: */
 
126
  /* FALLTHROUGH */
 
127
 
 
128
 error_exit:
 
129
  if (tuple != NULL)
 
130
    ib_tuple_delete(tuple);
 
131
 
 
132
  if (cursor != NULL)
 
133
    ib_cursor_close(cursor);
 
134
 
 
135
  return retval;
 
136
}
 
137
 
 
138
/**
 
139
 * Try to locate an item in the database. Return a cursor and the tuple to
 
140
 * the item if I found it in the database.
 
141
 *
 
142
 * @param trx the transaction to use
 
143
 * @param key the key of the item to look up
 
144
 * @param nkey the size of the key
 
145
 * @param cursor where to store the cursor (OUT)
 
146
 * @param tuple where to store the tuple (OUT)
 
147
 * @return true if I found the object, false otherwise
 
148
 */
 
149
static bool do_locate_item(ib_trx_t trx,
 
150
                           const void* key,
 
151
                           size_t nkey,
 
152
                           ib_crsr_t *cursor)
 
153
{
 
154
  int res;
 
155
  ib_tpl_t tuple= NULL;
 
156
 
 
157
  *cursor= NULL;
 
158
 
 
159
  checked(ib_cursor_open_table(tablename, trx, cursor));
 
160
  tuple= ib_clust_search_tuple_create(*cursor);
 
161
  if (tuple == NULL)
 
162
  {
 
163
    fprintf(stderr, "Failed to allocate tuple object\n");
 
164
    goto error_exit;
 
165
  }
 
166
 
 
167
  checked(ib_col_set_value(tuple, key_col_idx, key, nkey));
 
168
  ib_err_t err= ib_cursor_moveto(*cursor, tuple, IB_CUR_GE, &res);
 
169
 
 
170
  if (err == DB_SUCCESS && res == 0)
 
171
  {
 
172
    ib_tuple_delete(tuple);
 
173
    return true;
 
174
  }
 
175
  else if (err != DB_SUCCESS &&
 
176
           err != DB_RECORD_NOT_FOUND &&
 
177
           err != DB_END_OF_INDEX)
 
178
  {
 
179
    fprintf(stderr, "ERROR: ib_cursor_moveto(): %s\n", ib_strerror(err));
 
180
  }
 
181
  /* FALLTHROUGH */
 
182
 error_exit:
 
183
  if (tuple != NULL)
 
184
    ib_tuple_delete(tuple);
 
185
  if (*cursor != NULL)
 
186
    ib_cursor_close(*cursor);
 
187
  *cursor= NULL;
 
188
 
 
189
  return false;
 
190
}
 
191
 
 
192
/**
 
193
 * Try to get an item from the database
 
194
 *
 
195
 * @param trx the transaction to use
 
196
 * @param key the key to get
 
197
 * @param nkey the lenght of the key
 
198
 * @return a pointer to the item if I found it in the database
 
199
 */
 
200
static struct item* do_get_item(ib_trx_t trx, const void* key, size_t nkey) 
 
201
{
 
202
  ib_crsr_t cursor= NULL;
 
203
  ib_tpl_t tuple= NULL;
 
204
  struct item* retval= NULL;
 
205
 
 
206
  if (do_locate_item(trx, key, nkey, &cursor)) 
 
207
  {
 
208
    tuple= ib_clust_read_tuple_create(cursor);
 
209
    if (tuple == NULL)
 
210
    {
 
211
      fprintf(stderr, "Failed to create read tuple\n");
 
212
      goto error_exit;
 
213
    }
 
214
    checked(ib_cursor_read_row(cursor, tuple));
 
215
    ib_col_meta_t meta;
 
216
    ib_ulint_t datalen= ib_col_get_meta(tuple, data_col_idx, &meta);
 
217
    ib_ulint_t flaglen= ib_col_get_meta(tuple, flags_col_idx, &meta);
 
218
    ib_ulint_t caslen= ib_col_get_meta(tuple, cas_col_idx, &meta);
 
219
    ib_ulint_t explen= ib_col_get_meta(tuple, exp_col_idx, &meta);
 
220
    const void *dataptr= ib_col_get_value(tuple, data_col_idx);
 
221
 
 
222
    retval= create_item(key, nkey, dataptr, datalen, 0, 0);
 
223
    if (retval == NULL) 
 
224
    {
 
225
      fprintf(stderr, "Failed to allocate memory\n");
 
226
      goto error_exit;
 
227
    }
 
228
 
 
229
    if (flaglen != 0) 
 
230
    {
 
231
      ib_u32_t val;
 
232
      checked(ib_tuple_read_u32(tuple, flags_col_idx, &val));
 
233
      retval->flags= (uint32_t)val;
 
234
    }
 
235
    if (caslen != 0) 
 
236
    {
 
237
      ib_u64_t val;
 
238
      checked(ib_tuple_read_u64(tuple, cas_col_idx, &val));
 
239
      retval->cas= (uint64_t)val;
 
240
    }
 
241
    if (explen != 0) 
 
242
    {
 
243
      ib_u32_t val;
 
244
      checked(ib_tuple_read_u32(tuple, exp_col_idx, &val));
 
245
      retval->exp= (time_t)val;
 
246
    }
 
247
  }
 
248
 
 
249
  /* Release resources */
 
250
  /* FALLTHROUGH */
 
251
 
 
252
 error_exit:
 
253
  if (tuple != NULL)
 
254
    ib_tuple_delete(tuple);
 
255
 
 
256
  if (cursor != NULL)
 
257
    ib_cursor_close(cursor);
 
258
 
 
259
  return retval;
 
260
}
 
261
 
 
262
/**
 
263
 * Delete an item from the cache
 
264
 * @param trx the transaction to use
 
265
 * @param key the key of the item to delete
 
266
 * @param nkey the length of the key
 
267
 * @return true if we should go ahead and commit the transaction
 
268
 *         or false if we should roll back (if the key didn't exists)
 
269
 */
 
270
static bool do_delete_item(ib_trx_t trx, const void* key, size_t nkey) {
 
271
  ib_crsr_t cursor= NULL;
 
272
  bool retval= false;
 
273
 
 
274
  if (do_locate_item(trx, key, nkey, &cursor))
 
275
  {
 
276
    checked(ib_cursor_lock(cursor, IB_LOCK_X));
 
277
    checked(ib_cursor_delete_row(cursor));
 
278
    retval= true;
 
279
  }
 
280
  /* Release resources */
 
281
  /* FALLTHROUGH */
 
282
 
 
283
 error_exit:
 
284
  if (cursor != NULL)
 
285
    ib_cursor_close(cursor);
 
286
 
 
287
  return retval;
 
288
}
 
289
 
 
290
 
 
291
/****************************************************************************
 
292
 * External interface
 
293
 ***************************************************************************/
 
294
 
 
295
/**
 
296
 * Initialize the database storage
 
297
 * @return true if the database was initialized successfully, false otherwise
 
298
 */
 
299
bool initialize_storage(void) 
 
300
{
 
301
  ib_err_t error;
 
302
  ib_id_t tid;
 
303
 
 
304
  checked(ib_init());
 
305
  checked(ib_cfg_set_text("data_home_dir", "/tmp/memcached_light"));
 
306
  checked(ib_cfg_set_text("log_group_home_dir", "/tmp/memcached_light"));
 
307
  checked(ib_cfg_set_bool_on("file_per_table"));
 
308
  checked(ib_startup("barracuda"));
 
309
 
 
310
  /* check to see if the table exists or if we should create the schema */
 
311
  error= ib_table_get_id(tablename, &tid);
 
312
  if (error == DB_TABLE_NOT_FOUND) 
 
313
  {
 
314
    if (!create_schema()) 
 
315
    {
 
316
      return false;
 
317
    }
 
318
  } 
 
319
  else if (error != DB_SUCCESS) 
 
320
  {
 
321
    fprintf(stderr, "Failed to get table id: %s\n", ib_strerror(error));
 
322
    return false;
 
323
  }
 
324
 
 
325
  return true;
 
326
 
 
327
 error_exit:
 
328
  return false;
 
329
}
 
330
 
 
331
/**
 
332
 * Shut down this storage engine
 
333
 */
 
334
void shutdown_storage(void) 
 
335
{
 
336
  checked(ib_shutdown());
 
337
 error_exit:
 
338
  ;
 
339
}
 
340
 
 
341
/**
 
342
 * Store an item in the databse
 
343
 *
 
344
 * @param item the item to store
 
345
 */
 
346
void put_item(struct item* item) 
 
347
{
 
348
  ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
 
349
  if (do_put_item(transaction, item)) 
 
350
  {
 
351
    ib_err_t error= ib_trx_commit(transaction);
 
352
    if (error != DB_SUCCESS) 
 
353
    {
 
354
      fprintf(stderr, "Failed to store key:\n\t%s\n",
 
355
              ib_strerror(error));
 
356
    }
 
357
  } 
 
358
  else 
 
359
  {
 
360
    ib_err_t error= ib_trx_rollback(transaction);
 
361
    if (error != DB_SUCCESS)
 
362
      fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
 
363
              ib_strerror(error));
 
364
  }
 
365
}
 
366
 
 
367
/**
 
368
 * Get an item from the engine
 
369
 * @param key the key to grab
 
370
 * @param nkey number of bytes in the key
 
371
 * @return pointer to the item if found
 
372
 */
 
373
struct item* get_item(const void* key, size_t nkey) 
 
374
{
 
375
  ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
 
376
  struct item* ret= do_get_item(transaction, key, nkey);
 
377
  ib_err_t error= ib_trx_rollback(transaction);
 
378
 
 
379
  if (error != DB_SUCCESS)
 
380
    fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
 
381
            ib_strerror(error));
 
382
 
 
383
  return ret;
 
384
}
 
385
 
 
386
/**
 
387
 * Create an item structure and initialize it with the content
 
388
 *
 
389
 * @param key the key for the item
 
390
 * @param nkey the number of bytes in the key
 
391
 * @param data pointer to the value for the item (may be NULL)
 
392
 * @param size the size of the data
 
393
 * @param flags the flags to store with the data
 
394
 * @param exp the expiry time for the item
 
395
 * @return pointer to an initialized item object or NULL if allocation failed
 
396
 */
 
397
struct item* create_item(const void* key, size_t nkey, const void* data,
 
398
                         size_t size, uint32_t flags, time_t exp)
 
399
{
 
400
  struct item* ret= calloc(1, sizeof(*ret));
 
401
  if (ret != NULL)
 
402
  {
 
403
    ret->key= malloc(nkey);
 
404
    if (size > 0)
 
405
    {
 
406
      ret->data= malloc(size);
 
407
    }
 
408
 
 
409
    if (ret->key == NULL || (size > 0 && ret->data == NULL))
 
410
    {
 
411
      free(ret->key);
 
412
      free(ret->data);
 
413
      free(ret);
 
414
      return NULL;
 
415
    }
 
416
 
 
417
    memcpy(ret->key, key, nkey);
 
418
    if (data != NULL)
 
419
    {
 
420
      memcpy(ret->data, data, size);
 
421
    }
 
422
 
 
423
    ret->nkey= nkey;
 
424
    ret->size= size;
 
425
    ret->flags= flags;
 
426
    ret->exp= exp;
 
427
  }
 
428
 
 
429
  return ret;
 
430
}
 
431
 
 
432
/**
 
433
 * Delete an item from the cache
 
434
 * @param key the key of the item to delete
 
435
 * @param nkey the length of the key
 
436
 * @return true if the item was deleted from the cache
 
437
 */
 
438
bool delete_item(const void* key, size_t nkey) {
 
439
  ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
 
440
 
 
441
  bool ret= do_delete_item(transaction, key, nkey);
 
442
 
 
443
  if (ret)
 
444
  {
 
445
    /* object found. commit transaction */
 
446
    ib_err_t error= ib_trx_commit(transaction);
 
447
    if (error != DB_SUCCESS)
 
448
    {
 
449
      fprintf(stderr, "Failed to delete key:\n\t%s\n",
 
450
              ib_strerror(error));
 
451
      ret= false;
 
452
    }
 
453
  }
 
454
  else
 
455
  {
 
456
    ib_err_t error= ib_trx_rollback(transaction);
 
457
    if (error != DB_SUCCESS)
 
458
      fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
 
459
              ib_strerror(error));
 
460
  }
 
461
 
 
462
  return ret;
 
463
}
 
464
 
 
465
/**
 
466
 * Flush the entire cache
 
467
 * @param when when the cache should be flushed (0 == immediately)
 
468
 */
 
469
void flush(uint32_t when __attribute__((unused))) 
 
470
{
 
471
  /* @TODO implement support for when != 0 */
 
472
  ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
 
473
  ib_crsr_t cursor= NULL;
 
474
        ib_err_t err= DB_SUCCESS;
 
475
 
 
476
  checked(ib_cursor_open_table(tablename, transaction, &cursor));
 
477
  checked(ib_cursor_first(cursor));
 
478
  checked(ib_cursor_lock(cursor, IB_LOCK_X));
 
479
 
 
480
  do 
 
481
  {
 
482
    checked(ib_cursor_delete_row(cursor));
 
483
  } while ((err= ib_cursor_next(cursor)) == DB_SUCCESS);
 
484
 
 
485
  if (err != DB_END_OF_INDEX)
 
486
  {
 
487
    fprintf(stderr, "Failed to flush the cache: %s\n", ib_strerror(err));
 
488
    goto error_exit;
 
489
  }
 
490
  ib_cursor_close(cursor);
 
491
  cursor= NULL;
 
492
  checked(ib_trx_commit(transaction));
 
493
  return;
 
494
 
 
495
 error_exit:
 
496
  if (cursor != NULL)
 
497
    ib_cursor_close(cursor);
 
498
 
 
499
  ib_err_t error= ib_trx_rollback(transaction);
 
500
  if (error != DB_SUCCESS)
 
501
    fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
 
502
            ib_strerror(error));
 
503
}
 
504
 
 
505
/**
 
506
 * Update the cas ID in the item structure
 
507
 * @param item the item to update
 
508
 */
 
509
void update_cas(struct item* item) 
 
510
{
 
511
  item->cas= ++cas;
 
512
}
 
513
 
 
514
/**
 
515
 * Release all the resources allocated by the item
 
516
 * @param item the item to release
 
517
 */
 
518
void release_item(struct item* item) 
 
519
{
 
520
  free(item->key);
 
521
  free(item->data);
 
522
  free(item);
 
523
}