~zulcss/samba/server-dailies-3.0.37

« back to all changes in this revision

Viewing changes to source/tdb/common/transaction.c

  • Committer: Chuck Short
  • Date: 2010-09-28 20:24:01 UTC
  • Revision ID: zulcss@ubuntu.com-20100928202401-tgh438aoatxv3zp3
Initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
 /* 
 
2
   Unix SMB/CIFS implementation.
 
3
 
 
4
   trivial database library
 
5
 
 
6
   Copyright (C) Andrew Tridgell              2005
 
7
 
 
8
     ** NOTE! The following LGPL license applies to the tdb
 
9
     ** library. This does NOT imply that all of Samba is released
 
10
     ** under the LGPL
 
11
   
 
12
   This library is free software; you can redistribute it and/or
 
13
   modify it under the terms of the GNU Lesser General Public
 
14
   License as published by the Free Software Foundation; either
 
15
   version 2 of the License, or (at your option) any later version.
 
16
 
 
17
   This library is distributed in the hope that it will be useful,
 
18
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
19
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
20
   Lesser General Public License for more details.
 
21
 
 
22
   You should have received a copy of the GNU Lesser General Public
 
23
   License along with this library; if not, write to the Free Software
 
24
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
25
*/
 
26
 
 
27
#include "tdb_private.h"
 
28
 
 
29
/*
 
30
  transaction design:
 
31
 
 
32
  - only allow a single transaction at a time per database. This makes
 
33
    using the transaction API simpler, as otherwise the caller would
 
34
    have to cope with temporary failures in transactions that conflict
 
35
    with other current transactions
 
36
 
 
37
  - keep the transaction recovery information in the same file as the
 
38
    database, using a special 'transaction recovery' record pointed at
 
39
    by the header. This removes the need for extra journal files as
 
40
    used by some other databases
 
41
 
 
42
  - dynamically allocated the transaction recover record, re-using it
 
43
    for subsequent transactions. If a larger record is needed then
 
44
    tdb_free() the old record to place it on the normal tdb freelist
 
45
    before allocating the new record
 
46
 
 
47
  - during transactions, keep a linked list of writes all that have
 
48
    been performed by intercepting all tdb_write() calls. The hooked
 
49
    transaction versions of tdb_read() and tdb_write() check this
 
50
    linked list and try to use the elements of the list in preference
 
51
    to the real database.
 
52
 
 
53
  - don't allow any locks to be held when a transaction starts,
 
54
    otherwise we can end up with deadlock (plus lack of lock nesting
 
55
    in posix locks would mean the lock is lost)
 
56
 
 
57
  - if the caller gains a lock during the transaction but doesn't
 
58
    release it then fail the commit
 
59
 
 
60
  - allow for nested calls to tdb_transaction_start(), re-using the
 
61
    existing transaction record. If the inner transaction is cancelled
 
62
    then a subsequent commit will fail
 
63
 
 
64
  - keep a mirrored copy of the tdb hash chain heads to allow for the
 
65
    fast hash heads scan on traverse, updating the mirrored copy in
 
66
    the transaction version of tdb_write
 
67
 
 
68
  - allow callers to mix transaction and non-transaction use of tdb,
 
69
    although once a transaction is started then an exclusive lock is
 
70
    gained until the transaction is committed or cancelled
 
71
 
 
72
  - the commit stategy involves first saving away all modified data
 
73
    into a linearised buffer in the transaction recovery area, then
 
74
    marking the transaction recovery area with a magic value to
 
75
    indicate a valid recovery record. In total 4 fsync/msync calls are
 
76
    needed per commit to prevent race conditions. It might be possible
 
77
    to reduce this to 3 or even 2 with some more work.
 
78
 
 
79
  - check for a valid recovery record on open of the tdb, while the
 
80
    global lock is held. Automatically recover from the transaction
 
81
    recovery area if needed, then continue with the open as
 
82
    usual. This allows for smooth crash recovery with no administrator
 
83
    intervention.
 
84
 
 
85
  - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
 
86
    still available, but no transaction recovery area is used and no
 
87
    fsync/msync calls are made.
 
88
 
 
89
*/
 
90
 
 
91
int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
 
92
                       int rw_type, int lck_type, int probe, size_t len);
 
93
 
 
94
struct tdb_transaction_el {
 
95
        struct tdb_transaction_el *next, *prev;
 
96
        tdb_off_t offset;
 
97
        tdb_len_t length;
 
98
        unsigned char *data;
 
99
};
 
100
 
 
101
/*
 
102
  hold the context of any current transaction
 
103
*/
 
