~ubuntu-branches/ubuntu/trusty/glusterfs/trusty

« back to all changes in this revision

Viewing changes to xlators/storage/bdb/src/bdb-ll.c

  • Committer: Bazaar Package Importer
  • Author(s): Patrick Matthäi
  • Date: 2010-02-09 18:53:10 UTC
  • mfrom: (1.2.4 upstream) (4.1.5 sid)
  • Revision ID: james.westby@ubuntu.com-20100209185310-ww8p82lsbosorg2u
* New upstream release.
* Uploading to unstable.
* Bump Standards-Version to 3.8.4 (no changes needed).

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/*
2
 
  Copyright (c) 2008-2009 Gluster, Inc. <http://www.gluster.com>
3
 
  This file is part of GlusterFS.
4
 
 
5
 
  GlusterFS is free software; you can redistribute it and/or modify
6
 
  it under the terms of the GNU General Public License as published
7
 
  by the Free Software Foundation; either version 3 of the License,
8
 
  or (at your option) any later version.
9
 
 
10
 
  GlusterFS is distributed in the hope that it will be useful, but
11
 
  WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13
 
  General Public License for more details.
14
 
 
15
 
  You should have received a copy of the GNU General Public License
16
 
  along with this program.  If not, see
17
 
  <http://www.gnu.org/licenses/>.
18
 
*/
19
 
 
20
 
#include <libgen.h>
21
 
#include "bdb.h"
22
 
#include <list.h>
23
 
#include "hashfn.h"
24
 
/*
25
 
 * implement the procedures to interact with bdb */
26
 
 
27
 
/****************************************************************
28
 
 *
29
 
 * General wrappers and utility procedures for bdb xlator
30
 
 *
31
 
 ****************************************************************/
32
 
 
33
 
ino_t
34
 
bdb_inode_transform (ino_t parent,
35
 
                     const char *name,
36
 
                     size_t namelen)
37
 
{
38
 
        ino_t               ino = -1;
39
 
        uint64_t            hash = 0;
40
 
 
41
 
        hash = gf_dm_hashfn (name, namelen);
42
 
 
43
 
        ino = (((parent << 32) | 0x00000000ffffffffULL)
44
 
               & (hash | 0xffffffff00000000ULL));
45
 
 
46
 
        return ino;
47
 
}
48
 
 
49
 
static int
50
 
bdb_generate_secondary_hash (DB *secondary,
51
 
                             const DBT *pkey,
52
 
                             const DBT *data,
53
 
                             DBT *skey)
54
 
{
55
 
        char *primary = NULL;
56
 
        uint32_t *hash = NULL;
57
 
 
58
 
        primary = pkey->data;
59
 
 
60
 
        hash = calloc (1, sizeof (uint32_t));
61
 
 
62
 
        *hash = gf_dm_hashfn (primary, pkey->size);
63
 
 
64
 
        skey->data = hash;
65
 
        skey->size = sizeof (hash);
66
 
        skey->flags = DB_DBT_APPMALLOC;
67
 
 
68
 
        return 0;
69
 
}
70
 
 
71
 
/***********************************************************
72
 
 *
73
 
 *  bdb storage database utilities
74
 
 *
75
 
 **********************************************************/
76
 
 
77
 
/*
78
 
 * bdb_db_open - opens a storage db.
79
 
 *
80
 
 * @ctx: context specific to the directory for which we are supposed to open db
81
 
 *
82
 
 * see, if we have empty slots to open a db.
83
 
 *      if (no-empty-slots), then prune open dbs and close as many as possible
84
 
 *      if (empty-slot-available), tika muchkonDu db open maaDu
85
 
 *
86
 
 */
87
 
static int
88
 
bdb_db_open (bctx_t *bctx)
89
 
{
90
 
        DB *primary   = NULL;
91
 
        DB *secondary = NULL;
92
 
        int32_t ret = -1;
93
 
        bctx_table_t *table = NULL;
94
 
 
95
 
        GF_VALIDATE_OR_GOTO ("bdb-ll", bctx, out);
96
 
 
97
 
        table = bctx->table;
98
 
        GF_VALIDATE_OR_GOTO ("bdb-ll", table, out);
99
 
 
100
 
        /* we have to do the following, we can't deny someone of db_open ;) */
101
 
        ret = db_create (&primary, table->dbenv, 0);
102
 
        if (ret < 0) {
103
 
                gf_log ("bdb-ll", GF_LOG_DEBUG,
104
 
                        "_BDB_DB_OPEN %s: %s (failed to create database object"
105
 
                        " for primary database)",
106
 
                        bctx->directory, db_strerror (ret));
107
 
                ret = -ENOMEM;
108
 
                goto out;
109
 
        }
110
 
 
111
 
        if (table->page_size) {
112
 
                ret = primary->set_pagesize (primary,
113
 
                                             table->page_size);
114
 
                if (ret < 0) {
115
 
                        gf_log ("bdb-ll", GF_LOG_DEBUG,
116
 
                                "_BDB_DB_OPEN %s: %s (failed to set page-size "
117
 
                                "to %"PRIu64")",
118
 
                                bctx->directory, db_strerror (ret),
119
 
                                table->page_size);
120
 
                } else {
121
 
                        gf_log ("bdb-ll", GF_LOG_DEBUG,
122
 
                                "_BDB_DB_OPEN %s: page-size set to %"PRIu64,
123
 
                                bctx->directory, table->page_size);
124
 
                }
125
 
        }
126
 
 
127
 
        ret = primary->open (primary, NULL, bctx->db_path, "primary",
128
 
                             table->access_mode, table->dbflags, 0);
129
 
        if (ret < 0) {
130
 
                gf_log ("bdb-ll", GF_LOG_ERROR,
131
 
                        "_BDB_DB_OPEN %s: %s "
132
 
                        "(failed to open primary database)",
133
 
                        bctx->directory, db_strerror (ret));
134
 
                ret = -1;
135
 
                goto cleanup;
136
 
        }
137
 
 
138
 
        ret = db_create (&secondary, table->dbenv, 0);
139
 
        if (ret < 0) {
140
 
                gf_log ("bdb-ll", GF_LOG_DEBUG,
141
 
                        "_BDB_DB_OPEN %s: %s (failed to create database object"
142
 
                        " for secondary database)",
143
 
                        bctx->directory, db_strerror (ret));
144
 
                ret = -ENOMEM;
145
 
                goto cleanup;
146
 
        }
147
 
 
148
 
        ret = secondary->open (secondary, NULL, bctx->db_path, "secondary",
149
 
                               table->access_mode, table->dbflags, 0);
150
 
        if (ret != 0 ) {
151
 
                gf_log ("bdb-ll", GF_LOG_ERROR,
152
 
                        "_BDB_DB_OPEN %s: %s "
153
 
                        "(failed to open secondary database)",
154
 
                        bctx->directory, db_strerror (ret));
155
 
                ret = -1;
156
 
                goto cleanup;
157
 
        }
158
 
 
159
 
        ret = primary->associate (primary, NULL, secondary,
160
 
                                  bdb_generate_secondary_hash,
161
 
#ifdef DB_IMMUTABLE_KEY
162
 
                                  DB_IMMUTABLE_KEY);
163
 
#else
164
 
                                  0);
165
 
#endif
166
 
        if (ret != 0 ) {
167
 
                gf_log ("bdb-ll", GF_LOG_ERROR,
168
 
                        "_BDB_DB_OPEN %s: %s "
169
 
                        "(failed to associate primary database with "
170
 
                        "secondary database)",
171
 
                        bctx->directory, db_strerror (ret));
172
 
                ret = -1;
173
 
                goto cleanup;
174
 
        }
175
 
 
176
 
out:
177
 
        bctx->primary = primary;
178
 
        bctx->secondary = secondary;
179
 
 
180
 
        return ret;
181
 
cleanup:
182
 
        if (primary)
183
 
                primary->close (primary, 0);
184
 
        if (secondary)
185
 
                secondary->close (secondary, 0);
186
 
 
187
 
        return ret;
188
 
}
189
 
 
190
 
int32_t
191
 
bdb_cursor_close (bctx_t *bctx,
192
 
                  DBC *cursorp)
