~zulcss/samba/server-dailies-3.4

« back to all changes in this revision

Viewing changes to lib/tdb/common/transaction.c

  • Committer: Chuck Short
  • Date: 2010-09-28 20:38:39 UTC
  • Revision ID: zulcss@ubuntu.com-20100928203839-pgjulytsi9ue63x1
Initial version

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
 /* 
 
2
   Unix SMB/CIFS implementation.
 
3
 
 
4
   trivial database library
 
5
 
 
6
   Copyright (C) Andrew Tridgell              2005
 
7
 
 
8
     ** NOTE! The following LGPL license applies to the tdb
 
9
     ** library. This does NOT imply that all of Samba is released
 
10
     ** under the LGPL
 
11
   
 
12
   This library is free software; you can redistribute it and/or
 
13
   modify it under the terms of the GNU Lesser General Public
 
14
   License as published by the Free Software Foundation; either
 
15
   version 3 of the License, or (at your option) any later version.
 
16
 
 
17
   This library is distributed in the hope that it will be useful,
 
18
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
19
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
20
   Lesser General Public License for more details.
 
21
 
 
22
   You should have received a copy of the GNU Lesser General Public
 
23
   License along with this library; if not, see <http://www.gnu.org/licenses/>.
 
24
*/
 
25
 
 
26
#include "tdb_private.h"
 
27
 
 
28
/*
 
29
  transaction design:
 
30
 
 
31
  - only allow a single transaction at a time per database. This makes
 
32
    using the transaction API simpler, as otherwise the caller would
 
33
    have to cope with temporary failures in transactions that conflict
 
34
    with other current transactions
 
35
 
 
36
  - keep the transaction recovery information in the same file as the
 
37
    database, using a special 'transaction recovery' record pointed at
 
38
    by the header. This removes the need for extra journal files as
 
39
    used by some other databases
 
40
 
 
41
  - dynamically allocated the transaction recover record, re-using it
 
42
    for subsequent transactions. If a larger record is needed then
 
43
    tdb_free() the old record to place it on the normal tdb freelist
 
44
    before allocating the new record
 
45
 
 
46
  - during transactions, keep a linked list of writes all that have
 
47
    been performed by intercepting all tdb_write() calls. The hooked
 
48
    transaction versions of tdb_read() and tdb_write() check this
 
49
    linked list and try to use the elements of the list in preference
 
50
    to the real database.
 
51
 
 
52
  - don't allow any locks to be held when a transaction starts,
 
53
    otherwise we can end up with deadlock (plus lack of lock nesting
 
54
    in posix locks would mean the lock is lost)
 
55
 
 
56
  - if the caller gains a lock during the transaction but doesn't
 
57
    release it then fail the commit
 
58
 
 
59
  - allow for nested calls to tdb_transaction_start(), re-using the
 
60
    existing transaction record. If the inner transaction is cancelled
 
61
    then a subsequent commit will fail
 
62
 
 
63
  - keep a mirrored copy of the tdb hash chain heads to allow for the
 
64
    fast hash heads scan on traverse, updating the mirrored copy in
 
65
    the transaction version of tdb_write
 
66
 
 
67
  - allow callers to mix transaction and non-transaction use of tdb,
 
68
    although once a transaction is started then an exclusive lock is
 
69
    gained until the transaction is committed or cancelled
 
70
 
 
71
  - the commit stategy involves first saving away all modified data
 
72
    into a linearised buffer in the transaction recovery area, then
 
73
    marking the transaction recovery area with a magic value to
 
74
    indicate a valid recovery record. In total 4 fsync/msync calls are
 
75
    needed per commit to prevent race conditions. It might be possible
 
76
    to reduce this to 3 or even 2 with some more work.
 
77
 
 
78
  - check for a valid recovery record on open of the tdb, while the
 
79
    global lock is held. Automatically recover from the transaction
 
80
    recovery area if needed, then continue with the open as
 
81
    usual. This allows for smooth crash recovery with no administrator
 
82
    intervention.
 
83
 
 
84
  - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
 
85
    still available, but no transaction recovery area is used and no
 
86
    fsync/msync calls are made.
 
87
 
 
88
*/
 
89
 
 
90
 
 
91
/*
 
92
  hold the context of any current transaction
 
93
*/
 
94
struct tdb_transaction {
 
95
        /* we keep a mirrored copy of the tdb hash heads here so
 
96
           tdb_next_hash_chain() can operate efficiently */
 
97
        uint32_t *hash_heads;
 
98
 
 
99
        /* the original io methods - used to do IOs to the real db */
 
100
        const struct tdb_methods *io_methods;
 
101
 
 
102
        /* the list of transaction blocks. When a block is first
 
103
           written to, it gets created in this list */
 
104
        uint8_t **blocks;
 
105
        uint32_t num_blocks;
 
106
        uint32_t block_size;      /* bytes in each block */
 
107
        uint32_t last_block_size; /* number of valid bytes in the last block */
 
108
 
 
109
        /* non-zero when an internal transaction error has
 
110
           occurred. All write operations will then fail until the
 
111
           transaction is ended */
 
112
        int transaction_error;
 
113
 
 
114
        /* when inside a transaction we need to keep track of any
 
115
           nested tdb_transaction_start() calls, as these are allowed,
 
116
           but don't create a new transaction */
 
117
        int nesting;
 
118
 
 
119
        /* old file size before transaction */
 
120
        tdb_len_t old_map_size;
 
121
};
 