104
struct tdb_transaction {
 
105
        /* we keep a mirrored copy of the tdb hash heads here so
 
106
           tdb_next_hash_chain() can operate efficiently */
 
107
        u32 *hash_heads;
 
108
 
 
109
        /* the original io methods - used to do IOs to the real db */
 
110
        const struct tdb_methods *io_methods;
 
111
 
 
112
        /* the list of transaction elements. We use a doubly linked
 
113
           list with a last pointer to allow us to keep the list
 
114
           ordered, with first element at the front of the list. It
 
115
           needs to be doubly linked as the read/write traversals need
 
116
           to be backwards, while the commit needs to be forwards */
 
117
        struct tdb_transaction_el *elements, *elements_last;
 
118
 
 
119
        /* non-zero when an internal transaction error has
 
120
           occurred. All write operations will then fail until the
 
121
           transaction is ended */
 
122
        int transaction_error;
 
123
 
 
124
        /* when inside a transaction we need to keep track of any
 
125
           nested tdb_transaction_start() calls, as these are allowed,
 
126
           but don't create a new transaction */
 
127
        int nesting;
 
128
 
 
129
        /* old file size before transaction */
 
130
        tdb_len_t old_map_size;
 
131
};
 
132
 
 
133
 
 
134
/*
 
135
  read while in a transaction. We need to check first if the data is in our list
 
136
  of transaction elements, then if not do a real read
 
137
*/
 
138
static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
 
139
                            tdb_len_t len, int cv)
 
140
{
 
141
        struct tdb_transaction_el *el;
 
142
 
 
143
        /* we need to walk the list backwards to get the most recent data */
 
144
        for (el=tdb->transaction->elements_last;el;el=el->prev) {
 
145
                tdb_len_t partial;
 
146
 
 
147
                if (off+len <= el->offset) {
 
148
                        continue;
 
149
                }
 
150
                if (off >= el->offset + el->length) {
 
151
                        continue;
 
152
                }
 
153
 
 
154
                /* an overlapping read - needs to be split into up to
 
155
                   2 reads and a memcpy */
 
156
                if (off < el->offset) {
 
157
                        partial = el->offset - off;
 
158
                        if (transaction_read(tdb, off, buf, partial, cv) != 0) {
 
159
                                goto fail;
 
160
                        }
 
161
                        len -= partial;
 
162
                        off += partial;
 
163
                        buf = (void *)(partial + (char *)buf);
 
164
                }
 
165
                if (off + len <= el->offset + el->length) {
 
166
                        partial = len;
 
167
                } else {
 
168
                        partial = el->offset + el->length - off;
 
169
                }
 
170
                memcpy(buf, el->data + (off - el->offset), partial);
 
171
                if (cv) {
 
172
                        tdb_convert(buf, len);
 
173
                }
 
174
                len -= partial;
 
175
                off += partial;
 
176
                buf = (void *)(partial + (char *)buf);
 
177
                
 
178
                if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
 
179
                        goto fail;
 
180
                }
 
181
 
 
182
                return 0;
 
183
        }
 
184
 
 
185
        /* its not in the transaction elements - do a real read */
 
186
        return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
 
187
 
 
188
fail:
 
189
        TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
 
190
        tdb->ecode = TDB_ERR_IO;
 
191
        tdb->transaction->transaction_error = 1;
 
192
        return -1;
 
193
}
 
194
 
 
195
 
 
196
/*
 
197
  write while in a transaction
 
198
*/
 
199
static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
 
200
                             const void *buf, tdb_len_t len)
 
201
{
 
202
        struct tdb_transaction_el *el, *best_el=NULL;
 
203
 
 
204
        if (len == 0) {
 
205
                return 0;
 
206
        }
 
207
        
 
208
        /* if the write is to a hash head, then update the transaction
 
209
           hash heads */
 
210
        if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
 
211
            off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
 
212
                u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
 
213
                memcpy(&tdb->transaction->hash_heads[chain], buf, len);
 
214
        }
 
215
 
 
216
        /* first see if we can replace an existing entry */
 
217
        for (el=tdb->transaction->elements_last;el;el=el->prev) {
 
218
                tdb_len_t partial;
 
219
 
 
220
                if (best_el == NULL && off == el->offset+el->length) {
 
221
                        best_el = el;
 
222
                }
 
223
 
 
224
                if (off+len <= el->offset) {
 
225
                        continue;
 
226
                }
 
227
                if (off >= el->offset + el->length) {
 
228
                        continue;
 
229
                }
 
230
 
 
231
                /* an overlapping write - needs to be split into up to
 
232
                   2 writes and a memcpy */
 
233
                if (off < el->offset) {
 
234
                        partial = el->offset - off;
 
235
                        if (transaction_write(tdb, off, buf, partial) != 0) {
 
236
                                goto fail;
 
237
                        }
 
238
                        len -= partial;
 
239
                        off += partial;
 
240
                        buf = (const void *)(partial + (const char *)buf);
 
241
                }
 
242
                if (off + len <= el->offset + el->length) {
 
243
                        partial = len;
 
244
                } else {
 
245
                        partial = el->offset + el->length - off;
 
246
                }
 
247
                memcpy(el->data + (off - el->offset), buf, partial);
 
248
                len -= partial;
 
249
                off += partial;
 
250
                buf = (const void *)(partial + (const char *)buf);
 
251
                
 
252
                if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
 
253
                        goto fail;
 
254
                }
 