193
 
{
194
 
        int32_t ret = -1;
195
 
 
196
 
        GF_VALIDATE_OR_GOTO ("bdb-ll", bctx, out);
197
 
        GF_VALIDATE_OR_GOTO ("bdb-ll", cursorp, out);
198
 
 
199
 
        LOCK (&bctx->lock);
200
 
        {
201
 
#ifdef HAVE_BDB_CURSOR_GET
202
 
                ret = cursorp->close (cursorp);
203
 
#else
204
 
                ret = cursorp->c_close (cursorp);
205
 
#endif
206
 
                if (ret < 0) {
207
 
                        gf_log ("bdb-ll", GF_LOG_DEBUG,
208
 
                                "_BDB_CURSOR_CLOSE %s: %s "
209
 
                                "(failed to close database cursor)",
210
 
                                bctx->directory, db_strerror (ret));
211
 
                }
212
 
        }
213
 
        UNLOCK (&bctx->lock);
214
 
 
215
 
out:
216
 
        return ret;
217
 
}
218
 
 
219
 
 
220
 
int32_t
221
 
bdb_cursor_open (bctx_t *bctx,
222
 
                 DBC **cursorpp)
223
 
{
224
 
        int32_t ret = -1;
225
 
 
226
 
        GF_VALIDATE_OR_GOTO ("bdb-ll", bctx, out);
227
 
        GF_VALIDATE_OR_GOTO ("bdb-ll", cursorpp, out);
228
 
 
229
 
        LOCK (&bctx->lock);
230
 
        {
231
 
                if (bctx->secondary) {
232
 
                        /* do nothing, just continue */
233
 
                        ret = 0;
234
 
                } else {
235
 
                        ret = bdb_db_open (bctx);
236
 
                        if (ret < 0) {
237
 
                                gf_log ("bdb-ll", GF_LOG_DEBUG,
238
 
                                        "_BDB_CURSOR_OPEN %s: ENOMEM "
239
 
                                        "(failed to open secondary database)",
240
 
                                        bctx->directory);
241
 
                                ret = -ENOMEM;
242
 
                        } else {
243
 
                                ret = 0;
244
 
                        }
245
 
                }
246
 
 
247
 
                if (ret == 0) {
248
 
                        /* all set, open cursor */
249
 
                        ret = bctx->secondary->cursor (bctx->secondary,
250
 
                                                       NULL, cursorpp, 0);
251
 
                        if (ret < 0) {
252
 
                                gf_log ("bdb-ll", GF_LOG_DEBUG,
253
 
                                        "_BDB_CURSOR_OPEN %s: %s "
254
 
                                        "(failed to open a cursor to database)",
255
 
                                        bctx->directory, db_strerror (ret));
256
 
                        }
257
 
                }
258
 
        }
259
 
        UNLOCK (&bctx->lock);
260
 
 
261
 
out:
262
 
        return ret;
263
 
}
264
 
 
265
 
 
266
 
/* cache related */
267
 
static bdb_cache_t *
268
 
bdb_cache_lookup (bctx_t *bctx,
269
 
                  char *path)
270
 
{
271
 
        bdb_cache_t *bcache = NULL;
272
 
        bdb_cache_t *trav   = NULL;
273
 
        char        *key    = NULL;
274
 
 
275
 
        GF_VALIDATE_OR_GOTO ("bdb-ll", bctx, out);
276
 
        GF_VALIDATE_OR_GOTO ("bdb-ll", path, out);
277
 
 
278
 
        MAKE_KEY_FROM_PATH (key, path);
279
 
 
280
 
        LOCK (&bctx->lock);
281
 
        {
282
 
                list_for_each_entry (trav, &bctx->c_list, c_list) {
283
 
                        if (!strcmp (trav->key, key)){
284
 
                                bcache = trav;
285
 
                                break;
286
 
                        }
287
 
                }
288
 
        }
289
 
        UNLOCK (&bctx->lock);
290
 
 
291
 
out:
292
 
        return bcache;
293
 
}
294
 
 
295
 
static int32_t
296
 
bdb_cache_insert (bctx_t *bctx,
297
 
                  DBT *key,
298
 
                  DBT *data)
299
 
{
300
 
        bdb_cache_t *bcache = NULL;
301
 
        int32_t ret = -1;
302
 
 
303
 
        GF_VALIDATE_OR_GOTO ("bdb-ll", bctx, out);
304
 
        GF_VALIDATE_OR_GOTO ("bdb-ll", key, out);
305
 
        GF_VALIDATE_OR_GOTO ("bdb-ll", data, out);
306
 
 
307
 
        LOCK (&bctx->lock);
308
 
        {
309
 
                if (bctx->c_count > 5) {
310
 
                        /* most of the times, we enter here */
311
 
                        /* FIXME: ugly, not supposed to disect any of the
312
 
                         * 'struct list_head' directly */
313
 
                        if (!list_empty (&bctx->c_list)) {
314
 
                                bcache = list_entry (bctx->c_list.prev,
315
 
                                                     bdb_cache_t, c_list);
316
 
                                list_del_init (&bcache->c_list);
317
 
                        }
318
 
                        if (bcache->key) {
319
 
                                free (bcache->key);
320
 
                                bcache->key = calloc (key->size + 1,
321
 
                                                      sizeof (char));
322
 
                                GF_VALIDATE_OR_GOTO ("bdb-ll",
323
 
                                                     bcache->key, unlock);
324
 
                                memcpy (bcache->key, (char *)key->data,
325
 
                                        key->size);
326
 
                        } else {
327
 
                                /* should never come here */
328
 
                                gf_log ("bdb-ll", GF_LOG_DEBUG,
329
 
                                        "_BDB_CACHE_INSERT %s (%s) "
330
 
                                        "(found a cache entry with empty key)",
331
 
                                        bctx->directory, (char *)key->data);
332
 
                        } /* if(bcache->key)...else */
333
 
                        if (bcache->data) {
334
 
                                free (bcache->data);
335
 
                                bcache->data = memdup (data->data, data->size);
336
 
                                GF_VALIDATE_OR_GOTO ("bdb-ll", bcache->data,
337
 
                                                     unlock);
338
 
                                bcache->size = data->size;
339
 
                        } else {
340
 
                                /* should never come here */
341
 
                                gf_log ("bdb-ll", GF_LOG_CRITICAL,
342
 
                                        "_BDB_CACHE_INSERT %s (%s) "
343
 
                                        "(found a cache entry with no data)",
344
 
                                        bctx->directory, (char *)key->data);
345
 
                        } /* if(bcache->data)...else */
346
 
                        list_add (&bcache->c_list, &bctx->c_list);
347
 
                        ret = 0;
348
 
                } else {
349
 
                        /* we will be entering here very rarely */
350
 
                        bcache = CALLOC (1, sizeof (*bcache));
351
 
                        GF_VALIDATE_OR_GOTO ("bdb-ll", bcache, unlock);
352
 
 
353
 
                        bcache->key = calloc (key->size + 1, sizeof (char));
354
 
                        GF_VALIDATE_OR_GOTO ("bdb-ll", bcache->key, unlock);
355
 
                        memcpy (bcache->key, key->data, key->size);
356
 
 
357
 
                        bcache->data = memdup (data->data, data->size);
358
 
                        GF_VALIDATE_OR_GOTO ("bdb-ll", bcache->data, unlock);
359
 
 
360
 
                        bcache->size = data->size;
361
 
                        list_add (&bcache->c_list, &bctx->c_list);
362
 
                        bctx->c_count++;
363
 
                        ret = 0;
364
 
                } /* if(private->c_count < 5)...else */
365
 
        }
366
 
unlock:
367
 
        UNLOCK (&bctx->lock);
368
 
out:
369
 
        return ret;
370
 
}
371
 
 
372
 
static int32_t
373
 
bdb_cache_delete (bctx_t *bctx,
374
 
                  const char *key)
375
 
