~vkolesnikov/pbxt/pbxt-preload-test-bug

« back to all changes in this revision

Viewing changes to pbxt/src/xaction_xt.cc

  • Committer: paul-mccullagh
  • Date: 2008-03-10 11:36:34 UTC
  • Revision ID: paul-mccullagh-417ebf175a9c8ee6e5b3777d9e2398e1fb197391
Implemented full durability

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (c) 2005 SNAP Innovation GmbH
 
1
/* Copyright (c) 2005 PrimeBase Technologies GmbH
2
2
 *
3
3
 * PrimeBase XT
4
4
 *
21
21
 * H&G2JCtL
22
22
 */
23
23
 
 
24
#include <time.h>
 
25
 
24
26
#include "xt_config.h"
25
27
 
26
28
#include "xaction_xt.h"
29
31
#include "heap_xt.h"
30
32
#include "trace_xt.h"
31
33
#include "myxt_xt.h"
32
 
 
33
 
static XTXactLogPtr xn_get_log_for_writing(XTDatabaseHPtr db, int what_for);
34
 
static XTXactDataPtr xn_get_xact(XTDatabaseHPtr db, xtWord8 xn_id);
35
 
static void xn_sw_start_thread(XTThreadPtr self, XTDatabaseHPtr db);
36
 
static void xn_sw_stop_thread(XTThreadPtr self, XTDatabaseHPtr db);
37
 
static void xn_sw_wait_for_xact(XTThreadPtr self, XTDatabaseHPtr db, u_int secs);
 
34
#include "tabcache_xt.h"
 
35
 
 
36
#ifdef DEBUG
 
37
//#define TRACE_VARIATIONS
 
38
#endif
 
39
 
 
40
static void xn_sw_wait_for_xact(XTThreadPtr self, XTDatabaseHPtr db, u_int hsecs);
38
41
 
39
42
/* ============================================================================================== */
40
43
 
41
 
typedef struct XTXactSeqRead {
42
 
        XTXactLogPtr                    sr_xlog;                        /* The log file to read from. */
43
 
        XTOpenFilePtr                   sr_of;
44
 
        off_t                                   sr_offset;                      /* Offset of the buffer from the beginging of the file. */
45
 
        size_t                                  sr_length;                      /* Length of the data in the buffer. */
46
 
        xtWord1                                 *sr_buffer;
47
 
} XTXactSeqReadRec, *XTXactSeqReadPtr;
48
 
 
49
44
typedef struct XNSWRecItem {
50
 
        xtWord4                                 ri_tab_id;
51
 
        off_t                                   ri_address;
 
45
        xtTableID                               ri_tab_id;
 
46
        xtRecordID                              ri_rec_id;
52
47
} XNSWRecItemRec, *XNSWRecItemPtr;
53
48
 
54
49
typedef struct XNSWToFreeItem {
55
 
        xtWord4                                 ri_tab_id;                      /* If non-zero, then this is the table of the data record to be freed.
 
50
        xtTableID                               ri_tab_id;                      /* If non-zero, then this is the table of the data record to be freed.
56
51
                                                                                                 * If zero, then this free the transaction below must be freed.
57
52
                                                                                                 */
58
53
        union {
59
 
                off_t                           ri_address;
60
 
                xtWord8                         ri_xact_id;
 
54
                xtRecordID                      ri_rec_id;
 
55
                xtXactID                        ri_xn_id;
61
56
        } x;
62
 
        xtWord8                                 ri_wait_id;                     /* Wait for this transaction to be cleaned (or being cleaned up)
 
57
        xtXactID                                ri_wait_xn_id;                  /* Wait for this transaction to be cleaned (or being cleaned up)
63
58
                                                                                                 * before freeing this resource. */
64
59
} XNSWToFreeItemRec, *XNSWToFreeItemPtr;
65
60
 
66
 
typedef struct XNSweeperState {
67
 
        XTDatabaseHPtr                  ss_db;
68
 
        XTXactSeqReadRec                ss_seqread;
69
 
        XTDataBufferRec                 ss_databuf;
70
 
        u_int                                   ss_call_cnt;
71
 
        XTBasicQueueRec                 ss_to_free;
72
 
} XNSweeperStateRec, *XNSweeperStatePtr;
73
 
 
74
61
/* ----------------------------------------------------------------------
75
62
 * Cleanup Record list
76
63
 */
82
69
        XNSWRecItemPtr  y = (XNSWRecItemPtr) b;
83
70
 
84
71
        if (x->ri_tab_id == y->ri_tab_id) {
85
 
                if (x->ri_address == y->ri_address)
 
72
                if (x->ri_rec_id == y->ri_rec_id)
86
73
                        return 0;
87
 
                if (x->ri_address < y->ri_address)
 
74
                if (x->ri_rec_id < y->ri_rec_id)
88
75
                        return -1;
89
76
                return 1;
90
77
        }
115
102
 * When such a record is encountered, it is added to this list
116
103
 * and cleaned up by the sweeper.
117
104
 */
118
 
static void xn_add_cu_record(XTDatabaseHPtr db, xtWord4 tab_id, off_t address)
 
105
static void xn_add_cu_record(XTDatabaseHPtr db, xtTableID tab_id, xtRecordID rec_id)
119
106
{
120
107
        XNSWRecItemRec  cu_rec;
121
108
 
122
109
        cu_rec.ri_tab_id = tab_id;
123
 
        cu_rec.ri_address = address;
 
110
        cu_rec.ri_rec_id = rec_id;
124
111
 
125
112
        xt_sl_lock(NULL, db->db_sw_cu_list);
126
113
        xt_sl_insert(NULL, db->db_sw_cu_list, &cu_rec, &cu_rec);
130
117
/*
131
118
 * Get the next cleanup record from the end of the list.
132
119
 */
133
 
static xtBool xn_get_cu_record(XTDatabaseHPtr db, xtWord4 *tab_id, off_t *address)
 
120
static xtBool xn_get_cu_record(XTDatabaseHPtr db, xtTableID *tab_id, xtRecordID *rec_id)
134
121
{
135
122
        XNSWRecItemPtr cu_rec;
136
123
 
137
124
        xt_sl_lock(NULL, db->db_sw_cu_list);
138
125
        if ((cu_rec = (XNSWRecItemPtr) xt_sl_last_item(db->db_sw_cu_list))) {
139
126
                *tab_id = cu_rec->ri_tab_id;
140
 
                *address = cu_rec->ri_address;
 
127
                *rec_id = cu_rec->ri_rec_id;
141
128
                xt_sl_set_size(db->db_sw_cu_list, xt_sl_get_size(db->db_sw_cu_list) - 1);
142
129
        }
143
130
        xt_sl_unlock(NULL, db->db_sw_cu_list);
149
136
 */
150
137
 
151
138
typedef struct XNWaitFor {
152
 
        xtWord8                                 wf_waiting;                     /* The transaction of the waiting thread. */
153
 
        xtWord8                                 wf_for_me;                      /* The transaction we are waiting for. */
 
139
        xtXactID                                wf_waiting_xn_id;               /* The transaction of the waiting thread. */
 
140
        xtXactID                                wf_for_me_xn_id;                /* The transaction we are waiting for. */
154
141
} XNWaitForRec, *XNWaitForPtr;
155
142
 
156
143
static int xn_compare_wait_for(XTThreadPtr self, register const void *thunk, register const void *a, register const void *b)
157
144
{
158
145
#pragma unused(self, thunk)
159
 
        xtWord8                 *x = (xtWord8 *) a;
 
146
        xtXactID                *x = (xtXactID *) a;
160
147
        XNWaitForPtr    y = (XNWaitForPtr) b;
161
148
 
162
 
        if (*x == y->wf_waiting)
 
149
        if (*x == y->wf_waiting_xn_id)
163
150
                return 0;
164
 
        if (*x < y->wf_waiting)
 
151
        if (xt_xn_is_before(*x, y->wf_waiting_xn_id))
165
152
                return -1;
166
153
        return 1;
167
154
}
177
164
 * By repeatedly scanning the wait_for list we can find out if a
178
165
 * transaction is waiting for itself.
179
166
 */
180
 
static xtBool xn_detect_deadlock(XTDatabaseHPtr db, xtWord8 waiting, xtWord8 for_me)
 
167
static xtBool xn_detect_deadlock(XTDatabaseHPtr db, xtXactID waiting, xtXactID for_me)
181
168
{
182
169
        XNWaitForPtr wf;
183
170
 
188
175
                }
189
176
                if (!(wf = (XNWaitForPtr) xt_sl_find(NULL, db->db_xn_wait_for, &for_me)))
190
177
                        break;
191
 
                for_me = wf->wf_for_me;
 
178
                for_me = wf->wf_for_me_xn_id;
192
179
        }
193
180
        return FALSE;
194
181
}
200
187
 * Before waiting we make a check for deadlocks. A deadlock occurs
201
188
 * if waiting would introduce a cycle.
202
189
 */
203
 
xtPublic xtBool xt_xn_wait_for_xact(XTThreadPtr self, xtWord8 xn_id)
 
190
xtPublic xtBool xt_xn_wait_for_xact(XTThreadPtr self, xtXactID xn_id, xtBool will_retry)
204
191
{
205
192
        XTDatabaseHPtr          db = self->st_database;
206
193
        XTXactDataPtr           xact;
208
195
 
209
196
        ASSERT(self->st_xact_data);
210
197
 
211
 
        wf.wf_waiting = self->st_xact_data->xd_start_id;
212
 
        wf.wf_for_me = xn_id;
 
198
        wf.wf_waiting_xn_id = self->st_xact_data->xd_start_xn_id;
 
199
        wf.wf_for_me_xn_id = xn_id;
213
200
 
214
 
        xt_mutex_lock(&db->db_xn_wait_lock);
 
201
        xt_lock_mutex_ns(&db->db_xn_wait_lock);
215
202
 
216
203
        for (;;) {
217
 
                if (!(xact = xn_get_xact(db, xn_id)))
 
204
                if (!(xact = xt_xn_get_xact(db, xn_id)))
218
205
                        break;
219
206
 
220
207
                /* This is a dirty read, but it should work! */
221
 
                if (xact->xd_end_id || xact->xd_start_id != xn_id)
 
208
                if ((xact->xd_flags & XT_XN_XAC_ENDED) || xact->xd_start_xn_id != xn_id)
222
209
                        break;
223
210
 
224
 
                if (xn_detect_deadlock(db, wf.wf_waiting, wf.wf_for_me))
225
 
                        goto failed;
226
 
 
227
 
                if (!xt_sl_insert(NULL, db->db_xn_wait_for, &wf.wf_waiting, &wf))
228
 
                        goto failed;
229
 
 
230
 
                if (xact->xd_end_id || xact->xd_start_id != xn_id)
 
211
                if (xn_detect_deadlock(db, wf.wf_waiting_xn_id, wf.wf_for_me_xn_id))
 
212
                        goto failed;
 
213
 
 
214
                if (!xt_sl_insert(NULL, db->db_xn_wait_for, &wf.wf_waiting_xn_id, &wf))
 
215
                        goto failed;
 
216
 
 
217
                if ((xact->xd_flags & XT_XN_XAC_ENDED) || xact->xd_start_xn_id != xn_id)
231
218
                        break;
232
219
 
233
220
                /* Timed wait because it is possible that transaction quits before
234
221
                 * we go to sleep.
235
222
                 */
236
223
                if (!xt_timed_wait_cond(NULL, &db->db_xn_wait_cond, &db->db_xn_wait_lock, 2 * 1000)) {
237
 
                        xt_sl_delete(self, db->db_xn_wait_for, &wf.wf_waiting);
 
224
                        xt_sl_delete(self, db->db_xn_wait_for, &wf.wf_waiting_xn_id);
238
225
                        goto failed;
239
226
                }
240
227
 
241
 
                xt_sl_delete(self, db->db_xn_wait_for, &wf.wf_waiting);
 
228
                xt_sl_delete(self, db->db_xn_wait_for, &wf.wf_waiting_xn_id);
 
229
                
 
230
                if (will_retry)
 
231
                        break;
242
232
        }
243
233
 
244
 
        xt_mutex_unlock(&db->db_xn_wait_lock);
 
234
        xt_unlock_mutex_ns(&db->db_xn_wait_lock);
245
235
        return OK;
246
236
 
247
237
        failed:
248
 
        xt_mutex_unlock(&db->db_xn_wait_lock);
 
238
        xt_unlock_mutex_ns(&db->db_xn_wait_lock);
249
239
        return FAILED;
250
240
}
251
241
 
252
242
xtPublic void xt_xn_wakeup_transactions(XTDatabaseHPtr db, xtBool wait_for_sweeper)
253
243
{
254
 
        /*retry:*/
 
244
        /* This flag makes the gap for the race condition
 
245
         * very small.
 
246
         *
 
247
         * However, this posibility still remains because
 
248
         * we do not lock the mutex db_xn_wait_lock here.
 
249
         *
 
250
         * The reason is that it is too expensive.
 
251
         *
 
252
         * In the event that the wakeup is missed the sleeper
 
253
         * wait will timeout eventually.
 
254
         */
 
255
        db->db_sw_wakeup = TRUE;
255
256
        if (!xt_broadcast_cond(NULL, &db->db_xn_wait_cond))
256
257
                xt_log_and_clear_exception_ns();
257
 
        /*
258
 
        if (wait_for_sweeper) {
259
 
                xtBool idle;
260
 
 
261
 
                xt_mutex_lock(&db->db_xn_wait_lock);
262
 
                idle = db->db_sw_idle;
263
 
                xt_mutex_unlock(&db->db_xn_wait_lock);
264
 
                if (idle)
265
 
                        goto retry;
266
 
        }
267
 
        */
268
258
}
269
259
 
270
260
/* ----------------------------------------------------------------------
271
261
 * Utilities
272
262
 */
273
263
 
274
 
static void xn_logname(size_t size, char *path, XTDatabaseHPtr db, xtWord4 log_no)
275
 
{
276
 
#pragma unused(size)
277
 
        char name[50];
278
 
 
279
 
        sprintf(name, "xtlog-%lu.xt", (u_long) log_no);
280
 
        xt_strcpy(PATH_MAX, path, db->db_path);
281
 
        xt_add_dir_char(PATH_MAX, path);
282
 
        xt_strcat(PATH_MAX, path, name);
283
 
}
284
 
 
285
 
static xtBool xn_shared_read(XTOpenFilePtr of, off_t offset, size_t *io_size, void *data)
286
 
{
287
 
        size_t  size = *io_size;
288
 
 
289
 
        return xt_pread_file(of, offset, size, 0, data, io_size);
290
 
}
291
 
 
292
 
static xtBool xn_shared_write(XTOpenFilePtr of, off_t offset, size_t size, void *data)
293
 
{
294
 
        return xt_pwrite_file(of, offset, size, data);
295
 
}
296
 
 
297
 
/* This function ensures that the record at the given offset
298
 
 * is completely in the buffer.
299
 
 */
300
 
static xtBool xn_read_log(XTXactLogPtr xlog, XTOpenFilePtr of, off_t offset, size_t *io_size, void *data)
301
 
{
302
 
        size_t  size = *io_size;
303
 
        size_t  tfer;
304
 
        u_int   boff;
305
 
        xtBool  ok;
306
 
 
307
 
        if (!xlog->xl_offset || offset + size <= xlog->xl_file_size)
308
 
                return xn_shared_read(of, offset, io_size, data);
309
 
 
310
 
        xt_rwlock_rdlock(&xlog->xl_rwlock);
311
 
 
312
 
        /* Check required again, after lock! */
313
 
        if (!xlog->xl_offset || (offset + size <= xlog->xl_file_size)) {
314
 
                xt_rwlock_unlock(&xlog->xl_rwlock);
315
 
 
316
 
                /* Expensive (unlock and lock) but it should not happen often: */
317
 
                ok = xn_shared_read(of, offset, io_size, data);
318
 
                return ok;
319
 
        }
320
 
 
321
 
        if (offset < xlog->xl_file_size) {
322
 
                /* Part in file plus first then part of buffer: */
323
 
                tfer = (size_t) (xlog->xl_file_size - offset);
324
 
                boff = size - tfer;
325
 
                if (boff > xlog->xl_offset)
326
 
                        boff = xlog->xl_offset;
327
 
                memcpy(((char *) data) + tfer, xlog->xl_buffer, boff); // Copy the rest
328
 
                xt_rwlock_unlock(&xlog->xl_rwlock);
329
 
 
330
 
                ok = xn_shared_read(of, offset, &tfer, data);
331
 
                *io_size = tfer + boff;
332
 
                return ok;
333
 
        }
334
 
 
335
 
        /* Read completely in buffer: */
336
 
        boff = (u_int) (offset - xlog->xl_file_size);
337
 
        tfer = size;
338
 
        if (tfer > xlog->xl_offset - boff)
339
 
                tfer = xlog->xl_offset - boff;
340
 
        memcpy(data, xlog->xl_buffer + boff, tfer);
341
 
 
342
 
        xt_rwlock_unlock(&xlog->xl_rwlock);
343
 
        *io_size = tfer;
344
 
        return OK;
345
 
}
346
 
 
347
 
static xtBool xn_write_log(XTXactLogPtr xlog, XTOpenFilePtr of, off_t offset, size_t size, void *data)
348
 
{
349
 
        size_t  tfer;
350
 
        u_int   boff;
351
 
        xtBool  ok;
352
 
 
353
 
        if (!xlog->xl_offset || offset + size <= xlog->xl_file_size) {
354
 
                ok = xn_shared_write(of, offset, size, data);
355
 
                return ok;
356
 
        }
357
 
 
358
 
        xt_rwlock_rdlock(&xlog->xl_rwlock);
359
 
 
360
 
        /* Check required again, after lock! */
361
 
        if (!xlog->xl_offset || (offset + size <= xlog->xl_file_size)) {
362
 
                xt_rwlock_unlock(&xlog->xl_rwlock);
363
 
 
364
 
                /* Expensive (unlock and lock) but it should not happen often: */
365
 
                ok = xn_shared_write(of, offset, size, data);
366
 
                return ok;
367
 
        }
368
 
 
369
 
        if (offset < xlog->xl_file_size) {
370
 
                /* Part in file plus first then part of buffer: */
371
 
                tfer = (size_t) (xlog->xl_file_size - offset);
372
 
                boff = size - tfer;
373
 
                if (boff > xlog->xl_offset)
374
 
                        goto failed;
375
 
                memcpy(xlog->xl_buffer, ((char *) data) + tfer, boff); // Copy the rest
376
 
                xt_rwlock_unlock(&xlog->xl_rwlock);
377
 
 
378
 
                ok = xn_shared_write(of, offset, tfer, data);
379
 
                return ok;
380
 
        }
381
 
 
382
 
        /* Read completely in buffer: */
383
 
        boff = (u_int) (offset - xlog->xl_file_size);
384
 
        tfer = size;
385
 
        if (tfer > xlog->xl_offset - boff)
386
 
                goto failed;
387
 
        memcpy(xlog->xl_buffer + boff, data, tfer);
388
 
 
389
 
        xt_rwlock_unlock(&xlog->xl_rwlock);
390
 
        return OK;
391
 
 
392
 
        failed:
393
 
        /* Tried to write past the EOF! */
394
 
        xt_rwlock_unlock(&xlog->xl_rwlock);
395
 
        xt_register_ferrno(XT_REG_CONTEXT, ESPIPE, xt_file_path(xlog->xl_exfile));
396
 
        return FAILED;
397
 
}
398
 
 
399
 
static xtBool xn_append_log(XTThreadPtr self, size_t size, void *data, off_t *address)
400
 
{
401
 
        register XTXactLogPtr xlog;
402
 
 
403
 
        if (!(xlog = self->st_xact_log)) {
404
 
                if (!(xlog = xn_get_log_for_writing(self->st_database, XT_FOR_USER)))
405
 
                        return FAILED;
406
 
                self->st_xact_log = xlog;
407
 
        }
408
 
 
409
 
        ASSERT_NS(xlog->xl_exfile);
410
 
 
411
 
        if (xlog->xl_offset + size > XT_XACT_LOG_BUFFER_SIZE) {
412
 
                if (!xt_xn_flush_log(xlog))
413
 
                        return FAILED;
414
 
        }
415
 
 
416
 
        ASSERT_NS(size <= XT_DATA_LOG_BUFFER_SIZE);
417
 
 
418
 
        // The record fits in the buffer: */
419
 
        if (address)
420
 
                *address = xlog->xl_file_size + (off_t) xlog->xl_offset;
421
 
        memcpy(xlog->xl_buffer + xlog->xl_offset, data, size);
422
 
        xlog->xl_offset += size;
423
 
 
424
 
        return OK;
425
 
}
426
 
 
427
264
//#define HIGH_X
428
265
#ifdef HIGH_X
429
266
u_long tot_alloced;
438
275
        tot_alloced--;
439
276
#endif
440
277
        /* This indicates the structure is free: */
441
 
        xact->xd_start_id = 0;
 
278
        xact->xd_start_xn_id = 0;
