2
Unix SMB/CIFS implementation.
4
trivial database library
6
Copyright (C) Andrew Tridgell 2005
8
** NOTE! The following LGPL license applies to the tdb
9
** library. This does NOT imply that all of Samba is released
12
This library is free software; you can redistribute it and/or
13
modify it under the terms of the GNU Lesser General Public
14
License as published by the Free Software Foundation; either
15
version 3 of the License, or (at your option) any later version.
17
This library is distributed in the hope that it will be useful,
18
but WITHOUT ANY WARRANTY; without even the implied warranty of
19
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20
Lesser General Public License for more details.
22
You should have received a copy of the GNU Lesser General Public
23
License along with this library; if not, see <http://www.gnu.org/licenses/>.
26
#include "tdb_private.h"
31
- only allow a single transaction at a time per database. This makes
32
using the transaction API simpler, as otherwise the caller would
33
have to cope with temporary failures in transactions that conflict
34
with other current transactions
36
- keep the transaction recovery information in the same file as the
37
database, using a special 'transaction recovery' record pointed at
38
by the header. This removes the need for extra journal files as
39
used by some other databases
41
- dynamically allocated the transaction recover record, re-using it
42
for subsequent transactions. If a larger record is needed then
43
tdb_free() the old record to place it on the normal tdb freelist
44
before allocating the new record
46
- during transactions, keep a linked list of writes all that have
47
been performed by intercepting all tdb_write() calls. The hooked
48
transaction versions of tdb_read() and tdb_write() check this
49
linked list and try to use the elements of the list in preference
52
- don't allow any locks to be held when a transaction starts,
53
otherwise we can end up with deadlock (plus lack of lock nesting
54
in posix locks would mean the lock is lost)
56
- if the caller gains a lock during the transaction but doesn't
57
release it then fail the commit
59
- allow for nested calls to tdb_transaction_start(), re-using the
60
existing transaction record. If the inner transaction is cancelled
61
then a subsequent commit will fail
63
- keep a mirrored copy of the tdb hash chain heads to allow for the
64
fast hash heads scan on traverse, updating the mirrored copy in
65
the transaction version of tdb_write
67
- allow callers to mix transaction and non-transaction use of tdb,
68
although once a transaction is started then an exclusive lock is
69
gained until the transaction is committed or cancelled
71
- the commit stategy involves first saving away all modified data
72
into a linearised buffer in the transaction recovery area, then
73
marking the transaction recovery area with a magic value to
74
indicate a valid recovery record. In total 4 fsync/msync calls are
75
needed per commit to prevent race conditions. It might be possible
76
to reduce this to 3 or even 2 with some more work.
78
- check for a valid recovery record on open of the tdb, while the
79
global lock is held. Automatically recover from the transaction
80
recovery area if needed, then continue with the open as
81
usual. This allows for smooth crash recovery with no administrator
84
- if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85
still available, but no transaction recovery area is used and no
86
fsync/msync calls are made.
92
hold the context of any current transaction
94
struct tdb_transaction {
95
/* we keep a mirrored copy of the tdb hash heads here so
96
tdb_next_hash_chain() can operate efficiently */
99
/* the original io methods - used to do IOs to the real db */
100
const struct tdb_methods *io_methods;
102
/* the list of transaction blocks. When a block is first
103
written to, it gets created in this list */
106
uint32_t block_size; /* bytes in each block */
107
uint32_t last_block_size; /* number of valid bytes in the last block */
109
/* non-zero when an internal transaction error has
110
occurred. All write operations will then fail until the
111
transaction is ended */
112
int transaction_error;
114
/* when inside a transaction we need to keep track of any
115
nested tdb_transaction_start() calls, as these are allowed,
116
but don't create a new transaction */
119
/* old file size before transaction */
120
tdb_len_t old_map_size;
125
read while in a transaction. We need to check first if the data is in our list
126
of transaction elements, then if not do a real read
128
static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
129
tdb_len_t len, int cv)
133
/* break it down into block sized ops */
134
while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
135
tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
136
if (transaction_read(tdb, off, buf, len2, cv) != 0) {
141
buf = (void *)(len2 + (char *)buf);
148
blk = off / tdb->transaction->block_size;
150
/* see if we have it in the block list */
151
if (tdb->transaction->num_blocks <= blk ||
152
tdb->transaction->blocks[blk] == NULL) {
153
/* nope, do a real read */
154
if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
160
/* it is in the block list. Now check for the last block */
161
if (blk == tdb->transaction->num_blocks-1) {
162
if (len > tdb->transaction->last_block_size) {
167
/* now copy it out of this block */
168
memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
170
tdb_convert(buf, len);
175
TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
176
tdb->ecode = TDB_ERR_IO;
177
tdb->transaction->transaction_error = 1;
183
write while in a transaction
185
static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
186
const void *buf, tdb_len_t len)
190
/* if the write is to a hash head, then update the transaction
192
if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
193
off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
194
uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
195
memcpy(&tdb->transaction->hash_heads[chain], buf, len);
198
/* break it up into block sized chunks */
199
while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
200
tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
201
if (transaction_write(tdb, off, buf, len2) != 0) {
207
buf = (const void *)(len2 + (const char *)buf);
215
blk = off / tdb->transaction->block_size;
216
off = off % tdb->transaction->block_size;
218
if (tdb->transaction->num_blocks <= blk) {
219
uint8_t **new_blocks;
220
/* expand the blocks array */
221
if (tdb->transaction->blocks == NULL) {
222
new_blocks = (uint8_t **)malloc(
223
(blk+1)*sizeof(uint8_t *));
225
new_blocks = (uint8_t **)realloc(
226
tdb->transaction->blocks,
227
(blk+1)*sizeof(uint8_t *));
229
if (new_blocks == NULL) {
230
tdb->ecode = TDB_ERR_OOM;
233
memset(&new_blocks[tdb->transaction->num_blocks], 0,
234
(1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
235
tdb->transaction->blocks = new_blocks;
236
tdb->transaction->num_blocks = blk+1;
237
tdb->transaction->last_block_size = 0;
240
/* allocate and fill a block? */
241
if (tdb->transaction->blocks[blk] == NULL) {
242
tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
243
if (tdb->transaction->blocks[blk] == NULL) {
244
tdb->ecode = TDB_ERR_OOM;
245
tdb->transaction->transaction_error = 1;
248
if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
249
tdb_len_t len2 = tdb->transaction->block_size;
250
if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
251
len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
253
if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size,
254
tdb->transaction->blocks[blk],
256
SAFE_FREE(tdb->transaction->blocks[blk]);
257
tdb->ecode = TDB_ERR_IO;
260
if (blk == tdb->transaction->num_blocks-1) {
261
tdb->transaction->last_block_size = len2;
266
/* overwrite part of an existing block */
268
memset(tdb->transaction->blocks[blk] + off, 0, len);
270
memcpy(tdb->transaction->blocks[blk] + off, buf, len);
272
if (blk == tdb->transaction->num_blocks-1) {
273
if (len + off > tdb->transaction->last_block_size) {
274
tdb->transaction->last_block_size = len + off;
281
TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n",
282
(blk*tdb->transaction->block_size) + off, len));
283
tdb->transaction->transaction_error = 1;
289
write while in a transaction - this varient never expands the transaction blocks, it only
290
updates existing blocks. This means it cannot change the recovery size
292
static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
293
const void *buf, tdb_len_t len)
297
/* break it up into block sized chunks */
298
while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
299
tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
300
if (transaction_write_existing(tdb, off, buf, len2) != 0) {
306
buf = (const void *)(len2 + (const char *)buf);
314
blk = off / tdb->transaction->block_size;
315
off = off % tdb->transaction->block_size;
317
if (tdb->transaction->num_blocks <= blk ||
318
tdb->transaction->blocks[blk] == NULL) {
322
if (blk == tdb->transaction->num_blocks-1 &&
323
off + len > tdb->transaction->last_block_size) {
324
if (off >= tdb->transaction->last_block_size) {
327
len = tdb->transaction->last_block_size - off;
330
/* overwrite part of an existing block */
331
memcpy(tdb->transaction->blocks[blk] + off, buf, len);
338
accelerated hash chain head search, using the cached hash heads
340
static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
343
for (;h < tdb->header.hash_size;h++) {
344
/* the +1 takes account of the freelist */
345
if (0 != tdb->transaction->hash_heads[h+1]) {
353
out of bounds check during a transaction
355
static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
357
if (len <= tdb->map_size) {
360
return TDB_ERRCODE(TDB_ERR_IO, -1);
364
transaction version of tdb_expand().
366
static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
369
/* add a write to the transaction elements, so subsequent
370
reads see the zero data */
371
if (transaction_write(tdb, size, NULL, addition) != 0) {
379
brlock during a transaction - ignore them
381
static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
382
int rw_type, int lck_type, int probe, size_t len)
387
static const struct tdb_methods transaction_methods = {
390
transaction_next_hash_chain,
392
transaction_expand_file,
398
start a tdb transaction. No token is returned, as only a single
399
transaction is allowed to be pending per tdb_context
401
int tdb_transaction_start(struct tdb_context *tdb)
403
/* some sanity checks */
404
if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
405
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
406
tdb->ecode = TDB_ERR_EINVAL;
410
/* cope with nested tdb_transaction_start() calls */
411
if (tdb->transaction != NULL) {
412
tdb->transaction->nesting++;
413
TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
414
tdb->transaction->nesting));
418
if (tdb->num_locks != 0 || tdb->global_lock.count) {
419
/* the caller must not have any locks when starting a
420
transaction as otherwise we'll be screwed by lack
421
of nested locks in posix */
422
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
423
tdb->ecode = TDB_ERR_LOCK;
427
if (tdb->travlocks.next != NULL) {
428
/* you cannot use transactions inside a traverse (although you can use
429
traverse inside a transaction) as otherwise you can end up with
431
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
432
tdb->ecode = TDB_ERR_LOCK;
436
tdb->transaction = (struct tdb_transaction *)
437
calloc(sizeof(struct tdb_transaction), 1);
438
if (tdb->transaction == NULL) {
439
tdb->ecode = TDB_ERR_OOM;
443
/* a page at a time seems like a reasonable compromise between compactness and efficiency */
444
tdb->transaction->block_size = tdb->page_size;
446
/* get the transaction write lock. This is a blocking lock. As
447
discussed with Volker, there are a number of ways we could
448
make this async, which we will probably do in the future */
449
if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
450
SAFE_FREE(tdb->transaction->blocks);
451
SAFE_FREE(tdb->transaction);
455
/* get a read lock from the freelist to the end of file. This
456
is upgraded to a write lock during the commit */
457
if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
458
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
459
tdb->ecode = TDB_ERR_LOCK;
463
/* setup a copy of the hash table heads so the hash scan in
464
traverse can be fast */
465
tdb->transaction->hash_heads = (uint32_t *)
466
calloc(tdb->header.hash_size+1, sizeof(uint32_t));
467
if (tdb->transaction->hash_heads == NULL) {
468
tdb->ecode = TDB_ERR_OOM;
471
if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
472
TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
473
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
474
tdb->ecode = TDB_ERR_IO;
478
/* make sure we know about any file expansions already done by
480
tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
481
tdb->transaction->old_map_size = tdb->map_size;
483
/* finally hook the io methods, replacing them with
484
transaction specific methods */
485
tdb->transaction->io_methods = tdb->methods;
486
tdb->methods = &transaction_methods;
491
tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
492
tdb_transaction_unlock(tdb);
493
SAFE_FREE(tdb->transaction->blocks);
494
SAFE_FREE(tdb->transaction->hash_heads);
495
SAFE_FREE(tdb->transaction);
501
cancel the current transaction
503
int tdb_transaction_cancel(struct tdb_context *tdb)
507
if (tdb->transaction == NULL) {
508
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
512
if (tdb->transaction->nesting != 0) {
513
tdb->transaction->transaction_error = 1;
514
tdb->transaction->nesting--;
518
tdb->map_size = tdb->transaction->old_map_size;
520
/* free all the transaction blocks */
521
for (i=0;i<tdb->transaction->num_blocks;i++) {
522
if (tdb->transaction->blocks[i] != NULL) {
523
free(tdb->transaction->blocks[i]);
526
SAFE_FREE(tdb->transaction->blocks);
528
/* remove any global lock created during the transaction */
529
if (tdb->global_lock.count != 0) {
530
tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
531
tdb->global_lock.count = 0;
534
/* remove any locks created during the transaction */
535
if (tdb->num_locks != 0) {
536
for (i=0;i<tdb->num_lockrecs;i++) {
537
tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
538
F_UNLCK,F_SETLKW, 0, 1);
541
tdb->num_lockrecs = 0;
542
SAFE_FREE(tdb->lockrecs);
545
/* restore the normal io methods */
546
tdb->methods = tdb->transaction->io_methods;
548
tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
549
tdb_transaction_unlock(tdb);
550
SAFE_FREE(tdb->transaction->hash_heads);
551
SAFE_FREE(tdb->transaction);
559
static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
561
if (fsync(tdb->fd) != 0) {
562
tdb->ecode = TDB_ERR_IO;
563
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
568
tdb_off_t moffset = offset & ~(tdb->page_size-1);
569
if (msync(moffset + (char *)tdb->map_ptr,
570
length + (offset - moffset), MS_SYNC) != 0) {
571
tdb->ecode = TDB_ERR_IO;
572
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
583
work out how much space the linearised recovery data will consume
585
static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
587
tdb_len_t recovery_size = 0;
590
recovery_size = sizeof(uint32_t);
591
for (i=0;i<tdb->transaction->num_blocks;i++) {
592
if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
595
if (tdb->transaction->blocks[i] == NULL) {
598
recovery_size += 2*sizeof(tdb_off_t);
599
if (i == tdb->transaction->num_blocks-1) {
600
recovery_size += tdb->transaction->last_block_size;
602
recovery_size += tdb->transaction->block_size;
606
return recovery_size;
610
allocate the recovery area, or use an existing recovery area if it is
613
static int tdb_recovery_allocate(struct tdb_context *tdb,
614
tdb_len_t *recovery_size,
615
tdb_off_t *recovery_offset,
616
tdb_len_t *recovery_max_size)
618
struct list_struct rec;
619
const struct tdb_methods *methods = tdb->transaction->io_methods;
620
tdb_off_t recovery_head;
622
if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
623
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
629
if (recovery_head != 0 &&
630
methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
631
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
635
*recovery_size = tdb_recovery_size(tdb);
637
if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
638
/* it fits in the existing area */
639
*recovery_max_size = rec.rec_len;
640
*recovery_offset = recovery_head;
644
/* we need to free up the old recovery area, then allocate a
645
new one at the end of the file. Note that we cannot use
646
tdb_allocate() to allocate the new one as that might return
647
us an area that is being currently used (as of the start of
649
if (recovery_head != 0) {
650
if (tdb_free(tdb, recovery_head, &rec) == -1) {
651
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
656
/* the tdb_free() call might have increased the recovery size */
657
*recovery_size = tdb_recovery_size(tdb);
659
/* round up to a multiple of page size */
660
*recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
661
*recovery_offset = tdb->map_size;
662
recovery_head = *recovery_offset;
664
if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
665
(tdb->map_size - tdb->transaction->old_map_size) +
666
sizeof(rec) + *recovery_max_size) == -1) {
667
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
671
/* remap the file (if using mmap) */
672
methods->tdb_oob(tdb, tdb->map_size + 1, 1);
674
/* we have to reset the old map size so that we don't try to expand the file
675
again in the transaction commit, which would destroy the recovery area */
676
tdb->transaction->old_map_size = tdb->map_size;
678
/* write the recovery header offset and sync - we can sync without a race here
679
as the magic ptr in the recovery record has not been set */
680
CONVERT(recovery_head);
681
if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
682
&recovery_head, sizeof(tdb_off_t)) == -1) {
683
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
686
if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
687
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
696
setup the recovery data that will be used on a crash during commit
698
static int transaction_setup_recovery(struct tdb_context *tdb,
699
tdb_off_t *magic_offset)
701
tdb_len_t recovery_size;
702
unsigned char *data, *p;
703
const struct tdb_methods *methods = tdb->transaction->io_methods;
704
struct list_struct *rec;
705
tdb_off_t recovery_offset, recovery_max_size;
706
tdb_off_t old_map_size = tdb->transaction->old_map_size;
707
uint32_t magic, tailer;
711
check that the recovery area has enough space
713
if (tdb_recovery_allocate(tdb, &recovery_size,
714
&recovery_offset, &recovery_max_size) == -1) {
718
data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
720
tdb->ecode = TDB_ERR_OOM;
724
rec = (struct list_struct *)data;
725
memset(rec, 0, sizeof(*rec));
728
rec->data_len = recovery_size;
729
rec->rec_len = recovery_max_size;
730
rec->key_len = old_map_size;
733
/* build the recovery data into a single blob to allow us to do a single
734
large write, which should be more efficient */
735
p = data + sizeof(*rec);
736
for (i=0;i<tdb->transaction->num_blocks;i++) {
740
if (tdb->transaction->blocks[i] == NULL) {
744
offset = i * tdb->transaction->block_size;
745
length = tdb->transaction->block_size;
746
if (i == tdb->transaction->num_blocks-1) {
747
length = tdb->transaction->last_block_size;
750
if (offset >= old_map_size) {
753
if (offset + length > tdb->transaction->old_map_size) {
754
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
756
tdb->ecode = TDB_ERR_CORRUPT;
759
memcpy(p, &offset, 4);
760
memcpy(p+4, &length, 4);
764
/* the recovery area contains the old data, not the
765
new data, so we have to call the original tdb_read
767
if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
769
tdb->ecode = TDB_ERR_IO;
776
tailer = sizeof(*rec) + recovery_max_size;
777
memcpy(p, &tailer, 4);
780
/* write the recovery data to the recovery area */
781
if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
782
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
784
tdb->ecode = TDB_ERR_IO;
787
if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
788
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
790
tdb->ecode = TDB_ERR_IO;
794
/* as we don't have ordered writes, we have to sync the recovery
795
data before we update the magic to indicate that the recovery
797
if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
804
magic = TDB_RECOVERY_MAGIC;
807
*magic_offset = recovery_offset + offsetof(struct list_struct, magic);
809
if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
810
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
811
tdb->ecode = TDB_ERR_IO;
814
if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
815
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
816
tdb->ecode = TDB_ERR_IO;
820
/* ensure the recovery magic marker is on disk */
821
if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
829
commit the current transaction
831
int tdb_transaction_commit(struct tdb_context *tdb)
833
const struct tdb_methods *methods;
834
tdb_off_t magic_offset = 0;
838
if (tdb->transaction == NULL) {
839
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
843
if (tdb->transaction->transaction_error) {
844
tdb->ecode = TDB_ERR_IO;
845
tdb_transaction_cancel(tdb);
846
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
851
if (tdb->transaction->nesting != 0) {
852
tdb->transaction->nesting--;
856
/* check for a null transaction */
857
if (tdb->transaction->blocks == NULL) {
858
tdb_transaction_cancel(tdb);
862
methods = tdb->transaction->io_methods;
864
/* if there are any locks pending then the caller has not
865
nested their locks properly, so fail the transaction */
866
if (tdb->num_locks || tdb->global_lock.count) {
867
tdb->ecode = TDB_ERR_LOCK;
868
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
869
tdb_transaction_cancel(tdb);
873
/* upgrade the main transaction lock region to a write lock */
874
if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
875
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
876
tdb->ecode = TDB_ERR_LOCK;
877
tdb_transaction_cancel(tdb);
881
/* get the global lock - this prevents new users attaching to the database
883
if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
884
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
885
tdb->ecode = TDB_ERR_LOCK;
886
tdb_transaction_cancel(tdb);
890
if (!(tdb->flags & TDB_NOSYNC)) {
891
/* write the recovery data to the end of the file */
892
if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
893
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
894
tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
895
tdb_transaction_cancel(tdb);
900
/* expand the file to the new size if needed */
901
if (tdb->map_size != tdb->transaction->old_map_size) {
902
if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
904
tdb->transaction->old_map_size) == -1) {
905
tdb->ecode = TDB_ERR_IO;
906
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
907
tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
908
tdb_transaction_cancel(tdb);
911
tdb->map_size = tdb->transaction->old_map_size;
912
methods->tdb_oob(tdb, tdb->map_size + 1, 1);
915
/* perform all the writes */
916
for (i=0;i<tdb->transaction->num_blocks;i++) {
920
if (tdb->transaction->blocks[i] == NULL) {
924
offset = i * tdb->transaction->block_size;
925
length = tdb->transaction->block_size;
926
if (i == tdb->transaction->num_blocks-1) {
927
length = tdb->transaction->last_block_size;
930
if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
931
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
933
/* we've overwritten part of the data and
934
possibly expanded the file, so we need to
935
run the crash recovery code */
936
tdb->methods = methods;
937
tdb_transaction_recover(tdb);
939
tdb_transaction_cancel(tdb);
940
tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
942
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
945
SAFE_FREE(tdb->transaction->blocks[i]);
948
SAFE_FREE(tdb->transaction->blocks);
949
tdb->transaction->num_blocks = 0;
951
if (!(tdb->flags & TDB_NOSYNC)) {
952
/* ensure the new data is on disk */
953
if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
957
/* remove the recovery marker */
958
if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
959
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
963
/* ensure the recovery marker has been removed on disk */
964
if (transaction_sync(tdb, magic_offset, 4) == -1) {
969
tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
972
TODO: maybe write to some dummy hdr field, or write to magic
973
offset without mmap, before the last sync, instead of the
977
/* on some systems (like Linux 2.6.x) changes via mmap/msync
978
don't change the mtime of the file, this means the file may
979
not be backed up (as tdb rounding to block sizes means that
980
file size changes are quite rare too). The following forces
981
mtime changes when a transaction completes */
983
utime(tdb->name, NULL);
986
/* use a transaction cancel to free memory and remove the
988
tdb_transaction_cancel(tdb);
995
recover from an aborted transaction. Must be called with exclusive
996
database write access already established (including the global
997
lock to prevent new processes attaching)
999
int tdb_transaction_recover(struct tdb_context *tdb)
1001
tdb_off_t recovery_head, recovery_eof;
1002
unsigned char *data, *p;
1004
struct list_struct rec;
1006
/* find the recovery area */
1007
if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1008
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1009
tdb->ecode = TDB_ERR_IO;
1013
if (recovery_head == 0) {
1014
/* we have never allocated a recovery record */
1018
/* read the recovery record */
1019
if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1020
sizeof(rec), DOCONV()) == -1) {
1021
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
1022
tdb->ecode = TDB_ERR_IO;
1026
if (rec.magic != TDB_RECOVERY_MAGIC) {
1027
/* there is no valid recovery data */
1031
if (tdb->read_only) {
1032
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1033
tdb->ecode = TDB_ERR_CORRUPT;
1037
recovery_eof = rec.key_len;
1039
data = (unsigned char *)malloc(rec.data_len);
1041
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
1042
tdb->ecode = TDB_ERR_OOM;
1046
/* read the full recovery data */
1047
if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1048
rec.data_len, 0) == -1) {
1049
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
1050
tdb->ecode = TDB_ERR_IO;
1054
/* recover the file data */
1056
while (p+8 < data + rec.data_len) {
1062
memcpy(&len, p+4, 4);
1064
if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1066
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1067
tdb->ecode = TDB_ERR_IO;
1075
if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1076
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1077
tdb->ecode = TDB_ERR_IO;
1081
/* if the recovery area is after the recovered eof then remove it */
1082
if (recovery_eof <= recovery_head) {
1083
if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1084
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1085
tdb->ecode = TDB_ERR_IO;
1090
/* remove the recovery magic */
1091
if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
1093
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1094
tdb->ecode = TDB_ERR_IO;
1098
/* reduce the file size to the old size */
1100
if (ftruncate(tdb->fd, recovery_eof) != 0) {
1101
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1102
tdb->ecode = TDB_ERR_IO;
1105
tdb->map_size = recovery_eof;
1108
if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1109
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1110
tdb->ecode = TDB_ERR_IO;
1114
TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",