{
376
 
        bdb_cache_t *bcache = NULL;
377
 
        bdb_cache_t *trav   = NULL;
378
 
 
379
 
        GF_VALIDATE_OR_GOTO ("bdb-ll", bctx, out);
380
 
        GF_VALIDATE_OR_GOTO ("bdb-ll", key, out);
381
 
 
382
 
        LOCK (&bctx->lock);
383
 
        {
384
 
                list_for_each_entry (trav, &bctx->c_list, c_list) {
385
 
                        if (!strcmp (trav->key, key)){
386
 
                                bctx->c_count--;
387
 
                                bcache = trav;
388
 
                                break;
389
 
                        }
390
 
                }
391
 
 
392
 
                if (bcache) {
393
 
                        list_del_init (&bcache->c_list);
394
 
                        free (bcache->key);
395
 
                        free (bcache->data);
396
 
                        free (bcache);
397
 
                }
398
 
        }
399
 
        UNLOCK (&bctx->lock);
400
 
 
401
 
out:
402
 
        return 0;
403
 
}
404
 
 
405
 
void *
406
 
bdb_db_stat (bctx_t *bctx,
407
 
             DB_TXN *txnid,
408
 
             uint32_t flags)
409
 
{
410
 
        DB     *storage = NULL;
411
 
        void   *stat    = NULL;
412
 
        int32_t ret     = -1;
413
 
 
414
 
        LOCK (&bctx->lock);
415
 
        {
416
 
                if (bctx->primary == NULL) {
417
 
                        ret = bdb_db_open (bctx);
418
 
                        storage = bctx->primary;
419
 
                } else {
420
 
                        /* we are just fine, lets continue */
421
 
                        storage = bctx->primary;
422
 
                } /* if(bctx->dbp==NULL)...else */
423
 
        }
424
 
        UNLOCK (&bctx->lock);
425
 
 
426
 
        GF_VALIDATE_OR_GOTO ("bdb-ll", storage, out);
427
 
 
428
 
        ret = storage->stat (storage, txnid, &stat, flags);
429
 
 
430
 
        if (ret < 0) {
431
 
                gf_log ("bdb-ll", GF_LOG_DEBUG,
432
 
                        "_BDB_DB_STAT %s: %s "
433
 
                        "(failed to do stat database)",
434
 
                        bctx->directory, db_strerror (ret));
435
 
        }
436
 
out:
437
 
        return stat;
438
 
 
439
 
}
440
 
 
441
 
/* bdb_storage_get - retrieve a key/value pair corresponding to @path from the
442
 
 *  corresponding db file.
443
 
 *
444
 
 * @bctx: bctx_t * corresponding to the parent directory of @path. (should
445
 
 *  always be a valid bctx).  bdb_storage_get should never be called if
446
 
 *  @bctx = NULL.
447
 
 * @txnid: NULL if bdb_storage_get is not embedded in an explicit transaction
448
 
 *  or a valid DB_TXN *, when embedded in an explicit transaction.
449
 
 * @path: path of the file to read from (translated to a database key using
450
 
 *  MAKE_KEY_FROM_PATH)
451
 
 * @buf: char ** - pointer to a pointer to char. a read buffer is created in
452
 
 *  this procedure and pointer to the buffer is passed through @buf to the
453
 
 *  caller.
454
 
 * @size: size of the file content to be read.
455
 
 * @offset: offset from which the file content to be read.
456
 
 *
457
 
 * NOTE: bdb_storage_get tries to open DB, if @bctx->dbp == NULL
458
 
 *  (@bctx->dbp == NULL, nobody has opened DB till now or DB was closed by
459
 
 *  bdb_table_prune()).
460
 
 *
461
 
 * NOTE: if private->cache is set (bdb xlator's internal caching enabled), then
462
 
 *  bdb_storage_get first looks up the cache for key/value pair. if
463
 
 *  bdb_lookup_cache fails, then only DB->get() is called. also,  inserts a
464
 
 *  newly read key/value pair to cache through bdb_insert_to_cache.
465
 
 *
466
 
 * return: 'number of bytes read' on success or -1 on error.
467
 
 *
468
 
 * also see: bdb_lookup_cache, bdb_insert_to_cache for details about bdb
469
 
 *  xlator's internal cache.
470
 
 */
471
 
static int32_t
472
 
bdb_db_get (bctx_t *bctx,
473
 
            DB_TXN *txnid,
474
 
            const char *path,
475
 
            char *buf,
476
 
            size_t size,
477
 
            off_t offset)
478
 
{
479
 
        DB          *storage    = NULL;
480
 
        DBT          key        = {0,};
481
 
        DBT          value      = {0,};
482
 
        int32_t      ret        = -1;
483
 
        size_t       copy_size  = 0;
484
 
        char        *key_string = NULL;
485
 
        bdb_cache_t *bcache     = NULL;
486
 
        int32_t      db_flags   = 0;
487
 
        uint8_t      need_break = 0;
488
 
        int32_t      retries    = 1;
489
 
 
490
 
        GF_VALIDATE_OR_GOTO ("bdb-ll", bctx, out);
491
 
        GF_VALIDATE_OR_GOTO ("bdb-ll", path, out);
492
 
 
493
 
        MAKE_KEY_FROM_PATH (key_string, path);
494
 
 
495
 
        if (bctx->cache &&
496
 
            ((bcache = bdb_cache_lookup (bctx, key_string)) != NULL)) {
497
 
                if (buf) {
498
 
                        copy_size = ((bcache->size - offset) < size)?
499
 
                                (bcache->size - offset) : size;
500
 
 
501
 
                        memcpy (buf, (bcache->data + offset), copy_size);
502
 
                        ret = copy_size;
503
 
                } else {
504
 
                        ret = bcache->size;
505
 
                }
506
 
                
507
 
                goto out;
508
 
        } 
509
 
 
510
 
        LOCK (&bctx->lock);
511
 
        {
512
 
                if (bctx->primary == NULL) {
513
 
                        ret = bdb_db_open (bctx);
514
 
                        storage = bctx->primary;
515
 
                } else {
516
 
                        /* we are just fine, lets continue */
517
 
                        storage = bctx->primary;
518
 
                } /* if(bctx->dbp==NULL)...else */
519
 
        }
520
 
        UNLOCK (&bctx->lock);
521
 
 
522
 
        GF_VALIDATE_OR_GOTO ("bdb-ll", storage, out);
523
 
 
524
 
        key.data = (char *)key_string;
525
 
        key.size = strlen (key_string);
526
 
        key.flags = DB_DBT_USERMEM;
527
 
 
528
 
        if (bctx->cache){
529
 
                value.flags = DB_DBT_MALLOC;
530
 
        } else {
531
 
                if (size) {
532
 
                        value.data  = buf;
533
 
                        value.ulen  = size;
534
 
                        value.flags = DB_DBT_USERMEM | DB_DBT_PARTIAL;
535
 
                } else {
536
 
                        value.flags = DB_DBT_MALLOC;
537
 
                }
538
 
                value.dlen = size;
539
 
                value.doff = offset;
540
 
        }
541
 
 
542
 
        do {
543
 
                /* TODO: we prefer to give our own buffer to value.data
544
 
                 * and ask bdb to fill in it */
545
 
                ret = storage->get (storage, txnid, &key, &value,
546
 
                                    db_flags);
547
 
 
548
 
                if (ret == DB_NOTFOUND) {
549
 
                        gf_log ("bdb-ll", GF_LOG_DEBUG,
550
 
                                "_BDB_DB_GET %s - %s: ENOENT"
551
 
                                "(specified key not found in database)",
552
 
                                bctx->directory, key_string);
553
 
                        ret = -1;
554
 
                        need_break = 1;
555
 
                } else if (ret == DB_LOCK_DEADLOCK) {
556
 
                        retries++;
557
 
                        gf_log ("bdb-ll", GF_LOG_DEBUG,
558
 
                                "_BDB_DB_GET %s - %s"
559
 
                                "(deadlock detected, retrying for %d "
560
 
                                "time)",
561
 
                                bctx->directory, key_string, retries);
562
 
                } else if (ret == 0) {
563
 
                        /* successfully read data, lets set everything
564
 
                         * in place and return */
565
 
                        if (bctx->cache) {
566
 
                                if (buf) {
567
 
                                        copy_size = ((value.size - offset) < size) ?
568
 
                                                (value.size - offset) : size;
569
 
 
570
 
                                        memcpy (buf, (value.data + offset),
571
 
                                                copy_size);
572
 
                                        ret = copy_size;
573
 
                                }
574
 
 
575
 
                                bdb_cache_insert (bctx, &key, &value);
576
 
                        } else {
577
 
                                ret = value.size;
578
 
                        }
579
 
 
580
 
                        if (size == 0)
581
 
                                free (value.data);
582
 
 
583
 
                        need_break = 1;
584
 
                } else {
585
 
                        gf_log ("bdb-ll", GF_LOG_DEBUG,
586
 
                                "_BDB_DB_GET %s - %s: %s"
587
 
                                "(failed to retrieve specified key from"
588
 
                                " database)",
589
 
                                bctx->directory, key_string,
590
 
                                db_strerror (ret));
591
 
                        ret = -1;
592
 
                        need_break = 1;
593
 
                }
594
 
        } while (!need_break);
595
 
 
596
 
out:
597
 
        return ret;
598
 
}/* bdb_db_get */
599
 
 
600
 