122
 
 
123
 
 
124
/*
 
125
  read while in a transaction. We need to check first if the data is in our list
 
126
  of transaction elements, then if not do a real read
 
127
*/
 
128
static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
 
129
                            tdb_len_t len, int cv)
 
130
{
 
131
        uint32_t blk;
 
132
 
 
133
        /* break it down into block sized ops */
 
134
        while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
 
135
                tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
 
136
                if (transaction_read(tdb, off, buf, len2, cv) != 0) {
 
137
                        return -1;
 
138
                }
 
139
                len -= len2;
 
140
                off += len2;
 
141
                buf = (void *)(len2 + (char *)buf);
 
142
        }
 
143
 
 
144
        if (len == 0) {
 
145
                return 0;
 
146
        }
 
147
 
 
148
        blk = off / tdb->transaction->block_size;
 
149
 
 
150
        /* see if we have it in the block list */
 
151
        if (tdb->transaction->num_blocks <= blk ||
 
152
            tdb->transaction->blocks[blk] == NULL) {
 
153
                /* nope, do a real read */
 
154
                if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
 
155
                        goto fail;
 
156
                }
 
157
                return 0;
 
158
        }
 
159
 
 
160
        /* it is in the block list. Now check for the last block */
 
161
        if (blk == tdb->transaction->num_blocks-1) {
 
162
                if (len > tdb->transaction->last_block_size) {
 
163
                        goto fail;
 
164
                }
 
165
        }
 
166
        
 
167
        /* now copy it out of this block */
 
168
        memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
 
169
        if (cv) {
 
170
                tdb_convert(buf, len);
 
171
        }
 
172
        return 0;
 
173
 
 
174
fail:
 
175
        TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
 
176
        tdb->ecode = TDB_ERR_IO;
 
177
        tdb->transaction->transaction_error = 1;
 
178
        return -1;
 
179
}
 
180
 
 
181
 
 
182
/*
 
183
  write while in a transaction
 
184
*/
 
185
static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
 
186
                             const void *buf, tdb_len_t len)
 
187
{
 
188
        uint32_t blk;
 
189
 
 
190
        /* if the write is to a hash head, then update the transaction
 
191
           hash heads */
 
192
        if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
 
193
            off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
 
194
                uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
 
195
                memcpy(&tdb->transaction->hash_heads[chain], buf, len);
 
196
        }
 
197
 
 
198
        /* break it up into block sized chunks */
 
199
        while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
 
200
                tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
 
201
                if (transaction_write(tdb, off, buf, len2) != 0) {
 
202
                        return -1;
 
203
                }
 
204
                len -= len2;
 
205
                off += len2;
 
206
                if (buf != NULL) {
 
207
                        buf = (const void *)(len2 + (const char *)buf);
 
208
                }
 
209
        }
 
210
 
 
211
        if (len == 0) {
 
212
                return 0;
 
213
        }
 
214
 
 
215
        blk = off / tdb->transaction->block_size;
 
216
        off = off % tdb->transaction->block_size;
 
217
 
 
218
        if (tdb->transaction->num_blocks <= blk) {
 
219
                uint8_t **new_blocks;
 
220
                /* expand the blocks array */
 
221
                if (tdb->transaction->blocks == NULL) {
 
222
                        new_blocks = (uint8_t **)malloc(
 
223
                                (blk+1)*sizeof(uint8_t *));
 
224
                } else {
 
225
                        new_blocks = (uint8_t **)realloc(
 
226
                                tdb->transaction->blocks,
 
227
                                (blk+1)*sizeof(uint8_t *));
 
228
                }
 
229
                if (new_blocks == NULL) {
 
230
                        tdb->ecode = TDB_ERR_OOM;
 
231
                        goto fail;
 
232
                }
 
233
                memset(&new_blocks[tdb->transaction->num_blocks], 0, 
 
234
                       (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
 
235
                tdb->transaction->blocks = new_blocks;
 
236
                tdb->transaction->num_blocks = blk+1;
 
237
                tdb->transaction->last_block_size = 0;
 
238
        }
 
239
 
 
240
        /* allocate and fill a block? */
 
241
        if (tdb->transaction->blocks[blk] == NULL) {
 
242
                tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
 
243
                if (tdb->transaction->blocks[blk] == NULL) {
 
244
                        tdb->ecode = TDB_ERR_OOM;
 
245
                        tdb->transaction->transaction_error = 1;
 
246
                        return -1;                      
 
247
                }
 
248
                if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
 
249
                        tdb_len_t len2 = tdb->transaction->block_size;
 
250
                        if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
 
251
                                len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
 
252
                        }
 
253
                        if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size, 
 
254
                                                                   tdb->transaction->blocks[blk], 
 
255
                                                                   len2, 0) != 0) {
 
256
                                SAFE_FREE(tdb->transaction->blocks[blk]);                               
 
257
                                tdb->ecode = TDB_ERR_IO;
 
258
                                goto fail;
 
259
                        }
 