255
 
 
256
                return 0;
 
257
        }
 
258
 
 
259
        /* see if we can append the new entry to an existing entry */
 
260
        if (best_el && best_el->offset + best_el->length == off && 
 
261
            (off+len < tdb->transaction->old_map_size ||
 
262
             off > tdb->transaction->old_map_size)) {
 
263
                unsigned char *data = best_el->data;
 
264
                el = best_el;
 
265
                el->data = (unsigned char *)realloc(el->data,
 
266
                                                    el->length + len);
 
267
                if (el->data == NULL) {
 
268
                        tdb->ecode = TDB_ERR_OOM;
 
269
                        tdb->transaction->transaction_error = 1;
 
270
                        el->data = data;
 
271
                        return -1;
 
272
                }
 
273
                if (buf) {
 
274
                        memcpy(el->data + el->length, buf, len);
 
275
                } else {
 
276
                        memset(el->data + el->length, TDB_PAD_BYTE, len);
 
277
                }
 
278
                el->length += len;
 
279
                return 0;
 
280
        }
 
281
 
 
282
        /* add a new entry at the end of the list */
 
283
        el = (struct tdb_transaction_el *)malloc(sizeof(*el));
 
284
        if (el == NULL) {
 
285
                tdb->ecode = TDB_ERR_OOM;
 
286
                tdb->transaction->transaction_error = 1;                
 
287
                return -1;
 
288
        }
 
289
        el->next = NULL;
 
290
        el->prev = tdb->transaction->elements_last;
 
291
        el->offset = off;
 
292
        el->length = len;
 
293
        el->data = (unsigned char *)malloc(len);
 
294
        if (el->data == NULL) {
 
295
                free(el);
 
296
                tdb->ecode = TDB_ERR_OOM;
 
297
                tdb->transaction->transaction_error = 1;                
 
298
                return -1;
 
299
        }
 
300
        if (buf) {
 
301
                memcpy(el->data, buf, len);
 
302
        } else {
 
303
                memset(el->data, TDB_PAD_BYTE, len);
 
304
        }
 
305
        if (el->prev) {
 
306
                el->prev->next = el;
 
307
        } else {
 
308
                tdb->transaction->elements = el;
 
309
        }
 
310
        tdb->transaction->elements_last = el;
 
311
        return 0;
 
312
 
 
313
fail:
 
314
        TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
 
315
        tdb->ecode = TDB_ERR_IO;
 
316
        tdb->transaction->transaction_error = 1;
 
317
        return -1;
 
318
}
 
319
 
 
320
/*
 
321
  accelerated hash chain head search, using the cached hash heads
 
322
*/
 
323
static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
 
324
{
 
325
        u32 h = *chain;
 
326
        for (;h < tdb->header.hash_size;h++) {
 
327
                /* the +1 takes account of the freelist */
 
328
                if (0 != tdb->transaction->hash_heads[h+1]) {
 
329
                        break;
 
330
                }
 
331
        }
 
332
        (*chain) = h;
 
333
}
 
334
 
 
335
/*
 
336
  out of bounds check during a transaction
 
337
*/
 
338
static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
 
339
{
 
340
        if (len <= tdb->map_size) {
 
341
                return 0;
 
342
        }
 
343
        return TDB_ERRCODE(TDB_ERR_IO, -1);
 
344
}
 
345
 
 
346
/*
 
347
  transaction version of tdb_expand().
 
348
*/
 
349
static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
 
350
                                   tdb_off_t addition)
 
351
{
 
352
        /* add a write to the transaction elements, so subsequent
 
353
           reads see the zero data */
 
354
        if (transaction_write(tdb, size, NULL, addition) != 0) {
 
355
                return -1;
 
356
        }
 
357
 
 
358
        return 0;
 
359
}
 
360
 
 
361
/*
 
362
  brlock during a transaction - ignore them
 
363
*/
 
364
int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
 
365
                       int rw_type, int lck_type, int probe, size_t len)
 
366
{
 
367
        return 0;
 
368
}
 
369
 
 
370
static const struct tdb_methods transaction_methods = {
 
371
        transaction_read,
 
372
        transaction_write,
 
373
        transaction_next_hash_chain,
 
374
        transaction_oob,
 
375
        transaction_expand_file,
 
376
        transaction_brlock
 
377
};
 
378
 
 
379
 
 
380
/*
 
381
  start a tdb transaction. No token is returned, as only a single
 
382
  transaction is allowed to be pending per tdb_context
 
383
*/
 
