2
Unix SMB/CIFS implementation.
4
trivial database library
6
Copyright (C) Andrew Tridgell 2005
8
** NOTE! The following LGPL license applies to the tdb
9
** library. This does NOT imply that all of Samba is released
12
This library is free software; you can redistribute it and/or
13
modify it under the terms of the GNU Lesser General Public
14
License as published by the Free Software Foundation; either
15
version 2 of the License, or (at your option) any later version.
17
This library is distributed in the hope that it will be useful,
18
but WITHOUT ANY WARRANTY; without even the implied warranty of
19
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20
Lesser General Public License for more details.
22
You should have received a copy of the GNU Lesser General Public
23
License along with this library; if not, write to the Free Software
24
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27
#include "tdb_private.h"
32
- only allow a single transaction at a time per database. This makes
33
using the transaction API simpler, as otherwise the caller would
34
have to cope with temporary failures in transactions that conflict
35
with other current transactions
37
- keep the transaction recovery information in the same file as the
38
database, using a special 'transaction recovery' record pointed at
39
by the header. This removes the need for extra journal files as
40
used by some other databases
42
- dynamically allocated the transaction recover record, re-using it
43
for subsequent transactions. If a larger record is needed then
44
tdb_free() the old record to place it on the normal tdb freelist
45
before allocating the new record
47
- during transactions, keep a linked list of writes all that have
48
been performed by intercepting all tdb_write() calls. The hooked
49
transaction versions of tdb_read() and tdb_write() check this
50
linked list and try to use the elements of the list in preference
53
- don't allow any locks to be held when a transaction starts,
54
otherwise we can end up with deadlock (plus lack of lock nesting
55
in posix locks would mean the lock is lost)
57
- if the caller gains a lock during the transaction but doesn't
58
release it then fail the commit
60
- allow for nested calls to tdb_transaction_start(), re-using the
61
existing transaction record. If the inner transaction is cancelled
62
then a subsequent commit will fail
64
- keep a mirrored copy of the tdb hash chain heads to allow for the
65
fast hash heads scan on traverse, updating the mirrored copy in
66
the transaction version of tdb_write
68
- allow callers to mix transaction and non-transaction use of tdb,
69
although once a transaction is started then an exclusive lock is
70
gained until the transaction is committed or cancelled
72
- the commit stategy involves first saving away all modified data
73
into a linearised buffer in the transaction recovery area, then
74
marking the transaction recovery area with a magic value to
75
indicate a valid recovery record. In total 4 fsync/msync calls are
76
needed per commit to prevent race conditions. It might be possible
77
to reduce this to 3 or even 2 with some more work.
79
- check for a valid recovery record on open of the tdb, while the
80
global lock is held. Automatically recover from the transaction
81
recovery area if needed, then continue with the open as
82
usual. This allows for smooth crash recovery with no administrator
85
- if TDB_NOSYNC is passed to flags in tdb_open then transactions are
86
still available, but no transaction recovery area is used and no
87
fsync/msync calls are made.
91
int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
92
int rw_type, int lck_type, int probe, size_t len);
94
struct tdb_transaction_el {
95
struct tdb_transaction_el *next, *prev;
102
hold the context of any current transaction
104
struct tdb_transaction {
105
/* we keep a mirrored copy of the tdb hash heads here so
106
tdb_next_hash_chain() can operate efficiently */
109
/* the original io methods - used to do IOs to the real db */
110
const struct tdb_methods *io_methods;
112
/* the list of transaction elements. We use a doubly linked
113
list with a last pointer to allow us to keep the list
114
ordered, with first element at the front of the list. It
115
needs to be doubly linked as the read/write traversals need
116
to be backwards, while the commit needs to be forwards */
117
struct tdb_transaction_el *elements, *elements_last;
119
/* non-zero when an internal transaction error has
120
occurred. All write operations will then fail until the
121
transaction is ended */
122
int transaction_error;
124
/* when inside a transaction we need to keep track of any
125
nested tdb_transaction_start() calls, as these are allowed,
126
but don't create a new transaction */
129
/* old file size before transaction */
130
tdb_len_t old_map_size;
135
read while in a transaction. We need to check first if the data is in our list
136
of transaction elements, then if not do a real read
138
static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
139
tdb_len_t len, int cv)
141
struct tdb_transaction_el *el;
143
/* we need to walk the list backwards to get the most recent data */
144
for (el=tdb->transaction->elements_last;el;el=el->prev) {
147
if (off+len <= el->offset) {
150
if (off >= el->offset + el->length) {
154
/* an overlapping read - needs to be split into up to
155
2 reads and a memcpy */
156
if (off < el->offset) {
157
partial = el->offset - off;
158
if (transaction_read(tdb, off, buf, partial, cv) != 0) {
163
buf = (void *)(partial + (char *)buf);
165
if (off + len <= el->offset + el->length) {
168
partial = el->offset + el->length - off;
170
memcpy(buf, el->data + (off - el->offset), partial);
172
tdb_convert(buf, len);
176
buf = (void *)(partial + (char *)buf);
178
if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
185
/* its not in the transaction elements - do a real read */
186
return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
189
TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
190
tdb->ecode = TDB_ERR_IO;
191
tdb->transaction->transaction_error = 1;
197
write while in a transaction
199
static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
200
const void *buf, tdb_len_t len)
202
struct tdb_transaction_el *el, *best_el=NULL;
208
/* if the write is to a hash head, then update the transaction
210
if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
211
off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
212
u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
213
memcpy(&tdb->transaction->hash_heads[chain], buf, len);
216
/* first see if we can replace an existing entry */
217
for (el=tdb->transaction->elements_last;el;el=el->prev) {
220
if (best_el == NULL && off == el->offset+el->length) {
224
if (off+len <= el->offset) {
227
if (off >= el->offset + el->length) {
231
/* an overlapping write - needs to be split into up to
232
2 writes and a memcpy */
233
if (off < el->offset) {
234
partial = el->offset - off;
235
if (transaction_write(tdb, off, buf, partial) != 0) {
240
buf = (const void *)(partial + (const char *)buf);
242
if (off + len <= el->offset + el->length) {
245
partial = el->offset + el->length - off;
247
memcpy(el->data + (off - el->offset), buf, partial);
250
buf = (const void *)(partial + (const char *)buf);
252
if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
259
/* see if we can append the new entry to an existing entry */
260
if (best_el && best_el->offset + best_el->length == off &&
261
(off+len < tdb->transaction->old_map_size ||
262
off > tdb->transaction->old_map_size)) {
263
unsigned char *data = best_el->data;
265
el->data = (unsigned char *)realloc(el->data,
267
if (el->data == NULL) {
268
tdb->ecode = TDB_ERR_OOM;
269
tdb->transaction->transaction_error = 1;
274
memcpy(el->data + el->length, buf, len);
276
memset(el->data + el->length, TDB_PAD_BYTE, len);
282
/* add a new entry at the end of the list */
283
el = (struct tdb_transaction_el *)malloc(sizeof(*el));
285
tdb->ecode = TDB_ERR_OOM;
286
tdb->transaction->transaction_error = 1;
290
el->prev = tdb->transaction->elements_last;
293
el->data = (unsigned char *)malloc(len);
294
if (el->data == NULL) {
296
tdb->ecode = TDB_ERR_OOM;
297
tdb->transaction->transaction_error = 1;
301
memcpy(el->data, buf, len);
303
memset(el->data, TDB_PAD_BYTE, len);
308
tdb->transaction->elements = el;
310
tdb->transaction->elements_last = el;
314
TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
315
tdb->ecode = TDB_ERR_IO;
316
tdb->transaction->transaction_error = 1;
321
accelerated hash chain head search, using the cached hash heads
323
static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
326
for (;h < tdb->header.hash_size;h++) {
327
/* the +1 takes account of the freelist */
328
if (0 != tdb->transaction->hash_heads[h+1]) {
336
out of bounds check during a transaction
338
static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
340
if (len <= tdb->map_size) {
343
return TDB_ERRCODE(TDB_ERR_IO, -1);
347
transaction version of tdb_expand().
349
static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
352
/* add a write to the transaction elements, so subsequent
353
reads see the zero data */
354
if (transaction_write(tdb, size, NULL, addition) != 0) {
362
brlock during a transaction - ignore them
364
int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
365
int rw_type, int lck_type, int probe, size_t len)
370
static const struct tdb_methods transaction_methods = {
373
transaction_next_hash_chain,
375
transaction_expand_file,
381
start a tdb transaction. No token is returned, as only a single
382
transaction is allowed to be pending per tdb_context
384
int tdb_transaction_start(struct tdb_context *tdb)
386
/* some sanity checks */
387
if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
388
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
389
tdb->ecode = TDB_ERR_EINVAL;
393
/* cope with nested tdb_transaction_start() calls */
394
if (tdb->transaction != NULL) {
395
tdb->transaction->nesting++;
396
TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
397
tdb->transaction->nesting));
401
if (tdb->num_locks != 0 || tdb->global_lock.count) {
402
/* the caller must not have any locks when starting a
403
transaction as otherwise we'll be screwed by lack
404
of nested locks in posix */
405
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
406
tdb->ecode = TDB_ERR_LOCK;
410
if (tdb->travlocks.next != NULL) {
411
/* you cannot use transactions inside a traverse (although you can use
412
traverse inside a transaction) as otherwise you can end up with
414
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
415
tdb->ecode = TDB_ERR_LOCK;
419
tdb->transaction = (struct tdb_transaction *)
420
calloc(sizeof(struct tdb_transaction), 1);
421
if (tdb->transaction == NULL) {
422
tdb->ecode = TDB_ERR_OOM;
426
/* get the transaction write lock. This is a blocking lock. As
427
discussed with Volker, there are a number of ways we could
428
make this async, which we will probably do in the future */
429
if (tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
430
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get transaction lock\n"));
431
tdb->ecode = TDB_ERR_LOCK;
432
SAFE_FREE(tdb->transaction);
436
/* get a read lock from the freelist to the end of file. This
437
is upgraded to a write lock during the commit */
438
if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
439
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
440
tdb->ecode = TDB_ERR_LOCK;
444
/* setup a copy of the hash table heads so the hash scan in
445
traverse can be fast */
446
tdb->transaction->hash_heads = (u32 *)
447
calloc(tdb->header.hash_size+1, sizeof(u32));
448
if (tdb->transaction->hash_heads == NULL) {
449
tdb->ecode = TDB_ERR_OOM;
452
if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
453
TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
454
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
455
tdb->ecode = TDB_ERR_IO;
459
/* make sure we know about any file expansions already done by
461
tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
462
tdb->transaction->old_map_size = tdb->map_size;
464
/* finally hook the io methods, replacing them with
465
transaction specific methods */
466
tdb->transaction->io_methods = tdb->methods;
467
tdb->methods = &transaction_methods;
469
/* by calling this transaction write here, we ensure that we don't grow the
470
transaction linked list due to hash table updates */
471
if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
472
TDB_HASHTABLE_SIZE(tdb)) != 0) {
473
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
474
tdb->ecode = TDB_ERR_IO;
481
tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
482
tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
483
SAFE_FREE(tdb->transaction->hash_heads);
484
SAFE_FREE(tdb->transaction);
490
cancel the current transaction
492
int tdb_transaction_cancel(struct tdb_context *tdb)
494
if (tdb->transaction == NULL) {
495
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
499
if (tdb->transaction->nesting != 0) {
500
tdb->transaction->transaction_error = 1;
501
tdb->transaction->nesting--;
505
tdb->map_size = tdb->transaction->old_map_size;
507
/* free all the transaction elements */
508
while (tdb->transaction->elements) {
509
struct tdb_transaction_el *el = tdb->transaction->elements;
510
tdb->transaction->elements = el->next;
515
/* remove any global lock created during the transaction */
516
if (tdb->global_lock.count != 0) {
517
tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
518
tdb->global_lock.count = 0;
521
/* remove any locks created during the transaction */
522
if (tdb->num_locks != 0) {
524
for (i=0;i<tdb->num_lockrecs;i++) {
525
tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
526
F_UNLCK,F_SETLKW, 0, 1);
529
tdb->num_lockrecs = 0;
530
SAFE_FREE(tdb->lockrecs);
533
/* restore the normal io methods */
534
tdb->methods = tdb->transaction->io_methods;
536
tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
537
tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
538
SAFE_FREE(tdb->transaction->hash_heads);
539
SAFE_FREE(tdb->transaction);
547
static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
549
if (fsync(tdb->fd) != 0) {
550
tdb->ecode = TDB_ERR_IO;
551
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
556
tdb_off_t moffset = offset & ~(tdb->page_size-1);
557
if (msync(moffset + (char *)tdb->map_ptr,
558
length + (offset - moffset), MS_SYNC) != 0) {
559
tdb->ecode = TDB_ERR_IO;
560
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
571
work out how much space the linearised recovery data will consume
573
static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
575
struct tdb_transaction_el *el;
576
tdb_len_t recovery_size = 0;
578
recovery_size = sizeof(u32);
579
for (el=tdb->transaction->elements;el;el=el->next) {
580
if (el->offset >= tdb->transaction->old_map_size) {
583
recovery_size += 2*sizeof(tdb_off_t) + el->length;
586
return recovery_size;
590
allocate the recovery area, or use an existing recovery area if it is
593
static int tdb_recovery_allocate(struct tdb_context *tdb,
594
tdb_len_t *recovery_size,
595
tdb_off_t *recovery_offset,
596
tdb_len_t *recovery_max_size)
598
struct list_struct rec;
599
const struct tdb_methods *methods = tdb->transaction->io_methods;
600
tdb_off_t recovery_head;
602
if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
603
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
609
if (recovery_head != 0 &&
610
methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
611
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
615
*recovery_size = tdb_recovery_size(tdb);
617
if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
618
/* it fits in the existing area */
619
*recovery_max_size = rec.rec_len;
620
*recovery_offset = recovery_head;
624
/* we need to free up the old recovery area, then allocate a
625
new one at the end of the file. Note that we cannot use
626
tdb_allocate() to allocate the new one as that might return
627
us an area that is being currently used (as of the start of
629
if (recovery_head != 0) {
630
if (tdb_free(tdb, recovery_head, &rec) == -1) {
631
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
636
/* the tdb_free() call might have increased the recovery size */
637
*recovery_size = tdb_recovery_size(tdb);
639
/* round up to a multiple of page size */
640
*recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
641
*recovery_offset = tdb->map_size;
642
recovery_head = *recovery_offset;
644
if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
645
(tdb->map_size - tdb->transaction->old_map_size) +
646
sizeof(rec) + *recovery_max_size) == -1) {
647
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
651
/* remap the file (if using mmap) */
652
methods->tdb_oob(tdb, tdb->map_size + 1, 1);
654
/* we have to reset the old map size so that we don't try to expand the file
655
again in the transaction commit, which would destroy the recovery area */
656
tdb->transaction->old_map_size = tdb->map_size;
658
/* write the recovery header offset and sync - we can sync without a race here
659
as the magic ptr in the recovery record has not been set */
660
CONVERT(recovery_head);
661
if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
662
&recovery_head, sizeof(tdb_off_t)) == -1) {
663
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
672
setup the recovery data that will be used on a crash during commit
674
static int transaction_setup_recovery(struct tdb_context *tdb,
675
tdb_off_t *magic_offset)
677
struct tdb_transaction_el *el;
678
tdb_len_t recovery_size;
679
unsigned char *data, *p;
680
const struct tdb_methods *methods = tdb->transaction->io_methods;
681
struct list_struct *rec;
682
tdb_off_t recovery_offset, recovery_max_size;
683
tdb_off_t old_map_size = tdb->transaction->old_map_size;
687
check that the recovery area has enough space
689
if (tdb_recovery_allocate(tdb, &recovery_size,
690
&recovery_offset, &recovery_max_size) == -1) {
694
data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
696
tdb->ecode = TDB_ERR_OOM;
700
rec = (struct list_struct *)data;
701
memset(rec, 0, sizeof(*rec));
704
rec->data_len = recovery_size;
705
rec->rec_len = recovery_max_size;
706
rec->key_len = old_map_size;
709
/* build the recovery data into a single blob to allow us to do a single
710
large write, which should be more efficient */
711
p = data + sizeof(*rec);
712
for (el=tdb->transaction->elements;el;el=el->next) {
713
if (el->offset >= old_map_size) {
716
if (el->offset + el->length > tdb->transaction->old_map_size) {
717
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
719
tdb->ecode = TDB_ERR_CORRUPT;
722
memcpy(p, &el->offset, 4);
723
memcpy(p+4, &el->length, 4);
727
/* the recovery area contains the old data, not the
728
new data, so we have to call the original tdb_read
730
if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
732
tdb->ecode = TDB_ERR_IO;
739
tailer = sizeof(*rec) + recovery_max_size;
740
memcpy(p, &tailer, 4);
743
/* write the recovery data to the recovery area */
744
if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
745
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
747
tdb->ecode = TDB_ERR_IO;
751
/* as we don't have ordered writes, we have to sync the recovery
752
data before we update the magic to indicate that the recovery
754
if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
761
magic = TDB_RECOVERY_MAGIC;
764
*magic_offset = recovery_offset + offsetof(struct list_struct, magic);
766
if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
767
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
768
tdb->ecode = TDB_ERR_IO;
772
/* ensure the recovery magic marker is on disk */
773
if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
781
commit the current transaction
783
int tdb_transaction_commit(struct tdb_context *tdb)
785
const struct tdb_methods *methods;
786
tdb_off_t magic_offset = 0;
789
if (tdb->transaction == NULL) {
790
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
794
if (tdb->transaction->transaction_error) {
795
tdb->ecode = TDB_ERR_IO;
796
tdb_transaction_cancel(tdb);
797
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
801
if (tdb->transaction->nesting != 0) {
802
tdb->transaction->nesting--;
806
/* check for a null transaction */
807
if (tdb->transaction->elements == NULL) {
808
tdb_transaction_cancel(tdb);
812
methods = tdb->transaction->io_methods;
814
/* if there are any locks pending then the caller has not
815
nested their locks properly, so fail the transaction */
816
if (tdb->num_locks || tdb->global_lock.count) {
817
tdb->ecode = TDB_ERR_LOCK;
818
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
819
tdb_transaction_cancel(tdb);
823
/* upgrade the main transaction lock region to a write lock */
824
if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
825
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
826
tdb->ecode = TDB_ERR_LOCK;
827
tdb_transaction_cancel(tdb);
831
/* get the global lock - this prevents new users attaching to the database
833
if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
834
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
835
tdb->ecode = TDB_ERR_LOCK;
836
tdb_transaction_cancel(tdb);
840
if (!(tdb->flags & TDB_NOSYNC)) {
841
/* write the recovery data to the end of the file */
842
if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
843
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
844
tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
845
tdb_transaction_cancel(tdb);
850
/* expand the file to the new size if needed */
851
if (tdb->map_size != tdb->transaction->old_map_size) {
852
if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
854
tdb->transaction->old_map_size) == -1) {
855
tdb->ecode = TDB_ERR_IO;
856
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
857
tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
858
tdb_transaction_cancel(tdb);
861
tdb->map_size = tdb->transaction->old_map_size;
862
methods->tdb_oob(tdb, tdb->map_size + 1, 1);
865
/* perform all the writes */
866
while (tdb->transaction->elements) {
867
struct tdb_transaction_el *el = tdb->transaction->elements;
869
if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
870
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
872
/* we've overwritten part of the data and
873
possibly expanded the file, so we need to
874
run the crash recovery code */
875
tdb->methods = methods;
876
tdb_transaction_recover(tdb);
878
tdb_transaction_cancel(tdb);
879
tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
881
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
884
tdb->transaction->elements = el->next;
889
if (!(tdb->flags & TDB_NOSYNC)) {
890
/* ensure the new data is on disk */
891
if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
895
/* remove the recovery marker */
896
if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
897
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
901
/* ensure the recovery marker has been removed on disk */
902
if (transaction_sync(tdb, magic_offset, 4) == -1) {
907
tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
910
TODO: maybe write to some dummy hdr field, or write to magic
911
offset without mmap, before the last sync, instead of the
915
/* on some systems (like Linux 2.6.x) changes via mmap/msync
916
don't change the mtime of the file, this means the file may
917
not be backed up (as tdb rounding to block sizes means that
918
file size changes are quite rare too). The following forces
919
mtime changes when a transaction completes */
921
utime(tdb->name, NULL);
924
/* use a transaction cancel to free memory and remove the
926
tdb_transaction_cancel(tdb);
932
recover from an aborted transaction. Must be called with exclusive
933
database write access already established (including the global
934
lock to prevent new processes attaching)
936
int tdb_transaction_recover(struct tdb_context *tdb)
938
tdb_off_t recovery_head, recovery_eof;
939
unsigned char *data, *p;
941
struct list_struct rec;
943
/* find the recovery area */
944
if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
945
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
946
tdb->ecode = TDB_ERR_IO;
950
if (recovery_head == 0) {
951
/* we have never allocated a recovery record */
955
/* read the recovery record */
956
if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
957
sizeof(rec), DOCONV()) == -1) {
958
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
959
tdb->ecode = TDB_ERR_IO;
963
if (rec.magic != TDB_RECOVERY_MAGIC) {
964
/* there is no valid recovery data */
968
if (tdb->read_only) {
969
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
970
tdb->ecode = TDB_ERR_CORRUPT;
974
recovery_eof = rec.key_len;
976
data = (unsigned char *)malloc(rec.data_len);
978
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
979
tdb->ecode = TDB_ERR_OOM;
983
/* read the full recovery data */
984
if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
985
rec.data_len, 0) == -1) {
986
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
987
tdb->ecode = TDB_ERR_IO;
991
/* recover the file data */
993
while (p+8 < data + rec.data_len) {
999
memcpy(&len, p+4, 4);
1001
if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1003
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1004
tdb->ecode = TDB_ERR_IO;
1012
if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1013
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1014
tdb->ecode = TDB_ERR_IO;
1018
/* if the recovery area is after the recovered eof then remove it */
1019
if (recovery_eof <= recovery_head) {
1020
if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1021
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1022
tdb->ecode = TDB_ERR_IO;
1027
/* remove the recovery magic */
1028
if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
1030
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1031
tdb->ecode = TDB_ERR_IO;
1035
/* reduce the file size to the old size */
1037
if (ftruncate(tdb->fd, recovery_eof) != 0) {
1038
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1039
tdb->ecode = TDB_ERR_IO;
1042
tdb->map_size = recovery_eof;
1045
if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1046
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1047
tdb->ecode = TDB_ERR_IO;
1051
TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",