260
                        if (blk == tdb->transaction->num_blocks-1) {
 
261
                                tdb->transaction->last_block_size = len2;
 
262
                        }                       
 
263
                }
 
264
        }
 
265
        
 
266
        /* overwrite part of an existing block */
 
267
        if (buf == NULL) {
 
268
                memset(tdb->transaction->blocks[blk] + off, 0, len);
 
269
        } else {
 
270
                memcpy(tdb->transaction->blocks[blk] + off, buf, len);
 
271
        }
 
272
        if (blk == tdb->transaction->num_blocks-1) {
 
273
                if (len + off > tdb->transaction->last_block_size) {
 
274
                        tdb->transaction->last_block_size = len + off;
 
275
                }
 
276
        }
 
277
 
 
278
        return 0;
 
279
 
 
280
fail:
 
281
        TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
 
282
                 (blk*tdb->transaction->block_size) + off, len));
 
283
        tdb->transaction->transaction_error = 1;
 
284
        return -1;
 
285
}
 
286
 
 
287
 
 
288
/*
 
289
  write while in a transaction - this varient never expands the transaction blocks, it only
 
290
  updates existing blocks. This means it cannot change the recovery size
 
291
*/
 
292
static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, 
 
293
                                      const void *buf, tdb_len_t len)
 
294
{
 
295
        uint32_t blk;
 
296
 
 
297
        /* break it up into block sized chunks */
 
298
        while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
 
299
                tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
 
300
                if (transaction_write_existing(tdb, off, buf, len2) != 0) {
 
301
                        return -1;
 
302
                }
 
303
                len -= len2;
 
304
                off += len2;
 
305
                if (buf != NULL) {
 
306
                        buf = (const void *)(len2 + (const char *)buf);
 
307
                }
 
308
        }
 
309
 
 
310
        if (len == 0) {
 
311
                return 0;
 
312
        }
 
313
 
 
314
        blk = off / tdb->transaction->block_size;
 
315
        off = off % tdb->transaction->block_size;
 
316
 
 
317
        if (tdb->transaction->num_blocks <= blk ||
 
318
            tdb->transaction->blocks[blk] == NULL) {
 
319
                return 0;
 
320
        }
 
321
 
 
322
        if (blk == tdb->transaction->num_blocks-1 &&
 
323
            off + len > tdb->transaction->last_block_size) {
 
324
                if (off >= tdb->transaction->last_block_size) {
 
325
                        return 0;
 
326
                }
 
327
                len = tdb->transaction->last_block_size - off;
 
328
        }
 
329
 
 
330
        /* overwrite part of an existing block */
 
331
        memcpy(tdb->transaction->blocks[blk] + off, buf, len);
 
332
 
 
333
        return 0;
 
334
}
 
335
 
 
336
 
 
337
/*
 
338
  accelerated hash chain head search, using the cached hash heads
 
339
*/
 
340
static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
 
341
{
 
342
        uint32_t h = *chain;
 
343
        for (;h < tdb->header.hash_size;h++) {
 
344
                /* the +1 takes account of the freelist */
 
345
                if (0 != tdb->transaction->hash_heads[h+1]) {
 
346
                        break;
 
347
                }
 
348
        }
 
349
        (*chain) = h;
 
350
}
 
351
 
 
352
/*
 
353
  out of bounds check during a transaction
 
354
*/
 
355
static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
 
356
{
 
357
        if (len <= tdb->map_size) {
 
358
                return 0;
 
359
        }
 
360
        return TDB_ERRCODE(TDB_ERR_IO, -1);
 
361
}
 
362
 
 
363
/*
 
364
  transaction version of tdb_expand().
 
365
*/
 
366
static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
 
367
                                   tdb_off_t addition)
 
368
{
 
369
        /* add a write to the transaction elements, so subsequent
 
370
           reads see the zero data */
 
371
        if (transaction_write(tdb, size, NULL, addition) != 0) {
 
372
                return -1;
 
373
        }
 
374
 
 
375
        return 0;
 
376
}
 
377
 
 
378
/*
 
379
  brlock during a transaction - ignore them
 
380
*/
 
381
static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
 
382
                              int rw_type, int lck_type, int probe, size_t len)
 
383
{
 
384
        return 0;
 
385
}
 
386
 
 
387
static const struct tdb_methods transaction_methods = {
 
388
        transaction_read,
 
389
        transaction_write,
 
390
        transaction_next_hash_chain,
 
391
        transaction_oob,
 
392
        transaction_expand_file,
 
393
        transaction_brlock
 
394
};
 
395
 
 
396
 
 
397
/*
 
398
  start a tdb transaction. No token is returned, as only a single
 
399
  transaction is allowed to be pending per tdb_context
 
400
*/
 
401
int tdb_transaction_start(struct tdb_context *tdb)
 
402
{
 
403
        /* some sanity checks */
 
404
        if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
 
405
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
 
406
                tdb->ecode = TDB_ERR_EINVAL;
 
407
                return -1;
 
408
        }
 