/* TODO: handle errors here and log. propogate only the errno to caller */
601
 
int32_t
602
 
bdb_db_fread (struct bdb_fd *bfd, char *buf, size_t size, off_t offset)
603
 
{
604
 
        return bdb_db_get (bfd->ctx, NULL, bfd->key, buf, size, offset);
605
 
}
606
 
 
607
 
int32_t
608
 
bdb_db_iread (struct bdb_ctx *bctx, const char *key, char **bufp)
609
 
{
610
 
        char *buf = NULL;
611
 
        size_t size = 0;
612
 
        int64_t ret = 0;
613
 
 
614
 
        ret = bdb_db_get (bctx, NULL, key, NULL, 0, 0);
615
 
        size = ret;
616
 
 
617
 
        if (bufp) {
618
 
                buf = calloc (size, sizeof (char));
619
 
                *bufp = buf;
620
 
                ret = bdb_db_get (bctx, NULL, key, buf, size, 0);
621
 
        }
622
 
 
623
 
        return ret; 
624
 
}
625
 
 
626
 
/* bdb_storage_put - insert a key/value specified to the corresponding DB.
627
 
 *
628
 
 * @bctx: bctx_t * corresponding to the parent directory of @path.
629
 
 *        (should always be a valid bctx). bdb_storage_put should never be
630
 
 *         called if @bctx = NULL.
631
 
 * @txnid: NULL if bdb_storage_put is not embedded in an explicit transaction
632
 
 *         or a valid DB_TXN *, when embedded in an explicit transaction.
633
 
 * @key_string: key of the database entry.
634
 
 * @buf: pointer to the buffer data to be written as data for @key_string.
635
 
 * @size: size of @buf.
636
 
 * @offset: offset in the key's data to be modified with provided data.
637
 
 * @flags: valid flags are BDB_TRUNCATE_RECORD (to reduce the data of
638
 
 *         @key_string to 0 size).
639
 
 *
640
 
 * NOTE: bdb_storage_put tries to open DB, if @bctx->dbp == NULL
641
 
 *      (@bctx->dbp == NULL, nobody has opened DB till now or DB was closed by
642
 
 *       bdb_table_prune()).
643
 
 *
644
 
 * NOTE: bdb_storage_put deletes the key/value from bdb xlator's internal cache.
645
 
 *
646
 
 * return: 0 on success or -1 on error.
647
 
 *
648
 
 * also see: bdb_cache_delete for details on how a cached key/value pair is
649
 
 * removed.
650
 
 */
651
 
static int32_t
652
 
bdb_db_put (bctx_t *bctx,
653
 
            DB_TXN *txnid,
654
 
            const char *key_string,
655
 
            const char *buf,
656
 
            size_t size,
657
 
            off_t offset,
658
 
            int32_t flags)
659
 
{
660
 
        DB     *storage = NULL;
661
 
        DBT     key = {0,}, value = {0,};
662
 
        int32_t ret = -1;
663
 
        int32_t db_flags = DB_AUTO_COMMIT;
664
 
        uint8_t need_break = 0;
665
 
        int32_t retries = 1;
666
 
 
667
 
        LOCK (&bctx->lock);
668
 
        {
669
 
                if (bctx->primary == NULL) {
670
 
                        ret = bdb_db_open (bctx);
671
 
                        storage = bctx->primary;
672
 
                } else {
673
 
                        /* we are just fine, lets continue */
674
 
                        storage = bctx->primary;
675
 
                }
676
 
        }
677
 
        UNLOCK (&bctx->lock);
678
 
 
679
 
        GF_VALIDATE_OR_GOTO ("bdb-ll", storage, out);
680
 
 
681
 
        if (bctx->cache) {
682
 
                ret = bdb_cache_delete (bctx, (char *)key_string);
683
 
                GF_VALIDATE_OR_GOTO ("bdb-ll", (ret == 0), out);
684
 
        }
685
 
 
686
 
        key.data = (void *)key_string;
687
 
        key.size = strlen (key_string);
688
 
 
689
 
        /* NOTE: bdb lets us expand the file, suppose value.size > value.len,
690
 
         * then value.len bytes from value.doff offset and value.size bytes
691
 
         * will be written from value.doff and data from
692
 
         * value.doff + value.dlen will be pushed value.doff + value.size
693
 
         */
694
 
        value.data = (void *)buf;
695
 
 
696
 
        if (flags & BDB_TRUNCATE_RECORD) {
697
 
                value.size = size;
698
 
                value.doff = 0;
699
 
                value.dlen = offset;
700
 
        } else {
701
 
                value.size = size;
702
 
                value.dlen = size;
703
 
                value.doff = offset;
704
 
        }
705
 
        value.flags = DB_DBT_PARTIAL;
706
 
        if (buf == NULL && size == 0)
707
 
                /* truncate called us */
708
 
                value.flags = 0;
709
 
 
710
 
        do {
711
 
                ret = storage->put (storage, txnid, &key, &value, db_flags);
712
 
                if (ret == DB_LOCK_DEADLOCK) {
713
 
                        retries++;
714
 
                        gf_log ("bdb-ll", GF_LOG_DEBUG,
715
 
                                "_BDB_DB_PUT %s - %s"
716
 
                                "(deadlock detected, retying for %d time)",
717
 
                                bctx->directory, key_string, retries);
718
 
                } else if (ret) {
719
 
                        /* write failed */
720
 
                        gf_log ("bdb-ll", GF_LOG_DEBUG,
721
 
                                "_BDB_DB_PUT %s - %s: %s"
722
 
                                "(failed to put specified entry into database)",
723
 
                                bctx->directory, key_string, db_strerror (ret));
724
 
                        need_break = 1;
725
 
                } else {
726
 
                        /* successfully wrote */
727
 
                        ret = 0;
728
 
                        need_break = 1;
729
 
                }
730
 
        } while (!need_break);
731
 
out:
732
 
        return ret;
733
 
}/* bdb_db_put */
734
 
 
735
 
int32_t
736
 
bdb_db_icreate (struct bdb_ctx *bctx, const char *key)
737
 
{
738
 
        return bdb_db_put (bctx, NULL, key, NULL, 0, 0, 0);
739
 
}
740
 
 
741
 
/* TODO: handle errors here and log. propogate only the errno to caller */
742
 
int32_t
743
 
bdb_db_fwrite (struct bdb_fd *bfd, char *buf, size_t size, off_t offset)
744
 
{
745
 
        return bdb_db_put (bfd->ctx, NULL, bfd->key, buf, size, offset, 0);
746
 
}
747
 
 
748
 
/* TODO: handle errors here and log. propogate only the errno to caller */
749
 
int32_t
750
 
bdb_db_iwrite (struct bdb_ctx *bctx, const char *key, char *buf, size_t size)
751
 
{
752
 
        return bdb_db_put (bctx, NULL, key, buf, size, 0, 0);
753
 
}
754
 
 
755
 
int32_t
756
 
bdb_db_itruncate (struct bdb_ctx *bctx, const char *key)
757
 
{
758
 
        return bdb_db_put (bctx, NULL, key, NULL, 0, 1, 0);
759
 
}
760
 
 
761
 