384
int tdb_transaction_start(struct tdb_context *tdb)
 
385
{
 
386
        /* some sanity checks */
 
387
        if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
 
388
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
 
389
                tdb->ecode = TDB_ERR_EINVAL;
 
390
                return -1;
 
391
        }
 
392
 
 
393
        /* cope with nested tdb_transaction_start() calls */
 
394
        if (tdb->transaction != NULL) {
 
395
                tdb->transaction->nesting++;
 
396
                TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
 
397
                         tdb->transaction->nesting));
 
398
                return 0;
 
399
        }
 
400
 
 
401
        if (tdb->num_locks != 0 || tdb->global_lock.count) {
 
402
                /* the caller must not have any locks when starting a
 
403
                   transaction as otherwise we'll be screwed by lack
 
404
                   of nested locks in posix */
 
405
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
 
406
                tdb->ecode = TDB_ERR_LOCK;
 
407
                return -1;
 
408
        }
 
409
 
 
410
        if (tdb->travlocks.next != NULL) {
 
411
                /* you cannot use transactions inside a traverse (although you can use
 
412
                   traverse inside a transaction) as otherwise you can end up with
 
413
                   deadlock */
 
414
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
 
415
                tdb->ecode = TDB_ERR_LOCK;
 
416
                return -1;
 
417
        }
 
418
 
 
419
        tdb->transaction = (struct tdb_transaction *)
 
420
                calloc(sizeof(struct tdb_transaction), 1);
 
421
        if (tdb->transaction == NULL) {
 
422
                tdb->ecode = TDB_ERR_OOM;
 
423
                return -1;
 
424
        }
 
425
 
 
426
        /* get the transaction write lock. This is a blocking lock. As
 
427
           discussed with Volker, there are a number of ways we could
 
428
           make this async, which we will probably do in the future */
 
429
        if (tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
 
430
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get transaction lock\n"));
 
431
                tdb->ecode = TDB_ERR_LOCK;
 
432
                SAFE_FREE(tdb->transaction);
 
433
                return -1;
 
434
        }
 
435
        
 
436
        /* get a read lock from the freelist to the end of file. This
 
437
           is upgraded to a write lock during the commit */
 
438
        if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
 
439
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
 
440
                tdb->ecode = TDB_ERR_LOCK;
 
441
                goto fail;
 
442
        }
 
443
 
 
444
        /* setup a copy of the hash table heads so the hash scan in
 
445
           traverse can be fast */
 
446
        tdb->transaction->hash_heads = (u32 *)
 
447
                calloc(tdb->header.hash_size+1, sizeof(u32));
 
448
        if (tdb->transaction->hash_heads == NULL) {
 
449
                tdb->ecode = TDB_ERR_OOM;
 
450
                goto fail;
 
451
        }
 
452
        if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
 
453
                                   TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
 
454
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
 
455
                tdb->ecode = TDB_ERR_IO;
 
456
                goto fail;
 
457
        }
 
458
 
 
459
        /* make sure we know about any file expansions already done by
 
460
           anyone else */
 
461
        tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
 
462
        tdb->transaction->old_map_size = tdb->map_size;
 
463
 
 
464
        /* finally hook the io methods, replacing them with
 
465
           transaction specific methods */
 
466
        tdb->transaction->io_methods = tdb->methods;
 
467
        tdb->methods = &transaction_methods;
 
468
 
 
469
        /* by calling this transaction write here, we ensure that we don't grow the
 
470
           transaction linked list due to hash table updates */
 
471
        if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads, 
 
472
                              TDB_HASHTABLE_SIZE(tdb)) != 0) {
 
473
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
 
474
                tdb->ecode = TDB_ERR_IO;
 
475
                goto fail;
 
476
        }
 
477
 
 
478
        return 0;
 
479
        
 
480
fail:
 
481
        tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
 
482
        tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
 
483
        SAFE_FREE(tdb->transaction->hash_heads);
 
484
        SAFE_FREE(tdb->transaction);
 
485
        return -1;
 
486
}
 
487
 
 
488
 
 
489
/*
 
490
  cancel the current transaction
 
491
*/
 
492
int tdb_transaction_cancel(struct tdb_context *tdb)
 