409
 
 
410
        /* cope with nested tdb_transaction_start() calls */
 
411
        if (tdb->transaction != NULL) {
 
412
                tdb->transaction->nesting++;
 
413
                TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
 
414
                         tdb->transaction->nesting));
 
415
                return 0;
 
416
        }
 
417
 
 
418
        if (tdb->num_locks != 0 || tdb->global_lock.count) {
 
419
                /* the caller must not have any locks when starting a
 
420
                   transaction as otherwise we'll be screwed by lack
 
421
                   of nested locks in posix */
 
422
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
 
423
                tdb->ecode = TDB_ERR_LOCK;
 
424
                return -1;
 
425
        }
 
426
 
 
427
        if (tdb->travlocks.next != NULL) {
 
428
                /* you cannot use transactions inside a traverse (although you can use
 
429
                   traverse inside a transaction) as otherwise you can end up with
 
430
                   deadlock */
 
431
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
 
432
                tdb->ecode = TDB_ERR_LOCK;
 
433
                return -1;
 
434
        }
 
435
 
 
436
        tdb->transaction = (struct tdb_transaction *)
 
437
                calloc(sizeof(struct tdb_transaction), 1);
 
438
        if (tdb->transaction == NULL) {
 
439
                tdb->ecode = TDB_ERR_OOM;
 
440
                return -1;
 
441
        }
 
442
 
 
443
        /* a page at a time seems like a reasonable compromise between compactness and efficiency */
 
444
        tdb->transaction->block_size = tdb->page_size;
 
445
 
 
446
        /* get the transaction write lock. This is a blocking lock. As
 
447
           discussed with Volker, there are a number of ways we could
 
448
           make this async, which we will probably do in the future */
 
449
        if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
 
450
                SAFE_FREE(tdb->transaction->blocks);
 
451
                SAFE_FREE(tdb->transaction);
 
452
                return -1;
 
453
        }
 
454
        
 
455
        /* get a read lock from the freelist to the end of file. This
 
456
           is upgraded to a write lock during the commit */
 
457
        if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
 
458
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
 
459
                tdb->ecode = TDB_ERR_LOCK;
 
460
                goto fail;
 
461
        }
 
462
 
 
463
        /* setup a copy of the hash table heads so the hash scan in
 
464
           traverse can be fast */
 
465
        tdb->transaction->hash_heads = (uint32_t *)
 
466
                calloc(tdb->header.hash_size+1, sizeof(uint32_t));
 
467
        if (tdb->transaction->hash_heads == NULL) {
 
468
                tdb->ecode = TDB_ERR_OOM;
 
469
                goto fail;
 
470
        }
 
471
        if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
 
472
                                   TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
 
473
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
 
474
                tdb->ecode = TDB_ERR_IO;
 
475
                goto fail;
 
476
        }
 
477
 
 
478
        /* make sure we know about any file expansions already done by
 
479
           anyone else */
 
480
        tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
 
481
        tdb->transaction->old_map_size = tdb->map_size;
 
482
 
 
483
        /* finally hook the io methods, replacing them with
 
484
           transaction specific methods */
 
485
        tdb->transaction->io_methods = tdb->methods;
 
486
        tdb->methods = &transaction_methods;
 
487
 
 
488
        return 0;
 
489
        
 
490
fail:
 
491
        tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
 
492
        tdb_transaction_unlock(tdb);
 
493
        SAFE_FREE(tdb->transaction->blocks);
 
494
        SAFE_FREE(tdb->transaction->hash_heads);
 
495
        SAFE_FREE(tdb->transaction);
 
496
        return -1;
 
497
}
 
498
 
 
499
 
 
500
/*
 
501
  cancel the current transaction
 
502
*/
 
503
int tdb_transaction_cancel(struct tdb_context *tdb)
 
504
{       
 
505
        int i;
 
506
 
 
507
        if (tdb->transaction == NULL) {
 
508
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
 
509
                return -1;
 
510
        }
 
511
 
 
512
        if (tdb->transaction->nesting != 0) {
 
513
                tdb->transaction->transaction_error = 1;
 
514
                tdb->transaction->nesting--;
 
515
                return 0;
 
516
        }               
 
517
 
 
518
        tdb->map_size = tdb->transaction->old_map_size;
 
519
 
 
520
        /* free all the transaction blocks */
 
521
        for (i=0;i<tdb->transaction->num_blocks;i++) {
 
522
                if (tdb->transaction->blocks[i] != NULL) {
 
523
                        free(tdb->transaction->blocks[i]);
 
524
                }
 
525
        }
 
526
        SAFE_FREE(tdb->transaction->blocks);
 
527
 
 
528
        /* remove any global lock created during the transaction */
 
529
        if (tdb->global_lock.count != 0) {
 
530
                tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
 
531
                tdb->global_lock.count = 0;
 
532
        }
 
533
 
 
534
        /* remove any locks created during the transaction */
 
535
        if (tdb->num_locks != 0) {
 
536
                for (i=0;i<tdb->num_lockrecs;i++) {
 
537
                        tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
 
538
                                   F_UNLCK,F_SETLKW, 0, 1);
 
539
                }
 
540
                tdb->num_locks = 0;
 
541
                tdb->num_lockrecs = 0;
 
542
                SAFE_FREE(tdb->lockrecs);
 
543
        }
 