/* bdb_storage_del - delete a key/value pair corresponding to @path from
762
 
 *  corresponding db file.
763
 
 *
764
 
 * @bctx: bctx_t * corresponding to the parent directory of @path.
765
 
 *       (should always be a valid bctx). bdb_storage_del should never be called
766
 
 *       if @bctx = NULL.
767
 
 * @txnid: NULL if bdb_storage_del is not embedded in an explicit transaction
768
 
 *   or a valid DB_TXN *, when embedded in an explicit transaction.
769
 
 * @path: path to the file, whose key/value pair has to be deleted.
770
 
 *
771
 
 * NOTE: bdb_storage_del tries to open DB, if @bctx->dbp == NULL
772
 
 *  (@bctx->dbp == NULL, nobody has opened DB till now or DB was closed by
773
 
 *  bdb_table_prune()).
774
 
 *
775
 
 * return: 0 on success or -1 on error.
776
 
 */
777
 
static int32_t
778
 
bdb_db_del (bctx_t *bctx,
779
 
            DB_TXN *txnid,
780
 
            const char *key_string)
781
 
{
782
 
        DB     *storage    = NULL;
783
 
        DBT     key        = {0,};
784
 
        int32_t ret        = -1;
785
 
        int32_t db_flags   = 0;
786
 
        uint8_t need_break = 0;
787
 
        int32_t retries    = 1;
788
 
 
789
 
        LOCK (&bctx->lock);
790
 
        {
791
 
                if (bctx->primary == NULL) {
792
 
                        ret = bdb_db_open (bctx);
793
 
                        storage = bctx->primary;
794
 
                } else {
795
 
                        /* we are just fine, lets continue */
796
 
                        storage = bctx->primary;
797
 
                }
798
 
        }
799
 
        UNLOCK (&bctx->lock);
800
 
 
801
 
        GF_VALIDATE_OR_GOTO ("bdb-ll", storage, out);
802
 
 
803
 
        ret = bdb_cache_delete (bctx, key_string);
804
 
        GF_VALIDATE_OR_GOTO ("bdb-ll", (ret == 0), out);
805
 
 
806
 
        key.data = (char *)key_string;
807
 
        key.size = strlen (key_string);
808
 
        key.flags = DB_DBT_USERMEM;
809
 
 
810
 
        do {
811
 
                ret = storage->del (storage, txnid, &key, db_flags);
812
 
 
813
 
                if (ret == DB_NOTFOUND) {
814
 
                        gf_log ("bdb-ll", GF_LOG_DEBUG,
815
 
                                "_BDB_DB_DEL %s - %s: ENOENT"
816
 
                                "(failed to delete entry, could not be "
817
 
                                "found in the database)",
818
 
                                bctx->directory, key_string);
819
 
                        need_break = 1;
820
 
                } else if (ret == DB_LOCK_DEADLOCK) {
821
 
                        retries++;
822
 
                        gf_log ("bdb-ll", GF_LOG_DEBUG,
823
 
                                "_BDB_DB_DEL %s - %s"
824
 
                                "(deadlock detected, retying for %d time)",
825
 
                                bctx->directory, key_string, retries);
826
 
                } else if (ret == 0) {
827
 
                        /* successfully deleted the entry */
828
 
                        gf_log ("bdb-ll", GF_LOG_DEBUG,
829
 
                                "_BDB_DB_DEL %s - %s"
830
 
                                "(successfully deleted entry from database)",
831
 
                                bctx->directory, key_string);
832
 
                        ret = 0;
833
 
                        need_break = 1;
834
 
                } else {
835
 
                        gf_log ("bdb-ll", GF_LOG_DEBUG,
836
 
                                "_BDB_DB_DEL %s - %s: %s"
837
 
                                "(failed to delete entry from database)",
838
 
                                bctx->directory, key_string, db_strerror (ret));
839
 
                        ret = -1;
840
 
                        need_break = 1;
841
 
                }
842
 
        } while (!need_break);
843
 
out:
844
 
        return ret;
845
 
}
846
 
 
847
 
int32_t
848
 
bdb_db_iremove (bctx_t *bctx,
849
 
                const char *key)
850
 
{
851
 
        return bdb_db_del (bctx, NULL, key);
852
 
}
853
 
 
854
 
/* NOTE: bdb version compatibility wrapper */
855
 
int32_t
856
 
bdb_cursor_get (DBC *cursorp,
857
 
                DBT *sec, DBT *pri,
858
 
                DBT *val,
859
 
                int32_t flags)
860
 
{
861
 
        int32_t ret = -1;
862
 
 
863
 
        GF_VALIDATE_OR_GOTO ("bdb-ll", cursorp, out);
864
 
 
865
 
#ifdef HAVE_BDB_CURSOR_GET
866
 
        ret = cursorp->pget (cursorp, sec, pri, val, flags);
867
 
#else
868
 
        ret = cursorp->c_pget (cursorp, sec, pri, val, flags);
869
 
#endif
870
 
        if ((ret != 0)  && (ret != DB_NOTFOUND)) {
871
 
                gf_log ("bdb-ll", GF_LOG_DEBUG,
872
 
                        "_BDB_CURSOR_GET: %s"
873
 
                        "(failed to retrieve entry from database cursor)",
874
 
                        db_strerror (ret));
875
 
        }
876
 
 
877
 
out:
878
 
        return ret;
879
 
}/* bdb_cursor_get */
880
 
 
881
 
int32_t
882
 
bdb_dirent_size (DBT *key)
883
 
{
884
 
        return ALIGN (24 /* FIX MEEEE!!! */ + key->size);
885
 
}
886
 
 
887
 
 
888
 
 
889
 
/* bdb_dbenv_init - initialize DB_ENV
890
 
 *
891
 
 *  initialization includes:
892
 
 *   1. opening DB_ENV (db_env_create(), DB_ENV->open()).
893
 
 *      NOTE: see private->envflags for flags used.
894
 
 *   2. DB_ENV->set_lg_dir - set log directory to be used for storing log files
895
 
 *     (log files are the files in which transaction logs are written by db).
896
 
 *   3. DB_ENV->set_flags (DB_LOG_AUTOREMOVE) - set DB_ENV to automatically
897
 
 *      clear the unwanted log files (flushed at each checkpoint).
898
 
 *   4. DB_ENV->set_errfile - set errfile to be used by db to report detailed
899
 
 *      error logs. used only for debbuging purpose.
900
 
 *
901
 
 * return: returns a valid DB_ENV * on success or NULL on error.
902
 
 *
903
 
 */
904
 
static DB_ENV *
905
 
bdb_dbenv_init (xlator_t *this,
906
 
                char *directory)
907
 
{
908
 
        /* Create a DB environment */
909
 
        DB_ENV        *dbenv       = NULL;
910
 
        int32_t        ret         = 0;
911
 
        bdb_private_t *private     = NULL;
912
 
        int32_t        fatal_flags = 0;
913
 
 
914
 
        VALIDATE_OR_GOTO (this, err);
915
 
        VALIDATE_OR_GOTO (directory, err);
916
 
 
917
 
        private = this->private;
918
 
        VALIDATE_OR_GOTO (private, err);
919
 
 
920
 
        ret = db_env_create (&dbenv, 0);
921
 
        VALIDATE_OR_GOTO ((ret == 0), err);
922
 
 
923
 
        /* NOTE: set_errpfx returns 'void' */
924
 
        dbenv->set_errpfx(dbenv, this->name);
925
 
 
926
 
        ret = dbenv->set_lk_detect (dbenv, DB_LOCK_DEFAULT);
927
 
        VALIDATE_OR_GOTO ((ret == 0), err);
928
 
 
929
 
        ret = dbenv->open(dbenv, directory,
930
 
                          private->envflags,
931
 
                          S_IRUSR | S_IWUSR);
932
 
        if ((ret != 0) && (ret != DB_RUNRECOVERY)) {
933
 
                gf_log (this->name, GF_LOG_CRITICAL,
934
 
                        "failed to join Berkeley DB environment at %s: %s."
935
 
                        "please run manual recovery and retry running "
936
 
                        "glusterfs",
937
 
                        directory, db_strerror (ret));
938
 
                dbenv = NULL;
939
 
                goto err;
940
 
        } else if (ret == DB_RUNRECOVERY) {
941
 
                fatal_flags = ((private->envflags & (~DB_RECOVER))
942
 
                               | DB_RECOVER_FATAL);
943
 
                ret = dbenv->open(dbenv, directory, fatal_flags,
944
 
                                  S_IRUSR | S_IWUSR);
945
 
                if (ret != 0) {
946
 
                        gf_log (this->name, GF_LOG_CRITICAL,
947
 
                                "failed to join Berkeley DB environment in "
948
 
                                "recovery mode at %s: %s. please run manual "
949
 
                                "recovery and retry running glusterfs",
950
 
                                directory, db_strerror (ret));
951
 
                        dbenv = NULL;
952
 
                        goto err;
953
 
                }
954
 
        }
955
 
 
956
 
        ret = 0;
957
 
 
958
 
#if ((DB_VERSION_MAJOR > 4) || \
959
 
    (DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 7))