442
279
        if ((xtWord1 *) xact >= db->db_xn_data && (xtWord1 *) xact < db->db_xn_data_end) {
443
280
                /* Put it in the free list: */
444
281
                xact->xd_next_xact = seg->xs_free_list;
445
282
                seg->xs_free_list = xact;
446
283
                return;
447
284
        }
448
 
        xt_sys_free(xact);
 
285
        xt_free_ns(xact);
449
286
}
450
287
 
451
288
/*
459
296
 * This is the number of the last transaction actually
460
297
 * created in memory.
461
298
 *
462
 
 * This means that if you call xn_get_xact() with any
 
299
 * This means that if you call xt_xn_get_xact() with any
463
300
 * number less than or equal to this value, not finding
464
301
 * the transaction means it has already ended!
465
302
 */
466
 
static xtWord8 xn_get_curr_id(XTDatabaseHPtr db)
 
303
xtPublic xtXactID xt_xn_get_curr_id(XTDatabaseHPtr db)
467
304
{
468
305
        int                                             i;
469
 
        xtWord8                                 curr_id = 0;
 
306
        xtXactID                                curr_xn_id;
470
307
        register XTXactSegPtr   seg = db->db_xn_idx;
471
 
        
472
 
        for (i=0; i<XT_XN_NO_OF_SEGMENTS; i++, seg++) {
473
 
                if (curr_id < seg->xt_last_xn_id)
474
 
                        curr_id = seg->xt_last_xn_id;
 
308
 
 
309
        /* Find the highest transaction ID actually created... */
 
310
        curr_xn_id = seg->xt_last_xn_id;
 
311
        seg++;
 
312
        for (i=1; i<XT_XN_NO_OF_SEGMENTS; i++, seg++) {
 
313
                if (xt_xn_is_before(curr_xn_id, seg->xt_last_xn_id))
 
314
                        curr_xn_id = seg->xt_last_xn_id;
475
315
        }
476
 
        return curr_id;
 
316
        return curr_xn_id;
477
317
}
478
318
 
479
 
static XTXactDataPtr xn_add_old_xact(XTDatabaseHPtr db, xtWord8 xn_id)
 
319
xtPublic XTXactDataPtr xt_xn_add_old_xact(XTDatabaseHPtr db, xtXactID xn_id)
480
320
{
481
321
        register XTXactDataPtr  xact;
482
322
        register XTXactSegPtr   seg;
487
327
        hash = &seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
488
328
        xact = *hash;
489
329
        while (xact) {
490
 
                if (xact->xd_start_id == xn_id)
 
330
                if (xact->xd_start_xn_id == xn_id)
491
331
                        goto done_ok;
492
332
                xact = xact->xd_next_xact;
493
333
        }
500
340
                 * up...
501
341
                 */
502
342
                db->db_sw_faster = TRUE;
503
 
                if (!(xact = (XTXactDataPtr) xt_sys_malloc(sizeof(XTXactDataRec)))) {
 
343
                if (!(xact = (XTXactDataPtr) xt_malloc_ns(sizeof(XTXactDataRec)))) {
504
344
                        xt_rwlock_unlock(&seg->xs_tab_lock);
505
345
                        return NULL;
506
346
                }
509
349
        xact->xd_next_xact = *hash;
510
350
        *hash = xact;
511
351
 
512
 
        xact->xd_start_id = xn_id;
513
 
        xact->xd_end_id = 0;
 
352
        xact->xd_start_xn_id = xn_id;
 
353
        xact->xd_end_xn_id = 0;
 
354
        xact->xd_end_time = 0;
514
355
        xact->xd_begin_log = 0;
 
356
        xact->xd_flags = 0;
515
357
 
516
358
        /* Get the largest transaction id. */
517
 
        if (xn_id > seg->xt_last_xn_id)
 
359
        if (xt_xn_is_before(seg->xt_last_xn_id, xn_id))
518
360
                seg->xt_last_xn_id = xn_id;
519
361
 
520
362
        done_ok:
527
369
        return xact;
528
370
}
529
371
 
530
 
static XTXactDataPtr xn_add_new_xact(XTDatabaseHPtr db, xtWord8 xn_id)
 
372
static XTXactDataPtr xn_add_new_xact(XTDatabaseHPtr db, xtXactID xn_id)
531
373
{
532
374
        register XTXactDataPtr  xact;
533
375
        register XTXactSegPtr   seg;
545
387
                 * up...
546
388
                 */
547
389
                db->db_sw_faster = TRUE;
548
 
                if (!(xact = (XTXactDataPtr) xt_sys_malloc(sizeof(XTXactDataRec)))) {
 
390
                if (!(xact = (XTXactDataPtr) xt_malloc_ns(sizeof(XTXactDataRec)))) {
549
391
                        xt_rwlock_unlock(&seg->xs_tab_lock);
550
392
                        return NULL;
551
393
                }
554
396
        xact->xd_next_xact = *hash;
555
397
        *hash = xact;
556
398
 
557
 
        xact->xd_start_id = xn_id;
558
 
        xact->xd_end_id = 0;
 
399
        xact->xd_start_xn_id = xn_id;
 
400
        xact->xd_end_xn_id = 0;
 
401
        xact->xd_end_time = 0;
559
402
        xact->xd_begin_log = 0;
 
403
        xact->xd_flags = 0;
560
404
 
561
405
        seg->xt_last_xn_id = xn_id;
562
406
        xt_rwlock_unlock(&seg->xs_tab_lock);
568
412
        return xact;
569
413
}
570
414
 
571
 
static XTXactDataPtr xn_get_xact(XTDatabaseHPtr db, xtWord8 xn_id)
 
415
xtPublic XTXactDataPtr xt_xn_get_xact(XTDatabaseHPtr db, xtXactID xn_id)
572
416
{
573
417
        register XTXactSegPtr   seg;
574
418
        register XTXactDataPtr  xact;
577
421
        xt_rwlock_rdlock(&seg->xs_tab_lock);
578
422
        xact = seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
579
423
        while (xact) {
580
 
                if (xact->xd_start_id == xn_id)
 
424
                if (xact->xd_start_xn_id == xn_id)
581
425
                        break;
582
426
                xact = xact->xd_next_xact;
583
427
        }
585
429
        return xact;
586
430
}
587
431
 
588
 
xtPublic XTXactDataPtr xt_xn_get_xact(XTDatabaseHPtr db, xtWord8 xn_id)
589
 
{
590
 
        return xn_get_xact(db, xn_id);
591
 
}
592
 
 
593
432
/*
594
433
 * Delete a transaction, return TRUE if the transaction
595
434
 * was found.
596
435
 */
597
 
static xtBool xn_delete_xact(XTDatabaseHPtr db, xtWord8 xn_id)
 
436
xtPublic xtBool xt_xn_delete_xact(XTDatabaseHPtr db, xtXactID xn_id)
598
437
{
599
438
        XTXactDataPtr   xact, pxact = NULL;
600
439
        XTXactSegPtr    seg;
603
442
        xt_rwlock_wrlock(&seg->xs_tab_lock);
604
443
        xact = seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
605
444
        while (xact) {
606
 
                if (xact->xd_start_id == xn_id) {
 
445
                if (xact->xd_start_xn_id == xn_id) {
607
446
                        if (pxact)
608
447
                                pxact->xd_next_xact = xact->xd_next_xact;
609
448
                        else
626
465
 
627
466
int                                     check_ram_init_count = 0;
628
467
xt_rwlock_type          check_ram_lock;
629
 
xtWord8                         check_ram_trns[DEBUG_RAM_LIST_SIZE];
 
468
xtXactID                        check_ram_trns[DEBUG_RAM_LIST_SIZE];
630
469
int                                     check_ram_dummy;
631
470
 
632
471
static void check_ram_init(void)
649
488
 
650
489
        xt_rwlock_rdlock(&check_ram_lock);
651
490
        for (i=0; i<DEBUG_RAM_LIST_SIZE; i++) {
652
 
                if (check_ram_trns[i] && check_ram_trns[i] < db->db_xn_min_ram_id) {
 
491
                if (check_ram_trns[i] && xt_xn_is_before(check_ram_trns[i], db->db_xn_min_ram_id)) {
653
492
                        /* This should never happen! */
654
493
                        XTXactDataPtr x_ptr;
655
494
 
656
495
                        check_ram_dummy = 0;
657
496
                        for (i=0; i<DEBUG_RAM_LIST_SIZE; i++) {
658
497
                                if (check_ram_trns[i]) {
659
 
                                        x_ptr = xn_get_xact(db, check_ram_trns[i]);
 
498
                                        x_ptr = xt_xn_get_xact(db, check_ram_trns[i]);
660
499
                                        check_ram_dummy = 1;
661
500
                                }
662
501
                        }
666
505
        xt_rwlock_unlock(&check_ram_lock);
667
506
}
668
507
 
669
 
static void check_ram_add(xtWord8 tn_id)
 
508
static void check_ram_add(xtXactID xn_id)
670
509
{
671
510
        int i;
672
511
        
673
512
        xt_rwlock_wrlock(&check_ram_lock);
674
513
        for (i=0; i<DEBUG_RAM_LIST_SIZE; i++) {
675
514
                if (!check_ram_trns[i]) {
676
 
                        check_ram_trns[i] = tn_id;
 
515
                        check_ram_trns[i] = xn_id;
677
516
                        xt_rwlock_unlock(&check_ram_lock);
678
517
                        return;
679
518
                }
682
521
        printf("DEBUG --- List too small\n");
683
522
}
684
523
 
685
 
static void check_ram_del(xtWord8 tn_id)
 
524
static void check_ram_del(xtXactID xn_id)
686
525
{
687
526
        int i;
688
527
        
689
528
        xt_rwlock_wrlock(&check_ram_lock);
690
529
        for (i=0; i<DEBUG_RAM_LIST_SIZE; i++) {
691
 
                if (check_ram_trns[i] == tn_id) {
 
530
                if (check_ram_trns[i] == xn_id) {
692
531
                        check_ram_trns[i] = 0;
693
532
                        xt_rwlock_unlock(&check_ram_lock);
694
533
                        return;
698
537
}
699
538
#endif
700
539
 
701
 
static xtBool xn_seq_init(XTThreadPtr self, XTXactSeqReadPtr sr, XTXactLogPtr xlog, XTOpenFilePtr of)
702
 
{
703
 
        sr->sr_xlog = xlog;
704
 
        sr->sr_of = of;
705
 
        sr->sr_offset = 0;
706
 
        sr->sr_length = 0;
707
 
        sr->sr_buffer = (xtWord1 *) xt_malloc(self, XT_XACT_LOG_READ_BUFFER_SIZE);
708
 
        return sr->sr_buffer != NULL;
709
 
}
710
 
 
711
 
static void xn_seq_exit(XTThreadPtr self, XTXactSeqReadPtr sr)
712
 
{
713
 
        if (sr->sr_buffer) {
714
 
                xt_free(self, sr->sr_buffer);
715
 
                sr->sr_buffer = NULL;
716
 
        }
717
 
        sr->sr_xlog = NULL;
718
 
        sr->sr_of = NULL;
719
 
        sr->sr_offset = 0;
720
 
        sr->sr_length = 0;
721
 
}
722
 
 
723
 
static xtBool xn_seq_read(XTXactSeqReadPtr sr, off_t offset, XTXactLogBufferDPtr *entry, size_t *space)
724
 
{
725
 
        if (offset >= sr->sr_offset && offset + sizeof(XTXactLogBufferDRec) <= sr->sr_offset + sr->sr_length) {
726
 
                /* Completely in the buffer: */
727
 
                size_t boff;
728
 
 
729
 
                boff = (size_t) (offset - sr->sr_offset);
730
 
                *entry = (XTXactLogBufferDPtr) (sr->sr_buffer + boff);
731
 
                *space = sr->sr_length - boff;
732
 
                return OK;
733
 
        }
734
 
        
735
 
        /* Although the record may be partially in the buffer we don't bother
736
 
         * to preserver bytes arlready read because the records are so
737
 
         * small <= 11 bytes!
738
 
         */
739
 
        sr->sr_length = XT_XACT_LOG_READ_BUFFER_SIZE;
740
 
        sr->sr_offset = offset;
741
 
        if (!xn_read_log(sr->sr_xlog, sr->sr_of, offset, &sr->sr_length, sr->sr_buffer))
742
 
                return FAILED;
743
 
        *entry = (XTXactLogBufferDPtr) sr->sr_buffer;
744
 
        *space = sr->sr_length;
745
 
        return OK;
746
 
}
747
 
 
748
 
#define XN_MODE_RECOVER                 0
749
 
#define XN_MODE_EXCLUSIVE               1
750
 
#define XN_MODE_SWEEPER                 2
751
 
 
752
 
static xtBool xn_recover_log(XTXactLogPtr xlog, XTOpenFilePtr of)
753
 
{
754
 
        XTDatabaseHPtr          db = xlog->xl_db;
755
 
        XTXactSeqReadRec        sr;
756
 
        off_t                           curr_offset;
757
 
        XTXactLogBufferDPtr     entry;
758
 
        size_t                          space;
759
 
        size_t                          size;
760
 
        xtWord8                         curr_xn_id = 0;
761
 
        xtWord8                         xn_id;
762
 
        XTXactDataPtr           xact;
763
 
        u_int                           active_count = 0;
764
 
        xtWord4                         tab_id;
765
 
 
766
 
        xlog->xl_file_size = xt_seek_eof_file(NULL, of);
767
 
        if (xlog->xl_file_size == -1)
768
 
                return FAILED;
769
 
 
770
 
        if (!xn_seq_init(NULL, &sr, xlog, of))
771
 
                return FAILED;
772
 
 
773
 
        curr_offset = 0;
774
 
 
775
 
        for (;;) {
776
 
                if (!xn_seq_read(&sr, curr_offset, &entry, &space))
777
 
                        goto failed;
778
 
                if (space <= 1)
779
 
                        break;
780
 
                switch (entry->xb.xe_status_1) {
781
 
                        case XT_XN_STATUS_HEADER: {
782
 
                                if (space < offsetof(XTXactLogHeaderDRec, xh_size_4) + 4)
783
 
                                        goto break_loop;
784
 
                                size = XT_GET_DISK_4(entry->xh.xh_size_4);
785
 
                                if (space < size || size < 18)
786
 
                                        goto break_loop;
787
 
                                xn_id = XT_GET_DISK_6(entry->xh.xh_curr_id_6);
788
 
                                if (xn_id > db->db_xn_curr_id)
789
 
                                        db->db_xn_curr_id = xn_id;
790
 
                                tab_id = XT_GET_DISK_4(entry->xh.xh_tab_id_4);
791
 
                                if (tab_id > db->db_curr_tab_id)
792
 
                                        db->db_curr_tab_id = tab_id;
793
 
                                curr_offset += size;
794
 
                                break;
795
 
                        }
796
 
                        case XT_XN_STATUS_BEGIN:
797
 
                        case XT_XN_STATUS_COMMITTED:
798
 
                        case XT_XN_STATUS_ABORTED:
799
 
                                if (space < sizeof(XTXactBeginEntryDRec))
800
 
                                        goto break_loop;
801
 
                                curr_xn_id = XT_GET_DISK_6(entry->xb.xe_xact_id_6);
802
 
                                if (curr_xn_id > db->db_xn_curr_id)
803
 
                                        db->db_xn_curr_id = curr_xn_id;
804
 
 
805
 
                                if (entry->xb.xe_status_1 == XT_XN_STATUS_BEGIN) {
806
 
                                        if (!(xact = xn_add_old_xact(db, curr_xn_id)))
807
 
                                                goto failed;
808
 
 
809
 
                                        xact->xd_begin_log = xlog->xl_number;
810
 
                                        xact->xd_begin_offset = curr_offset;
811
 
                                        xact->xd_committed = FALSE;
812
 
                                        xact->xd_end_id = curr_xn_id;
813
 
                                        active_count++;
814
 
 
815
 
                                        /* This may affect the "minimum RAM transaction": */
816
 
                                        if (!db->db_xn_min_ram_id || curr_xn_id < db->db_xn_min_ram_id)
817
 
                                                db->db_xn_min_ram_id = curr_xn_id;
818
 
                                }
819
 
                                else
820
 
                                        curr_xn_id = 0;
821
 
 
822
 
                                curr_offset += sizeof(XTXactBeginEntryDRec);
823
 
                                break;
824
 
                        case XT_XN_STATUS_COMMIT:
825
 
                        case XT_XN_STATUS_ABORT:
826
 
                                if (space < sizeof(XTXactEndEntryDRec))
827
 
                                        goto break_loop;
828
 
                                xn_id = XT_GET_DISK_6(entry->xe.xe_xend_id_6);
829
 
                                if (curr_xn_id && (xact = xn_get_xact(db, curr_xn_id))) {
830
 
                                        xact->xd_committed = entry->xb.xe_status_1 == XT_XN_STATUS_COMMIT;
831
 
                                        xact->xd_end_id = xn_id;
832
 
                                }
833
 
                                curr_xn_id = 0;
834
 
                                curr_offset += sizeof(XTXactEndEntryDRec);
835
 
                                break;
836
 
                        case XT_XN_STATUS_UPDATE:
837
 
                        case XT_XN_STATUS_INSERT:
838
 
                        case XT_XN_STATUS_DELETE:
839
 
                                if (space < sizeof(XTactUpdateEntryDRec))
840
 
                                        goto break_loop;
841
 
                                curr_offset += sizeof(XTactUpdateEntryDRec);
842
 
                                break;
843
 
                        case XT_XN_STATUS_CURR_IDS:
844
 
                                if (space < sizeof(XTXactCurrEntryDRec))
845
 
                                        goto break_loop;
846
 
 
847
 
                                xn_id = XT_GET_DISK_6(entry->xc.xe_curr_id_6);
848
 
                                if (xn_id > db->db_xn_curr_id) {
849
 
                                        db->db_xn_curr_id = xn_id;
850
 
                                }
851
 
                                tab_id = XT_GET_DISK_4(entry->xc.xe_tab_id_4);
852
 
                                if (tab_id > db->db_curr_tab_id)
853
 
                                        db->db_curr_tab_id = tab_id;
854
 
 
855
 
                                curr_offset += sizeof(XTXactCurrEntryDRec);
856
 
                                break;
857
 
                        default:
858
 
                                goto break_loop;
859
 
                }
860
 
        }
861
 
 
862
 
        break_loop:
863
 
        xn_seq_exit(NULL, &sr);
864
 
        if (xlog->xl_file_size != curr_offset) {
865
 
                xt_registerf(XT_REG_CONTEXT, XT_ERR_XLOG_WAS_CORRUPTED, 0, "Corrupted transaction log, '%s' has been truncated at %lu", xt_file_path(of), (u_long) curr_offset);
866
 
                xt_log_and_clear_exception_ns();
867
 
                xlog->xl_file_size = curr_offset;
868
 
                if (!xt_set_eof_file(NULL, of, xlog->xl_file_size))
869
 
                        return FAILED;
870
 
                xlog->xl_file_size = curr_offset;
871
 
        }
872
 
 
873
 
        if (!active_count && xlog->xl_file_size >= XT_XACT_LOG_ROLLOVER_SIZE)
874
 
                xlog->xl_file_size = 0;
875
 
 
876
 
        if (db->db_xn_curr_id > db->db_xn_max_disk_id)
877
 
                db->db_xn_max_disk_id = db->db_xn_curr_id;
878
 
 
879
 
        xlog->xl_recovered = TRUE;
880
 
        return OK;
881
 
 
882
 
        failed:
883
 
        xn_seq_exit(NULL, &sr);
884
 
        return FAILED;
885
 
}
886
 
 
887
 
static void xn_free_log(XTXactLogPtr xlog)
888
 
{
889
 
        xlog->xl_ref_count--;
890
 
        if (!xlog->xl_ref_count) {
891
 
                if (xlog->xl_sw_file)
892
 
                        xt_close_file_ns(xlog->xl_sw_file);
893
 
 
894
 
                if (xlog->xl_exfile)
895
 
                        xt_close_file_ns(xlog->xl_exfile);
896
 
 
897
 
                xt_free_rwlock(&xlog->xl_rwlock);
898
 
 
899
 
                xt_sys_free(xlog);
900
 
        }
901
 
}
902
 
 
903
 
static void xn_recalc_high_log(XTDatabaseHPtr db)
904
 
{
905
 
        xtWord4 old_high;
906
 
 
907
 
        old_high = db->db_xn_high_log;
908
 
        while (db->db_xn_high_log > 0) {
909
 
                if (!db->db_xn_log_list[db->db_xn_high_log-1] ||
910
 
                        db->db_xn_log_list[db->db_xn_high_log-1]->xl_file_size > 0)
911
 
                        break;
912
 
                db->db_xn_high_log--;
913
 
        }
914
 
        
915
 
        for (u_int i=db->db_xn_high_log; i<old_high; i++) {
916
 
                if (db->db_xn_log_list[i]) {
917
 
                        ASSERT_NS(!db->db_xn_log_list[i]->xl_file_size);
918
 
                        if (!db->db_xn_log_list[i]->xl_file_size) {
919
 
                                xn_free_log(db->db_xn_log_list[i]);
920
 
                                db->db_xn_log_list[i] = NULL;
921
 
                        }
922
 
                }
923
 
        }
924
 
}
925
 
 
926
 
static xtBool xn_create_new_header(XTXactLogPtr xlog, XTOpenFilePtr of, XTDatabaseHPtr db)
927
 
{
928
 
        XTXactLogHeaderDRec     header;
929
 
 
930
 
        memset(&header, 0, sizeof(header));
931
 
        XT_SET_DISK_4(header.xh_magic_4, XT_XN_XACT_LOG_MAGIC);
932
 
        XT_SET_DISK_4(header.xh_size_4, sizeof(XTXactLogHeaderDRec));
933
 
        XT_SET_DISK_6(header.xh_curr_id_6, db->db_xn_curr_id);
934
 
        XT_SET_DISK_4(header.xh_tab_id_4, db->db_curr_tab_id);
935
 
        if (!xt_pwrite_file(of, 0, sizeof(XTXactLogHeaderDRec), &header))
936
 
                return FAILED;
937
 
        xlog->xl_file_size = sizeof(XTXactLogHeaderDRec);
938
 
        if (!xt_set_eof_file(NULL, of, xlog->xl_file_size))
939
 
                return FAILED;
940
 
 
941
 
        if (db->db_xn_curr_id > db->db_xn_max_disk_id)
942
 
                db->db_xn_max_disk_id = db->db_xn_curr_id;
943
 
 
944
 
        return OK;
945
 
}
946
 
 
947
 
static xtBool xn_use_xlog(XTDatabaseHPtr db, XTXactLogPtr *out_xlog, xtWord4 log_no, int mode)
948
 
{
949
 
        XTXactLogPtr            xlog;
950
 
        char                            path[PATH_MAX];
951
 
 
952
 
        xt_mutex_lock(&db->db_xn_log_lock);
953
 
 
954
 
        /* We use offset log_no - 1: */
955
 
        if (log_no > db->db_xn_log_count) {
956
 
                if (!xt_sys_realloc((void **) &db->db_xn_log_list, log_no * sizeof(XTXactLogPtr)))
957
 
                        goto failed;
958
 
                memset(&db->db_xn_log_list[db->db_xn_log_count], 0, (log_no - db->db_xn_log_count) * sizeof(XTXactLogPtr));
959
 
                db->db_xn_log_count = log_no;
960
 
        }
961
 
 
962
 
        if (!(xlog = db->db_xn_log_list[log_no-1])) {
963
 
                /* Load the log: */
964
 
                if (!(xlog = (XTXactLogPtr) xt_sys_malloc(sizeof(XTXactLogRec))))
965
 
                        goto failed;
966
 
                memset(xlog, 0, offsetof(XTXactLogRec, xl_buffer));
967
 
                xlog->xl_db = db;
968
 
                xlog->xl_number = log_no;
969
 
                xlog->xl_ref_count = 1;
970
 
 
971
 
                if (!xt_init_rwlock(NULL, &xlog->xl_rwlock)) {
972
 
                        xt_sys_free(xlog);
973
 
                        goto failed;
974
 
                }
975
 
 
976
 
                db->db_xn_log_list[log_no-1] = xlog; // This reference was counted above
977
 
        }
978
 
 
979
 
        xn_logname(PATH_MAX, path, db, log_no);
980
 
 
981
 
        if (mode == XN_MODE_RECOVER) {
982
 
                if (!xlog->xl_recovered) {
983
 
                        XTOpenFilePtr of;
984
 
 
985
 
                        if (!(of = xt_open_file_ns(path, XT_FS_DEFAULT)))
986
 
                                goto failed;
987
 
 
988
 
                        if (!xn_recover_log(xlog, of)) {
989
 
                                xt_close_file_ns(of);
990
 
                                goto failed;
991
 
                        }
992
 
 
993
 
                        if (!xlog->xl_file_size && !xn_create_new_header(xlog, of, db)) {
994
 
                                xt_close_file_ns(of);
995
 
                                goto failed;
996
 
                        }
997
 
 
998
 
                        xt_close_file_ns(of);
999
 
                }
1000
 
 
1001
 
                xlog = NULL;
1002
 
        }
1003
 
        else if (mode == XN_MODE_SWEEPER) {
1004
 
                if (!xlog->xl_sw_file) {
1005
 
                        if (!(xlog->xl_sw_file = xt_open_file_ns(path, XT_FS_DEFAULT)))
1006
 
                                goto failed;
1007
 
                }
1008
 
                if (!xlog->xl_recovered) {
1009
 
                        if (!xn_recover_log(xlog, xlog->xl_sw_file)) {
1010
 
                                xt_close_file_ns(xlog->xl_sw_file);
1011
 
                                xlog->xl_sw_file = NULL;
1012
 
                                goto failed;
1013
 
                        }
1014
 
                }
1015
 
        }
1016
 
        else {
1017
 
                /* Already open for exclusive use. */
1018
 
                if (xlog->xl_exfile) {
1019
 
                        xlog = NULL;
1020
 
                        goto done_ok;
1021
 
                }
1022
 
 
1023
 
                ASSERT_NS(!xlog->xl_offset);
1024
 
                if (xlog->xl_recovered && xlog->xl_file_size >= XT_XACT_LOG_ROLLOVER_SIZE) {
1025
 
                        xlog = NULL;
1026
 
                        goto done_ok;
1027
 
                }
1028
 
 
1029
 
                if (!(xlog->xl_exfile = xt_open_file_ns(path, XT_FS_CREATE)))
1030
 
                        goto failed;
1031
 
 
1032
 
                if (!xlog->xl_recovered) {
1033
 
                        if (!xn_recover_log(xlog, xlog->xl_exfile))
1034
 
                                goto excl_failed;
1035
 
                }
1036
 
 
1037
 
                if (xlog->xl_file_size == 0) {
1038
 
                        if (!xn_create_new_header(xlog, xlog->xl_exfile, db))
1039
 
                                goto excl_failed;
1040
 
                }
1041
 
                else if (xlog->xl_file_size >= XT_XACT_LOG_ROLLOVER_SIZE) {
1042
 
                        /* The log does not have enough space: */
1043
 
                        xt_close_file_ns(xlog->xl_exfile);
1044
 
                        xlog->xl_exfile = NULL;
1045
 
                        xlog = NULL;
1046
 
                }
1047
 
        }
1048
 
 
1049
 
        done_ok:
1050
 
        if (db->db_xn_log_list[log_no-1] && db->db_xn_log_list[log_no-1]->xl_file_size) {
1051
 
                if (log_no > db->db_xn_high_log)
1052
 
                        db->db_xn_high_log = log_no;
1053
 
        }
1054
 
 
1055
 
        if (out_xlog) {
1056
 
                if (xlog)
1057
 
                        xlog->xl_ref_count++;
1058
 
                *out_xlog = xlog;
1059
 
        }
1060
 
 
1061
 
        xt_mutex_unlock(&db->db_xn_log_lock);
1062
 
        return OK;
1063
 
 
1064
 
        excl_failed:
1065
 
        xt_close_file_ns(xlog->xl_exfile);
1066
 
        xlog->xl_exfile = NULL;
1067
 
 
1068
 
        failed:
1069
 
        xt_mutex_unlock(&db->db_xn_log_lock);
1070
 
        return FAILED;
1071
 
}
1072
 
 
1073
 
/* Get a transaction log for writting. We fill the logs
1074
 
 * at the end first in the hopes of reducing the
1075
 
 * high log.
1076
 
 */
1077
 
static XTXactLogPtr xn_get_log_for_writing(XTDatabaseHPtr db, int what_for)
1078
 
{
1079
 
        xtWord4                 log_no;
1080
 
        XTXactLogPtr    xlog = NULL;
1081
 
        xtWord4                 high;
1082
 
 
1083
 
        high = db->db_xn_high_log;
1084
 
 
1085
 
        if (what_for == XT_FOR_SWEEPER)
1086
 
                log_no = 1;
1087
 
        else {
1088
 
                log_no = db->db_xn_next_log;
1089
 
                if (log_no > high)
1090
 
                        log_no = high;
1091
 
        }
1092
 
 
1093
 
        for (xtWord4 i=0; i<high; i++) {
1094
 
                if (!log_no)
1095
 
                        log_no = high;
1096
 
                if (!xn_use_xlog(db, &xlog, log_no, XN_MODE_EXCLUSIVE))
1097
 
                        return NULL;
1098
 
                if (xlog) {
1099
 
                        ASSERT_NS(xlog->xl_exfile);
1100
 
                        break;
1101
 
                }
1102
 
                log_no--;
1103
 
        }
1104
 
 
1105
 
        if (!xlog) {
1106
 
                log_no = high;
1107
 
                while (!xlog) {
1108
 
                        log_no++;
1109
 
                        if (!xn_use_xlog(db, &xlog, log_no, XN_MODE_EXCLUSIVE))
1110
 
                                return NULL;
1111
 
                }
1112
 
                ASSERT_NS(xlog->xl_exfile);
1113
 
        }
1114
 
 
1115
 
        if (what_for != XT_FOR_SWEEPER)
1116
 
                db->db_xn_next_log = log_no-1;
1117
 
        return xlog;
1118
 
}
1119
 
 
1120
 
xtPublic xtBool xt_xn_flush_log(XTXactLogPtr xlog)
1121
 
{
1122
 
        xtBool ok = TRUE;
1123
 
 
1124
 
        if (xlog->xl_offset) {
1125
 
                xt_rwlock_wrlock(&xlog->xl_rwlock);
1126
 
                if ((ok = xt_pwrite_file(xlog->xl_exfile, xlog->xl_file_size, xlog->xl_offset, xlog->xl_buffer))) {
1127
 
                        /* Locking is to make sure the reader has a consistant view of the length of the file. */
1128
 
                        xlog->xl_file_size += xlog->xl_offset;
1129
 
                        xlog->xl_offset = 0;
1130
 
                }
1131
 
                xt_rwlock_unlock(&xlog->xl_rwlock);
1132
 
        }
1133
 
        return ok;
1134
 
}
1135
 
 
1136
 
static void xn_unlock_xlog(XTXactLogPtr xlog)
1137
 
{
1138
 
        if (xlog->xl_exfile) {
1139
 
                if (!xt_xn_flush_log(xlog))
1140
 
                        xt_log_and_clear_exception_ns();
1141
 
                xt_close_file_ns(xlog->xl_exfile);
1142
 
                xlog->xl_exfile = NULL;
1143
 
        }
1144
 
}
1145
 
 
1146
 
static void xn_release_log(XTXactLogPtr xlog)
1147
 
{
1148
 
        XTDatabaseHPtr db = xlog->xl_db;
1149
 
 
1150
 
        xt_mutex_lock(&db->db_xn_log_lock);
1151
 
        xn_free_log(xlog);
1152
 
        xt_mutex_unlock(&db->db_xn_log_lock);
1153
 
}
1154
 
 
1155
 
static void xn_release_log_w_self(XTThreadPtr self, XTXactLogPtr xlog)
1156
 
{
1157
 
        xn_release_log(xlog);
1158
 
}
1159
 
 
1160
540
/* ----------------------------------------------------------------------
1161
541
 * Init and Exit
1162
542
 */
1164
544
xtPublic void xt_xn_init_db(XTThreadPtr self, XTDatabaseHPtr db)
1165
545
{
1166
546
        XTXactDataPtr   xact;
1167
 
        XTOpenDirPtr    od;
1168
 
        xtWord4                 log_id;
1169
547
        XTXactSegPtr    seg;
1170
548
 
1171
549
#ifdef DEBUG_RAM_LIST
1172
550
        check_ram_init();
1173
551
#endif
1174
552
        xt_init_mutex(self, &db->db_xn_id_lock);
1175
 
        xt_init_mutex(self, &db->db_xn_log_lock);
1176
553
        xt_init_mutex(self, &db->db_xn_wait_lock);
1177
554
        xt_init_cond(self, &db->db_xn_wait_cond);
1178
 
        xt_init_mutex(self, &db->db_sw_tab_lock);
 
555
        xt_init_mutex(self, &db->db_wr_lock);
 
556
        xt_init_cond(self, &db->db_wr_cond);
1179
557
 
1180
 
        /* Pre-alloctate transaction data structures: */
 
558
        /* Pre-allocate transaction data structures: */
1181
559
        db->db_xn_data = (xtWord1 *) xt_malloc(self, sizeof(XTXactDataRec) * XT_XN_DATA_ALLOC_COUNT * XT_XN_NO_OF_SEGMENTS);
1182
560
        db->db_xn_data_end = db->db_xn_data + sizeof(XTXactDataRec) * XT_XN_DATA_ALLOC_COUNT * XT_XN_NO_OF_SEGMENTS;
1183
561
        xact = (XTXactDataPtr) db->db_xn_data;
1186
564
                xt_init_rwlock(self, &seg->xs_tab_lock);
1187
565
                for (u_int j=0;  j<XT_XN_DATA_ALLOC_COUNT; j++) {
1188
566
                        xact->xd_next_xact = seg->xs_free_list;
1189
 
                         seg->xs_free_list = xact;
 
567
                        seg->xs_free_list = xact;
1190
568
                        xact++;
1191
569
                }
1192
570
        }
1193
571
 
1194
 
        pushsr_(od, xt_dir_close, xt_dir_open(self, db->db_path, "xtlog-*.xt"));
1195
 
        while (xt_dir_next(self, od)) {
1196
 
                log_id = xt_file_name_to_id(xt_dir_name(self, od));
1197
 
                if (!xn_use_xlog(db, NULL, log_id, XN_MODE_RECOVER))
1198
 
                        throw_();
 
572
        /* Initialize the data logs: */
 
573
        db->db_datalogs.dlc_init(self, db); 
 
574
 
 
575
        /* Setup the transaction log: */
 
576
        db->db_xlog.xlog_setup(self, db, xt_db_log_file_threshold, xt_db_log_buffer_size, xt_db_transaction_buffer_size);
 
577
 
 
578
        db->db_xn_end_time = 1;
 
579
 
 
580
        /* Initializing the restart file, also does
 
581
         * recovery. This returns the log position after recovery.
 
582
         *
 
583
         * This is the log position where the writer thread will
 
584
         * begin. The writer thread writes changes to the database that
 
585
         * have been flushed to the log.
 
586
         */
 
587
        xt_xres_init(self, db);
 
588
 
 
589
        /* Initialize the "last transaction in memory", by default
 
590
         * this is the current transaction ID, which is the ID
 
591
         * of the last transaction.
 
592
         */
 
593
        for (u_int i=0; i<XT_XN_NO_OF_SEGMENTS; i++) {
 
594
                seg = &db->db_xn_idx[i];
 
595
                xt_init_rwlock(self, &seg->xs_tab_lock);
 
596
                seg->xt_last_xn_id = db->db_xn_curr_id;
1199
597
        }
1200
 
        freer_();
1201
 
 
1202
 
        /* This is our "safety gap". If I did not have this gap then
1203
 
         * I would have to write a log record every time I issue
1204
 
         * a new transaction ID to ensure that a transaction ID
1205
 
         * is NEVER re-used.
1206
 
         *
1207
 
         * With a safety gap I only have to ensure that the
1208
 
         * highest transaction number on disk is within
1209
 
         * the gap when compared to the current transaction
1210
 
         * ID.
1211
 
         *
1212
 
         * This basically means I loose a few transaction IDs
1213
 
         * every time I restart, but this is a small price to
1214
 
         * pay. 
1215
 
         */  
1216
 
        db->db_xn_curr_id += XT_TN_NUMBER_INCREMENT;
1217
 
 
1218
 
        if (!db->db_xn_min_ram_id)
1219
 
                /* This is true because if no transaction was placed in RAM then
1220
 
                 * the next transaction in RAM will have the next ID: */
1221
 
                db->db_xn_min_ram_id = db->db_xn_curr_id + 1;
1222
598
 
1223
599
        /*
1224
600
         * The next transaction to clean is the lowest transaction
1238
614
#else
1239
615
        db->db_sw_cu_list = xt_new_sortedlist(self, sizeof(XNSWRecItemRec), 100, 50, xn_compare_cu_item, db, xn_free_cu_item, TRUE, FALSE);
1240
616
#endif
1241
 
 
1242
 
        xn_sw_start_thread(self, db);
1243
617
}
1244
618
 
1245
619
xtPublic void xt_xn_exit_db(XTThreadPtr self, XTDatabaseHPtr db)
1251
625
        printf("=========> MAX TXs IN RAM: %lu\n", in_ram_max);
1252
626
#endif
1253
627
 
1254
 
        xt_quit_xn_daemons(self, db);
 
628
        xt_stop_sweeper(self, db);      // Should be done already!
 
629
        xt_stop_writer(self, db);       // Should be done already!
 
630
 
 
631
        xt_xres_exit(self, db);
 
632
        db->db_xlog.xlog_exit(self);
 
633
 
 
634
        db->db_datalogs.dlc_exit(self); 
1255
635
 
1256
636
        for (u_int i=0; i<XT_XN_NO_OF_SEGMENTS; i++) {
1257
637
                XTXactSegPtr    seg;
1283
663
                db->db_xn_data_end = NULL;
1284
664
        }
1285
665
 
1286
 
        xt_free_mutex(&db->db_sw_tab_lock);
 
666
        xt_free_cond(&db->db_wr_cond);
 
667
        xt_free_mutex(&db->db_wr_lock);
1287
668
        xt_free_cond(&db->db_xn_wait_cond);
1288
669
        xt_free_mutex(&db->db_xn_wait_lock);
1289
 
        xt_free_mutex(&db->db_xn_log_lock);
1290
670
        xt_free_mutex(&db->db_xn_id_lock);
1291
671
#ifdef DEBUG_RAM_LIST
1292
672
        check_ram_free();
1293
673
#endif
1294
674
}
1295
675
 
1296
 
xtPublic void xt_quit_xn_daemons(XTThreadPtr self, XTDatabaseHPtr db)
1297
 
{
1298
 
        xn_sw_stop_thread(self, db);
1299
 
 
1300
 
        /* I have moved this here from xt_xn_exit_db(), to ensure that
1301
 
         * all log files are closed before a database is deleted.
1302
 
         *
1303
 
         * A database is deleted when the last PBXT table is removed.
1304
 
         */
1305
 
        if (db->db_xn_log_list) {
1306
 
                for (xtWord4 i=0; i<db->db_xn_log_count; i++) {
1307
 
                        if (db->db_xn_log_list[i]) {
1308
 
                                xn_release_log(db->db_xn_log_list[i]);
1309
 
                                db->db_xn_log_list[i] = NULL;
1310
 
                        }
1311
 
                }
1312
 
                xt_sys_free(db->db_xn_log_list);
1313
 
                db->db_xn_log_list = NULL;
1314
 
                db->db_xn_log_count = 0;
1315
 
        }
1316
 
}
1317
 
 
1318
676
xtPublic void xt_xn_init_thread(XTThreadPtr self, int what_for)
1319
677
{
1320
 
        XTXactLogPtr xlog;
 
678
        ASSERT(self->st_database);
1321
679
 
1322
 
        ASSERT(!self->st_xact_log && self->st_database);
1323
 
        /* The garbage collector does not need a log,
1324
 
         * and we will delay opening a log for the user
1325
 
         * until he actually performs an update transaction.
1326
 
         */
1327
 
        if (what_for != XT_FOR_GARBAGE_COLLECTOR && what_for != XT_FOR_USER) {
1328
 
                if (!(xlog = xn_get_log_for_writing(self->st_database, what_for)))
1329
 
                        throw_();
1330
 
                self->st_xact_log = xlog;
 
680
        if (!xt_init_row_lock_list(&self->st_lock_list))
 
681
                xt_throw(self);
 
682
        switch (what_for) {
 
683
                case XT_FOR_COMPACTOR:
 
684
                        self->st_xact_buf.xbuf_init(self, xt_db_transaction_buffer_size, FALSE);
 
685
                        self->st_dlog_buf.dlb_init(self->st_database, xt_db_log_buffer_size);
 
686
                        break;
 
687
                case XT_FOR_WRITER:
 
688
                        /* The writer does not need a transaction buffer. */
 
689
                        self->st_xact_buf.xbuf_init(self, 0, FALSE);
 
690
                        self->st_dlog_buf.dlb_init(self->st_database, 0);
 
691
                        break;
 
692
                case XT_FOR_SWEEPER:
 
693
                        self->st_xact_buf.xbuf_init(self, xt_db_transaction_buffer_size, FALSE);
 
694
                        self->st_dlog_buf.dlb_init(self->st_database, 0);
 
695
                        break;
 
696
                case XT_FOR_USER:
 
697
                        self->st_xact_buf.xbuf_init(self, xt_db_transaction_buffer_size, TRUE);
 
698
                        self->st_dlog_buf.dlb_init(self->st_database, xt_db_log_buffer_size);
 
699
                        break;
1331
700
        }
1332
701
}
1333
702
 
1335
704
{
1336
705
        if (self->st_xact_data)
1337
706
                xt_xn_rollback(self);
1338
 
        if (self->st_xact_log) {
1339
 
                xn_unlock_xlog(self->st_xact_log);
1340
 
                xn_release_log(self->st_xact_log);
1341
 
                self->st_xact_log = NULL;
1342
 
        }
 
707
        self->st_xact_buf.xbuf_exit(self);
 
708
        self->st_dlog_buf.dlb_exit(self);
 
709
        xt_exit_row_lock_list(&self->st_lock_list);
1343
710
}
1344
711
 
1345
712
/* ----------------------------------------------------------------------
1349
716
xtPublic xtBool xt_xn_begin(XTThreadPtr self)
1350
717
{
1351
718
        XTDatabaseHPtr  db = self->st_database;
1352
 
        xtWord8                 xn_id;
 
719
        xtXactID                xn_id;
1353
720
 
1354
721
        ASSERT(!self->st_xact_data);
1355
722
 
1356
723
        xt_mutex_spinlock(&db->db_xn_id_lock);
1357
724
        xn_id = ++db->db_xn_curr_id;
1358
 
        xt_mutex_unlock(&db->db_xn_id_lock);
 
725
        xt_unlock_mutex_ns(&db->db_xn_id_lock);
1359
726
 
1360
727
#ifdef HIGH_X
1361
 
        if ((xn_id - db->db_xn_to_clean_id) > not_clean_max)
 
728
        if (xt_xn_is_before(not_clean_max, xn_id - db->db_xn_to_clean_id))
1362
729
                not_clean_max = xn_id - db->db_xn_to_clean_id;
1363
 
        if ((xn_id - db->db_xn_min_ram_id) > in_ram_max)
 
730
        if (xt_xn_is_before(in_ram_max, xn_id - db->db_xn_min_ram_id))
1364
731
                in_ram_max = xn_id - db->db_xn_min_ram_id;
1365
732
#endif
1366
733
        /* [*] This is the gap between incrementing the ID,
1367
734
         * and creating the transaction in memory.
1368
 
         * See xn_get_curr_id().
 
735
         * See xt_xn_get_curr_id().
1369
736
         */
1370
737
 
1371
738
        if (!(self->st_xact_data = xn_add_new_xact(db, xn_id)))
1372
739
                return FAILED;
1373
 
        XT_DEBUG_TRACE(("BEGIN %p tx=%d\n", self, (int) self->st_xact_data->xd_start_id));
1374
 
        return OK;
1375
 
}
1376
 
 
1377
 
static xtBool xn_flush_log_and_rollover(XTThreadPtr self, int what_for)
1378
 
{
1379
 
        register XTXactLogPtr xlog, new_xlog;
1380
 
 
1381
 
        xlog = self->st_xact_log;
1382
 
        ASSERT_NS(xlog);
1383
 
 
1384
 
        if (!xt_xn_flush_log(xlog))
1385
 
                return FAILED;
1386
 
 
1387
 
        if (xlog->xl_file_size + xlog->xl_offset >= XT_XACT_LOG_ROLLOVER_SIZE) {
1388
 
                /* Switch the log, before we complete the commit. */
1389
 
 
1390
 
                if (!(new_xlog = xn_get_log_for_writing(self->st_database, what_for)))
1391
 
                        return FAILED;
1392
 
                xn_unlock_xlog(xlog);
1393
 
                xn_release_log(xlog);
1394
 
                self->st_xact_log = new_xlog;
1395
 
        }
 
740
        self->st_xact_buf.xbuf_data_written = FALSE;
 
741
        
 
742
        /* All transactions that committed before or at this time
 
743
         * are this one are visible: */
 
744
        self->st_visible_time = db->db_xn_end_time;
 
745
 
 
746
        XT_DEBUG_TRACE(("BEGIN %p tx=%d\n", self, (int) self->st_xact_data->xd_start_xn_id));
 
747
#ifdef TRACE_VARIATIONS
 
748
        xt_trace("%s begin: T%d\n", self->t_name, (int) self->st_xact_data->xd_start_xn_id);
 
749
#endif
 
750
#ifdef PBXT_TRACE_STAT
 
751
        xt_tracef_query(self, "BEGIN T%lu\n", (u_long) self->st_xact_data->xd_start_xn_id);
 
752
#endif
1396
753
        return OK;
1397
754
}
1398
755
 
1403
760
        ASSERT(self->st_xact_data);
1404
761
        if ((xact = self->st_xact_data)) {
1405
762
                XTDatabaseHPtr  db = self->st_database;
1406
 
                xtWord8                 end_id = db->db_xn_curr_id;
1407
 
                xtWord8                 xn_id = xact->xd_start_id;
1408
 
 
1409
 
                if (xact->xd_begin_log) {
 
763
                xtXactID                end_id = db->db_xn_curr_id;
 
764
                xtXactID                xn_id = xact->xd_start_xn_id;
 
765
                xtWord4                 end_seq = ++db->db_xn_end_time;
 
766
                
 
767
                if (self->st_xact_buf.xbuf_data_written) {
1410
768
                        /* The transaction wrote something: */
1411
769
                        XTXactEndEntryDRec      entry;
 
770
                        xtWord4                         sum;
1412
771
 
 
772
                        sum = XT_CHECKSUM4_XACT(xn_id) ^ XT_CHECKSUM4_XACT(end_id);
1413
773
                        entry.xe_status_1 = status;
1414
 
                        XT_SET_DISK_6(entry.xe_xend_id_6, end_id);      
1415
 
                        if (!xn_append_log(self, sizeof(XTXactEndEntryDRec), &entry, NULL))
1416
 
                                return FAILED;
1417
 
 
1418
 
                        if (!xn_flush_log_and_rollover(self, XT_FOR_USER))
1419
 
                                return FAILED;
1420
 
 
1421
 
                        xact->xd_committed = (status == XT_XN_STATUS_COMMIT);
1422
 
                        xact->xd_end_id = end_id;
1423
 
 
1424
 
                        /* Maintain the highest transaction ID on disk: */
1425
 
                        if (xn_id > db->db_xn_max_disk_id)
1426
 
                                db->db_xn_max_disk_id = xn_id;
 
774
                        entry.xe_checksum_1 = XT_CHECKSUM_1(sum);
 
775
                        XT_SET_DISK_4(entry.xe_xact_id_4, xn_id);
 
776
                        XT_SET_DISK_4(entry.xe_xend_id_4, end_id);
 
777
 
 
778
                        /* Flush the data log: */
 
779
                        if (!self->st_dlog_buf.dlb_flush_log(TRUE))
 
780
                                return FAILED;
 
781
 
 
782
                        /* Write and flush the transaction log: */
 
783
                        if (!self->st_xact_buf.xbuf_log_data(self, sizeof(XTXactEndEntryDRec), (XTXactLogBufferDPtr) &entry, TRUE))
 
784
                                return FAILED;
 
785
 
 
786
                        /* Drop locks is you have any: */
 
787
                        self->st_lock_list.xt_remove_all_locks(db);
 
788
 
 
789
                        xact->xd_end_time = end_seq;
 
790
                        xact->xd_end_xn_id = end_id;
 
791
 
 
792
                        /* Setting this flag completes the transaction (do it last)! */
 
793
                        if (status == XT_LOG_ENT_COMMIT)
 
794
                                xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
 
795
                        else
 
796
                                xact->xd_flags |= XT_XN_XAC_ENDED;
1427
797
                }
1428
798
                else {
1429
799
                        /* Read-only transaction can be removed, immediately */
1430
 
 
1431
 
                        xact->xd_end_id = end_id;
1432
 
                        if (xn_delete_xact(db, xn_id)) {
 
800
                        xact->xd_end_time = end_seq;
 
801
                        xact->xd_end_xn_id = end_id;
 
802
 
 
803
                        /* Drop locks is you have any: */
 
804
                        self->st_lock_list.xt_remove_all_locks(db);
 
805
 
 
806
                        if (xt_xn_delete_xact(db, xn_id)) {
1433
807
                                if (db->db_xn_min_ram_id == xn_id)
1434
808
                                        db->db_xn_min_ram_id = xn_id+1;
1435
809
                        }
1447
821
 
1448
822
xtPublic xtBool xt_xn_commit(XTThreadPtr self)
1449
823
{
1450
 
        XT_DEBUG_TRACE(("COMMIT %p tx=%d\n", self, (int) self->st_xact_data->xd_start_id));
1451
 
        return xn_end_xact(self, XT_XN_STATUS_COMMIT);
 
824
#ifdef PBXT_TRACE_STAT
 
825
        xt_tracef_query(self, "COMMIT T%lu\n", (u_long) self->st_xact_data->xd_start_xn_id);
 
826
#endif
 
827
#ifdef TRACE_VARIATIONS
 
828
        xt_trace("%s commit: T%d\n", self->t_name, (int) self->st_xact_data->xd_start_xn_id);
 
829
#endif
 
830
        XT_DEBUG_TRACE(("COMMIT %p tx=%d\n", self, (int) self->st_xact_data->xd_start_xn_id));
 
831
        return xn_end_xact(self, XT_LOG_ENT_COMMIT);
1452
832
}
1453
833
 
1454
834
xtPublic xtBool xt_xn_rollback(XTThreadPtr self)
1455
835
{
1456
 
        XT_DEBUG_TRACE(("ABORT %p tx=%d\n", self, (int) self->st_xact_data->xd_start_id));
1457
 
        return xn_end_xact(self, XT_XN_STATUS_ABORT);
1458
 
}
1459
 
 
1460
 
/* Return TRUE if records written by the given transaction are
1461
 
 * visible to the thread (i.e. the transaction of the thread).
1462
 
 */ 
1463
 
xtPublic xtBool xt_xn_visible(XTOpenTablePtr ot, xtWord8 xn_id, off_t address, xtBool *mine)
1464
 
{
1465
 
        register XTThreadPtr    self = ot->ot_thread;
1466
 
        register XTXactDataPtr  xact;
1467
 
 
1468
 
        /* NOTE: If a transaction is not in RAM, then it is considered aborted.
1469
 
         * This means that we can only remove a transaction from memory when
1470
 
         * all transactions that were running when we started cleanup have
1471
 
         * completed.
1472
 
         * Only these transactions may have read something that has been
1473
 
         * changed by the sweeper in the meantime.
1474
 
         * For example a transaction may fill its buffer when doing a
1475
 
         * sequential read. A record may be cleaned by the sweeper that
1476
 
         * is in this buffer, but the reader already has a old copy of
1477
 
         * the data. Then the sweeper removes the transaction
1478
 
         * from RAM. The reader will then consider this record
1479
 
         * invalid.
1480
 
         */ 
1481
 
        if (xn_id < self->st_database->db_xn_min_ram_id) {
1482
 
                /* This record is not clean, and the transaction is not in
1483
 
                 * RAM. This means it has be missed, so clean it up.
1484
 
                 */
1485
 
                xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, address);
1486
 
                return FALSE;
1487
 
        }
1488
 
        if (xn_id == self->st_xact_data->xd_start_id) {
1489
 
                if (mine)
1490
 
                        *mine = TRUE;
1491
 
                return TRUE;
1492
 
        }
1493
 
        if (xn_id > self->st_xact_data->xd_start_id)
1494
 
                /* This record is written after the this transaction
1495
 
                 * started (is not visible).
1496
 
                 */
1497
 
                return FALSE;
1498
 
        if (!(xact = xn_get_xact(self->st_database, xn_id))) {
1499
 
                xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, address);
1500
 
                return FALSE;
1501
 
        }
1502
 
        if (!xact->xd_end_id || xact->xd_end_id >= self->st_xact_data->xd_start_id)
1503
 
                /* Either this transaction has not yet ended, or this
1504
 
                 * record was written by a transaction that ended
1505
 
                 * after the reading transaction started!
1506
 
                 * So this record is not visible!
1507
 
                 */ 
1508
 
                return FALSE;
1509
 
        /* Visible if the transaction was committed: */
1510
 
        return xact->xd_committed;
1511
 
}
1512
 
 
1513
 
/*
1514
 
 * Return TRUE if the record has been commited.
1515
 
 */
1516
 
xtPublic xtBool xt_xn_committed(XTOpenTablePtr ot, xtWord8 xn_id, off_t address, xtBool *mine)
1517
 
{
1518
 
        register XTThreadPtr    self = ot->ot_thread;
1519
 
        register XTXactDataPtr  xact;
1520
 
 
1521
 
        if (xn_id < self->st_database->db_xn_min_ram_id) {
1522
 
                /* This record is not clean, and the transaction is not in
1523
 
                 * RAM. This means it has be missed, so clean it up.
1524
 
                 */
1525
 
                xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, address);
1526
 
                return FALSE;
1527
 
        }
1528
 
        if (xn_id == self->st_xact_data->xd_start_id) {
1529
 
                if (mine)
1530
 
                        *mine = TRUE;
1531
 
                return TRUE;
1532
 
        }
1533
 
        if (!(xact = xn_get_xact(self->st_database, xn_id))) {
1534
 
                xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, address);
1535
 
                return FALSE;
1536
 
        }
1537
 
        if (!xact->xd_end_id)
1538
 
                /* Either this transaction has not yet ended: */ 
1539
 
                return FALSE;
1540
 
        /* TRUE if the record was committed: */
1541
 
        return xact->xd_committed;
1542
 
}
1543
 
 
1544
 
/*
1545
 
 * Return TRUE of the transaction is committed, or may be
1546
 
 * committed in the future.
1547
 
 *
1548
 
 * if used, 'wait' must be initialized to FALSE!
1549
 
 *
1550
 
 * It will be set to TRUE if the transaction has not yet ended.
1551
 
 * Return FALSE of the transaction was aborted.
1552
 
 */
1553
 
xtPublic xtBool xt_xn_may_commit(XTOpenTablePtr ot, xtWord8 xn_id, off_t address, xtBool *mine, xtBool *wait)
1554
 
{
1555
 
        register XTThreadPtr    self = ot->ot_thread;
1556
 
        register XTXactDataPtr  xact;
1557
 
 
1558
 
        if (xn_id < self->st_database->db_xn_min_ram_id) {
1559
 
                /* Not in RAM, rollback done: */
1560
 
                xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, address);
1561
 
                return FALSE;
1562
 
        }
1563
 
        if (xn_id == self->st_xact_data->xd_start_id) {
1564
 
                if (mine)
1565
 
                        *mine = TRUE;
1566
 
                return TRUE;
1567
 
        }
1568
 
        if (!(xact = xn_get_xact(self->st_database, xn_id))) {
1569
 
                /* Not in RAM, rollback done: */
1570
 
                xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, address);
1571
 
                return FALSE;
1572
 
        }
1573
 
        if (!xact->xd_end_id) {
 
836
#ifdef PBXT_TRACE_STAT
 
837
        xt_tracef_query(self, "ABORT T%lu\n", (u_long) self->st_xact_data->xd_start_xn_id);
 
838
#endif
 
839
#ifdef TRACE_VARIATIONS
 
840
        xt_trace("%s abort: T%d\n", self->t_name, (int) self->st_xact_data->xd_start_xn_id);
 
841
#endif
 
842
        XT_DEBUG_TRACE(("ABORT %p tx=%d\n", self, (int) self->st_xact_data->xd_start_xn_id));
 
843
        return xn_end_xact(self, XT_LOG_ENT_ABORT);
 
844
}
 
845
 
 
846
xtPublic xtBool xt_xn_log_tab_id(XTThreadPtr self, xtTableID tab_id)
 
847
{
 
848
        XTXactNewTabEntryDRec   entry;
 
849
 
 
850
        entry.xt_status_1 = XT_LOG_ENT_NEW_TAB;
 
851
        entry.xt_checksum_1 = XT_CHECKSUM_1(tab_id);
 
852
        XT_SET_DISK_4(entry.xt_tab_id_4, tab_id);
 
853
        return self->st_xact_buf.xbuf_log_data(self, sizeof(XTXactNewTabEntryDRec), (XTXactLogBufferDPtr) &entry, TRUE);
 
854
}
 
855
 
 
856
/*
 
857
 * XT_XN_ABORTED - Transaction was aborted.
 
858
 * XT_XN_MY_UPDATE - The record was update by me.
 
859
 * XT_XN_OTHER_UPDATE - The record was updated by someone else.
 
860
 * XT_XN_COMMITTED - The transaction was committed.
 
861
 */
 
862
xtPublic int xt_xn_status(XTOpenTablePtr ot, xtXactID xn_id, xtRecordID rec_id)
 
863
{
 
864
        register XTThreadPtr    self = ot->ot_thread;
 
865
        register XTXactDataPtr  xact;
 
866
 
 
867
        if (xt_xn_is_before(xn_id, self->st_database->db_xn_min_ram_id)) {
 
868
                /* Not in RAM, rollback done: */
 
869
//*DBG*/xt_dump_trace();
 
870
//*DBG*/xt_dump_xlogs(self->st_database);
 
871
//*DBG*/xt_check_table(self, ot);
 
872
                xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, rec_id);
 
873
                return XT_XN_ABORTED;
 
874
        }
 
875
        if (xn_id == self->st_xact_data->xd_start_xn_id)
 
876
                return XT_XN_MY_UPDATE;
 
877
        if (!(xact = xt_xn_get_xact(self->st_database, xn_id))) {
 
878
                /* Not in RAM, rollback done: */
 
879
                xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, rec_id);
 
880
                return XT_XN_ABORTED;
 
881
        }
 
882
        if (!(xact->xd_flags & XT_XN_XAC_ENDED))
1574
883
                /* Transaction not ended, may be visible. */
1575
 
                if (wait)
1576
 
                        *wait = TRUE;
1577
 
                return TRUE;
1578
 
        }
 
884
                return XT_XN_OTHER_UPDATE;
1579
885
        /* Visible if the transaction was committed: */
1580
 
        return xact->xd_committed;
1581
 
}
1582
 
 
1583
 
/* ----------------------------------------------------------------------
1584
 
 * Log writing
1585
 
 */
1586
 
 
1587
 
static xtBool xn_log_begin(XTThreadPtr self, register XTXactDataPtr xact)
1588
 
{
1589
 
        XTXactBeginEntryDRec    begin;
1590
 
 
1591
 
        begin.xe_status_1 = XT_XN_STATUS_BEGIN;
1592
 
        XT_SET_DISK_6(begin.xe_xact_id_6, xact->xd_start_id);
1593
 
        return xn_append_log(self, sizeof(XTXactBeginEntryDRec), &begin, &xact->xd_begin_offset);
1594
 
}
1595
 
 
1596
 
/*
1597
 
 * This function makes sure that this transaction is less than the
1598
 
 * next transaction number that will be used on restart.
1599
 
 *
1600
 
 * The next number to be used on restart is the highest
1601
 
 * number on disk plus a certain constant:
1602
 
 * XT_TN_NUMBER_INCREMENT (>= 1).
1603
 
 *
1604
 
 * The function must be called before a transaction ID is written
1605
 
 * to a record on disk.
1606
 
 *
1607
 
 * It ensures that the record can be correctly identified
1608
 
 * as invalid (i.e. rolled back). A record is rolled back
1609
 
 * if it has a transaction ID that was not committed. Any
1610
 
 * record that is not clean, but has an ID smaller than
1611
 
 * all transactions in RAM is considered invalid. 
1612
 
 *
1613
 
 * If this is not the case, we write this transaction number
1614
 
 * to disk!
1615
 
 */
1616
 
xtPublic xtBool xt_xn_log_begin(XTOpenTablePtr ot)
1617
 
{
1618
 
        register XTXactDataPtr  xact;
1619
 
 
1620
 
        if (!(xact = ot->ot_thread->st_xact_data)) {
1621
 
                xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_TRANSACTION);
1622
 
                return FAILED;
1623
 
        }
1624
 
        if (!xact->xd_begin_log) {
1625
 
                register XTDatabaseHPtr db;
1626
 
 
1627
 
                db = ot->ot_table->tab_db;
1628
 
                if (xact->xd_start_id > db->db_xn_max_disk_id + XT_TN_NUMBER_INCREMENT) {
1629
 
                        if (!xn_log_begin(ot->ot_thread, xact))
1630
 
                                return FAILED;
1631
 
                        ASSERT_NS(ot->ot_thread->st_xact_log);
1632
 
 
1633
 
                        xact->xd_begin_log = ot->ot_thread->st_xact_log->xl_number;
1634
 
                        if (!xt_xn_flush_log(ot->ot_thread->st_xact_log))
1635
 
                                return FAILED;
1636
 
                        if (xact->xd_start_id > db->db_xn_max_disk_id)
1637
 
                                db->db_xn_max_disk_id = xact->xd_start_id;
1638
 
                }
1639
 
        }
1640
 
        return OK;
1641
 
}
1642
 
 
1643
 
xtPublic xtBool xt_xn_log_update(XTOpenTablePtr ot, off_t record, u_int status, u_int rec_type)
1644
 
{
1645
 
        XTactUpdateEntryDRec    entry;
1646
 
        register XTXactDataPtr  xact;
1647
 
 
1648
 
        xact = ot->ot_thread->st_xact_data;
1649
 
        if (!xact->xd_begin_log) {
1650
 
                if (!xn_log_begin(ot->ot_thread, xact))
1651
 
                        return FAILED;
1652
 
                ASSERT_NS(ot->ot_thread->st_xact_log);
1653
 
                xact->xd_begin_log = ot->ot_thread->st_xact_log->xl_number;
1654
 
        }
1655
 
 
1656
 
        entry.xe_status_1 = (xtWord1) status;
1657
 
        entry.xe_rec_type_1 = (xtWord1) rec_type;
1658
 
        XT_SET_DISK_4(entry.xe_tab_id_4, ot->ot_table->tab_id);
1659
 
        XT_SET_DISK_6(entry.xe_record_6, record);
1660
 
        return xn_append_log(ot->ot_thread, sizeof(XTactUpdateEntryDRec), &entry, NULL);
1661
 
}
1662
 
 
1663
 
xtPublic xtBool xt_xn_log_ids(XTThreadPtr self, XTDatabaseHPtr db)
1664
 
{
1665
 
        XTXactCurrEntryDRec     entry;
1666
 
 
1667
 
        /* Before we delete a log, we have to preserve the current IDs: */
1668
 
        entry.xe_status_1 = XT_XN_STATUS_CURR_IDS;
1669
 
        XT_SET_DISK_6(entry.xe_curr_id_6, db->db_xn_curr_id);
1670
 
        XT_SET_DISK_4(entry.xe_tab_id_4, db->db_curr_tab_id);
1671
 
        if (!xn_append_log(self, sizeof(XTXactCurrEntryDRec), &entry, NULL))
1672
 
                return FAILED;
1673
 
 
1674
 
        if (!xn_flush_log_and_rollover(self, XT_FOR_SWEEPER))
1675
 
                return FAILED;
1676
 
 
1677
 
        if (db->db_xn_curr_id > db->db_xn_max_disk_id)
1678
 
                db->db_xn_max_disk_id = db->db_xn_curr_id;
1679
 
        return OK;
1680
 
}
1681
 
 
1682
 
/* ----------------------------------------------------------------------
1683
 
 * Sweeper process
1684
 
 */
1685
 
 
1686
 
static void xn_sw_free_tables(XTThreadPtr self, XTDatabaseHPtr db)
1687
 
{
1688
 
        XTSWTablePtr st, tmp_st;
1689
 
 
1690
 
        if ((st = db->db_sw_tables)) {
1691
 
                do {
1692
 
                        tmp_st = st->st_less_ru;
1693
 
                        if (st->st_table) {
1694
 
                                xt_flush_table(st->st_table);
1695
 
                                xt_close_table(st->st_table);
1696
 
                        }
1697
 
                        xt_free(self, st);
1698
 
                        st = tmp_st;
1699
 
                } while (st != db->db_sw_tables);
1700
 
        }
1701
 
        db->db_sw_tab_count = 0;
1702
 
        db->db_sw_tables = NULL;
1703
 
}
1704
 
 
1705
 
static void xn_sw_flush_tables(XTThreadPtr self, XTDatabaseHPtr db)
1706
 
{
1707
 
        XTSWTablePtr st;
1708
 
 
1709
 
        if ((st = db->db_sw_tables)) {
1710
 
                do {
1711
 
                        if (st->st_table && st->st_dirty) {
1712
 
                                if (!xt_flush_table(st->st_table))
1713
 
                                        throw_();
1714
 
                                st->st_dirty = FALSE;
1715
 
                        }
1716
 
                        st = st->st_less_ru;
1717
 
                } while (st != db->db_sw_tables);
1718
 
        }
1719
 
}
1720
 
 
1721
 
xtPublic void xt_sw_lock_sweeper(XTThreadPtr self, XTDatabaseHPtr db)
1722
 
{
1723
 
        /* A deadlock may occur if the calling thread
1724
 
         * has a transaction!
1725
 
         *
1726
 
         * I have this situation in which this occurs
1727
 
         * without a deadlock:
1728
 
         *
1729
 
         * create table t2 select * from t1;
1730
 
         *
1731
 
         * This query creates a transaction to write the data
1732
 
         * to the table. It then closes the table as usual
1733
 
         * after the create.
1734
 
         *
1735
 
         * Unfortunately I can't remember how the deadlock
1736
 
         * can occur...
1737
 
         */
1738
 
        //ASSERT(!self->st_xact_data);
1739
 
 
1740
 
        xt_lock_mutex(self, &db->db_sw_tab_lock);
1741
 
        pushr_(xt_unlock_mutex, &db->db_sw_tab_lock);
1742
 
        xn_sw_free_tables(self, db);
1743
 
        popr_(); // xt_unlock_mutex(db->db_sw_tab_lock)
1744
 
}
1745
 
 
1746
 
xtPublic void xt_sw_unlock_sweeper(XTThreadPtr self, XTDatabaseHPtr db)
1747
 
{
1748
 
        xt_unlock_mutex(self, &db->db_sw_tab_lock);
1749
 
}
1750
 
 
1751
 
/*
1752
 
 * If we return NULL, set *have_dic to TRUE if the table cannot
1753
 
 * be found.
1754
 
 */
1755
 
static int xn_sw_open_table(XTThreadPtr self, XTOpenTablePtr *ot, XTDatabaseHPtr db, xtWord4 tab_id)
1756
 
{
1757
 
        XTSWTablePtr    st;
1758
 
        XTTableHPtr             tab;
1759
 
        int                             r;
1760
 
 
1761
 
        if ((st = db->db_sw_tables)) {
1762
 
                do {
1763
 
                        if (st->st_tab_id == tab_id) {
1764
 
                                /* Make this table the most recently used: */
1765
 
                                if (st != db->db_sw_tables) {
1766
 
                                        /* Remove: */
1767
 
                                        st->st_less_ru->st_more_ru = st->st_more_ru;
1768
 
                                        st->st_more_ru->st_less_ru = st->st_less_ru;
1769
 
                                        
1770
 
                                        /* Add to the front: */
1771
 
                                        st->st_less_ru = db->db_sw_tables;
1772
 
                                        st->st_more_ru = db->db_sw_tables->st_more_ru;
1773
 
                                        st->st_less_ru->st_more_ru = st;
1774
 
                                        st->st_more_ru->st_less_ru = st;
1775
 
                                        db->db_sw_tables = st;
1776
 
                                }
1777
 
                                st->st_dirty = TRUE;
1778
 
                                *ot = st->st_table;
1779
 
                                return XT_TAB_OK;
1780
 
                        }
1781
 
                        st = st->st_less_ru;
1782
 
                } while (st != db->db_sw_tables);
1783
 
        }
1784
 
 
1785
 
        if ((r = xt_use_table_by_id(self, &tab, db, tab_id)))
1786
 
                return r;
1787
 
 
1788
 
        pushr_(xt_heap_release, tab);
1789
 
        /*
1790
 
         * Only if a table has been opened by MySQL will we
1791
 
         * have key types and charset information we need
1792
 
         * to compare index keys!
1793
 
         */
1794
 
 
1795
 
        /* Add it to the list: */
1796
 
        st = (XTSWTablePtr) xt_calloc(self, sizeof(XTSWTableRec));
1797
 
        if (!(*ot = xt_open_table(tab))) {
1798
 
                xt_free(self, st);
1799
 
                throw_();
1800
 
        }
1801
 
        db->db_sw_tab_count++;
1802
 
        st->st_tab_id = tab_id;
1803
 
        st->st_table = *ot;
1804
 
        st->st_dirty = TRUE;
1805
 
        if (db->db_sw_tables) {
1806
 
                st->st_less_ru = db->db_sw_tables;
1807
 
                st->st_more_ru = db->db_sw_tables->st_more_ru;
1808
 
                st->st_less_ru->st_more_ru = st;
1809
 
                st->st_more_ru->st_less_ru = st;
1810
 
                db->db_sw_tables = st;
1811
 
                
1812
 
                /* Too many on the open list: */
1813
 
                if (db->db_sw_tab_count > XT_SW_MAX_OPEN_TABLES) {
1814
 
                        /* Remove the least recently used: */
1815
 
                        st = db->db_sw_tables->st_more_ru;
1816
 
 
1817
 
                        st->st_less_ru->st_more_ru = st->st_more_ru;
1818
 
                        st->st_more_ru->st_less_ru = st->st_less_ru;
1819
 
                        
1820
 
                        db->db_sw_tab_count--;
1821
 
                        xt_flush_table(st->st_table);
1822
 
                        xt_close_table(st->st_table);
1823
 
                        xt_free(self, st);
1824
 
                }
1825
 
        }
1826
 
        else {
1827
 
                db->db_sw_tables = st;
1828
 
                st->st_less_ru = st;
1829
 
                st->st_more_ru = st;
1830
 
        }
1831
 
        freer_(); // xt_heap_release(tab)
1832
 
        
1833
 
        return XT_TAB_OK;
 
886
        if (xact->xd_flags & XT_XN_XAC_COMMITTED) {
 
887
                if (!xt_xn_is_before(self->st_visible_time, xact->xd_end_time))  // was self->st_visible_time >= xact->xd_end_time
 
888
                        return XT_XN_VISIBLE;
 
889
                return XT_XN_NOT_VISIBLE;
 
890
        }
 
891
        return XT_XN_ABORTED;
 
892
}
 
893
 
 
894
/* ----------------------------------------------------------------------
 
895
 * S W E E P E R    P R O C E S S
 
896
 */
 
897
 
 
898
typedef struct XNSweeperState {
 
899
        XTDatabaseHPtr                  ss_db;
 
900
        XTXactSeqReadRec                ss_seqread;
 
901
        XTDataBufferRec                 ss_databuf;
 
902
        u_int                                   ss_call_cnt;
 
903
        XTBasicQueueRec                 ss_to_free;
 
904
        xtBool                                  ss_flush_pending;
 
905
        XTOpenTablePtr                  ss_ot;
 
906
} XNSweeperStateRec, *XNSweeperStatePtr;
 
907
 
 
908
static XTOpenTablePtr xn_sw_get_open_table(XTThreadPtr self, XNSweeperStatePtr ss, xtTableID tab_id, int *r)
 
909
{
 
910
        if (ss->ss_ot) {
 
911
                if (ss->ss_ot->ot_table->tab_id == tab_id)
 
912
                        return ss->ss_ot;
 
913
 
 
914
                /* Flush the table indexes: */
 
915
                /* This is now done by the checkpointer thread...
 
916
                 * see xres_cp_flush_indices()
 
917
                if (!xt_flush_table_index(ss->ss_ot))
 
918
                        xt_throw(self);
 
919
                */
 
920
 
 
921
                xt_db_return_table_to_pool(self, ss->ss_ot);
 
922
                ss->ss_ot = NULL;
 
923
        }
 
924
 
 
925
        if (!ss->ss_ot) {
 
926
                if (!(ss->ss_ot = xt_db_open_pool_table(self, ss->ss_db, tab_id, r, TRUE)))
 
927
                        return NULL;
 
928
        }
 
929
 
 
930
        return ss->ss_ot;
 
931
}
 
932
 
 
933
static void xn_sw_close_open_table(XTThreadPtr self, XNSweeperStatePtr ss)
 
934
{
 
935
        if (ss->ss_ot) {
 
936
                /* Flush the table indexes: */
 
937
                /* This is now done by the checkpointer thread
 
938
                 * see xres_cp_flush_indices()
 
939
                if (!xt_flush_table_index(ss->ss_ot))
 
940
                        xt_throw(self);
 
941
                */
 
942
 
 
943
                xt_db_return_table_to_pool(self, ss->ss_ot);
 
944
                ss->ss_ot = NULL;
 
945
        }
1834
946
}
1835
947
 
1836
948
/*
1860
972
 * Service the "to free" queue. Return TRUE if something was
1861
973
 * freed.
1862
974
 */
1863
 
static xtBool xn_sw_service_to_free(XTThreadPtr self, XNSweeperStatePtr ss, xtBool lock, xtBool in_cleanup)
 
975
static xtBool xn_sw_service_to_free(XTThreadPtr self, XNSweeperStatePtr ss, xtBool in_cleanup)
1864
976
{
1865
977
        XTDatabaseHPtr          db = ss->ss_db;
1866
978
        XNSWToFreeItemPtr       free_item;
1867
979
        xtBool                          something_freed = FALSE;
1868
 
        xtWord4                         tab_id;
 
980
        xtTableID                       tab_id;
1869
981
        XTOpenTablePtr          ot = NULL;
1870
982
 
1871
983
        if ((free_item = (XNSWToFreeItemPtr) xt_bq_get(&ss->ss_to_free))) {
1872
 
                if (lock) {
1873
 
                        xt_lock_mutex(self, &db->db_sw_tab_lock);
1874
 
                        pushr_(xt_unlock_mutex, &db->db_sw_tab_lock);
1875
 
                }
1876
 
 
1877
984
                do {
1878
985
                        xn_sw_could_go_faster(self, db);
1879
986
 
1880
 
                        if (db->db_xn_min_run_id < db->db_xn_to_clean_id) {
 
987
                        if (xt_xn_is_before(db->db_xn_min_run_id, db->db_xn_to_clean_id)) {
1881
988
                                if (in_cleanup)
1882
989
                                        /* If we are currently in cleanup, then
1883
990
                                         * the transaction current being cleaned
1891
998
                        /* Before we can free this resource, we must wait
1892
999
                         * until the transaction is clean!
1893
1000
                         */
1894
 
                        if (free_item->ri_wait_id >= db->db_xn_min_run_id) {
 
1001
                        if (!xt_xn_is_before(free_item->ri_wait_xn_id, db->db_xn_min_run_id)) { // was >=
1895
1002
                                /* Now we have to check to see if we cannot move
1896
1003
                                 * the minimum run ID forward.
1897
1004
                                 */
1898
1005
                                XTXactDataPtr   xact;
1899
 
                                xtWord8                 tmp_id; // Protect against concurrent update!
1900
 
                                xtWord8                 xn_curr_id = xn_get_curr_id(db);
 
1006
                                xtXactID                tmp_xn_id; // Protect against concurrent update!
 
1007
                                xtXactID                xn_curr_xn_id = xt_xn_get_curr_id(db);
1901
1008
 
1902
1009
                                for (;;) {
1903
 
                                        tmp_id = db->db_xn_min_run_id;
1904
 
                                        if (tmp_id > xn_curr_id)
 
1010
                                        tmp_xn_id = db->db_xn_min_run_id;
 
1011
                                        if (xt_xn_is_before(xn_curr_xn_id, tmp_xn_id))
1905
1012
                                                break;
1906
 
                                        if ((xact = xn_get_xact(db, tmp_id)) && !xact->xd_end_id)
 
1013
                                        if ((xact = xt_xn_get_xact(db, tmp_xn_id)) && !(xact->xd_flags & XT_XN_XAC_ENDED))
1907
1014
                                                /* The transaction is still running... */
1908
1015
                                                break;
1909
 
                                        db->db_xn_min_run_id = tmp_id+1;
 
1016
                                        db->db_xn_min_run_id = tmp_xn_id+1;
1910
1017
                                }
1911
 
                                if (free_item->ri_wait_id >= db->db_xn_min_run_id)
 
1018
                                if (!xt_xn_is_before(free_item->ri_wait_xn_id, db->db_xn_min_run_id))
1912
1019
                                        break;
1913
1020
                        }
1914
1021
 
1915
1022
                        /* The transaction is clean, I can free the resource... */
1916
1023
                        if ((tab_id = free_item->ri_tab_id)) {
1917
1024
                                /* Free the data record: */
1918
 
                                if (!ot || tab_id != ot->ot_table->tab_id) {
1919
 
                                        switch (xn_sw_open_table(self, &ot, db, tab_id)) {
1920
 
                                                case XT_TAB_NO_DICTIONARY:
1921
 
                                                        xt_throw_ulxterr(XT_CONTEXT, XT_ERR_NO_DICTIONARY, (u_long) tab_id);
1922
 
                                                        break;
1923
 
                                                case XT_TAB_POOL_CLOSED:
1924
 
                                                        xt_throw_ulxterr(XT_CONTEXT, XT_ERR_TABLE_LOCKED, (u_long) tab_id);
1925
 
                                                        break;
1926
 
                                        }
1927
 
                                }
1928
 
                                if (ot) {
1929
 
                                        if (!xt_tab_free_record(ot, free_item->x.ri_address))
 
1025
                                if ((ot = xn_sw_get_open_table(self, ss, tab_id, NULL))) {
 
1026
                                        ss->ss_flush_pending = TRUE;
 
1027
                                        if (!xt_tab_free_record(ot, XT_LOG_ENT_REC_FREED, free_item->x.ri_rec_id, FALSE))
1930
1028
                                                throw_();
1931
1029
                                }
1932
1030
                        }
1933
1031
                        else {
1934
 
 
1935
1032
                                /* Free the transaction record in memory: */
1936
 
                                xn_delete_xact(db, free_item->x.ri_xact_id);
 
1033
                                xt_xn_delete_xact(db, free_item->x.ri_xn_id);
1937
1034
 
1938
1035
                                /* Recalculate the minimum memory transaction: */
1939
 
                                ASSERT(free_item->x.ri_xact_id >= db->db_xn_min_ram_id);
 
1036
                                ASSERT(!xt_xn_is_before(free_item->x.ri_xn_id, db->db_xn_min_ram_id));
1940
1037
                                
1941
 
                                if (db->db_xn_min_ram_id == free_item->x.ri_xact_id) {
1942
 
                                        db->db_xn_min_ram_id = free_item->x.ri_xact_id+1;
 
1038
                                if (db->db_xn_min_ram_id == free_item->x.ri_xn_id) {
 
1039
                                        db->db_xn_min_ram_id = free_item->x.ri_xn_id+1;
1943
1040
                                }
1944
1041
                                else {
1945
 
                                        xtWord8 xn_id;
1946
 
                                        xtWord8 xn_curr_id = xn_get_curr_id(db);
 
1042
                                        xtXactID xn_id;
 
1043
                                        xtXactID xn_curr_xn_id = xt_xn_get_curr_id(db);
1947
1044
 
1948
 
                                        while (db->db_xn_min_ram_id <= xn_curr_id) {
 
1045
                                        while (!xt_xn_is_before(xn_curr_xn_id, db->db_xn_min_ram_id)) { // was db->db_xn_min_ram_id <= xn_curr_xn_id
1949
1046
                                                /* db_xn_min_ram_id may be changed, by some other process! */
1950
1047
                                                xn_id = db->db_xn_min_ram_id;
1951
 
                                                if (xn_get_xact(db, xn_id))
 
1048
                                                if (xt_xn_get_xact(db, xn_id))
1952
1049
                                                        break;
1953
1050
                                                db->db_xn_min_ram_id = xn_id+1;
1954
1051
                                        }
1957
1054
 
1958
1055
                        xt_bq_next(&ss->ss_to_free);
1959
1056
                } while ((free_item = (XNSWToFreeItemPtr) xt_bq_get(&ss->ss_to_free)));
1960
 
 
1961
 
                xn_sw_flush_tables(self, db);
1962
 
 
1963
 
                if (lock)
1964
 
                        freer_(); // xt_unlock_mutex(db->db_sw_tab_lock)
1965
1057
        }
1966
1058
 
1967
1059
        return something_freed;
1973
1065
 * with all transactions that started before it! 
1974
1066
 *
1975
1067
 * The reason for this is that a sequential scan or some
1976
 
 * other operation may read a committed record which is not longer
 
1068
 * other operation may read a committed record which is no longer
1977
1069
 * valid because it is no longer the latest variation (the first
1978
1070
 * variation reachable from the row pointer).
1979
1071
 *
1987
1079
 * Without re-reading the record the sequential
1988
1080
 * scan or other read will find it on the variation list, and
1989
1081
 * return the record data as if valid!
 
1082
 *
 
1083
 * ------------ 2008-01-03
 
1084
 *
 
1085
 * An example of this is:
 
1086
 *
 
1087
 * Assume we have 3 records.
 
1088
 * The 3rd record is deleted, and committed.
 
1089
 * Before cleanup can be performed
 
1090
 * a sequential scan takes a copy of the records.
 
1091
 *
 
1092
 * Now assume a new insert is done before
 
1093
 * the sequential scan gets to the 3rd record.
 
1094
 *
 
1095
 * The insert allocates the 3rd row and 3rd record
 
1096
 * again.
 
1097
 *
 
1098
 * Now, when the sequential scan gets to the old copy of the 3rd record,
 
1099
 * this is valid because the row points to this record again.
 
1100
 *
 
1101
 * HOWEVER! I have now changed the sequential scan so that it accesses
 
1102
 * the records from the cache, without making a copy.
 
1103
 *
 
1104
 * This means that this problem cannot occur because the sequential scan
 
1105
 * always reads the current data from the cache.
 
1106
 *
 
1107
 * There is also no race condition (although no lock is taken), because
 
1108
 * the record is writen before the row (see here [(5)]).
 
1109
 *
 
1110
 * This means that the row does not point to the record before the
 
1111
 * record has been modified.
 
1112
 *
 
1113
 * Once the record has been modified then the sequential scan will see
 
1114
 * that the record belongs to a new transaction.
 
1115
 *
 
1116
 * If the row pointer was set before the record updated then a race
 
1117
 * condition would exist when the sequential scan reads the record
 
1118
 * after the insert has updated the row pointer but before it has
 
1119
 * changed the record.
 
1120
 *
 
1121
 * AS A RESULT:
 
1122
 *
 
1123
 * I believe I can remove the delayed free record!
 
1124
 *
 
1125
 * This means I can combine the REMOVE and FREE operations.
 
1126
 *
 
1127
 * This is good because this takes care of the problem
 
1128
 * that records are lost when:
 
1129
 *
 
1130
 * The server crashes when the delayed free list still has items on it.
 
1131
 * AND
 
1132
 * The transaction that freed the records has been cleaned, and this
 
1133
 * fact has been committed to the log.
 
1134
 *
 
1135
 * So I have removed the delay here: [(6)]
1990
1136
 */
1991
 
static void xn_sw_add_record_to_free(XTThreadPtr self, XNSweeperStatePtr ss, XTOpenTablePtr ot, off_t address)
 
1137
static void xn_sw_add_record_to_free(XTThreadPtr self, XNSweeperStatePtr ss, XTOpenTablePtr ot, xtRecordID rec_id)
1992
1138
{
1993
1139
        XNSWToFreeItemRec free_item;
1994
1140
 
1998
1144
                 * when the queue overflows!
1999
1145
                 */
2000
1146
                if ((ss->ss_call_cnt % XT_TN_MAX_TO_FREE_CHECK) == 0)
2001
 
                        xn_sw_service_to_free(self, ss, FALSE, TRUE);
 
1147
                        xn_sw_service_to_free(self, ss, FALSE);
2002
1148
                ss->ss_call_cnt++;
2003
1149
        }
2004
1150
 
2005
 
        free_item.ri_wait_id = ss->ss_db->db_xn_curr_id;
 
1151
        free_item.ri_wait_xn_id = ss->ss_db->db_xn_curr_id;
2006
1152
        free_item.ri_tab_id = ot->ot_table->tab_id;
2007
 
        free_item.x.ri_address = address;
 
1153
        free_item.x.ri_rec_id = rec_id;
2008
1154
 
2009
1155
#ifdef DEBUG
2010
1156
        XNSWToFreeItemPtr free_list = (XNSWToFreeItemPtr) ss->ss_to_free.bq_data;
2011
1157
        for (u_int i=ss->ss_to_free.bq_back; i<ss->ss_to_free.bq_front; i++) {
2012
 
                if (free_list[i].ri_tab_id == ot->ot_table->tab_id && free_list[i].x.ri_address == address)
 
1158
                if (free_list[i].ri_tab_id == ot->ot_table->tab_id && free_list[i].x.ri_rec_id == rec_id)
2013
1159
                        ASSERT(FALSE);
2014
1160
        }
2015
1161
#endif
2023
1169
 * currently running transactions have ended. This is because
2024
1170
 * sequential and index scans have copies of old data.
2025
1171
 *
2026
 
 * In the old data a record may not be indicated has cleaned. Such
2027
 
 * a record is considered invalid of the transaction is not in RAM.
 
1172
 * In the old data a record may not be indicated as cleaned. Such
 
1173
 * a record is considered invalid if the transaction is not in RAM.
2028
1174
 *
2029
1175
 * GOTCHA:
2030
1176
 *
2031
 
 * And this problem is demostrated by the following example
 
1177
 * And this problem is demonstrated by the following example
2032
1178
 * which was derived from flush_table.test.
2033
1179
 *
2034
1180
 * Each handler command below is a separate transaction.
2062
1208
 * }
2063
1209
 * 
2064
1210
 */
2065
 
static void xn_sw_add_xact_to_free(XTThreadPtr self, XNSweeperStatePtr ss, xtWord8 xn_id)
 
1211
static void xn_sw_add_xact_to_free(XTThreadPtr self, XNSweeperStatePtr ss, xtXactID xn_id)
2066
1212
{
2067
1213
        XNSWToFreeItemRec free_item;
2068
1214
 
2075
1221
                        /* GOTCHA: This call was not locking the sweeper,
2076
1222
                         * this could cause failure, of course:
2077
1223
                         */
2078
 
                        xn_sw_service_to_free(self, ss, TRUE, TRUE);
 
1224
                        xn_sw_service_to_free(self, ss, TRUE);
2079
1225
                ss->ss_call_cnt++;
2080
1226
        }
2081
1227
 
2082
 
        free_item.ri_wait_id = ss->ss_db->db_xn_curr_id;
 
1228
        free_item.ri_wait_xn_id = ss->ss_db->db_xn_curr_id;
2083
1229
        free_item.ri_tab_id = 0;
2084
 
        free_item.x.ri_xact_id = xn_id;
 
1230
        free_item.x.ri_xn_id = xn_id;
2085
1231
 
2086
1232
        xt_bq_add(self, &ss->ss_to_free, &free_item);
2087
1233
}
2088
1234
 
2089
 
static void xt_sw_delete_variations(XTThreadPtr self, XNSweeperStatePtr ss, XTOpenTablePtr ot, off_t address)
 
1235
static void xt_sw_delete_variations(XTThreadPtr self, XNSweeperStatePtr ss, XTOpenTablePtr ot, xtRecordID rec_id, xtRowID row_id, xtXactID xn_id)
2090
1236
{
2091
 
        off_t prev_var;
 
1237
        xtRecordID prev_var_rec_id;
2092
1238
 
2093
 
        while (address) {
2094
 
                switch (xt_tab_remove_record(ot, address, ss->ss_databuf.db_data, &prev_var, FALSE)) {
 
1239
        while (rec_id) {
 
1240
                switch (xt_tab_remove_record(ot, rec_id, ss->ss_databuf.db_data, &prev_var_rec_id, FALSE, row_id, xn_id)) {
2095
1241
                        case XT_ERR:
2096
1242
                                throw_();
2097
1243
                                return;
2098
1244
                        case TRUE:
2099
 
                                xn_sw_add_record_to_free(self, ss, ot, address);
2100
1245
                                break;
2101
1246
                }
2102
 
                address = prev_var;
 
1247
                rec_id = prev_var_rec_id;
2103
1248
        }
2104
1249
}
2105
1250
 
2106
 
static void xt_sw_delete_variation(XTThreadPtr self, XNSweeperStatePtr ss, XTOpenTablePtr ot, off_t address, xtBool clean_delete)
 
1251
static void xt_sw_delete_variation(XTThreadPtr self, XNSweeperStatePtr ss, XTOpenTablePtr ot, xtRecordID rec_id, xtBool clean_delete, xtRowID row_id, xtXactID xn_id)
2107
1252
{
2108
 
        off_t prev_var;
 
1253
        xtRecordID prev_var_rec_id;
2109
1254
 
2110
 
        switch (xt_tab_remove_record(ot, address, ss->ss_databuf.db_data, &prev_var, clean_delete)) {
 
1255
        switch (xt_tab_remove_record(ot, rec_id, ss->ss_databuf.db_data, &prev_var_rec_id, clean_delete, row_id, xn_id)) {
2111
1256
                case XT_ERR:
2112
1257
                        throw_();
2113
1258
                        return;
2114
1259
                case TRUE:
2115
 
                        xn_sw_add_record_to_free(self, ss, ot, address);
2116
1260
                        break;
2117
1261
                case FALSE:
2118
1262
                        break;
2128
1272
/*
2129
1273
 * Read the record to be cleaned. Return TRUE if the cleanup has already been done.
2130
1274
 */
2131
 
static xtBool xn_sw_cleanup_done(XTThreadPtr self, XTOpenTablePtr ot, XTXactDataPtr xact, off_t address, u_int rec_type, XTTabRecHeadDPtr rec_head)
 
1275
static xtBool xn_sw_cleanup_done(XTThreadPtr self, XTOpenTablePtr ot, xtRecordID rec_id, xtXactID xn_id, u_int rec_type, u_int stat_id, xtRowID row_id, XTTabRecHeadDPtr rec_head)
2132
1276
{
2133
 
        if (!xt_tab_get_data(ot, address, sizeof(XTTabRecHeadDRec), (xtWord1 *) rec_head, NULL))
 
1277
        if (!xt_tab_get_rec_data(ot, rec_id, sizeof(XTTabRecHeadDRec), (xtWord1 *) rec_head))
2134
1278
                throw_();
2135
1279
 
2136
1280
        if (rec_type == XN_FORCE_CLEANUP) {
2137
 
                if (rec_head->tr_rec_type_1 == XT_TAB_STATUS_FREED)
 
1281
                if (XT_REC_IS_FREE(rec_head->tr_rec_type_1))
2138
1282
                        return TRUE;
2139
1283
        }
2140
1284
        else {
 
1285
                /* Transaction must match: */
 
1286
                if (XT_GET_DISK_4(rec_head->tr_xact_id_4) != xn_id)
 
1287
                        return TRUE;
 
1288
 
2141
1289
                /* Record header must match expected value from
2142
 
                 * log in order to cleanup!
 
1290
                 * log or clean has been done, or is not required.
 
1291
                 *
 
1292
                 * For example, it is not required if a record
 
1293
                 * has been overwritten in a transaction.
2143
1294
                 */
2144
 
                if (rec_head->tr_rec_type_1 != rec_type)
 
1295
                if (rec_head->tr_rec_type_1 != rec_type ||
 
1296
                        rec_head->tr_stat_id_1 != stat_id)
2145
1297
                        return TRUE;
2146
1298
 
2147
 
                /* Transaction must match: */
2148
 
                if (XT_GET_DISK_6(rec_head->tr_xact_id_6) != xact->xd_start_id)
 
1299
                /* Row must match: */
 
1300
                if (XT_GET_DISK_4(rec_head->tr_row_id_4) != row_id)
2149
1301
                        return TRUE;
2150
1302
        }
2151
1303
 
2152
1304
        return FALSE;
2153
1305
}
2154
1306
 
 
1307
static void xn_sw_clean_indices(XTThreadPtr self, XTOpenTablePtr ot, xtRecordID rec_id, xtRowID row_id, xtWord1 *rec_data, xtWord1 *rec_buffer)
 
1308
{
 
1309
        XTTableHPtr     tab = ot->ot_table;
 
1310
        u_int           cols_req;
 
1311
        XTIndexPtr      *ind;
 
1312
 
 
1313
        if (!tab->tab_dic.dic_key_count)
 
1314
                return;
 
1315
 
 
1316
        cols_req = tab->tab_dic.dic_ind_cols_req;
 
1317
        if (XT_REC_IS_FIXED(rec_data[0]))
 
1318
                rec_buffer = rec_data + XT_REC_FIX_HEADER_SIZE;
 
1319
        else {
 
1320
                if (XT_REC_IS_VARIABLE(rec_data[0])) {
 
1321
                        if (!myxt_load_row(ot, rec_data + XT_REC_FIX_HEADER_SIZE, rec_buffer, cols_req))
 
1322
                                goto failed;
 
1323
                }
 
1324
                else if (XT_REC_IS_EXT_DLOG(rec_data[0])) {
 
1325
                        ASSERT(cols_req);
 
1326
                        if (cols_req && cols_req <= tab->tab_dic.dic_fix_col_count) {
 
1327
                                if (!myxt_load_row(ot, rec_data + XT_REC_EXT_HEADER_SIZE, rec_buffer, cols_req))
 
1328
                                        goto failed;
 
1329
                        }
 
1330
                        else {
 
1331
                                if (rec_data != ot->ot_row_rbuffer)
 
1332
                                        memcpy(ot->ot_row_rbuffer, rec_data, tab->tab_dic.dic_rec_size);
 
1333
                                if (!xt_tab_load_ext_data(ot, rec_id, rec_buffer, cols_req))
 
1334
                                        goto failed;
 
1335
                        }
 
1336
                }
 
1337
                else
 
1338
                        /* This is possible, the record has already been cleaned up. */
 
1339
                        return;
 
1340
        }
 
1341
 
 
1342
        ind = tab->tab_dic.dic_keys;
 
1343
        for (u_int i=0; i<tab->tab_dic.dic_key_count; i++, ind++) {
 
1344
                if (!xt_idx_update_row_id(ot, *ind, rec_id, row_id, rec_buffer))
 
1345
                        xt_log_and_clear_exception_ns();
 
1346
        }
 
1347
        return;
 
1348
        
 
1349
        failed:
 
1350
        xt_log_and_clear_exception_ns();
 
1351
}
 
1352
 
2155
1353
/*
2156
1354
 * Return TRUE if the cleanup was done. FAILED if cleanup could not be done
2157
1355
 * because dictionary information is not available.
2158
1356
 */
2159
 
static xtBool xn_sw_cleanup_variation(XTThreadPtr self, XNSweeperStatePtr ss, XTXactDataPtr xact, xtWord4 tab_id, off_t address, u_int status, u_int rec_type)
 
1357
static xtBool xn_sw_cleanup_variation(XTThreadPtr self, XNSweeperStatePtr ss, XTXactDataPtr xact, xtTableID tab_id, xtRecordID rec_id, u_int status, u_int rec_type, u_int stat_id, xtRowID row_id, xtWord1 *rec_buf)
2160
1358
{
2161
1359
        XTOpenTablePtr          ot;
2162
1360
        XTTableHPtr                     tab;
2163
1361
        XTTabRecHeadDRec        rec_head;
2164
 
        off_t                           after_rec;
2165
 
        xtWord4                         row_id;
 
1362
        xtRecordID                      after_rec_id;
 
1363
        xtXactID                        xn_id;
 
1364
        int                                     r;
2166
1365
 
2167
 
        switch (xn_sw_open_table(self, &ot, ss->ss_db, tab_id)) {
2168
 
                case XT_TAB_NOT_FOUND:
2169
 
                        /* The table no longer exists, consider cleanup done: */
2170
 
                        return TRUE;
2171
 
                case XT_TAB_NO_DICTIONARY:
2172
 
                case XT_TAB_POOL_CLOSED:
2173
 
                        return FALSE;
 
1366
        if (!(ot = xn_sw_get_open_table(self, ss, tab_id, &r))) {
 
1367
                /* The table no longer exists, consider cleanup done: */
 
1368
                switch (r) {
 
1369
                        case XT_TAB_NOT_FOUND:
 
1370
                                break;
 
1371
                        case XT_TAB_NO_DICTIONARY:
 
1372
                        case XT_TAB_POOL_CLOSED:
 
1373
                                return FALSE;
 
1374
                }
 
1375
                return TRUE;
2174
1376
        }
2175
1377
 
2176
1378
        tab = ot->ot_table;
2178
1380
        /* Make sure the buffer is large enough! */
2179
1381
        xt_db_set_size(self, &ss->ss_databuf, (size_t) tab->tab_dic.dic_buf_size);
2180
1382
 
2181
 
        if (xact->xd_committed) {
 
1383
        xn_id = xact->xd_start_xn_id;
 
1384
        if (xact->xd_flags & XT_XN_XAC_COMMITTED) {
2182
1385
                /* The transaction has been committed. Clean the record and
2183
1386
                 * remove variations no longer in use.
2184
1387
                 */
2185
1388
                switch (status) {
2186
 
                        case XT_XN_STATUS_UPDATE:
2187
 
                                if (xn_sw_cleanup_done(self, ot, xact, address,  rec_type, &rec_head))
2188
 
                                        return OK;
2189
 
                                after_rec = (off_t) XT_GET_DISK_6(rec_head.tr_prev_var_6);
2190
 
                                xt_sw_delete_variations(self, ss, ot, after_rec);
 
1389
                        case XT_LOG_ENT_REC_MODIFIED:
 
1390
                        case XT_LOG_ENT_UPDATE:
 
1391
                        case XT_LOG_ENT_UPDATE_FL:
 
1392
                        case XT_LOG_ENT_UPDATE_BG:
 
1393
                        case XT_LOG_ENT_UPDATE_FL_BG:
 
1394
                                if (xn_sw_cleanup_done(self, ot, rec_id, xn_id, rec_type, stat_id, row_id, &rec_head))
 
1395
                                        goto done_ok;
 
1396
                                after_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
 
1397
                                xt_sw_delete_variations(self, ss, ot, after_rec_id, row_id, xn_id);
2191
1398
                                rec_head.tr_rec_type_1 |= XT_TAB_STATUS_CLEANED_BIT;
2192
 
                                XT_SET_NULL_DISK_6(rec_head.tr_prev_var_6);
2193
 
                                if (!xt_tab_put_data(ot, address, offsetof(XTTabRecHeadDRec, tr_prev_var_6) + 6, (xtWord1 *) &rec_head))
 
1399
                                XT_SET_NULL_DISK_4(rec_head.tr_prev_rec_id_4);
 
1400
                                if (!xt_tab_put_log_op_rec_data(ot, XT_LOG_ENT_REC_CLEANED, 0, rec_id, offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE, (xtWord1 *) &rec_head))
2194
1401
                                        throw_();
 
1402
                                xn_sw_clean_indices(self, ot, rec_id, row_id, rec_buf, ss->ss_databuf.db_data);
2195
1403
                                break;
2196
 
                        case XT_XN_STATUS_INSERT:
 
1404
                        case XT_LOG_ENT_INSERT:
 
1405
                        case XT_LOG_ENT_INSERT_FL:
 
1406
                        case XT_LOG_ENT_INSERT_BG:
 
1407
                        case XT_LOG_ENT_INSERT_FL_BG:
2197
1408
                                /* In the case of insert, we avoid reading the record into cache! */
2198
1409
                                rec_head.tr_rec_type_1 = rec_type | XT_TAB_STATUS_CLEANED_BIT;
2199
 
                                if (!xt_tab_put_data(ot, address + offsetof(XTTabRecHeadDRec, tr_rec_type_1), 1, (xtWord1 *) &rec_head.tr_rec_type_1))
 
1410
                                if (!xt_tab_put_log_op_rec_data(ot, XT_LOG_ENT_REC_CLEANED_1, 0, rec_id, 1, (xtWord1 *) &rec_head.tr_rec_type_1))
2200
1411
                                        throw_();
 
1412
                                xn_sw_clean_indices(self, ot, rec_id, row_id, rec_buf, ss->ss_databuf.db_data);
2201
1413
                                break;
2202
 
                        case XT_XN_STATUS_DELETE:
2203
 
                                if (xn_sw_cleanup_done(self, ot, xact, address,  rec_type, &rec_head))
2204
 
                                        return OK;
2205
 
                                after_rec = (off_t) XT_GET_DISK_6(rec_head.tr_prev_var_6);
2206
 
                                xt_sw_delete_variations(self, ss, ot, after_rec);
2207
 
                                xt_sw_delete_variation(self, ss, ot, address, TRUE);
2208
 
                                if ((row_id = XT_GET_DISK_4(rec_head.tr_row_id_4))) {
 
1414
                        case XT_LOG_ENT_DELETE:
 
1415
                        case XT_LOG_ENT_DELETE_FL:
 
1416
                        case XT_LOG_ENT_DELETE_BG:
 
1417
                        case XT_LOG_ENT_DELETE_FL_BG:
 
1418
                                if (xn_sw_cleanup_done(self, ot, rec_id, xn_id, rec_type, stat_id, row_id, &rec_head))
 
1419
                                        goto done_ok;
 
1420
                                after_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
 
1421
                                xt_sw_delete_variations(self, ss, ot, after_rec_id, row_id, xn_id);
 
1422
                                xt_sw_delete_variation(self, ss, ot, rec_id, TRUE, row_id, xn_id);
 
1423
                                if (row_id) {
2209
1424
                                        if (!xt_tab_free_row(ot, tab, row_id))
2210
1425
                                                throw_();
2211
1426
                                }
2217
1432
                 * variation list. If this means the list is empty, then remove
2218
1433
                 * the record as well.
2219
1434
                 */
2220
 
                off_t                           first_rec, next_rec, prev_rec;
 
1435
                xtRecordID                      first_rec_id, next_rec_id, prev_rec_id;
2221
1436
                XTTabRecHeadDRec        prev_rec_head;
2222
1437
 
2223
 
                if (xn_sw_cleanup_done(self, ot, xact, address, rec_type, &rec_head))
2224
 
                        return OK;
2225
 
 
2226
 
                if (!(row_id = XT_GET_DISK_4(rec_head.tr_row_id_4)))
2227
 
                        return OK;
2228
 
                after_rec = (off_t) XT_GET_DISK_6(rec_head.tr_prev_var_6);
2229
 
 
2230
 
                /* Delete the extended record and index entries: */
2231
 
                xt_sw_delete_variation(self, ss, ot, address, FALSE);
 
1438
                if (xn_sw_cleanup_done(self, ot, rec_id, xn_id, rec_type, stat_id, row_id, &rec_head))
 
1439
                        goto done_ok;
 
1440
 
 
1441
                if (!row_id)
 
1442
                        row_id = XT_GET_DISK_4(rec_head.tr_row_id_4);
 
1443
                after_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
2232
1444
 
2233
1445
                /* Now remove the record from the variation list,
2234
1446
                 * (if it is still on the list).
2235
1447
                 */
2236
 
                xt_rwlock_wrlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
 
1448
                xt_rwlock_wrlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
2237
1449
 
2238
1450
                /* Find the variation before the variation we wish to remove: */
2239
 
                if (!(xt_tab_get_row(ot, row_id, &first_rec)))
 
1451
                if (!(xt_tab_get_row(ot, row_id, &first_rec_id)))
2240
1452
                        goto failed;
2241
 
                prev_rec = 0;
2242
 
                next_rec = first_rec;
2243
 
                while (next_rec != address) {
2244
 
                        if (!next_rec)
 
1453
                prev_rec_id = 0;
 
1454
                next_rec_id = first_rec_id;
 
1455
                while (next_rec_id != rec_id) {
 
1456
                        if (!next_rec_id) {
2245
1457
                                /* The record was not found in the list (we are done) */
 
1458
                                xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
2246
1459
                                goto unlink_done;
2247
 
                        if (!xt_tab_get_data(ot, next_rec, sizeof(XTTabRecHeadDRec), (xtWord1 *) &prev_rec_head, NULL))
2248
 
                                goto failed;
2249
 
                        prev_rec = next_rec;
2250
 
                        next_rec = XT_GET_DISK_6(prev_rec_head.tr_prev_var_6);
 
1460
                        }
 
1461
                        if (!xt_tab_get_rec_data(ot, next_rec_id, sizeof(XTTabRecHeadDRec), (xtWord1 *) &prev_rec_head)) {
 
1462
                                xt_log_and_clear_exception(self);
 
1463
                                break;
 
1464
                        }
 
1465
                        prev_rec_id = next_rec_id;
 
1466
                        next_rec_id = XT_GET_DISK_4(prev_rec_head.tr_prev_rec_id_4);
2251
1467
                }
2252
1468
 
2253
 
                /* Delete everything except the record. */
2254
 
                if (prev_rec) {
2255
 
                        /* Unlink the deleted variation: */
2256
 
                        XT_SET_DISK_6(prev_rec_head.tr_prev_var_6, after_rec);
2257
 
                        if (!xt_tab_put_data(ot, prev_rec + offsetof(XTTabRecHeadDRec, tr_prev_var_6), 6, (xtWord1 *) &prev_rec_head.tr_prev_var_6))
2258
 
                                goto failed;
2259
 
                }
2260
 
                else {
2261
 
                        /* Variation to be removed is at the front of the list. */
2262
 
                        ASSERT(address == first_rec);
2263
 
                        if (after_rec) {
2264
 
                                /* Unlink the deleted variation, from the front of the list: */
2265
 
                                if (!xt_tab_set_row(ot, row_id, after_rec, TRUE))
 
1469
                if (next_rec_id == rec_id) {
 
1470
                        /* The record was found on the list: */
 
1471
                        if (prev_rec_id) {
 
1472
                                /* Unlink the deleted variation:
 
1473
                                 * I have found the following sequence:
 
1474
                                 *
 
1475
                                 * 17933 in use  1906112
 
1476
                                 * 1906112 delete      xact=2901   row=17933 prev=2419240
 
1477
                                 * 2419240 delete      xact=2899   row=17933 prev=2153360
 
1478
                                 * 2153360 record-X C  xact=2599   row=17933 prev=0 Xlog=151 Xoff=16824 Xsiz=100
 
1479
                                 *
 
1480
                                 * Despite the following facts which should prevent chains from
 
1481
                                 * forming:
 
1482
                                 *
 
1483
                                 * --- Only one transaction can modify a row
 
1484
                                 * at any one time. So it is not possible for a new change
 
1485
                                 * to be linked onto an uncommitted change.
 
1486
                                 * 
 
1487
                                 * --- Transactions that modify the same row
 
1488
                                 * twice do not allocate a new record for each change.
 
1489
                                 *
 
1490
                                 * -- A change that has been
 
1491
                                 * rolled back will not be linked onto. Instead
 
1492
                                 * the new transaction will link to the last.
 
1493
                                 * Comitted record.
 
1494
                                 *
 
1495
                                 * So if the sweeper is slow in doing its job
 
1496
                                 * we can have the situation that a number of records
 
1497
                                 * can refer to the last committed record of the
 
1498
                                 * row.
 
1499
                                 *
 
1500
                                 * Only one will be reference by the row pointer.
 
1501
                                 *
 
1502
                                 * The other, will all have been rolled back.
 
1503
                                 * This occurs over here: [(4)]
 
1504
                                 */
 
1505
                                XT_SET_DISK_4(prev_rec_head.tr_prev_rec_id_4, after_rec_id);
 
1506
                                if (!xt_tab_put_log_op_rec_data(ot, XT_LOG_ENT_REC_UNLINKED, 0, prev_rec_id, offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE, (xtWord1 *) &prev_rec_head))
2266
1507
                                        goto failed;
2267
1508
                        }
2268
1509
                        else {
2269
 
                                /* No more variations, remove the row: */
2270
 
                                if (!xt_tab_free_row(ot, tab, row_id))
2271
 
                                        goto failed;
 
1510
                                /* Variation to be removed at the front of the list. */
 
1511
                                ASSERT(rec_id == first_rec_id);
 
1512
                                if (after_rec_id) {
 
1513
                                        /* Unlink the deleted variation, from the front of the list: */
 
1514
                                        if (!xt_tab_set_row(ot, XT_LOG_ENT_ROW_SET, row_id, after_rec_id))
 
1515
                                                goto failed;
 
1516
                                }
 
1517
                                else {
 
1518
                                        /* No more variations, remove the row: */
 
1519
                                        if (!xt_tab_free_row(ot, tab, row_id))
 
1520
                                                goto failed;
 
1521
                                }
2272
1522
                        }
2273
1523
                }
2274
 
        
 
1524
 
 
1525
                xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
 
1526
 
 
1527
                /* Note: even when not found on the row list, the record must still
 
1528
                 * be freed.
 
1529
                 *
 
1530
                 * There might be an exception to this, but there are very definite
 
1531
                 * cases where this is required, for example when an unreferenced
 
1532
                 * record is fouund and added to the clean up list xn_add_cu_record().
 
1533
                 */
 
1534
 
2275
1535
                unlink_done:
2276
 
                xt_rwlock_unlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
 
1536
                /* Delete the extended record and index entries:
 
1537
                 *
 
1538
                 * NOTE! This must be done after we have release the row lock. Because
 
1539
                 * a thread that does a duplicate check locks the index, and then
 
1540
                 * check whether a row is valid, and can deadlock with
 
1541
                 * code that locks a row, then an index!
 
1542
                 *
 
1543
                 * However, this should all be OK, because the variation has been removed from the
 
1544
                 * row variation list at this stage, and now just need to be deleted.
 
1545
                 */
 
1546
                xt_sw_delete_variation(self, ss, ot, rec_id, FALSE, row_id, xn_id);
2277
1547
        }
2278
1548
 
 
1549
        done_ok:
2279
1550
        return OK;
2280
1551
 
2281
1552
        failed:
2282
 
        xt_rwlock_unlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
 
1553
        xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
2283
1554
        throw_();
2284
1555
        return FAILED;
2285
1556
}
2290
1561
 * If the transaction was committed then we remove older variations.
2291
1562
 * If a delete was committed this can lead to the row being removed.
2292
1563
 *
2293
 
 * After a transaction has been cleans it can be removed from RAM.
 
1564
 * After a transaction has been cleaned it can be removed from RAM.
2294
1565
 * If this was the last transaction in a log, and the log has reached
2295
1566
 * threshold, and the log is no longer in exclusive use, then the log
2296
1567
 * can be deleted.
2300
1571
static xtBool xn_sw_cleanup_xact(XTThreadPtr self, XNSweeperStatePtr ss, XTXactDataPtr xact)
2301
1572
{
2302
1573
        XTDatabaseHPtr          db = ss->ss_db;
2303
 
        XTXactLogPtr            xlog;
2304
 
        off_t                           curr_offset = xact->xd_begin_offset;
2305
 
        off_t                           begin_offset = 0;
2306
 
        XTXactLogBufferDPtr     entry;
2307
 
        size_t                          space;
2308
 
        size_t                          size;
2309
 
        xtWord4                         tab_id;
2310
 
        off_t                           rec_address;
 
1574
        XTXactLogBufferDPtr     record;
 
1575
        xtTableID                       tab_id;
 
1576
        xtRecordID                      rec_id;
2311
1577
        xtBool                          cleanup_complete = FALSE;
2312
 
 
2313
 
        if (!xn_use_xlog(db, &xlog, xact->xd_begin_log, XN_MODE_SWEEPER))
2314
 
                throw_();
2315
 
        pushr_(xn_release_log_w_self, xlog);
2316
 
 
2317
 
        /* As long as we are in the same log, there is no need to re-read: */
2318
 
        if (ss->ss_seqread.sr_xlog != xlog) {
2319
 
                ss->ss_seqread.sr_xlog = xlog;
2320
 
                ss->ss_seqread.sr_offset = 0;
2321
 
                ss->ss_seqread.sr_length = 0;
2322
 
        }
2323
 
        ss->ss_seqread.sr_of = xlog->xl_sw_file;
2324
 
 
2325
 
        xt_lock_mutex(self, &db->db_sw_tab_lock);
2326
 
        pushr_(xt_unlock_mutex, &db->db_sw_tab_lock);
 
1578
        xtXactID                        xn_id;
 
1579
        xtRowID                         row_id;
 
1580
 
 
1581
        if (!db->db_xlog.xlog_seq_start(&ss->ss_seqread, xact->xd_begin_log, xact->xd_begin_offset, FALSE))
 
1582
                xt_throw(self);
2327
1583
 
2328
1584
        for (;;) {
2329
1585
                if (self->t_quit)
2331
1587
 
2332
1588
                xn_sw_could_go_faster(self, db);
2333
1589
 
2334
 
                if (!xn_seq_read(&ss->ss_seqread, curr_offset, &entry, &space))
2335
 
                        throw_();
2336
 
                if (space <= 1)
 
1590
                if (!db->db_xlog.xlog_seq_next(&ss->ss_seqread, &record, FALSE))
 
1591
                        xt_throw(self);
 
1592
                if (!record)
2337
1593
                        break;
2338
 
                switch (entry->xb.xe_status_1) {
2339
 
                        case XT_XN_STATUS_HEADER:
2340
 
                                if (space < sizeof(XTXactLogHeaderDRec))
2341
 
                                        goto break_loop;
2342
 
                                size = XT_GET_DISK_4(entry->xh.xh_size_4);
2343
 
                                if (space < size)
2344
 
                                        goto break_loop;
2345
 
                                curr_offset += size;
2346
 
                                break;
2347
 
                        case XT_XN_STATUS_BEGIN:
2348
 
                                if (space < sizeof(XTXactBeginEntryDRec))
2349
 
                                        goto break_loop;
2350
 
                                if (curr_offset != xact->xd_begin_offset)
2351
 
                                        goto break_loop;
2352
 
                                begin_offset = curr_offset;
2353
 
                                curr_offset += sizeof(XTXactBeginEntryDRec);
2354
 
                                break;
2355
 
                        case XT_XN_STATUS_COMMITTED:
2356
 
                        case XT_XN_STATUS_ABORTED:
2357
 
                                goto break_loop;
2358
 
                        case XT_XN_STATUS_COMMIT:
2359
 
                        case XT_XN_STATUS_ABORT:
2360
 
                                if (space < sizeof(XTXactEndEntryDRec))
2361
 
                                        goto break_loop;
2362
 
                                curr_offset += sizeof(XTXactEndEntryDRec);
2363
 
                                goto break_loop;
2364
 
                        case XT_XN_STATUS_UPDATE:
2365
 
                        case XT_XN_STATUS_INSERT:
2366
 
                        case XT_XN_STATUS_DELETE:
2367
 
                                if (space < sizeof(XTactUpdateEntryDRec))
2368
 
                                        goto break_loop;
2369
 
                                tab_id = XT_GET_DISK_4(entry->xu.xe_tab_id_4);
2370
 
                                rec_address = XT_GET_DISK_6(entry->xu.xe_record_6);
2371
 
                                if (!xn_sw_cleanup_variation(self, ss, xact, tab_id, rec_address, entry->xu.xe_status_1, entry->xu.xe_rec_type_1))
2372
 
                                        goto stop_cleanup;
2373
 
                                curr_offset += sizeof(XTactUpdateEntryDRec);
2374
 
                                break;
2375
 
                        case XT_XN_STATUS_CURR_IDS:
2376
 
                                curr_offset += sizeof(XTXactCurrEntryDRec);
 
1594
                switch (record->xh.xh_status_1) {
 
1595
                        case XT_LOG_ENT_NEW_LOG:
 
1596
                                if (!db->db_xlog.xlog_seq_start(&ss->ss_seqread, XT_GET_DISK_4(record->xl.xl_log_id_4), 0, FALSE))
 
1597
                                        xt_throw(self);
 
1598
                                break;
 
1599
                        case XT_LOG_ENT_COMMIT:
 
1600
                        case XT_LOG_ENT_ABORT:
 
1601
                                xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);
 
1602
                                if (xn_id == xact->xd_start_xn_id)
 
1603
                                        goto break_loop;
 
1604
                                break;
 
1605
                        case XT_LOG_ENT_REC_MODIFIED:
 
1606
                        case XT_LOG_ENT_UPDATE:
 
1607
                        case XT_LOG_ENT_INSERT:
 
1608
                        case XT_LOG_ENT_DELETE:
 
1609
                        case XT_LOG_ENT_UPDATE_BG:
 
1610
                        case XT_LOG_ENT_INSERT_BG:
 
1611
                        case XT_LOG_ENT_DELETE_BG:
 
1612
                                xn_id = XT_GET_DISK_4(record->xu.xu_xact_id_4);
 
1613
                                if (xn_id != xact->xd_start_xn_id)
 
1614
                                        break;
 
1615
                                tab_id = XT_GET_DISK_4(record->xu.xu_tab_id_4);
 
1616
                                rec_id = XT_GET_DISK_4(record->xu.xu_rec_id_4);
 
1617
                                row_id = XT_GET_DISK_4(record->xu.xu_row_id_4);
 
1618
                                if (!xn_sw_cleanup_variation(self, ss, xact, tab_id, rec_id, record->xu.xu_status_1, record->xu.xu_rec_type_1, record->xu.xu_stat_id_1, row_id, &record->xu.xu_rec_type_1))
 
1619
                                        goto stop_cleanup;
 
1620
                                break;
 
1621
                        case XT_LOG_ENT_UPDATE_FL:
 
1622
                        case XT_LOG_ENT_INSERT_FL:
 
1623
                        case XT_LOG_ENT_DELETE_FL:
 
1624
                        case XT_LOG_ENT_UPDATE_FL_BG:
 
1625
                        case XT_LOG_ENT_INSERT_FL_BG:
 
1626
                        case XT_LOG_ENT_DELETE_FL_BG:
 
1627
                                xn_id = XT_GET_DISK_4(record->xf.xf_xact_id_4);
 
1628
                                if (xn_id != xact->xd_start_xn_id)
 
1629
                                        break;
 
1630
                                tab_id = XT_GET_DISK_4(record->xf.xf_tab_id_4);
 
1631
                                rec_id = XT_GET_DISK_4(record->xf.xf_rec_id_4);
 
1632
                                row_id = XT_GET_DISK_4(record->xf.xf_row_id_4);
 
1633
                                if (!xn_sw_cleanup_variation(self, ss, xact, tab_id, rec_id, record->xf.xf_status_1, record->xf.xf_rec_type_1, record->xf.xf_stat_id_1, row_id, &record->xf.xf_rec_type_1))
 
1634
                                        goto stop_cleanup;
2377
1635
                                break;
2378
1636
                        default:
2379
 
                                goto break_loop;
 
1637
                                break;
2380
1638
                }
2381
1639
        }
2382
1640
        break_loop:
2383
1641
        cleanup_complete = TRUE;
2384
1642
 
2385
1643
        stop_cleanup:
2386
 
        xn_sw_flush_tables(self, db);
2387
 
 
2388
 
        freer_(); // xt_unlock_mutex(db->db_sw_tab_lock)
2389
 
 
2390
 
        if (!cleanup_complete) {
2391
 
                freer_(); // xn_release_log_w_self(xlog)
 
1644
        if (!cleanup_complete)
2392
1645
                return FAILED;
2393
 
        }
2394
1646
 
2395
1647
        /* Write the log to indicate the transaction has been cleaned: */
2396
 
        if (begin_offset) {
2397
 
                xtWord1 status = xact->xd_committed ? XT_XN_STATUS_COMMITTED : XT_XN_STATUS_ABORTED;
2398
 
 
2399
 
                if (!xn_write_log(xlog, xlog->xl_sw_file, begin_offset + offsetof(XTXactBeginEntryDRec, xe_status_1), 1, &status))
2400
 
                        throw_();
2401
 
        }
2402
 
 
2403
 
        xn_sw_add_xact_to_free(self, ss, xact->xd_start_id);
2404
 
 
2405
 
        if (!self->t_quit && xlog->xl_file_size + xlog->xl_offset >= XT_XACT_LOG_ROLLOVER_SIZE && !xlog->xl_exfile) {
2406
 
                ASSERT_NS(xlog->xl_offset == 0);
2407
 
                /* Check if we have just cleaned up the last transaction in this log: */
2408
 
                if (curr_offset >= xlog->xl_file_size + xlog->xl_offset) {
2409
 
                        char                            path[PATH_MAX];
2410
 
 
2411
 
                        if (!xt_xn_log_ids(self, db))
2412
 
                                throw_();
2413
 
 
2414
 
                        if (xlog->xl_sw_file) {
2415
 
                                xt_close_file_ns(xlog->xl_sw_file);
2416
 
                                xlog->xl_sw_file = NULL;
2417
 
                        }
2418
 
 
2419
 
                        xn_logname(PATH_MAX, path, xlog->xl_db, xlog->xl_number);
2420
 
                        if (!xt_fs_delete(NULL, path))
2421
 
                                xt_log_and_clear_exception_ns();
2422
 
 
2423
 
                        xlog->xl_recovered = FALSE;
2424
 
                        xlog->xl_file_size = 0;
2425
 
                        xlog->xl_offset = 0;
2426
 
 
2427
 
                        if (xlog->xl_number <= db->db_xn_high_log) {
2428
 
                                /* Recalculate the high log: */
2429
 
                                xt_mutex_lock(&db->db_xn_log_lock);
2430
 
                                xn_recalc_high_log(db);
2431
 
                                xt_mutex_unlock(&db->db_xn_log_lock);
2432
 
                        }
2433
 
                }
2434
 
        }
2435
 
 
2436
 
        freer_(); // xn_release_log_w_self(xlog)
 
1648
        XTXactCleanupEntryDRec cu;
 
1649
 
 
1650
        cu.xc_status_1 = XT_LOG_ENT_CLEANUP;
 
1651
        cu.xc_checksum_1 = XT_CHECKSUM_1(XT_CHECKSUM4_XACT(xact->xd_start_xn_id));
 
1652
        XT_SET_DISK_4(cu.xc_xact_id_4, xact->xd_start_xn_id);
 
1653
 
 
1654
        if (!self->st_xact_buf.xbuf_log_data(self, sizeof(XTXactCleanupEntryDRec), (XTXactLogBufferDPtr) &cu, FALSE))
 
1655
                return FAILED;
 
1656
 
 
1657
        ss->ss_flush_pending = TRUE;
 
1658
 
 
1659
        xn_sw_add_xact_to_free(self, ss, xact->xd_start_xn_id);
2437
1660
        return OK;
2438
1661
}
2439
1662
 
2443
1666
static xtBool xn_sw_cleanup_records(XTThreadPtr self, XNSweeperStatePtr ss)
2444
1667
{
2445
1668
        XTDatabaseHPtr  db = ss->ss_db;
2446
 
        xtWord4                 tab_id;
2447
 
        off_t                   address;
 
1669
        xtTableID               tab_id = 0;
 
1670
        xtRecordID              rec_id = 0;
2448
1671
        XTXactDataRec   xact;
2449
1672
 
2450
1673
        if (!xt_sl_get_size(db->db_sw_cu_list))
2451
1674
                return FALSE;
2452
1675
 
2453
 
        xt_lock_mutex(self, &db->db_sw_tab_lock);
2454
 
        pushr_(xt_unlock_mutex, &db->db_sw_tab_lock);
2455
 
 
2456
 
        xact.xd_start_id = 0;
2457
 
        xact.xd_end_id = 0;
 
1676
        xact.xd_start_xn_id = 0;
 
1677
        xact.xd_end_xn_id = 0;
 
1678
        xact.xd_end_time = 0;
2458
1679
        xact.xd_begin_log = 0;
2459
1680
        xact.xd_begin_offset = 0;
2460
 
        xact.xd_committed = FALSE;
2461
 
        while (!self->t_quit && xn_get_cu_record(db, &tab_id, &address)) {
 
1681
        xact.xd_flags = 0;
 
1682
        while (!self->t_quit && xn_get_cu_record(db, &tab_id, &rec_id)) {
2462
1683
                xn_sw_could_go_faster(self, db);
2463
1684
 
2464
 
                if (!xn_sw_cleanup_variation(self, ss, &xact, tab_id, address, 0, XN_FORCE_CLEANUP)) {
2465
 
                        xn_add_cu_record(db, tab_id, address);
 
1685
                ss->ss_flush_pending = TRUE;
 
1686
                if (!xn_sw_cleanup_variation(self, ss, &xact, tab_id, rec_id, 0, XN_FORCE_CLEANUP, 0, 0, NULL)) {
 
1687
                        xn_add_cu_record(db, tab_id, rec_id);
2466
1688
                        break;
2467
1689
                }
2468
1690
        }
2469
1691
 
2470
 
        xn_sw_service_to_free(self, ss, FALSE, FALSE);
2471
 
 
2472
 
        freer_(); // xt_unlock_mutex(db->db_sw_tab_lock)
 
1692
        xn_sw_service_to_free(self, ss, FALSE);
2473
1693
        
2474
1694
        return TRUE;
2475
1695
}
2476
1696
 
2477
1697
static void xn_free_sw_state(XTThreadPtr self, XNSweeperStatePtr ss)
2478
1698
{
2479
 
        xn_seq_exit(self, &ss->ss_seqread);
 
1699
        xn_sw_close_open_table(self, ss);
 
1700
        if (ss->ss_db)
 
1701
                ss->ss_db->db_xlog.xlog_seq_exit(&ss->ss_seqread);
2480
1702
        xt_db_set_size(self, &ss->ss_databuf, 0);
2481
1703
        xt_bq_set_size(self, &ss->ss_to_free, 0);
2482
1704
}
2491
1713
 
2492
1714
        alloczr_(ss, xn_free_sw_state, sizeof(XNSweeperStateRec), XNSweeperStatePtr);
2493
1715
        ss->ss_db = db;
2494
 
        xn_seq_init(self, &ss->ss_seqread, NULL, NULL);
 
1716
 
 
1717
        if (!db->db_xlog.xlog_seq_init(&ss->ss_seqread, xt_db_transaction_buffer_size))
 
1718
                xt_throw(self);
 
1719
 
2495
1720
        ss->ss_to_free.bq_item_size = sizeof(XNSWToFreeItemRec);
2496
1721
        ss->ss_to_free.bq_max_waste = XT_TN_MAX_TO_FREE_WASTE;
2497
1722
        ss->ss_to_free.bq_item_inc = XT_TN_MAX_TO_FREE_INC;
2498
1723
        ss->ss_call_cnt = 0;
 
1724
        ss->ss_flush_pending = FALSE;
2499
1725
 
2500
1726
        while (!self->t_quit) {
2501
 
                while (!self->t_quit && db->db_xn_to_clean_id <= xn_get_curr_id(db)) {
 
1727
                while (!self->t_quit) {
 
1728
                        /* We are just about to check the condition for sleeping,
 
1729
                         * so if the condition for sleeping holds, then we wil
 
1730
                         * exit the loop and sleep.
 
1731
                         *
 
1732
                         * We will then sleep if nobody sets the flag before we
 
1733
                         * actually do sleep!
 
1734
                         */
 
1735
                        db->db_sw_wakeup = FALSE;
 
1736
                        if (xt_xn_is_before(xt_xn_get_curr_id(db), db->db_xn_to_clean_id))
 
1737
                                break;
2502
1738
                        xn_sw_could_go_faster(self, db);
2503
1739
 
2504
 
                        if ((xact = xn_get_xact(db, db->db_xn_to_clean_id))) {
2505
 
                                xtWord8 xn_id;
 
1740
                        if ((xact = xt_xn_get_xact(db, db->db_xn_to_clean_id))) {
 
1741
                                xtXactID xn_id;
2506
1742
 
2507
 
                                if (!xact->xd_end_id)
 
1743
                                if (!(xact->xd_flags & XT_XN_XAC_ENDED))
2508
1744
                                        /* Transaction has not yet committed. */
2509
1745
                                        goto sleep;
2510
1746
 
2511
1747
                                /* Check if we can cleanup the transaction.
2512
1748
                                 * We do this by checking to see if there is any running
2513
 
                                 * transaction which start before the end if this transaction.
 
1749
                                 * transaction which start before the end of this transaction.
2514
1750
                                 */
2515
 
                                xn_id = xact->xd_start_id;
2516
 
                                while (xn_id < xact->xd_end_id) {
 
1751
                                xn_id = xact->xd_start_xn_id;
 
1752
                                while (xt_xn_is_before(xn_id, xact->xd_end_xn_id)) {
2517
1753
                                        xn_id++;
2518
 
                                        if ((xact2 = xn_get_xact(db, xn_id))) {
2519
 
                                                if (!xact2->xd_end_id)
 
1754
                                        if ((xact2 = xt_xn_get_xact(db, xn_id))) {
 
1755
                                                if (!(xact2->xd_flags & XT_XN_XAC_ENDED))
2520
1756
                                                        /* A transaction was started before the end of
2521
1757
                                                         * the transaction we wish to sweep, and this
2522
1758
                                                         * transaction has not committed, the we have to
2527
1763
                                }
2528
1764
                                
2529
1765
                                /* Can cleanup the transaction, and move to the next. */
2530
 
                                if (xact->xd_begin_log) {
2531
 
                                        if (!xn_sw_cleanup_xact(self, ss, xact))
 
1766
                                if (xact->xd_flags & XT_XN_XAC_LOGGED) {
 
1767
                                        if (!xn_sw_cleanup_xact(self, ss, xact)) {
2532
1768
                                                /* We failed to clean (try again later)... */
2533
1769
                                                goto sleep;
 
1770
                                        }
2534
1771
                                }
2535
1772
                                else {
2536
1773
                                        /* This was a read-only transaction, it is safe to
2538
1775
                                         * (should not be necessary because RO transactions
2539
1776
                                         * do this themselves):
2540
1777
                                         */
2541
 
                                        if (xn_delete_xact(db, db->db_xn_to_clean_id)) {
 
1778
                                        if (xt_xn_delete_xact(db, db->db_xn_to_clean_id)) {
2542
1779
                                                if (db->db_xn_min_ram_id == db->db_xn_to_clean_id)
2543
1780
                                                        db->db_xn_min_ram_id = db->db_xn_to_clean_id+1;
2544
1781
                                        }
2553
1790
                if (xn_sw_cleanup_records(self, ss))
2554
1791
                        continue;
2555
1792
                        
2556
 
                if (xn_sw_service_to_free(self, ss, TRUE, FALSE))
 
1793
                if (xn_sw_service_to_free(self, ss, TRUE))
2557
1794
                        continue;
2558
1795
 
 
1796
                xn_sw_close_open_table(self, ss);
 
1797
 
2559
1798
                xn_sw_could_go_slower(self, db);
2560
1799
 
2561
1800
                /* Shrink the free list, if it is empty, and larger then
2566
1805
                                xt_bq_set_size(self, &ss->ss_to_free, XT_TN_MAX_TO_FREE);
2567
1806
                }
2568
1807
 
2569
 
                xn_sw_wait_for_xact(self, db, 120); // Poll every 2 minutes (we may miss the signal)
 
1808
                if (ss->ss_flush_pending) {
 
1809
                        time_t then, now;
 
1810
 
 
1811
                        /* If nothing happens in the next 2.4 seconds, then
 
1812
                         * we will flush the log.
 
1813
                         */
 
1814
                        then = time(NULL);
 
1815
                        xn_sw_wait_for_xact(self, db, 240);
 
1816
 
 
1817
                        /* Flush pending means we have written something to the log.
 
1818
                         *
 
1819
                         * if so we flush the log so that the writer will also do
 
1820
                         * its work!
 
1821
                         *
 
1822
                         * This will lead to the freeer continuing if it is waiting.
 
1823
                         */
 
1824
                        now = time(NULL);
 
1825
                        if (now >= then + 2) {
 
1826
                                if (!self->st_xact_buf.xbuf_flush_log(self, TRUE))
 
1827
                                        xt_throw(self);
 
1828
                                ss->ss_flush_pending = FALSE;
 
1829
                        }
 
1830
                }
 
1831
                else
 
1832
                        /* If no flush is pending, we can wait much longer: */
 
1833
                        xn_sw_wait_for_xact(self, db, 12000);
 
1834
        }
 
1835
 
 
1836
        if (ss->ss_flush_pending) {
 
1837
                self->st_xact_buf.xbuf_flush_log(self, TRUE);
 
1838
                ss->ss_flush_pending = FALSE;
2570
1839
        }
2571
1840
 
2572
1841
        freer_(); // xn_free_sw_state(ss)
2573
1842
}
2574
1843
 
2575
 
static void sw_close_tables(XTThreadPtr self, XTDatabaseHPtr db)
2576
 
{
2577
 
        xt_lock_mutex(self, &db->db_sw_tab_lock);
2578
 
        pushr_(xt_unlock_mutex, &db->db_sw_tab_lock);
2579
 
        xn_sw_free_tables(self, db);
2580
 
        freer_(); // xt_unlock_mutex(db->db_sw_tab_lock)
2581
 
}
2582
 
 
2583
1844
static void *xn_sw_run_thread(XTThreadPtr self)
2584
1845
{
2585
1846
        XTDatabaseHPtr  db = (XTDatabaseHPtr) self->t_data;
2614
1875
                }
2615
1876
                catch_(a) {
2616
1877
                        /* This error is "normal"! */
2617
 
                        if (self->t_exception.e_xt_err != XT_ERR_NO_DICTIONARY)
 
1878
                        if (self->t_exception.e_xt_err != XT_ERR_NO_DICTIONARY &&
 
1879
                                !(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
 
1880
                                self->t_exception.e_sys_err == SIGTERM))
2618
1881
                                xt_log_and_clear_exception(self);
2619
 
                        sw_close_tables(self, db);
2620
1882
                }
2621
1883
                cont_(a);
2622
1884
 
2636
1898
                        count--;
2637
1899
                }
2638
1900
        }
2639
 
        sw_close_tables(self, db);
2640
1901
        return NULL;
2641
1902
}
2642
1903
 
2653
1914
}
2654
1915
 
2655
1916
/* Wait for a transaction to quit: */
2656
 
static void xn_sw_wait_for_xact(XTThreadPtr self, XTDatabaseHPtr db, u_int secs)
 
1917
static void xn_sw_wait_for_xact(XTThreadPtr self, XTDatabaseHPtr db, u_int hsecs)
2657
1918
{
2658
1919
        xt_lock_mutex(self, &db->db_xn_wait_lock);
2659
1920
        pushr_(xt_unlock_mutex, &db->db_xn_wait_lock);
2660
1921
        db->db_sw_idle = TRUE;
2661
 
        if (!self->t_quit)
2662
 
                xt_timed_wait_cond(self, &db->db_xn_wait_cond, &db->db_xn_wait_lock, secs * 1000);
 
1922
        if (!self->t_quit && !db->db_sw_wakeup)
 
1923
                xt_timed_wait_cond(self, &db->db_xn_wait_cond, &db->db_xn_wait_lock, hsecs * 10);
2663
1924
        db->db_sw_idle = FALSE;
 
1925
        db->db_sw_check_count++;
2664
1926
        freer_(); // xt_unlock_mutex(&db->db_xn_wait_lock)
2665
1927
}
2666
1928
 
2667
 
static void xn_sw_start_thread(XTThreadPtr self, XTDatabaseHPtr db)
 
1929
xtPublic void xt_start_sweeper(XTThreadPtr self, XTDatabaseHPtr db)
2668
1930
{
2669
1931
        char name[PATH_MAX];
2670
1932
 
2675
1937
        xt_run_thread(self, db->db_sw_thread, xn_sw_run_thread);
2676
1938
}
2677
1939
 
2678
 
static void xn_sw_stop_thread(XTThreadPtr self, XTDatabaseHPtr db)
 
1940
xtPublic void xt_wait_for_sweeper(XTThreadPtr self, XTDatabaseHPtr db)
 
1941
{
 
1942
        time_t  then, now;
 
1943
        xtBool  message = FALSE;
 
1944
 
 
1945
        if (db->db_sw_thread) {
 
1946
                then = time(NULL);
 
1947
                while (!xt_xn_is_before(xt_xn_get_curr_id(db), db->db_xn_to_clean_id)) { // was db->db_xn_to_clean_id <= xt_xn_get_curr_id(db)
 
1948
                        xt_lock_mutex(self, &db->db_xn_wait_lock);
 
1949
                        pushr_(xt_unlock_mutex, &db->db_xn_wait_lock);
 
1950
                        if (db->db_sw_idle) {
 
1951
                                if (!xt_broadcast_cond(NULL, &db->db_xn_wait_cond)) {
 
1952
                                        xt_log_and_clear_exception_ns();
 
1953
                                        break;
 
1954
                                }
 
1955
                        }
 
1956
                        freer_(); // xt_unlock_mutex(&db->db_xn_wait_lock)
 
1957
                        xt_sleep_100th_second(1);
 
1958
                        now = time(NULL);
 
1959
                        if (now >=  now+10) {
 
1960
                                xt_logf(XT_NT_INFO, "Aborting wait for '%d' sweeper\n", db->db_name);
 
1961
                                break;
 
1962
                        }
 
1963
                        if (now >=  now+2) {
 
1964
                                if (!message) {
 
1965
                                        message = TRUE;
 
1966
                                        xt_logf(XT_NT_INFO, "Waiting for '%d' sweeper...\n", db->db_name);
 
1967
                                }
 
1968
                        }
 
1969
                }
 
1970
        }
 
1971
}
 
1972
 
 
1973
xtPublic void xt_stop_sweeper(XTThreadPtr self, XTDatabaseHPtr db)
2679
1974
{
2680
1975
        XTThreadPtr thr_sw;
2681
1976
 
2690
1985
                        thread = thr_sw->t_pthread;
2691
1986
 
2692
1987
                        /* Make sure the thread quits when woken up. */
2693
 
                        thr_sw->t_quit = TRUE;
 
1988
                        xt_terminate_thread(self, thr_sw);
 
1989
 
2694
1990
                        xt_xn_wakeup_transactions(db, TRUE);
2695
1991
        
2696
1992
                        freer_(); // xt_unlock_mutex(&db->db_xn_wait_lock)
2717
2013
        }
2718
2014
}
2719
2015
 
 
2016
#ifdef XXXXXXXXXXXX
 
2017
/* Return TRUE if records written by the given transaction are
 
2018
 * visible to the thread (i.e. the transaction of the thread).
 
2019
 */ 
 
2020
xtPublic xtBool xt_xn_visible(XTOpenTablePtr ot, xtXactID xn_id, xtRecordID rec_id, xtBool *mine)
 
2021
{
 
2022
        register XTThreadPtr    self = ot->ot_thread;
 
2023
        register XTXactDataPtr  xact;
 
2024
 
 
2025
        /* NOTE: If a transaction is not in RAM, then it is considered aborted.
 
2026
         * This means that we can only remove a transaction from memory when
 
2027
         * all transactions that were running when we started cleanup have
 
2028
         * completed.
 
2029
         * Only these transactions may have read something that has been
 
2030
         * changed by the sweeper in the meantime.
 
2031
         * For example a transaction may fill its buffer when doing a
 
2032
         * sequential read. A record may be cleaned by the sweeper that
 
2033
         * is in this buffer, but the reader already has a old copy of
 
2034
         * the data. Then the sweeper removes the transaction
 
2035
         * from RAM. The reader will then consider this record
 
2036
         * invalid.
 
2037
         */ 
 
2038
        if (xt_xn_is_before(xn_id, self->st_database->db_xn_min_ram_id)) {
 
2039
                /* This record is not clean, and the transaction is not in
 
2040
                 * RAM. This means it has be missed, so clean it up.
 
2041
                 */
 
2042
                xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, rec_id);
 
2043
                return FALSE;
 
2044
        }
 
2045
        if (xn_id == self->st_xact_data->xd_start_xn_id) {
 
2046
                if (mine)
 
2047
                        *mine = TRUE;
 
2048
                return TRUE;
 
2049
        }
 
2050
        if (xt_xn_is_before(self->st_xact_data->xd_start_xn_id, xn_id))
 
2051
                /* This record is written after the this transaction
 
2052
                 * started (is not visible).
 
2053
                 */
 
2054
                return FALSE;
 
2055
        if (!(xact = xt_xn_get_xact(self->st_database, xn_id))) {
 
2056
                xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, rec_id);
 
2057
                return FALSE;
 
2058
        }
 
2059
        if (!(xact->xd_flags & XT_XN_XAC_ENDED) || !xt_xn_is_before(xact->xd_end_xn_id, self->st_xact_data->xd_start_xn_id)) // was >=
 
2060
                /* Either this transaction has not yet ended, or this
 
2061
                 * record was written by a transaction that ended
 
2062
                 * after the reading transaction started!
 
2063
                 * So this record is not visible!
 
2064
                 */ 
 
2065
                return FALSE;
 
2066
        /* Visible if the transaction was committed: */
 
2067
        return xact->xd_flags & XT_XN_XAC_COMMITTED;
 
2068
}
 
2069
 
 
2070
/*
 
2071
 * Return TRUE if the record has been commited.
 
2072
 */
 
2073
xtPublic xtBool xt_xn_committed(XTOpenTablePtr ot, xtXactID xn_id, xtRecordID rec_id, xtBool *mine)
 
2074
{
 
2075
        register XTThreadPtr    self = ot->ot_thread;
 
2076
        register XTXactDataPtr  xact;
 
2077
 
 
2078
        if (xt_xn_is_before(xn_id, self->st_database->db_xn_min_ram_id)) {
 
2079
                /* This record is not clean, and the transaction is not in
 
2080
                 * RAM. This means it has be missed, so clean it up.
 
2081
                 */
 
2082
                xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, rec_id);
 
2083
                return FALSE;
 
2084
        }
 
2085
        if (xn_id == self->st_xact_data->xd_start_xn_id) {
 
2086
                if (mine)
 
2087
                        *mine = TRUE;
 
2088
                return TRUE;
 
2089
        }
 
2090
        if (!(xact = xt_xn_get_xact(self->st_database, xn_id))) {
 
2091
                xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, rec_id);
 
2092
                return FALSE;
 
2093
        }
 
2094
        if (!(xact->xd_flags & XT_XN_XAC_ENDED))
 
2095
                /* Either this transaction has not yet ended, or */ 
 
2096
                return FALSE;
 
2097
        /* TRUE if the record was committed: */
 
2098
        return xact->xd_flags & XT_XN_XAC_COMMITTED;
 
2099
}
 
2100
 
 
2101
/*
 
2102
 * Return TRUE of the transaction is committed, or may be
 
2103
 * committed in the future.
 
2104
 *
 
2105
 * if used, 'wait' must be initialized to FALSE!
 
2106
 *
 
2107
 * It will be set to TRUE if the transaction has not yet ended.
 
2108
 * Return FALSE of the transaction was aborted.
 
2109
 *
 
2110
 * Return TRUE if the transaction was committed 
 
2111
 */
 
2112
xtPublic xtBool xt_xn_maybe_committed(XTOpenTablePtr ot, xtXactID xn_id, xtRecordID rec_id, xtBool *mine, xtBool *wait)
 
2113
{
 
2114
        register XTThreadPtr    self = ot->ot_thread;
 
2115
        register XTXactDataPtr  xact;
 
2116
 
 
2117
        if (xt_xn_is_before(xn_id, self->st_database->db_xn_min_ram_id)) {
 
2118
                /* Not in RAM, rollback done: */
 
2119
                xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, rec_id);
 
2120
                return FALSE;
 
2121
        }
 
2122
        if (xn_id == self->st_xact_data->xd_start_xn_id) {
 
2123
                if (mine)
 
2124
                        *mine = TRUE;
 
2125
                return TRUE;
 
2126
        }
 
2127
        if (!(xact = xt_xn_get_xact(self->st_database, xn_id))) {
 
2128
                /* Not in RAM, rollback done: */
 
2129
                xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, rec_id);
 
2130
                return FALSE;
 
2131
        }
 
2132
        if (!(xact->xd_flags & XT_XN_XAC_ENDED)) {
 
2133
                /* Transaction not ended, may be visible. */
 
2134
                if (wait)
 
2135
                        *wait = TRUE;
 
2136
                return TRUE;
 
2137
        }
 
2138
        /* Visible if the transaction was committed: */
 
2139
        return xact->xd_flags & XT_XN_XAC_COMMITTED;
 
2140
}
 
2141
 
 
2142
#endif
 
2143