544
 
 
545
        /* restore the normal io methods */
 
546
        tdb->methods = tdb->transaction->io_methods;
 
547
 
 
548
        tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
 
549
        tdb_transaction_unlock(tdb);
 
550
        SAFE_FREE(tdb->transaction->hash_heads);
 
551
        SAFE_FREE(tdb->transaction);
 
552
        
 
553
        return 0;
 
554
}
 
555
 
 
556
/*
 
557
  sync to disk
 
558
*/
 
559
static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
 
560
{       
 
561
        if (fsync(tdb->fd) != 0) {
 
562
                tdb->ecode = TDB_ERR_IO;
 
563
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
 
564
                return -1;
 
565
        }
 
566
#ifdef HAVE_MMAP
 
567
        if (tdb->map_ptr) {
 
568
                tdb_off_t moffset = offset & ~(tdb->page_size-1);
 
569
                if (msync(moffset + (char *)tdb->map_ptr, 
 
570
                          length + (offset - moffset), MS_SYNC) != 0) {
 
571
                        tdb->ecode = TDB_ERR_IO;
 
572
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
 
573
                                 strerror(errno)));
 
574
                        return -1;
 
575
                }
 
576
        }
 
577
#endif
 
578
        return 0;
 
579
}
 
580
 
 
581
 
 
582
/*
 
583
  work out how much space the linearised recovery data will consume
 
584
*/
 
585
static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
 
586
{
 
587
        tdb_len_t recovery_size = 0;
 
588
        int i;
 
589
 
 
590
        recovery_size = sizeof(uint32_t);
 
591
        for (i=0;i<tdb->transaction->num_blocks;i++) {
 
592
                if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
 
593
                        break;
 
594
                }
 
595
                if (tdb->transaction->blocks[i] == NULL) {
 
596
                        continue;
 
597
                }
 
598
                recovery_size += 2*sizeof(tdb_off_t);
 
599
                if (i == tdb->transaction->num_blocks-1) {
 
600
                        recovery_size += tdb->transaction->last_block_size;
 
601
                } else {
 
602
                        recovery_size += tdb->transaction->block_size;
 
603
                }
 
604
        }       
 
605
 
 
606
        return recovery_size;
 
607
}
 
608
 
 
609
/*
 
610
  allocate the recovery area, or use an existing recovery area if it is
 
611
  large enough
 
612
*/
 
613
static int tdb_recovery_allocate(struct tdb_context *tdb, 
 
614
                                 tdb_len_t *recovery_size,
 
615
                                 tdb_off_t *recovery_offset,
 
616
                                 tdb_len_t *recovery_max_size)
 
617
{
 
618
        struct list_struct rec;
 
619
        const struct tdb_methods *methods = tdb->transaction->io_methods;
 
620
        tdb_off_t recovery_head;
 
621
 
 
622
        if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
 
623
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
 
624
                return -1;
 
625
        }
 
626
 
 
627
        rec.rec_len = 0;
 
628
 
 
629
        if (recovery_head != 0 && 
 
630
            methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
 
631
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
 
632
                return -1;
 
633
        }
 
634
 
 
635
        *recovery_size = tdb_recovery_size(tdb);
 
636
 
 
637
        if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
 
638
                /* it fits in the existing area */
 
639
                *recovery_max_size = rec.rec_len;
 
640
                *recovery_offset = recovery_head;
 
641
                return 0;
 
642
        }
 
643
 
 
644
        /* we need to free up the old recovery area, then allocate a
 
645
           new one at the end of the file. Note that we cannot use
 
646
           tdb_allocate() to allocate the new one as that might return
 
647
           us an area that is being currently used (as of the start of
 
648
           the transaction) */
 
649
        if (recovery_head != 0) {
 
650
                if (tdb_free(tdb, recovery_head, &rec) == -1) {
 
651
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
 
652
                        return -1;
 
653
                }
 
654
        }
 
655
 
 
656
        /* the tdb_free() call might have increased the recovery size */
 
657
        *recovery_size = tdb_recovery_size(tdb);
 
658
 
 
659
        /* round up to a multiple of page size */
 
660
        *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
 
661
        *recovery_offset = tdb->map_size;
 
662
        recovery_head = *recovery_offset;
 
663
 
 
664
        if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
 
665
                                     (tdb->map_size - tdb->transaction->old_map_size) +
 
666
                                     sizeof(rec) + *recovery_max_size) == -1) {
 
667
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
 
668
                return -1;
 
669
        }
 
670
 
 
671
        /* remap the file (if using mmap) */
 
672
        methods->tdb_oob(tdb, tdb->map_size + 1, 1);
 
673
 
 
674
        /* we have to reset the old map size so that we don't try to expand the file
 
675
           again in the transaction commit, which would destroy the recovery area */
 
676
        tdb->transaction->old_map_size = tdb->map_size;
 
677
 
 
678
        /* write the recovery header offset and sync - we can sync without a race here
 
679
           as the magic ptr in the recovery record has not been set */
 