960
 
        if (private->log_auto_remove) {
961
 
                ret = dbenv->log_set_config (dbenv, DB_LOG_AUTO_REMOVE, 1);
962
 
        } else {
963
 
                ret = dbenv->log_set_config (dbenv, DB_LOG_AUTO_REMOVE, 0);
964
 
        }
965
 
#else
966
 
        if (private->log_auto_remove) {
967
 
                ret = dbenv->set_flags (dbenv, DB_LOG_AUTOREMOVE, 1);
968
 
        } else {
969
 
                ret = dbenv->set_flags (dbenv, DB_LOG_AUTOREMOVE, 0);
970
 
        }
971
 
#endif
972
 
        if (ret < 0) {
973
 
                gf_log ("bdb-ll", GF_LOG_ERROR,
974
 
                        "autoremoval of transactional log files could not be "
975
 
                        "configured (%s). you may have to do a manual "
976
 
                        "monitoring of transactional log files and remove "
977
 
                        "periodically.",
978
 
                        db_strerror (ret));
979
 
                goto err;
980
 
        }
981
 
 
982
 
        if (private->transaction) {
983
 
                ret = dbenv->set_flags(dbenv, DB_AUTO_COMMIT, 1);
984
 
 
985
 
                if (ret != 0) {
986
 
                        gf_log ("bdb-ll", GF_LOG_DEBUG,
987
 
                                "configuration of auto-commit failed for "
988
 
                                "database environment at %s. none of the "
989
 
                                "operations will be embedded in transaction "
990
 
                                "unless explicitly done so.",
991
 
                                db_strerror (ret));
992
 
                        goto err;
993
 
                }
994
 
 
995
 
                if (private->txn_timeout) {
996
 
                        ret = dbenv->set_timeout (dbenv, private->txn_timeout,
997
 
                                                  DB_SET_TXN_TIMEOUT);
998
 
                        if (ret != 0) {
999
 
                                gf_log ("bdb-ll", GF_LOG_ERROR,
1000
 
                                        "could not configure Berkeley DB "
1001
 
                                        "transaction timeout to %d (%s). please"
1002
 
                                        " review 'option transaction-timeout %d"
1003
 
                                        "' option.",
1004
 
                                        private->txn_timeout,
1005
 
                                        db_strerror (ret),
1006
 
                                        private->txn_timeout);
1007
 
                                goto err;
1008
 
                        }
1009
 
                }
1010
 
 
1011
 
                if (private->lock_timeout) {
1012
 
                        ret = dbenv->set_timeout(dbenv,
1013
 
                                                 private->txn_timeout,
1014
 
                                                 DB_SET_LOCK_TIMEOUT);
1015
 
                        if (ret < 0) {
1016
 
                                gf_log ("bdb-ll", GF_LOG_ERROR,
1017
 
                                        "could not configure Berkeley DB "
1018
 
                                        "lock timeout to %d (%s). please"
1019
 
                                        " review 'option lock-timeout %d"
1020
 
                                        "' option.",
1021
 
                                        private->lock_timeout,
1022
 
                                        db_strerror (ret),
1023
 
                                        private->lock_timeout);
1024
 
                                goto err;
1025
 
                        }
1026
 
                }
1027
 
 
1028
 
                ret = dbenv->set_lg_dir (dbenv, private->logdir);
1029
 
                if (ret < 0) {
1030
 
                        gf_log ("bdb-ll", GF_LOG_ERROR,
1031
 
                                "failed to configure libdb transaction log "
1032
 
                                "directory at %s. please review the "
1033
 
                                "'option logdir %s' option.",
1034
 
                                db_strerror (ret), private->logdir);
1035
 
                        goto err;
1036
 
                }
1037
 
        }
1038
 
 
1039
 
        if (private->errfile) {
1040
 
                private->errfp = fopen (private->errfile, "a+");
1041
 
                if (private->errfp) {
1042
 
                        dbenv->set_errfile (dbenv, private->errfp);
1043
 
                } else {
1044
 
                        gf_log ("bdb-ll", GF_LOG_ERROR,
1045
 
                                "failed to open error logging file for "
1046
 
                                "libdb (Berkeley DB) internal logging (%s)."
1047
 
                                "please review the 'option errfile %s' option.",
1048
 
                                strerror (errno), private->errfile);
1049
 
                        goto err;
1050
 
                }
1051
 
        }
1052
 
 
1053
 
        return dbenv;
1054
 
err:
1055
 
        if (dbenv) {
1056
 
                dbenv->close (dbenv, 0);
1057
 
        }
1058
 
 
1059
 
        return NULL;
1060
 
}
1061
 
 
1062
 
#define BDB_ENV(this) ((((struct bdb_private *)this->private)->b_table)->dbenv)
1063
 
 
1064
 
/* bdb_checkpoint - during transactional usage, db does not directly write the
1065
 
 *  data to db files, instead db writes a 'log' (similar to a journal entry)
1066
 
 *  into a log file. db normally clears the log files during opening of an
1067
 
 *  environment. since we expect a filesystem server to run for a pretty long
1068
 
 *  duration and flushing 'log's during dbenv->open would prove very costly, if
1069
 
 *  we accumulate the log entries for one complete run of glusterfs server. to
1070
 
 *  flush the logs frequently, db provides a mechanism called 'checkpointing'.
1071
 
 *  when we do a checkpoint, db flushes the logs to disk (writes changes to db
1072
 
 *  files) and we can also clear the accumulated log files after checkpointing.
1073
 
 *  NOTE: removing unwanted log files is not part of dbenv->txn_checkpoint()
1074
 
 *  call.
1075
 
 *
1076
 
 * @data: xlator_t of the current instance of bdb xlator.
1077
 
 *
1078
 
 *  bdb_checkpoint is called in a different thread from the main glusterfs
1079
 
 *  thread. bdb xlator creates the checkpoint thread after successfully opening
1080
 
 *  the db environment.
1081
 
 *  NOTE: bdb_checkpoint thread shares the DB_ENV handle with the filesystem
1082
 
 *  thread.
1083
 
 *
1084
 
 *  db environment checkpointing frequency is controlled by
1085
 
 *  'option checkpoint-timeout <time-in-seconds>' in volfile.
1086
 
 *
1087
 
 * NOTE: checkpointing thread is started only if 'option transaction on'
1088
 
 *      specified in volfile. checkpointing is not valid for non-transactional
1089
 
 *      environments.
1090
 
 *
1091
 
 */
1092
 
static void *
1093
 
bdb_checkpoint (void *data)
1094
 