493
{       
 
494
        if (tdb->transaction == NULL) {
 
495
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
 
496
                return -1;
 
497
        }
 
498
 
 
499
        if (tdb->transaction->nesting != 0) {
 
500
                tdb->transaction->transaction_error = 1;
 
501
                tdb->transaction->nesting--;
 
502
                return 0;
 
503
        }               
 
504
 
 
505
        tdb->map_size = tdb->transaction->old_map_size;
 
506
 
 
507
        /* free all the transaction elements */
 
508
        while (tdb->transaction->elements) {
 
509
                struct tdb_transaction_el *el = tdb->transaction->elements;
 
510
                tdb->transaction->elements = el->next;
 
511
                free(el->data);
 
512
                free(el);
 
513
        }
 
514
 
 
515
        /* remove any global lock created during the transaction */
 
516
        if (tdb->global_lock.count != 0) {
 
517
                tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
 
518
                tdb->global_lock.count = 0;
 
519
        }
 
520
 
 
521
        /* remove any locks created during the transaction */
 
522
        if (tdb->num_locks != 0) {
 
523
                int i;
 
524
                for (i=0;i<tdb->num_lockrecs;i++) {
 
525
                        tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
 
526
                                   F_UNLCK,F_SETLKW, 0, 1);
 
527
                }
 
528
                tdb->num_locks = 0;
 
529
                tdb->num_lockrecs = 0;
 
530
                SAFE_FREE(tdb->lockrecs);
 
531
        }
 
532
 
 
533
        /* restore the normal io methods */
 
534
        tdb->methods = tdb->transaction->io_methods;
 
535
 
 
536
        tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
 
537
        tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
 
538
        SAFE_FREE(tdb->transaction->hash_heads);
 
539
        SAFE_FREE(tdb->transaction);
 
540
        
 
541
        return 0;
 
542
}
 
543
 
 
544
/*
 
545
  sync to disk
 
546
*/
 
547
static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
 
548
{       
 
549
        if (fsync(tdb->fd) != 0) {
 
550
                tdb->ecode = TDB_ERR_IO;
 
551
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
 
552
                return -1;
 
553
        }
 
554
#ifdef HAVE_MMAP
 
555
        if (tdb->map_ptr) {
 
556
                tdb_off_t moffset = offset & ~(tdb->page_size-1);
 
557
                if (msync(moffset + (char *)tdb->map_ptr, 
 
558
                          length + (offset - moffset), MS_SYNC) != 0) {
 
559
                        tdb->ecode = TDB_ERR_IO;
 
560
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
 
561
                                 strerror(errno)));
 
562
                        return -1;
 
563
                }
 
564
        }
 
565
#endif
 
566
        return 0;
 
567
}
 
568
 
 
569
 
 
570
/*
 
571
  work out how much space the linearised recovery data will consume
 
572
*/
 
573
static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
 
574
{
 
575
        struct tdb_transaction_el *el;
 
576
        tdb_len_t recovery_size = 0;
 
577
 
 
578
        recovery_size = sizeof(u32);
 
579
        for (el=tdb->transaction->elements;el;el=el->next) {
 
580
                if (el->offset >= tdb->transaction->old_map_size) {
 
581
                        continue;
 
582
                }
 
583
                recovery_size += 2*sizeof(tdb_off_t) + el->length;
 
584
        }
 
585
 
 
586
        return recovery_size;
 
587
}
 
588
 
 
589
/*
 
590
  allocate the recovery area, or use an existing recovery area if it is
 
591
  large enough
 
592
*/
 
593
static int tdb_recovery_allocate(struct tdb_context *tdb, 
 
594
                                 tdb_len_t *recovery_size,
 
595
                                 tdb_off_t *recovery_offset,
 
596
                                 tdb_len_t *recovery_max_size)
 
597
{
 
598
        struct list_struct rec;
 
599
        const struct tdb_methods *methods = tdb->transaction->io_methods;
 
600
        tdb_off_t recovery_head;
 
601
 
 
602
        if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
 
603
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
 
604
                return -1;
 
605
        }
 
606
 
 
607
        rec.rec_len = 0;
 
608
 
 
609
        if (recovery_head != 0 && 
 
610
            methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
 
611
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
 
612
                return -1;
 
613
        }
 
614
 
 
615
        *recovery_size = tdb_recovery_size(tdb);
 
616
 
 
617
        if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
 
618
                /* it fits in the existing area */
 
619
                *recovery_max_size = rec.rec_len;
 
620
                *recovery_offset = recovery_head;
 
621
                return 0;
 
622
        }
 
623
 
 
624
        /* we need to free up the old recovery area, then allocate a
 
625
           new one at the end of the file. Note that we cannot use
 
626
           tdb_allocate() to allocate the new one as that might return
 
627
           us an area that is being currently used (as of the start of
 
628
           the transaction) */
 
629
        if (recovery_head != 0) {
 
630
                if (tdb_free(tdb, recovery_head, &rec) == -1) {
 
631
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
 
632
                        return -1;
 
633
                }
 
634
        }
 
635
 
 
636
        /* the tdb_free() call might have increased the recovery size */
 
637
        *recovery_size = tdb_recovery_size(tdb);
 
638
 
 
639
        /* round up to a multiple of page size */
 
640
        *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
 
641
        *recovery_offset = tdb->map_size;
 
642
        recovery_head = *recovery_offset;
 
643
 
 
644
        if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
 
645
                                     (tdb->map_size - tdb->transaction->old_map_size) +
 