680
        CONVERT(recovery_head);
 
681
        if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
 
682
                               &recovery_head, sizeof(tdb_off_t)) == -1) {
 
683
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
 
684
                return -1;
 
685
        }
 
686
        if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
 
687
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
 
688
                return -1;
 
689
        }
 
690
 
 
691
        return 0;
 
692
}
 
693
 
 
694
 
 
695
/*
 
696
  setup the recovery data that will be used on a crash during commit
 
697
*/
 
698
static int transaction_setup_recovery(struct tdb_context *tdb, 
 
699
                                      tdb_off_t *magic_offset)
 
700
{
 
701
        tdb_len_t recovery_size;
 
702
        unsigned char *data, *p;
 
703
        const struct tdb_methods *methods = tdb->transaction->io_methods;
 
704
        struct list_struct *rec;
 
705
        tdb_off_t recovery_offset, recovery_max_size;
 
706
        tdb_off_t old_map_size = tdb->transaction->old_map_size;
 
707
        uint32_t magic, tailer;
 
708
        int i;
 
709
 
 
710
        /*
 
711
          check that the recovery area has enough space
 
712
        */
 
713
        if (tdb_recovery_allocate(tdb, &recovery_size, 
 
714
                                  &recovery_offset, &recovery_max_size) == -1) {
 
715
                return -1;
 
716
        }
 
717
 
 
718
        data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
 
719
        if (data == NULL) {
 
720
                tdb->ecode = TDB_ERR_OOM;
 
721
                return -1;
 
722
        }
 
723
 
 
724
        rec = (struct list_struct *)data;
 
725
        memset(rec, 0, sizeof(*rec));
 
726
 
 
727
        rec->magic    = 0;
 
728
        rec->data_len = recovery_size;
 
729
        rec->rec_len  = recovery_max_size;
 
730
        rec->key_len  = old_map_size;
 
731
        CONVERT(rec);
 
732
 
 
733
        /* build the recovery data into a single blob to allow us to do a single
 
734
           large write, which should be more efficient */
 
735
        p = data + sizeof(*rec);
 
736
        for (i=0;i<tdb->transaction->num_blocks;i++) {
 
737
                tdb_off_t offset;
 
738
                tdb_len_t length;
 
739
 
 
740
                if (tdb->transaction->blocks[i] == NULL) {
 
741
                        continue;
 
742
                }
 
743
 
 
744
                offset = i * tdb->transaction->block_size;
 
745
                length = tdb->transaction->block_size;
 
746
                if (i == tdb->transaction->num_blocks-1) {
 
747
                        length = tdb->transaction->last_block_size;
 
748
                }
 
749
                
 
750
                if (offset >= old_map_size) {
 
751
                        continue;
 
752
                }
 
753
                if (offset + length > tdb->transaction->old_map_size) {
 
754
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
 
755
                        free(data);
 
756
                        tdb->ecode = TDB_ERR_CORRUPT;
 
757
                        return -1;
 
758
                }
 
759
                memcpy(p, &offset, 4);
 
760
                memcpy(p+4, &length, 4);
 
761
                if (DOCONV()) {
 
762
                        tdb_convert(p, 8);
 
763
                }
 
764
                /* the recovery area contains the old data, not the
 
765
                   new data, so we have to call the original tdb_read
 
766
                   method to get it */
 
767
                if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
 
768
                        free(data);
 
769
                        tdb->ecode = TDB_ERR_IO;
 
770
                        return -1;
 
771
                }
 
772
                p += 8 + length;
 
773
        }
 
774
 
 
775
        /* and the tailer */
 
776
        tailer = sizeof(*rec) + recovery_max_size;
 
777
        memcpy(p, &tailer, 4);
 
778
        CONVERT(p);
 
779
 
 
780
        /* write the recovery data to the recovery area */
 
781
        if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
 
782
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
 
783
                free(data);
 
784
                tdb->ecode = TDB_ERR_IO;
 
785
                return -1;
 
786
        }
 
787
        if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
 
788
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
 
789
                free(data);
 
790
                tdb->ecode = TDB_ERR_IO;
 
791
                return -1;
 
792
        }
 
793
 
 
794
        /* as we don't have ordered writes, we have to sync the recovery
 
795
           data before we update the magic to indicate that the recovery
 
796
           data is present */
 
797
        if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
 
798
                free(data);
 
799
                return -1;
 
800
        }
 
801
 
 
802
        free(data);
 
803
 
 
804
        magic = TDB_RECOVERY_MAGIC;
 
805
        CONVERT(magic);
 
806
 
 
807
        *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
 
808
 
 
809
        if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
 
810
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
 
811
                tdb->ecode = TDB_ERR_IO;
 
812
                return -1;
 
813
        }
 
814
        if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
 
815
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
 
816
                tdb->ecode = TDB_ERR_IO;
 
817
                return -1;
 
818
        }
 
819
 
 
820
        /* ensure the recovery magic marker is on disk */
 
821
        if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
 
822
                return -1;
 
823
        }
 
824
 
 
825
        return 0;
 
826
}
 
827
 
 
828
/*
 
829
  commit the current transaction
 
830
*/
 