{
1095
 
        xlator_t *this = NULL;
1096
 
        struct bdb_private *private = NULL;
1097
 
        DB_ENV *dbenv = NULL;
1098
 
        int32_t ret = 0;
1099
 
        uint32_t active = 0;
1100
 
 
1101
 
        this = (xlator_t *) data;
1102
 
        dbenv = BDB_ENV(this);
1103
 
        private = this->private;
1104
 
 
1105
 
        for (;;sleep (private->checkpoint_interval)) {
1106
 
                LOCK (&private->active_lock);
1107
 
                active = private->active;
1108
 
                UNLOCK (&private->active_lock);
1109
 
 
1110
 
                if (active) {
1111
 
                        ret = dbenv->txn_checkpoint (dbenv, 1024, 0, 0);
1112
 
                        if (ret) {
1113
 
                                gf_log ("bdb-ll", GF_LOG_DEBUG,
1114
 
                                        "_BDB_CHECKPOINT: %s"
1115
 
                                        "(failed to checkpoint environment)",
1116
 
                                        db_strerror (ret));
1117
 
                        } else {
1118
 
                                gf_log ("bdb-ll", GF_LOG_DEBUG,
1119
 
                                        "_BDB_CHECKPOINT: successfully "
1120
 
                                        "checkpointed");
1121
 
                        }
1122
 
                } else {
1123
 
                        ret = dbenv->txn_checkpoint (dbenv, 1024, 0, 0);
1124
 
                        if (ret) {
1125
 
                                gf_log ("bdb-ll", GF_LOG_ERROR,
1126
 
                                        "_BDB_CHECKPOINT: %s"
1127
 
                                        "(final checkpointing failed. might "
1128
 
                                        "need to run recovery tool manually on "
1129
 
                                        "next usage of this database "
1130
 
                                        "environment)",
1131
 
                                        db_strerror (ret));
1132
 
                        } else {
1133
 
                                gf_log ("bdb-ll", GF_LOG_DEBUG,
1134
 
                                        "_BDB_CHECKPOINT: final successfully "
1135
 
                                        "checkpointed");
1136
 
                        }
1137
 
                        break;
1138
 
                }
1139
 
        }
1140
 
 
1141
 
        return NULL;
1142
 
}
1143
 
 
1144
 
 
1145
 
/* bdb_db_init - initialize bdb xlator
1146
 
 *
1147
 
 * reads the options from @options dictionary and sets appropriate values in
1148
 
 * @this->private. also initializes DB_ENV.
1149
 
 *
1150
 
 * return: 0 on success or -1 on error
1151
 
 * (with logging the error through gf_log()).
1152
 
 */
1153
 
int
1154
 
bdb_db_init (xlator_t *this,
1155
 
             dict_t *options)
1156
 