646
                                     sizeof(rec) + *recovery_max_size) == -1) {
 
647
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
 
648
                return -1;
 
649
        }
 
650
 
 
651
        /* remap the file (if using mmap) */
 
652
        methods->tdb_oob(tdb, tdb->map_size + 1, 1);
 
653
 
 
654
        /* we have to reset the old map size so that we don't try to expand the file
 
655
           again in the transaction commit, which would destroy the recovery area */
 
656
        tdb->transaction->old_map_size = tdb->map_size;
 
657
 
 
658
        /* write the recovery header offset and sync - we can sync without a race here
 
659
           as the magic ptr in the recovery record has not been set */
 
660
        CONVERT(recovery_head);
 
661
        if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
 
662
                               &recovery_head, sizeof(tdb_off_t)) == -1) {
 
663
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
 
664
                return -1;
 
665
        }
 
666
 
 
667
        return 0;
 
668
}
 
669
 
 
670
 
 
671
/*
 
672
  setup the recovery data that will be used on a crash during commit
 
673
*/
 
674
static int transaction_setup_recovery(struct tdb_context *tdb, 
 
675
                                      tdb_off_t *magic_offset)
 
676
{
 
677
        struct tdb_transaction_el *el;
 
678
        tdb_len_t recovery_size;
 
679
        unsigned char *data, *p;
 
680
        const struct tdb_methods *methods = tdb->transaction->io_methods;
 
681
        struct list_struct *rec;
 
682
        tdb_off_t recovery_offset, recovery_max_size;
 
683
        tdb_off_t old_map_size = tdb->transaction->old_map_size;
 
684
        u32 magic, tailer;
 
685
 
 
686
        /*
 
687
          check that the recovery area has enough space
 
688
        */
 
689
        if (tdb_recovery_allocate(tdb, &recovery_size, 
 
690
                                  &recovery_offset, &recovery_max_size) == -1) {
 
691
                return -1;
 
692
        }
 
693
 
 
694
        data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
 
695
        if (data == NULL) {
 
696
                tdb->ecode = TDB_ERR_OOM;
 
697
                return -1;
 
698
        }
 
699
 
 
700
        rec = (struct list_struct *)data;
 
701
        memset(rec, 0, sizeof(*rec));
 
702
 
 
703
        rec->magic    = 0;
 
704
        rec->data_len = recovery_size;
 
705
        rec->rec_len  = recovery_max_size;
 
706
        rec->key_len  = old_map_size;
 
707
        CONVERT(rec);
 
708
 
 
709
        /* build the recovery data into a single blob to allow us to do a single
 
710
           large write, which should be more efficient */
 
711
        p = data + sizeof(*rec);
 
712
        for (el=tdb->transaction->elements;el;el=el->next) {
 
713
                if (el->offset >= old_map_size) {
 
714
                        continue;
 
715
                }
 
716
                if (el->offset + el->length > tdb->transaction->old_map_size) {
 
717
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
 
718
                        free(data);
 
719
                        tdb->ecode = TDB_ERR_CORRUPT;
 
720
                        return -1;
 
721
                }
 
722
                memcpy(p, &el->offset, 4);
 
723
                memcpy(p+4, &el->length, 4);
 
724
                if (DOCONV()) {
 
725
                        tdb_convert(p, 8);
 
726
                }
 
727
                /* the recovery area contains the old data, not the
 
728
                   new data, so we have to call the original tdb_read
 
729
                   method to get it */
 
730
                if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
 
731
                        free(data);
 
732
                        tdb->ecode = TDB_ERR_IO;
 
733
                        return -1;
 
734
                }
 
735
                p += 8 + el->length;
 
736
        }
 
737
 
 
738
        /* and the tailer */
 
739
        tailer = sizeof(*rec) + recovery_max_size;
 
740
        memcpy(p, &tailer, 4);
 
741
        CONVERT(p);
 
742
 
 
743
        /* write the recovery data to the recovery area */
 
744
        if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
 
745
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
 
746
                free(data);
 
747
                tdb->ecode = TDB_ERR_IO;
 
748
                return -1;
 
749
        }
 
750
 
 
751
        /* as we don't have ordered writes, we have to sync the recovery
 
752
           data before we update the magic to indicate that the recovery
 
753
           data is present */
 
754
        if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
 
755
                free(data);
 
756
                return -1;
 
757
        }
 
758
 
 
759
        free(data);
 
760
 
 
761
        magic = TDB_RECOVERY_MAGIC;
 
762
        CONVERT(magic);
 
763
 
 
764
        *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
 
765
 
 
766
        if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
 
767
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
 
768
                tdb->ecode = TDB_ERR_IO;
 
769
                return -1;
 
770
        }
 
771
 
 
772
        /* ensure the recovery magic marker is on disk */
 
773
        if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
 
774
                return -1;
 
775
        }
 
776
 
 
777
        return 0;
 