831
int tdb_transaction_commit(struct tdb_context *tdb)
 
832
{       
 
833
        const struct tdb_methods *methods;
 
834
        tdb_off_t magic_offset = 0;
 
835
        uint32_t zero = 0;
 
836
        int i;
 
837
 
 
838
        if (tdb->transaction == NULL) {
 
839
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
 
840
                return -1;
 
841
        }
 
842
 
 
843
        if (tdb->transaction->transaction_error) {
 
844
                tdb->ecode = TDB_ERR_IO;
 
845
                tdb_transaction_cancel(tdb);
 
846
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
 
847
                return -1;
 
848
        }
 
849
 
 
850
 
 
851
        if (tdb->transaction->nesting != 0) {
 
852
                tdb->transaction->nesting--;
 
853
                return 0;
 
854
        }               
 
855
 
 
856
        /* check for a null transaction */
 
857
        if (tdb->transaction->blocks == NULL) {
 
858
                tdb_transaction_cancel(tdb);
 
859
                return 0;
 
860
        }
 
861
 
 
862
        methods = tdb->transaction->io_methods;
 
863
        
 
864
        /* if there are any locks pending then the caller has not
 
865
           nested their locks properly, so fail the transaction */
 
866
        if (tdb->num_locks || tdb->global_lock.count) {
 
867
                tdb->ecode = TDB_ERR_LOCK;
 
868
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
 
869
                tdb_transaction_cancel(tdb);
 
870
                return -1;
 
871
        }
 
872
 
 
873
        /* upgrade the main transaction lock region to a write lock */
 
874
        if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
 
875
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
 
876
                tdb->ecode = TDB_ERR_LOCK;
 
877
                tdb_transaction_cancel(tdb);
 
878
                return -1;
 
879
        }
 
880
 
 
881
        /* get the global lock - this prevents new users attaching to the database
 
882
           during the commit */
 
883
        if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
 
884
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
 
885
                tdb->ecode = TDB_ERR_LOCK;
 
886
                tdb_transaction_cancel(tdb);
 
887
                return -1;
 
888
        }
 
889
 
 
890
        if (!(tdb->flags & TDB_NOSYNC)) {
 
891
                /* write the recovery data to the end of the file */
 
892
                if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
 
893
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
 
894
                        tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
 
895
                        tdb_transaction_cancel(tdb);
 
896
                        return -1;
 
897
                }
 
898
        }
 
899
 
 
900
        /* expand the file to the new size if needed */
 
901
        if (tdb->map_size != tdb->transaction->old_map_size) {
 
902
                if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
 
903
                                             tdb->map_size - 
 
904
                                             tdb->transaction->old_map_size) == -1) {
 
905
                        tdb->ecode = TDB_ERR_IO;
 
906
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
 
907
                        tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
 
908
                        tdb_transaction_cancel(tdb);
 
909
                        return -1;
 
910
                }
 
911
                tdb->map_size = tdb->transaction->old_map_size;
 
912
                methods->tdb_oob(tdb, tdb->map_size + 1, 1);
 
913
        }
 
914
 
 
915
        /* perform all the writes */
 
916
        for (i=0;i<tdb->transaction->num_blocks;i++) {
 
917
                tdb_off_t offset;
 
918
                tdb_len_t length;
 
919
 
 
920
                if (tdb->transaction->blocks[i] == NULL) {
 
921
                        continue;
 
922
                }
 
923
 
 
924
                offset = i * tdb->transaction->block_size;
 
925
                length = tdb->transaction->block_size;
 
926
                if (i == tdb->transaction->num_blocks-1) {
 
927
                        length = tdb->transaction->last_block_size;
 
928
                }
 
929
 
 
930
                if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
 
931
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
 
932
                        
 
933
                        /* we've overwritten part of the data and
 
934
                           possibly expanded the file, so we need to
 
935
                           run the crash recovery code */
 
936
                        tdb->methods = methods;
 
937
                        tdb_transaction_recover(tdb); 
 
938
 
 
939
                        tdb_transaction_cancel(tdb);
 
940
                        tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
 
941
 
 
942
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
 
943
                        return -1;
 
944
                }
 
945
                SAFE_FREE(tdb->transaction->blocks[i]);
 
946
        } 
 
947
 
 
948
        SAFE_FREE(tdb->transaction->blocks);
 
949
        tdb->transaction->num_blocks = 0;
 
950
 
 
951
        if (!(tdb->flags & TDB_NOSYNC)) {
 
952
                /* ensure the new data is on disk */
 
953
                if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
 
954
                        return -1;
 
955
                }
 
956
 
 
957
                /* remove the recovery marker */
 
958
                if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
 
959
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
 
960
                        return -1;
 
961
                }
 
962
 
 
963
                /* ensure the recovery marker has been removed on disk */
 
964
                if (transaction_sync(tdb, magic_offset, 4) == -1) {
 
965
                        return -1;
 
966
                }
 
967
        }
 
968
 
 
969
        tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
 
970
 
 
971
        /*
 
972
          TODO: maybe write to some dummy hdr field, or write to magic
 
973
          offset without mmap, before the last sync, instead of the
 
974
          utime() call
 
975
        */
 