{
1157
 
        /* create a db entry for root */
1158
 
        int32_t        op_ret  = 0;
1159
 
        bdb_private_t *private = NULL;
1160
 
        bctx_table_t  *table = NULL;
1161
 
 
1162
 
        char *checkpoint_interval_str = NULL;
1163
 
        char *page_size_str           = NULL;
1164
 
        char *lru_limit_str           = NULL;
1165
 
        char *timeout_str             = NULL;
1166
 
        char *access_mode             = NULL;
1167
 
        char *endptr    = NULL;
1168
 
        char *errfile   = NULL;
1169
 
        char *directory = NULL;
1170
 
        char *logdir    = NULL;
1171
 
        char *mode      = NULL;
1172
 
        char *mode_str  = NULL;
1173
 
        int   ret = -1;
1174
 
        int   idx = 0;
1175
 
        struct stat stbuf = {0,};
1176
 
 
1177
 
        private = this->private;
1178
 
 
1179
 
        /* cache is always on */
1180
 
        private->cache = ON;
1181
 
 
1182
 
        ret = dict_get_str (options, "access-mode", &access_mode);
1183
 
        if ((ret == 0)
1184
 
            && (!strcmp (access_mode, "btree"))) {
1185
 
                gf_log (this->name, GF_LOG_DEBUG,
1186
 
                        "using BTREE access mode to access libdb "
1187
 
                        "(Berkeley DB)");
1188
 
                private->access_mode = DB_BTREE;
1189
 
        } else {
1190
 
                gf_log (this->name, GF_LOG_DEBUG,
1191
 
                        "using HASH access mode to access libdb (Berkeley DB)");
1192
 
                private->access_mode = DB_HASH;
1193
 
        }
1194
 
 
1195
 
        ret = dict_get_str (options, "mode", &mode);
1196
 
        if ((ret == 0)
1197
 
            && (!strcmp (mode, "cache"))) {
1198
 
                gf_log (this->name, GF_LOG_DEBUG,
1199
 
                        "cache data mode selected for 'storage/bdb'. filesystem"
1200
 
                        " operations are not transactionally protected and "
1201
 
                        "system crash does not guarantee recoverability of "
1202
 
                        "data");
1203
 
                private->envflags = DB_CREATE | DB_INIT_LOG |
1204
 
                        DB_INIT_MPOOL | DB_THREAD;
1205
 
                private->dbflags = DB_CREATE | DB_THREAD;
1206
 
                private->transaction = OFF;
1207
 
        } else {
1208
 
                gf_log (this->name, GF_LOG_DEBUG,
1209
 
                        "persistent data mode selected for 'storage/bdb'. each"
1210
 
                        "filesystem operation is guaranteed to be Berkeley DB "
1211
 
                        "transaction protected.");
1212
 
                private->transaction = ON;
1213
 
                private->envflags = DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG |
1214
 
                        DB_INIT_MPOOL | DB_INIT_TXN | DB_RECOVER | DB_THREAD;
1215
 
                private->dbflags = DB_CREATE | DB_THREAD;
1216
 
 
1217
 
 
1218
 
                ret = dict_get_str (options, "lock-timeout", &timeout_str);
1219
 
 
1220
 
                if (ret == 0) {
1221
 
                        ret = gf_string2time (timeout_str,
1222
 
                                              &private->lock_timeout);
1223
 
 
1224
 
                        if (private->lock_timeout > 4260000) {
1225
 
                                /* db allows us to DB_SET_LOCK_TIMEOUT to be
1226
 
                                 * set to a maximum of 71 mins
1227
 
                                 * (4260000 milliseconds) */
1228
 
                                gf_log (this->name, GF_LOG_DEBUG,
1229
 
                                        "Berkeley DB lock-timeout parameter "
1230
 
                                        "(%d) is out of range. please specify"
1231
 
                                        " a valid timeout value for "
1232
 
                                        "lock-timeout and retry.",
1233
 
                                        private->lock_timeout);
1234
 
                                goto err;
1235
 
                        }
1236
 
                }
1237
 
                ret = dict_get_str (options, "transaction-timeout",
1238
 
                                    &timeout_str);
1239
 
                if (ret == 0) {
1240
 
                        ret = gf_string2time (timeout_str,
1241
 
                                              &private->txn_timeout);
1242
 
 
1243
 
                        if (private->txn_timeout > 4260000) {
1244
 
                                /* db allows us to DB_SET_TXN_TIMEOUT to be set
1245
 
                                 * to a maximum of 71 mins
1246
 
                                 * (4260000 milliseconds) */
1247
 
                                gf_log (this->name, GF_LOG_DEBUG,
1248
 
                                        "Berkeley DB lock-timeout parameter "
1249
 
                                        "(%d) is out of range. please specify"
1250
 
                                        " a valid timeout value for "
1251
 
                                        "lock-timeout and retry.",
1252
 
                                        private->lock_timeout);
1253
 
                                goto err;
1254
 
                        }
1255
 
                }
1256
 
 
1257
 
                private->checkpoint_interval = BDB_DEFAULT_CHECKPOINT_INTERVAL;
1258
 
                ret = dict_get_str (options, "checkpoint-interval",
1259
 
                                    &checkpoint_interval_str);
1260
 
                if (ret == 0) {
1261
 
                        ret = gf_string2time (checkpoint_interval_str,
1262
 
                                              &private->checkpoint_interval);
1263
 
 
1264
 
                        if (ret < 0) {
1265
 
                                gf_log (this->name, GF_LOG_DEBUG,
1266
 
                                        "'%"PRIu32"' is not a valid parameter "
1267
 
                                        "for checkpoint-interval option. "
1268
 
                                        "please specify a valid "
1269
 
                                        "checkpoint-interval and retry",
1270
 
                                        private->checkpoint_interval);
1271
 
                                goto err;
1272
 
                        }
1273
 
                }
1274
 
        }
1275
 
 
1276
 
        ret = dict_get_str (options, "file-mode", &mode_str);
1277
 
        if (ret == 0) {
1278
 
                private->file_mode = strtol (mode_str, &endptr, 8);
1279
 
 
1280
 
                if ((*endptr) ||
1281
 
                    (!IS_VALID_FILE_MODE(private->file_mode))) {
1282
 
                        gf_log (this->name, GF_LOG_DEBUG,
1283
 
                                "'%o' is not a valid parameter for file-mode "
1284
 
                                "option. please specify a valid parameter for "
1285
 
                                "file-mode and retry.",
1286
 
                                private->file_mode);
1287
 
                        goto err;
1288
 
                }
1289
 
        } else {
1290
 
                private->file_mode = DEFAULT_FILE_MODE;
1291
 
        }
1292
 
        private->symlink_mode = private->file_mode | S_IFLNK;
1293
 
        private->file_mode = private->file_mode | S_IFREG;
1294
 
 
1295
 
        ret = dict_get_str (options, "dir-mode", &mode_str);
1296
 
        if (ret == 0) {
1297
 
                private->dir_mode = strtol (mode_str, &endptr, 8);
1298
 
                if ((*endptr) ||
1299
 
                    (!IS_VALID_FILE_MODE(private->dir_mode))) {
1300
 
                        gf_log (this->name, GF_LOG_DEBUG,
1301
 
                                "'%o' is not a valid parameter for dir-mode "
1302
 
                                "option. please specify a valid parameter for "
1303
 
                                "dir-mode and retry.",
1304
 
                                private->dir_mode);
1305
 
                        goto err;
1306
 
                }
1307
 
        } else {
1308
 
                private->dir_mode = DEFAULT_DIR_MODE;
1309
 
        }
1310
 
 
1311
 
        private->dir_mode = private->dir_mode | S_IFDIR;
1312
 
 
1313
 
        table = CALLOC (1, sizeof (*table));
1314
 
        if (table == NULL) {
1315
 
                gf_log ("bdb-ll", GF_LOG_CRITICAL,
1316
 
                        "memory allocation for 'storage/bdb' internal "
1317
 
                        "context table failed.");
1318
 
                goto err;
1319
 
        }
1320
 
 
1321
 
        INIT_LIST_HEAD(&(table->b_lru));
1322
 
        INIT_LIST_HEAD(&(table->active));
1323
 
        INIT_LIST_HEAD(&(table->purge));
1324
 
 
1325
 
        LOCK_INIT (&table->lock);
1326
 
        LOCK_INIT (&table->checkpoint_lock);
1327
 
 
1328
 
        table->transaction = private->transaction;
1329
 
        table->access_mode = private->access_mode;
1330
 
        table->dbflags = private->dbflags;
1331
 
        table->this    = this;
1332
 
 
1333
 
        ret = dict_get_str (options, "lru-limit",
1334
 
                            &lru_limit_str);
1335
 
 
1336
 
        /* TODO: set max lockers and max txns to accomodate
1337
 
         * for more than lru_limit */
1338
 
        if (ret == 0) {
1339
 
                ret = gf_string2uint32 (lru_limit_str,
1340
 
                                        &table->lru_limit);
1341
 
                gf_log ("bdb-ll", GF_LOG_DEBUG,
1342
 
                        "setting lru limit of 'storage/bdb' internal context"
1343
 
                        "table to %d. maximum of %d unused databases can be "
1344
 
                        "open at any given point of time.",
1345
 
                        table->lru_limit, table->lru_limit);
1346
 
        } else {
1347
 
                table->lru_limit = BDB_DEFAULT_LRU_LIMIT;
1348
 
        }
1349
 
 
1350
 
        ret = dict_get_str (options, "page-size",
1351
 
                            &page_size_str);
1352
 
 
1353
 
        if (ret == 0) {
1354
 
                ret = gf_string2bytesize (page_size_str,
1355
 
                                          &table->page_size);
1356
 
                if (ret < 0) {
1357
 
                        gf_log ("bdb-ll", GF_LOG_ERROR,
1358
 
                                "\"%s\" is an invalid parameter to "
1359
 
                                "\"option page-size\". please specify a valid "
1360
 
                                "size and retry.",
1361
 
                                page_size_str);
1362
 
                        goto err;
1363
 
                }
1364
 
 
1365
 
                if (!PAGE_SIZE_IN_RANGE(table->page_size)) {
1366
 
                        gf_log ("bdb-ll", GF_LOG_ERROR,
1367
 
                                "\"%s\" is out of range for Berkeley DB "
1368
 
                                "page-size. allowed page-size range is %d to "
1369
 
                                "%d. please specify a page-size value in the "
1370
 
                                "range and retry.",
1371
 
                                page_size_str, BDB_LL_PAGE_SIZE_MIN,
1372
 
                                BDB_LL_PAGE_SIZE_MAX);
1373
 
                        goto err;
1374
 
                }
1375
 
        } else {
1376
 
                table->page_size = BDB_LL_PAGE_SIZE_DEFAULT;
1377
 
        }
1378
 
 
1379
 
        table->hash_size = BDB_DEFAULT_HASH_SIZE;
1380
 
        table->b_hash = CALLOC (BDB_DEFAULT_HASH_SIZE,
1381
 
                                sizeof (struct list_head));
1382
 
 
1383
 
        for (idx = 0; idx < table->hash_size; idx++)
1384
 
                INIT_LIST_HEAD(&(table->b_hash[idx]));
1385
 
 
1386
 
        private->b_table = table;
1387
 
 
1388
 
        ret = dict_get_str (options, "errfile", &errfile);
1389
 
        if (ret == 0) {
1390
 
                private->errfile = strdup (errfile);
1391
 
                gf_log (this->name, GF_LOG_DEBUG,
1392
 
                        "using %s as error logging file for libdb (Berkeley DB "
1393
 
                        "library) internal logging.", private->errfile);
1394
 
        }
1395
 
 
1396
 
        ret = dict_get_str (options, "directory", &directory);
1397
 
 
1398
 
        if (ret == 0) {
1399
 
                ret = dict_get_str (options, "logdir", &logdir);
1400
 
 
1401
 
                if (ret < 0) {
1402
 
                        gf_log ("bdb-ll", GF_LOG_DEBUG,
1403
 
                                "using the database environment home "
1404
 
                                "directory (%s) itself as transaction log "
1405
 
                                "directory", directory);
1406
 
                        private->logdir = strdup (directory);
1407
 
 
1408
 
                } else {
1409
 
                        private->logdir = strdup (logdir);
1410
 
 
1411
 
                        op_ret = stat (private->logdir, &stbuf);
1412
 
                        if ((op_ret != 0)
1413
 
                            || (!S_ISDIR (stbuf.st_mode))) {
1414
 
                                gf_log ("bdb-ll", GF_LOG_ERROR,
1415
 
                                        "specified logdir %s does not exist. "
1416
 
                                        "please provide a valid existing "
1417
 
                                        "directory as parameter to 'option "
1418
 
                                        "logdir'",
1419
 
                                        private->logdir);
1420
 
                                goto err;
1421
 
                        }
1422
 
                }
1423
 
 
1424
 
                private->b_table->dbenv = bdb_dbenv_init (this, directory);
1425
 
                if (private->b_table->dbenv == NULL) {
1426
 
                        gf_log ("bdb-ll", GF_LOG_ERROR,
1427
 
                                "initialization of database environment "
1428
 
                                "failed");
1429
 
                        goto err;
1430
 
                } else {
1431
 
                        if (private->transaction) {
1432
 
                                /* all well, start the checkpointing thread */
1433
 
                                LOCK_INIT (&private->active_lock);
1434
 
 
1435
 
                                LOCK (&private->active_lock);
1436
 
                                {
1437
 
                                        private->active = 1;
1438
 
                                }
1439
 
                                UNLOCK (&private->active_lock);
1440
 
                                pthread_create (&private->checkpoint_thread,
1441
 
                                                NULL, bdb_checkpoint, this);
1442
 
                        }
1443
 
                }
1444
 
        }
1445
 
 
1446
 
        return op_ret;
1447
 
err:
1448
 
        if (table) {
1449
 
                FREE (table->b_hash);
1450
 
                FREE (table);
1451
 
        }
1452
 
        if (private) {
1453
 
                if (private->errfile)
1454
 
                        FREE (private->errfile);
1455
 
 
1456
 
                if (private->logdir)
1457
 
                        FREE (private->logdir);
1458
 
        }
1459
 
 
1460
 
        return -1;
1461
 
}