778
}
 
779
 
 
780
/*
 
781
  commit the current transaction
 
782
*/
 
783
int tdb_transaction_commit(struct tdb_context *tdb)
 
784
{       
 
785
        const struct tdb_methods *methods;
 
786
        tdb_off_t magic_offset = 0;
 
787
        u32 zero = 0;
 
788
 
 
789
        if (tdb->transaction == NULL) {
 
790
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
 
791
                return -1;
 
792
        }
 
793
 
 
794
        if (tdb->transaction->transaction_error) {
 
795
                tdb->ecode = TDB_ERR_IO;
 
796
                tdb_transaction_cancel(tdb);
 
797
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
 
798
                return -1;
 
799
        }
 
800
 
 
801
        if (tdb->transaction->nesting != 0) {
 
802
                tdb->transaction->nesting--;
 
803
                return 0;
 
804
        }               
 
805
 
 
806
        /* check for a null transaction */
 
807
        if (tdb->transaction->elements == NULL) {
 
808
                tdb_transaction_cancel(tdb);
 
809
                return 0;
 
810
        }
 
811
 
 
812
        methods = tdb->transaction->io_methods;
 
813
        
 
814
        /* if there are any locks pending then the caller has not
 
815
           nested their locks properly, so fail the transaction */
 
816
        if (tdb->num_locks || tdb->global_lock.count) {
 
817
                tdb->ecode = TDB_ERR_LOCK;
 
818
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
 
819
                tdb_transaction_cancel(tdb);
 
820
                return -1;
 
821
        }
 
822
 
 
823
        /* upgrade the main transaction lock region to a write lock */
 
824
        if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
 
825
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
 
826
                tdb->ecode = TDB_ERR_LOCK;
 
827
                tdb_transaction_cancel(tdb);
 
828
                return -1;
 
829
        }
 
830
 
 
831
        /* get the global lock - this prevents new users attaching to the database
 
832
           during the commit */
 
833
        if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
 
834
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
 
835
                tdb->ecode = TDB_ERR_LOCK;
 
836
                tdb_transaction_cancel(tdb);
 
837
                return -1;
 
838
        }
 
839
 
 
840
        if (!(tdb->flags & TDB_NOSYNC)) {
 
841
                /* write the recovery data to the end of the file */
 
842
                if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
 
843
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
 
844
                        tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
 
845
                        tdb_transaction_cancel(tdb);
 
846
                        return -1;
 
847
                }
 
848
        }
 
849
 
 
850
        /* expand the file to the new size if needed */
 
851
        if (tdb->map_size != tdb->transaction->old_map_size) {
 
852
                if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
 
853
                                             tdb->map_size - 
 
854
                                             tdb->transaction->old_map_size) == -1) {
 
855
                        tdb->ecode = TDB_ERR_IO;
 
856
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
 
857
                        tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
 
858
                        tdb_transaction_cancel(tdb);
 
859
                        return -1;
 
860
                }
 
861
                tdb->map_size = tdb->transaction->old_map_size;
 
862
                methods->tdb_oob(tdb, tdb->map_size + 1, 1);
 
863
        }
 
864
 
 
865
        /* perform all the writes */
 
866
        while (tdb->transaction->elements) {
 
867
                struct tdb_transaction_el *el = tdb->transaction->elements;
 
868
 
 
869
                if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
 
870
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
 
871
                        
 
872
                        /* we've overwritten part of the data and
 
873
                           possibly expanded the file, so we need to
 
874
                           run the crash recovery code */
 
875
                        tdb->methods = methods;
 
876
                        tdb_transaction_recover(tdb); 
 
877
 
 
878
                        tdb_transaction_cancel(tdb);
 
879
                        tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
 
880
 
 
881
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
 
882
                        return -1;
 
883
                }
 
884
                tdb->transaction->elements = el->next;
 
885
                free(el->data); 
 
886
                free(el);
 
887
        } 
 
888
 
 
889
        if (!(tdb->flags & TDB_NOSYNC)) {
 
890
                /* ensure the new data is on disk */
 
891
                if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
 
892
                        return -1;
 
893
                }
 
894
 
 
895
                /* remove the recovery marker */
 
896
                if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
 
897
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
 
898
                        return -1;
 
899
                }
 
900
 
 
901
                /* ensure the recovery marker has been removed on disk */
 
902
                if (transaction_sync(tdb, magic_offset, 4) == -1) {
 
903
                        return -1;
 
904
                }
 
905
        }
 
906
 
 
907
        tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
 
908
 
 
909
        /*
 
910
          TODO: maybe write to some dummy hdr field, or write to magic
 
911
          offset without mmap, before the last sync, instead of the
 
912
          utime() call
 
913
        */
 