976
 
 
977
        /* on some systems (like Linux 2.6.x) changes via mmap/msync
 
978
           don't change the mtime of the file, this means the file may
 
979
           not be backed up (as tdb rounding to block sizes means that
 
980
           file size changes are quite rare too). The following forces
 
981
           mtime changes when a transaction completes */
 
982
#ifdef HAVE_UTIME
 
983
        utime(tdb->name, NULL);
 
984
#endif
 
985
 
 
986
        /* use a transaction cancel to free memory and remove the
 
987
           transaction locks */
 
988
        tdb_transaction_cancel(tdb);
 
989
 
 
990
        return 0;
 
991
}
 
992
 
 
993
 
 
994
/*
 
995
  recover from an aborted transaction. Must be called with exclusive
 
996
  database write access already established (including the global
 
997
  lock to prevent new processes attaching)
 
998
*/
 
999
int tdb_transaction_recover(struct tdb_context *tdb)
 
1000
{
 
1001
        tdb_off_t recovery_head, recovery_eof;
 
1002
        unsigned char *data, *p;
 
1003
        uint32_t zero = 0;
 
1004
        struct list_struct rec;
 
1005
 
 
1006
        /* find the recovery area */
 
1007
        if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
 
1008
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
 
1009
                tdb->ecode = TDB_ERR_IO;
 
1010
                return -1;
 
1011
        }
 
1012
 
 
1013
        if (recovery_head == 0) {
 
1014
                /* we have never allocated a recovery record */
 
1015
                return 0;
 
1016
        }
 
1017
 
 
1018
        /* read the recovery record */
 
1019
        if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
 
1020
                                   sizeof(rec), DOCONV()) == -1) {
 
1021
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
 
1022
                tdb->ecode = TDB_ERR_IO;
 
1023
                return -1;
 
1024
        }
 
1025
 
 
1026
        if (rec.magic != TDB_RECOVERY_MAGIC) {
 
1027
                /* there is no valid recovery data */
 
1028
                return 0;
 
1029
        }
 
1030
 
 
1031
        if (tdb->read_only) {
 
1032
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
 
1033
                tdb->ecode = TDB_ERR_CORRUPT;
 
1034
                return -1;
 
1035
        }
 
1036
 
 
1037
        recovery_eof = rec.key_len;
 
1038
 
 
1039
        data = (unsigned char *)malloc(rec.data_len);
 
1040
        if (data == NULL) {
 
1041
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
 
1042
                tdb->ecode = TDB_ERR_OOM;
 
1043
                return -1;
 
1044
        }
 
1045
 
 
1046
        /* read the full recovery data */
 
1047
        if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
 
1048
                                   rec.data_len, 0) == -1) {
 
1049
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
 
1050
                tdb->ecode = TDB_ERR_IO;
 
1051
                return -1;
 
1052
        }
 
1053
 
 
1054
        /* recover the file data */
 
1055
        p = data;
 
1056
        while (p+8 < data + rec.data_len) {
 
1057
                uint32_t ofs, len;
 
1058
                if (DOCONV()) {
 
1059
                        tdb_convert(p, 8);
 
1060
                }
 
1061
                memcpy(&ofs, p, 4);
 
1062
                memcpy(&len, p+4, 4);
 
1063
 
 
1064
                if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
 
1065
                        free(data);
 
1066
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
 
1067
                        tdb->ecode = TDB_ERR_IO;
 
1068
                        return -1;
 
1069
                }
 
1070
                p += 8 + len;
 
1071
        }
 
1072
 
 
1073
        free(data);
 
1074
 
 
1075
        if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
 
1076
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
 
1077
                tdb->ecode = TDB_ERR_IO;
 
1078
                return -1;
 
1079
        }
 
1080
 
 
1081
        /* if the recovery area is after the recovered eof then remove it */
 
1082
        if (recovery_eof <= recovery_head) {
 
1083
                if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
 
1084
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
 
1085
                        tdb->ecode = TDB_ERR_IO;
 
1086
                        return -1;                      
 
1087
                }
 
1088
        }
 
1089
 
 
1090
        /* remove the recovery magic */
 
1091
        if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
 
1092
                          &zero) == -1) {
 
1093
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
 
1094
                tdb->ecode = TDB_ERR_IO;
 
1095
                return -1;                      
 
1096
        }
 
1097
        
 
1098
        /* reduce the file size to the old size */
 
1099
        tdb_munmap(tdb);
 
1100
        if (ftruncate(tdb->fd, recovery_eof) != 0) {
 
1101
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
 
1102
                tdb->ecode = TDB_ERR_IO;
 
1103
                return -1;                      
 
1104
        }
 
1105
        tdb->map_size = recovery_eof;
 
1106
        tdb_mmap(tdb);
 
1107
 
 
1108
        if (transaction_sync(tdb, 0, recovery_eof) == -1) {
 
1109
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
 
1110
                tdb->ecode = TDB_ERR_IO;
 
1111
                return -1;
 
1112
        }
 
1113
 
 
1114
        TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
 
1115
                 recovery_eof));
 
1116
 
 
1117
        /* all done */
 
1118
        return 0;
 
1119
}