914
 
 
915
        /* on some systems (like Linux 2.6.x) changes via mmap/msync
 
916
           don't change the mtime of the file, this means the file may
 
917
           not be backed up (as tdb rounding to block sizes means that
 
918
           file size changes are quite rare too). The following forces
 
919
           mtime changes when a transaction completes */
 
920
#ifdef HAVE_UTIME
 
921
        utime(tdb->name, NULL);
 
922
#endif
 
923
 
 
924
        /* use a transaction cancel to free memory and remove the
 
925
           transaction locks */
 
926
        tdb_transaction_cancel(tdb);
 
927
        return 0;
 
928
}
 
929
 
 
930
 
 
931
/*
 
932
  recover from an aborted transaction. Must be called with exclusive
 
933
  database write access already established (including the global
 
934
  lock to prevent new processes attaching)
 
935
*/
 
936
int tdb_transaction_recover(struct tdb_context *tdb)
 
937
{
 
938
        tdb_off_t recovery_head, recovery_eof;
 
939
        unsigned char *data, *p;
 
940
        u32 zero = 0;
 
941
        struct list_struct rec;
 
942
 
 
943
        /* find the recovery area */
 
944
        if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
 
945
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
 
946
                tdb->ecode = TDB_ERR_IO;
 
947
                return -1;
 
948
        }
 
949
 
 
950
        if (recovery_head == 0) {
 
951
                /* we have never allocated a recovery record */
 
952
                return 0;
 
953
        }
 
954
 
 
955
        /* read the recovery record */
 
956
        if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
 
957
                                   sizeof(rec), DOCONV()) == -1) {
 
958
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
 
959
                tdb->ecode = TDB_ERR_IO;
 
960
                return -1;
 
961
        }
 
962
 
 
963
        if (rec.magic != TDB_RECOVERY_MAGIC) {
 
964
                /* there is no valid recovery data */
 
965
                return 0;
 
966
        }
 
967
 
 
968
        if (tdb->read_only) {
 
969
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
 
970
                tdb->ecode = TDB_ERR_CORRUPT;
 
971
                return -1;
 
972
        }
 
973
 
 
974
        recovery_eof = rec.key_len;
 
975
 
 
976
        data = (unsigned char *)malloc(rec.data_len);
 
977
        if (data == NULL) {
 
978
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
 
979
                tdb->ecode = TDB_ERR_OOM;
 
980
                return -1;
 
981
        }
 
982
 
 
983
        /* read the full recovery data */
 
984
        if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
 
985
                                   rec.data_len, 0) == -1) {
 
986
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
 
987
                tdb->ecode = TDB_ERR_IO;
 
988
                return -1;
 
989
        }
 
990
 
 
991
        /* recover the file data */
 
992
        p = data;
 
993
        while (p+8 < data + rec.data_len) {
 
994
                u32 ofs, len;
 
995
                if (DOCONV()) {
 
996
                        tdb_convert(p, 8);
 
997
                }
 
998
                memcpy(&ofs, p, 4);
 
999
                memcpy(&len, p+4, 4);
 
1000
 
 
1001
                if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
 
1002
                        free(data);
 
1003
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
 
1004
                        tdb->ecode = TDB_ERR_IO;
 
1005
                        return -1;
 
1006
                }
 
1007
                p += 8 + len;
 
1008
        }
 
1009
 
 
1010
        free(data);
 
1011
 
 
1012
        if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
 
1013
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
 
1014
                tdb->ecode = TDB_ERR_IO;
 
1015
                return -1;
 
1016
        }
 
1017
 
 
1018
        /* if the recovery area is after the recovered eof then remove it */
 
1019
        if (recovery_eof <= recovery_head) {
 
1020
                if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
 
1021
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
 
1022
                        tdb->ecode = TDB_ERR_IO;
 
1023
                        return -1;                      
 
1024
                }
 
1025
        }
 
1026
 
 
1027
        /* remove the recovery magic */
 
1028
        if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
 
1029
                          &zero) == -1) {
 
1030
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
 
1031
                tdb->ecode = TDB_ERR_IO;
 
1032
                return -1;                      
 
1033
        }
 
1034
        
 
1035
        /* reduce the file size to the old size */
 
1036
        tdb_munmap(tdb);
 
1037
        if (ftruncate(tdb->fd, recovery_eof) != 0) {
 
1038
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
 
1039
                tdb->ecode = TDB_ERR_IO;
 
1040
                return -1;                      
 
1041
        }
 
1042
        tdb->map_size = recovery_eof;
 
1043
        tdb_mmap(tdb);
 
1044
 
 
1045
        if (transaction_sync(tdb, 0, recovery_eof) == -1) {
 
1046
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
 
1047
                tdb->ecode = TDB_ERR_IO;
 
1048
                return -1;
 
1049
        }
 
1050
 
 
1051
        TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
 
1052
                 recovery_eof));
 
1053
 
 
1054
        /* all done */
 
1055
        return 0;
 
1056
}