~posulliv/drizzle/optimizer-style-cleanup

« back to all changes in this revision

Viewing changes to plugin/pbxt/src/xaction_xt.cc

  • Committer: Padraig O'Sullivan
  • Date: 2010-04-17 01:38:47 UTC
  • mfrom: (1237.9.238 bad-staging)
  • Revision ID: osullivan.padraig@gmail.com-20100417013847-ibjioqsfbmf5yg4g
Merge trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2005 PrimeBase Technologies GmbH
 
2
 *
 
3
 * PrimeBase XT
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * 2005-04-10   Paul McCullagh
 
20
 *
 
21
 * H&G2JCtL
 
22
 */
 
23
 
 
24
#include "xt_config.h"
 
25
 
 
26
#ifdef DRIZZLED
 
27
#include <bitset>
 
28
#endif
 
29
 
 
30
#include <time.h>
 
31
#include <signal.h>
 
32
 
 
33
#include "xaction_xt.h"
 
34
#include "database_xt.h"
 
35
#include "strutil_xt.h"
 
36
#include "heap_xt.h"
 
37
#include "trace_xt.h"
 
38
#include "myxt_xt.h"
 
39
#include "tabcache_xt.h"
 
40
 
 
41
#ifdef DEBUG
 
42
//#define TRACE_WAIT_FOR
 
43
//#define TRACE_VARIATIONS
 
44
//#define TRACE_SWEEPER_ACTIVITY
 
45
 
 
46
/* Enable to trace the statements executed by the engine: */
 
47
//#define TRACE_STATEMENTS
 
48
#endif
 
49
 
 
50
#if defined(TRACE_STATEMENTS) || defined(TRACE_VARIATIONS)
 
51
#define TRACE_TRANSACTION
 
52
#endif
 
53
 
 
54
static void xn_sw_wait_for_xact(XTThreadPtr self, XTDatabaseHPtr db, u_int hsecs);
 
55
static xtBool xn_get_xact_details(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr XT_UNUSED(thread), int *flags, xtXactID *start, xtXactID *end, xtThreadID *thd_id);
 
56
static xtBool xn_get_xact_pointer(XTDatabaseHPtr db, xtXactID xn_id, XTXactDataPtr *xact_ptr);
 
57
 
 
58
/* ============================================================================================== */
 
59
 
 
60
typedef struct XNSWRecItem {
 
61
        xtTableID                               ri_tab_id;
 
62
        xtRecordID                              ri_rec_id;
 
63
} XNSWRecItemRec, *XNSWRecItemPtr;
 
64
 
 
65
typedef struct XNSWToFreeItem {
 
66
        xtTableID                               ri_tab_id;                      /* If non-zero, then this is the table of the data record to be freed.
 
67
                                                                                                 * If zero, then this free the transaction below must be freed.
 
68
                                                                                                 */
 
69
        union {
 
70
                xtRecordID                      ri_rec_id;
 
71
                xtXactID                        ri_xn_id;
 
72
        } x;
 
73
        xtXactID                                ri_wait_xn_id;          /* Wait for this transaction to be cleaned (or being cleaned up)
 
74
                                                                                                 * before freeing this resource. */
 
75
} XNSWToFreeItemRec, *XNSWToFreeItemPtr;
 
76
 
 
77
/* ----------------------------------------------------------------------
 
78
 * WAIT FOR TRANSACTIONS
 
79
 */
 
80
 
 
81
typedef struct XNWaitFor {
 
82
        xtXactID                                wf_waiting_xn_id;               /* The transaction of the waiting thread. */
 
83
        xtXactID                                wf_for_me_xn_id;                /* The transaction we are waiting for. */
 
84
} XNWaitForRec, *XNWaitForPtr;
 
85
 
 
86
static int xn_compare_wait_for(XTThreadPtr XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
 
87
{
 
88
        xtXactID                *x = (xtXactID *) a;
 
89
        XNWaitForPtr    y = (XNWaitForPtr) b;
 
90
 
 
91
        if (*x == y->wf_waiting_xn_id)
 
92
                return 0;
 
93
        if (xt_xn_is_before(*x, y->wf_waiting_xn_id))
 
94
                return -1;
 
95
        return 1;
 
96
}
 
97
 
 
98
static void xn_free_wait_for(XTThreadPtr XT_UNUSED(self), void *XT_UNUSED(thunk), void *XT_UNUSED(item))
 
99
{
 
100
}
 
101
 
 
102
/*
 
103
 * A deadlock occurs when a transaction is waiting for itself!
 
104
 * For example A is waiting for B which is waiting for A.
 
105
 * By repeatedly scanning the wait_for list we can find out if a
 
106
 * transaction is waiting for itself.
 
107
 */
 
108
static xtBool xn_detect_deadlock(XTDatabaseHPtr db, xtXactID waiting, xtXactID for_me)
 
109
{
 
110
        XNWaitForPtr wf;
 
111
 
 
112
        for (;;) {
 
113
                if (waiting == for_me) {
 
114
#ifdef TRACE_WAIT_FOR
 
115
                        for (u_int i=0; i<xt_sl_get_size(db->db_xn_wait_for); i++) {
 
116
                                wf = (XNWaitForPtr) xt_sl_item_at(db->db_xn_wait_for, i);
 
117
                                xt_trace("T%lu --> T%lu\n", (u_long) wf->wf_waiting_xn_id, (u_long) wf->wf_for_me_xn_id);
 
118
                        }
 
119
                        xt_ttracef(xt_get_self(), "DEADLOCK\n");
 
120
                        xt_dump_trace();
 
121
#endif
 
122
                        xt_register_xterr(XT_REG_CONTEXT, XT_ERR_DEADLOCK);
 
123
                        return TRUE;
 
124
                }
 
125
                if (!(wf = (XNWaitForPtr) xt_sl_find(NULL, db->db_xn_wait_for, &for_me)))
 
126
                        break;
 
127
                for_me = wf->wf_for_me_xn_id;
 
128
        }
 
129
        return FALSE;
 
130
}
 
131
 
 
132
#ifdef XT_USE_SPINLOCK_WAIT_FOR
 
133
 
 
134
#if defined(XT_MAC) || defined(XT_WIN)
 
135
#define WAIT_SPIN_COUNT                 10
 
136
#else
 
137
#define WAIT_SPIN_COUNT                 50
 
138
#endif
 
139
 
 
140
/* Should not be required, but we wait for a second,
 
141
 * just in case the wakeup is missed!
 
142
 */
 
143
#ifdef DEBUG
 
144
#define WAIT_FOR_XACT_TIME              30000
 
145
#else
 
146
#define WAIT_FOR_XACT_TIME              1000
 
147
#endif
 
148
 
 
149
static xtBool xn_add_to_wait_for(XTDatabaseHPtr db, XNWaitForPtr wf, XTThreadPtr thread)
 
150
{
 
151
        /* If we are waiting for a transaction to end, 
 
152
         * put this thread on the wait list...
 
153
         *
 
154
         * As long as the temporary lock is removed
 
155
         * or turned into a permanent lock before
 
156
         * a thread waits again, all should be OK!
 
157
         */
 
158
        xt_spinlock_lock(&db->db_xn_wait_spinlock);
 
159
 
 
160
#ifdef TRACE_WAIT_FOR
 
161
        xt_ttracef(thread, "T%lu -wait-> T%lu\n", (u_long) thread->st_xact_data->xd_start_xn_id, (u_long) wait_xn_id);
 
162
#endif
 
163
        /* Check for a deadlock: */
 
164
        if (xn_detect_deadlock(db, wf->wf_waiting_xn_id, wf->wf_for_me_xn_id))
 
165
                goto failed;
 
166
 
 
167
        /* We will wait for this transaction... */
 
168
        db->db_xn_wait_count++;
 
169
        if (thread->st_xact_writer)
 
170
                db->db_xn_writer_wait_count++;
 
171
 
 
172
        if (!xt_sl_insert(NULL, db->db_xn_wait_for, &wf->wf_waiting_xn_id, wf)) {
 
173
                db->db_xn_wait_count--;
 
174
                goto failed;
 
175
        }
 
176
 
 
177
        xt_spinlock_unlock(&db->db_xn_wait_spinlock);
 
178
        return OK;
 
179
 
 
180
        failed:
 
181
        xt_spinlock_unlock(&db->db_xn_wait_spinlock);
 
182
        return FAILED;
 
183
}
 
184
 
 
185
inline void xn_remove_from_wait_for(XTDatabaseHPtr db, XNWaitForPtr wf, XTThreadPtr thread)
 
186
{
 
187
        xt_spinlock_lock(&db->db_xn_wait_spinlock);
 
188
 
 
189
        xt_sl_delete(NULL, db->db_xn_wait_for, &wf->wf_waiting_xn_id);
 
190
        db->db_xn_wait_count--;
 
191
        if (thread->st_xact_writer)
 
192
                db->db_xn_writer_wait_count--;
 
193
 
 
194
#ifdef TRACE_WAIT_FOR
 
195
        xt_ttracef(thread, "T%lu -wait-> T%lu FAILED\n", (u_long) thread->st_xact_data->xd_start_xn_id, (u_long) wait_xn_id);
 
196
#endif
 
197
        xt_spinlock_unlock(&db->db_xn_wait_spinlock);
 
198
}
 
199
 
 
200
/* Wait for a transation to terminate or a lock to be granted.
 
201
 *
 
202
 * If term_req is TRUE, then the termination of the transaction is required
 
203
 * before continuing.
 
204
 *
 
205
 * If pw_func is set then this function will not return before this call has
 
206
 * succeeded.
 
207
 *
 
208
 * This function returns FAILE on error.
 
209
 */
 
210
xtPublic xtBool xt_xn_wait_for_xact(XTThreadPtr thread, XTXactWaitPtr xw, XTLockWaitPtr lw)
 
211
{
 
212
        XTDatabaseHPtr          db = thread->st_database;
 
213
        XNWaitForRec            wf;
 
214
        int                                     flags = 0;
 
215
        xtXactID                        start = 0;
 
216
        XTXactDataPtr           wait_xact_ptr;
 
217
        xtBool                          on_wait_list = FALSE;
 
218
        XTXactWaitRec           xw_new;
 
219
        u_int                           loop_count = 0;
 
220
        XTWaitThreadPtr         my_wt;
 
221
 
 
222
        ASSERT_NS(thread->st_xact_data);
 
223
        thread->st_statistics.st_wait_for_xact++;
 
224
 
 
225
        wf.wf_waiting_xn_id = thread->st_xact_data->xd_start_xn_id;
 
226
 
 
227
        if (lw) {
 
228
                /* If we are here, then the lw structure is on the wait
 
229
                 * queue for the given lock.
 
230
                 */
 
231
                xtXactID locking_xn_id;
 
232
                
 
233
                wait_for_locker:
 
234
                locking_xn_id = lw->lw_xn_id;
 
235
                wf.wf_for_me_xn_id = lw->lw_xn_id;
 
236
                if (!xn_add_to_wait_for(db, &wf, thread)) {
 
237
                        lw->lw_ot->ot_table->tab_locks.xt_cancel_temp_lock(lw);
 
238
                        return FAILED;
 
239
                }
 
240
 
 
241
                while (loop_count < WAIT_SPIN_COUNT) {
 
242
                        loop_count++;
 
243
 
 
244
                        switch (lw->lw_curr_lock) {
 
245
                                case XT_LOCK_ERR:
 
246
                                        xn_remove_from_wait_for(db, &wf, thread);
 
247
                                        return FAILED;
 
248
                                case XT_NO_LOCK:
 
249
                                        /* Got the lock: */
 
250
                                        /* Check if we must also wait for the transaction: */
 
251
                                        if (lw->lw_row_updated) {
 
252
                                                /* This will override the xw passed in.
 
253
                                                 * The reason is, because we are actually waiting
 
254
                                                 * for a lock, and the lock owner may have changed
 
255
                                                 * while we were waiting for the lock.
 
256
                                                 */
 
257
                                                xw_new.xw_xn_id = lw->lw_updating_xn_id;
 
258
                                                xw = &xw_new;
 
259
                                        }
 
260
                                        if (xw) {
 
261
                                                if (wf.wf_for_me_xn_id == xw->xw_xn_id)
 
262
                                                        on_wait_list = TRUE;
 
263
                                                else
 
264
                                                        xn_remove_from_wait_for(db, &wf, thread);
 
265
                                                goto wait_for_xact;
 
266
                                        }
 
267
                                        xn_remove_from_wait_for(db, &wf, thread);
 
268
                                        return OK;
 
269
                                case XT_TEMP_LOCK:
 
270
                                case XT_PERM_LOCK:
 
271
                                        if (locking_xn_id != lw->lw_xn_id) {
 
272
                                                /* Change the transaction that we are waiting for: */
 
273
                                                xn_remove_from_wait_for(db, &wf, thread);
 
274
                                                goto wait_for_locker;
 
275
                                        }
 
276
                                        break;
 
277
                        }
 
278
 
 
279
                        xt_critical_wait();
 
280
                }
 
281
 
 
282
 
 
283
                /* The non-spinning version... */
 
284
                wait_for_locker_no_spin:
 
285
                THR_ARRAY_READ_LOCK(&xt_thr_array_resize_lock, thread->t_id);
 
286
                my_wt = xt_thr_array[thread->t_id].td_waiting;
 
287
                THR_ARRAY_UNLOCK(&xt_thr_array_resize_lock, thread->t_id);
 
288
                xt_lock_mutex_ns(&my_wt->wt_lock);
 
289
 
 
290
                for (;;) {
 
291
                        switch (lw->lw_curr_lock) {
 
292
                                case XT_LOCK_ERR:
 
293
                                        xt_unlock_mutex_ns(&my_wt->wt_lock);
 
294
                                        xn_remove_from_wait_for(db, &wf, thread);
 
295
                                        return FAILED;
 
296
                                case XT_NO_LOCK:
 
297
                                        xt_unlock_mutex_ns(&my_wt->wt_lock);
 
298
                                        if (lw->lw_row_updated) {
 
299
                                                xw_new.xw_xn_id = lw->lw_updating_xn_id;
 
300
                                                xw = &xw_new;
 
301
                                        }
 
302
                                        if (xw) {
 
303
                                                if (wf.wf_for_me_xn_id == xw->xw_xn_id)
 
304
                                                        on_wait_list = TRUE;
 
305
                                                else
 
306
                                                        xn_remove_from_wait_for(db, &wf, thread);
 
307
                                                goto wait_for_xact;
 
308
                                        }
 
309
                                        xn_remove_from_wait_for(db, &wf, thread);
 
310
                                        return OK;
 
311
                                case XT_TEMP_LOCK:
 
312
                                case XT_PERM_LOCK:
 
313
                                        if (locking_xn_id != lw->lw_xn_id) {
 
314
                                                /* Change the transaction that we are waiting for: */
 
315
                                                xt_unlock_mutex_ns(&my_wt->wt_lock);
 
316
                                                xn_remove_from_wait_for(db, &wf, thread);
 
317
                                                locking_xn_id = lw->lw_xn_id;
 
318
                                                wf.wf_for_me_xn_id = lw->lw_xn_id;
 
319
                                                if (!xn_add_to_wait_for(db, &wf, thread)) {
 
320
                                                        lw->lw_ot->ot_table->tab_locks.xt_cancel_temp_lock(lw);
 
321
                                                        return FAILED;
 
322
                                                }
 
323
                                                goto wait_for_locker_no_spin;
 
324
                                        }
 
325
                                        break;
 
326
                        }
 
327
 
 
328
                        xt_timed_wait_cond_ns(&my_wt->wt_cond, &my_wt->wt_lock, WAIT_FOR_XACT_TIME);
 
329
                }
 
330
 
 
331
                /* Unreachable
 
332
                xt_unlock_mutex_ns(&my_wt->wt_lock);
 
333
                */
 
334
        }
 
335
 
 
336
        if (xw) {
 
337
                xtThreadID              tn_thd_id;
 
338
 
 
339
                wait_for_xact:
 
340
                wf.wf_for_me_xn_id = xw->xw_xn_id;
 
341
 
 
342
                if (!xn_get_xact_pointer(db, xw->xw_xn_id, &wait_xact_ptr))
 
343
                        /* The transaction was not found... */
 
344
                        goto wait_done;
 
345
 
 
346
                if (wait_xact_ptr) {
 
347
                        /* This is a dirty read, but it should work! */
 
348
                        flags = wait_xact_ptr->xd_flags;
 
349
                        start = wait_xact_ptr->xd_start_xn_id;
 
350
                        tn_thd_id = wait_xact_ptr->xd_thread_id;
 
351
                }
 
352
                else {
 
353
                        tn_thd_id = 0;
 
354
                        if (!xn_get_xact_details(db, xw->xw_xn_id, thread, &flags, &start, NULL, &tn_thd_id))
 
355
                                flags = XT_XN_XAC_ENDED | XT_XN_XAC_SWEEP;
 
356
                }
 
357
 
 
358
                if ((flags & XT_XN_XAC_ENDED) || start != xw->xw_xn_id)
 
359
                        /* The transaction has terminated! */
 
360
                        goto wait_done;
 
361
 
 
362
                /* Tell the thread we are waiting for it: */
 
363
                xt_add_to_wakeup_list(thread->t_id, tn_thd_id);
 
364
 
 
365
                if (!on_wait_list) {
 
366
                        if (!xn_add_to_wait_for(db, &wf, thread))
 
367
                                return FAILED;
 
368
                        on_wait_list = TRUE;
 
369
                }
 
370
 
 
371
                /* The spinning version: */
 
372
                while (loop_count < WAIT_SPIN_COUNT) {
 
373
                        loop_count++;
 
374
 
 
375
                        xt_critical_wait();
 
376
 
 
377
                        if (wait_xact_ptr) {
 
378
                                /* This is a dirty read, but it should work! */
 
379
                                flags = wait_xact_ptr->xd_flags;
 
380
                                start = wait_xact_ptr->xd_start_xn_id;
 
381
                        }
 
382
                        else {
 
383
                                if (!xn_get_xact_details(db, xw->xw_xn_id, thread, &flags, &start, NULL, NULL))
 
384
                                        flags = XT_XN_XAC_ENDED | XT_XN_XAC_SWEEP;
 
385
                        }
 
386
 
 
387
                        if ((flags & XT_XN_XAC_ENDED) || start != xw->xw_xn_id)
 
388
                                /* The transaction has terminated! */
 
389
                                goto wait_done;
 
390
                }
 
391
 
 
392
                /* The non-spinning version:
 
393
                 *
 
394
                 * I believe I can avoid missing the wakeup signal
 
395
                 * by locking before we check if the transaction
 
396
                 * is still running.
 
397
                 *
 
398
                 * Even though db->db_xn_wait_on_cond is "dirty read".
 
399
                 *
 
400
                 * The reason is, before the signal is sent the 
 
401
                 * lock is also aquired. This is not possible until
 
402
                 * this thread is safely sleaping.
 
403
                 */
 
404
                THR_ARRAY_READ_LOCK(&xt_thr_array_resize_lock, thread->t_id);
 
405
                my_wt = xt_thr_array[thread->t_id].td_waiting;
 
406
                THR_ARRAY_UNLOCK(&xt_thr_array_resize_lock, thread->t_id);
 
407
 
 
408
                xt_lock_mutex_ns(&my_wt->wt_lock);
 
409
 
 
410
                for (;;) {
 
411
                        if (wait_xact_ptr) {
 
412
                                /* This is a dirty read, but it should work! */
 
413
                                flags = wait_xact_ptr->xd_flags;
 
414
                                start = wait_xact_ptr->xd_start_xn_id;
 
415
                        }
 
416
                        else {
 
417
                                if (!xn_get_xact_details(db, xw->xw_xn_id, thread, &flags, &start, NULL, NULL))
 
418
                                        flags = XT_XN_XAC_ENDED | XT_XN_XAC_SWEEP;
 
419
                        }
 
420
 
 
421
                        if ((flags & XT_XN_XAC_ENDED) || start != xw->xw_xn_id)
 
422
                                /* The transaction has terminated! */
 
423
                                break;
 
424
 
 
425
                        xt_timed_wait_cond_ns(&my_wt->wt_cond, &my_wt->wt_lock, WAIT_FOR_XACT_TIME);
 
426
                }
 
427
 
 
428
                xt_unlock_mutex_ns(&my_wt->wt_lock);
 
429
 
 
430
                wait_done:
 
431
                if (on_wait_list)
 
432
                        xn_remove_from_wait_for(db, &wf, thread);
 
433
        }
 
434
 
 
435
        return OK;
 
436
}
 
437
 
 
438
#else // XT_USE_SPINLOCK_WAIT_FOR
 
439
/*
 
440
 * The given thread must wait for the specified transaction to terminate. This
 
441
 * function places the transaction of the thread on a list of waiting threads.
 
442
 *
 
443
 * Before waiting we make a check for deadlocks. A deadlock occurs
 
444
 * if waiting would introduce a cycle.
 
445
 */
 
446
xtPublic xtBool old_xt_xn_wait_for_xact(XTThreadPtr thread, xtXactID xn_id, xtBool will_retry, XTLockWaitFuncPtr pw_func, XTLockWaitPtr pw_data)
 
447
{
 
448
        XTDatabaseHPtr          db = thread->st_database;
 
449
        XNWaitForRec            wf;
 
450
        int                                     flags = 0;
 
451
        xtXactID                        start = 0;
 
452
 
 
453
        ASSERT_NS(thread->st_xact_data);
 
454
 
 
455
        thread->st_statistics.st_wait_for_xact++;
 
456
        wf.wf_waiting_xn_id = thread->st_xact_data->xd_start_xn_id;
 
457
        wf.wf_for_me_xn_id = xn_id;
 
458
        wf.wf_thread_id = thread->t_id;
 
459
 
 
460
        xt_lock_mutex_ns(&db->db_xn_wait_lock);
 
461
 
 
462
#ifdef TRACE_WAIT_FOR
 
463
        xt_ttracef(thread, "T%lu -wait-> T%lu\n", (u_long) thread->st_xact_data->xd_start_xn_id, (u_long) xn_id);
 
464
#endif
 
465
        for (;;) {
 
466
                if (!xn_get_xact_details(db, xn_id, thread, &flags, &start, NULL, NULL))
 
467
                        break;
 
468
 
 
469
                /* This is a dirty read, but it should work! */
 
470
                if ((flags & XT_XN_XAC_ENDED) || start != xn_id)
 
471
                        break;
 
472
 
 
473
                if (xn_detect_deadlock(db, wf.wf_waiting_xn_id, wf.wf_for_me_xn_id))
 
474
                        goto failed;
 
475
 
 
476
                /* We will wait for this transaction... */
 
477
                db->db_xn_wait_count++;
 
478
                if (thread->st_xact_writer)
 
479
                        db->db_xn_writer_wait_count++;
 
480
 
 
481
                if (!xt_sl_insert(NULL, db->db_xn_wait_for, &wf.wf_waiting_xn_id, &wf)) {
 
482
                        db->db_xn_wait_count--;
 
483
                        goto failed;
 
484
                }
 
485
 
 
486
                if (!xn_get_xact_details(db, xn_id, thread, &flags, &start, NULL, NULL)) {
 
487
                        xt_sl_delete(NULL, db->db_xn_wait_for, &wf.wf_waiting_xn_id);
 
488
                        db->db_xn_wait_count--;
 
489
                        if (thread->st_xact_writer)
 
490
                                db->db_xn_writer_wait_count--;
 
491
                        break;
 
492
                }
 
493
 
 
494
                if ((flags & XT_XN_XAC_ENDED) || start != xn_id) {
 
495
                        xt_sl_delete(NULL, db->db_xn_wait_for, &wf.wf_waiting_xn_id);
 
496
                        db->db_xn_wait_count--;
 
497
                        if (thread->st_xact_writer)
 
498
                                db->db_xn_writer_wait_count--;
 
499
                        break;
 
500
                }
 
501
 
 
502
                db->db_xn_post_wait[thread->t_id].pw_call_me = pw_func;
 
503
                db->db_xn_post_wait[thread->t_id].pw_thread = thread;
 
504
                db->db_xn_post_wait[thread->t_id].pw_data = pw_data;
 
505
 
 
506
                /* Timed wait because it is possible that transaction quits before
 
507
                 * we go to sleep.
 
508
                 */
 
509
                if (!xt_timed_wait_cond(NULL, &db->db_xn_wait_cond, &db->db_xn_wait_lock, 2 * 1000)) {
 
510
                        xt_sl_delete(NULL, db->db_xn_wait_for, &wf.wf_waiting_xn_id);
 
511
                        db->db_xn_wait_count--;
 
512
                        if (thread->st_xact_writer)
 
513
                                db->db_xn_writer_wait_count--;
 
514
                        goto failed;
 
515
                }
 
516
 
 
517
                db->db_xn_post_wait[thread->t_id].pw_call_me = NULL;
 
518
                xt_sl_delete(NULL, db->db_xn_wait_for, &wf.wf_waiting_xn_id);
 
519
                db->db_xn_wait_count--;
 
520
                if (thread->st_xact_writer)
 
521
                        db->db_xn_writer_wait_count--;
 
522
                
 
523
                if (will_retry)
 
524
                        break;
 
525
        }
 
526
 
 
527
#ifdef TRACE_WAIT_FOR
 
528
        xt_ttracef(thread, "T%lu -wait-> T%lu DONE\n", (u_long) thread->st_xact_data->xd_start_xn_id, (u_long) xn_id);
 
529
#endif
 
530
        xt_unlock_mutex_ns(&db->db_xn_wait_lock);
 
531
        return OK;
 
532
 
 
533
        failed:
 
534
#ifdef TRACE_WAIT_FOR
 
535
        xt_ttracef(self, "T%lu -wait-> T%lu FAILED\n", (u_long) self->st_xact_data->xd_start_xn_id, (u_long) xn_id);
 
536
#endif
 
537
        xt_unlock_mutex_ns(&db->db_xn_wait_lock);
 
538
        return FAILED;
 
539
}
 
540
 
 
541
xtPublic void old_xt_xn_wakeup_transactions(XTDatabaseHPtr db, XTThreadPtr thread)
 
542
{
 
543
        u_int                   len;
 
544
        XNWaitForPtr    wf;
 
545
 
 
546
        xt_lock_mutex_ns(&db->db_xn_wait_lock);
 
547
        /* The idea here is to release the oldest transactions
 
548
         * first. Although this may not be completely fair
 
549
         * it has the advantage that older transactions are
 
550
         * encouraged to complete first.
 
551
         *
 
552
         * I have found the following problem with this test:
 
553
         * runTest(INCREMENT_TEST, 16, INCREMENT_TEST_UPDATE_COUNT);
 
554
         * with a bit of bad luck a transaction can be starved.
 
555
         * This results in the sweeper stalling because it is
 
556
         * waiting for an old transaction to quite so that
 
557
         * it continue.
 
558
         *
 
559
         * Because the sweeper is waiting, the number of
 
560
         * versions of the record to be updated
 
561
         * begins to increase. In the above test over
 
562
         * 1600 transaction remain uncleaned.
 
563
         *
 
564
         * This means that there are 1600 version of the
 
565
         * row which must be scanned to find the most
 
566
         * recent version.
 
567
         */
 
568
        if ((len = (u_int) xt_sl_get_size(db->db_xn_wait_for))) {
 
569
                for (u_int i=0; i<len; i++) {
 
570
                        wf = (XNWaitForPtr) xt_sl_item_at(db->db_xn_wait_for, i);
 
571
                        if (db->db_xn_post_wait[wf->wf_thread_id].pw_call_me) {
 
572
                                if (db->db_xn_post_wait[wf->wf_thread_id].pw_call_me(thread, &db->db_xn_post_wait[wf->wf_thread_id]))
 
573
                                        db->db_xn_post_wait[wf->wf_thread_id].pw_call_me = NULL;
 
574
                        }
 
575
                }
 
576
                if (!xt_broadcast_cond_ns(&db->db_xn_wait_cond))
 
577
                        xt_log_and_clear_exception_ns();
 
578
        }
 
579
        ASSERT_NS(db->db_xn_wait_count == len);
 
580
        xt_unlock_mutex_ns(&db->db_xn_wait_lock);
 
581
}
 
582
#endif  // XT_USE_SPINLOCK_WAIT_FOR
 
583
 
 
584
/* ----------------------------------------------------------------------
 
585
 * Utilities
 
586
 */
 
587
 
 
588
//#define HIGH_X
 
589
#ifdef HIGH_X
 
590
u_long tot_alloced;
 
591
u_long high_alloced;
 
592
u_long not_clean_max;
 
593
u_long in_ram_max;
 
594
#endif
 
595
 
 
596
static void xn_free_xact(XTDatabaseHPtr db, XTXactSegPtr seg, XTXactDataPtr xact)
 
597
{
 
598
#ifdef HIGH_X
 
599
        tot_alloced--;
 
600
#endif
 
601
        /* This indicates the structure is free: */
 
602
        xact->xd_start_xn_id = 0;
 
603
        if ((xtWord1 *) xact >= db->db_xn_data && (xtWord1 *) xact < db->db_xn_data_end) {
 
604
                /* Put it in the free list: */
 
605
                xact->xd_next_xact = seg->xs_free_list;
 
606
                seg->xs_free_list = xact;
 
607
                return;
 
608
        }
 
609
        xt_free_ns(xact);
 
610
}
 
611
 
 
612
/*
 
613
 * GOTCHA: The value db->db_xn_curr_id may be a bit larger
 
614
 * than the actual transaction created because there is
 
615
 * a gap between the issude of the transaction ID
 
616
 * and the creation of a memory structure.
 
617
 * (indicated here: {GAP-INC-ADD-XACT})
 
618
 *
 
619
 * This function returns the actuall current transaction ID.
 
620
 * This is the number of the last transaction actually
 
621
 * created in memory.
 
622
 *
 
623
 * This means that if you call xt_xn_get_xact() with any
 
624
 * number less than or equal to this value, not finding
 
625
 * the transaction means it has already ended!
 
626
 */
 
627
xtPublic xtXactID xt_xn_get_curr_id(XTDatabaseHPtr db)
 
628
{
 
629
        int                                             i;
 
630
        xtXactID                                curr_xn_id;
 
631
        register XTXactSegPtr   seg = db->db_xn_idx;
 
632
 
 
633
        /* Find the highest transaction ID actually created... */
 
634
        curr_xn_id = seg->xs_last_xn_id;
 
635
        seg++;
 
636
        for (i=1; i<XT_XN_NO_OF_SEGMENTS; i++, seg++) {
 
637
                if (xt_xn_is_before(curr_xn_id, seg->xs_last_xn_id))
 
638
                        curr_xn_id = seg->xs_last_xn_id;
 
639
        }
 
640
        return curr_xn_id;
 
641
}
 
642
 
 
643
xtPublic XTXactDataPtr xt_xn_add_old_xact(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr thread)
 
644
{
 
645
        register XTXactDataPtr  xact;
 
646
        register XTXactSegPtr   seg;
 
647
        register XTXactDataPtr  *hash;
 
648
 
 
649
        (void) thread;
 
650
        seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
 
651
        XT_XACT_WRITE_LOCK(&seg->xs_tab_lock, thread);
 
652
        hash = &seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
 
653
        xact = *hash;
 
654
        while (xact) {
 
655
                if (xact->xd_start_xn_id == xn_id)
 
656
                        goto done_ok;
 
657
                xact = xact->xd_next_xact;
 
658
        }
 
659
 
 
660
        if ((xact = seg->xs_free_list))
 
661
                seg->xs_free_list = xact->xd_next_xact;
 
662
        else {
 
663
                /* We have used up all the free transaction slots,
 
664
                 * the sweeper should work faster to free them
 
665
                 * up...
 
666
                 */
 
667
                db->db_sw_faster |= XT_SW_NO_MORE_XACT_SLOTS;
 
668
                if (!(xact = (XTXactDataPtr) xt_malloc_ns(sizeof(XTXactDataRec)))) {
 
669
                        XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, TRUE);
 
670
                        return NULL;
 
671
                }
 
672
        }
 
673
 
 
674
        xact->xd_next_xact = *hash;
 
675
        *hash = xact;
 
676
 
 
677
        xact->xd_start_xn_id = xn_id;
 
678
        xact->xd_end_xn_id = 0;
 
679
        xact->xd_end_time = 0;
 
680
        xact->xd_begin_log = 0;
 
681
        xact->xd_flags = 0;
 
682
 
 
683
        /* Get the largest transaction id. */
 
684
        if (xt_xn_is_before(seg->xs_last_xn_id, xn_id))
 
685
                seg->xs_last_xn_id = xn_id;
 
686
 
 
687
        done_ok:
 
688
        XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, TRUE);
 
689
#ifdef HIGH_X
 
690
        tot_alloced++;
 
691
        if (tot_alloced > high_alloced)
 
692
                high_alloced = tot_alloced;
 
693
#endif
 
694
        return xact;
 
695
}
 
696
 
 
697
static XTXactDataPtr xn_add_new_xact(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr thread)
 
698
{
 
699
        register XTXactDataPtr  xact;
 
700
        register XTXactSegPtr   seg;
 
701
        register XTXactDataPtr  *hash;
 
702
 
 
703
        (void) thread;
 
704
        seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
 
705
        XT_XACT_WRITE_LOCK(&seg->xs_tab_lock, thread);
 
706
        hash = &seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
 
707
 
 
708
        if ((xact = seg->xs_free_list))
 
709
                seg->xs_free_list = xact->xd_next_xact;
 
710
        else {
 
711
                /* We have used up all the free transaction slots,
 
712
                 * the sweeper should work faster to free them
 
713
                 * up...
 
714
                 */
 
715
                db->db_sw_faster |= XT_SW_NO_MORE_XACT_SLOTS;
 
716
                if (!(xact = (XTXactDataPtr) xt_malloc_ns(sizeof(XTXactDataRec)))) {
 
717
                        XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, TRUE);
 
718
                        return NULL;
 
719
                }
 
720
        }
 
721
 
 
722
        xact->xd_next_xact = *hash;
 
723
        *hash = xact;
 
724
 
 
725
        xact->xd_thread_id = thread->t_id;
 
726
        xact->xd_start_xn_id = xn_id;
 
727
        xact->xd_end_xn_id = 0;
 
728
        xact->xd_end_time = 0;
 
729
        xact->xd_begin_log = 0;
 
730
        xact->xd_flags = 0;
 
731
 
 
732
        seg->xs_last_xn_id = xn_id;
 
733
        XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, TRUE);
 
734
#ifdef HIGH_X
 
735
        tot_alloced++;
 
736
        if (tot_alloced > high_alloced)
 
737
                high_alloced = tot_alloced;
 
738
#endif
 
739
        return xact;
 
740
}
 
741
 
 
742
static xtBool xn_get_xact_details(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr XT_UNUSED(thread), int *flags, xtXactID *start, xtWord4 *end, xtThreadID *thd_id)
 
743
{
 
744
        register XTXactSegPtr   seg;
 
745
        register XTXactDataPtr  xact;
 
746
        xtBool                                  found = FALSE;
 
747
 
 
748
        seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
 
749
        XT_XACT_READ_LOCK(&seg->xs_tab_lock, thread);
 
750
        xact = seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
 
751
        while (xact) {
 
752
                if (xact->xd_start_xn_id == xn_id) {
 
753
                        found = TRUE;
 
754
                        if (flags)
 
755
                                *flags = xact->xd_flags;
 
756
                        if (start)
 
757
                                *start = xact->xd_start_xn_id;
 
758
                        if (end)
 
759
                                *end = xact->xd_end_time;
 
760
                        if (thd_id)
 
761
                                *thd_id = xact->xd_thread_id;
 
762
                        break;
 
763
                }
 
764
                xact = xact->xd_next_xact;
 
765
        }
 
766
        XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, FALSE);
 
767
        return found;
 
768
}
 
769
 
 
770
static xtBool xn_get_xact_pointer(XTDatabaseHPtr db, xtXactID xn_id, XTXactDataPtr *xact_ptr)
 
771
{
 
772
        register XTXactSegPtr   seg;
 
773
        register XTXactDataPtr  xact;
 
774
        xtBool                                  found = FALSE;
 
775
 
 
776
        *xact_ptr = NULL;
 
777
        seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
 
778
        XT_XACT_READ_LOCK(&seg->xs_tab_lock, thread);
 
779
        xact = seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
 
780
        while (xact) {
 
781
                if (xact->xd_start_xn_id == xn_id) {
 
782
                        found = TRUE;
 
783
                        /* We only return pointers to transaction structures that are permanently
 
784
                         * allocated!
 
785
                         */
 
786
                        if ((xtWord1 *) xact >= db->db_xn_data && (xtWord1 *) xact < db->db_xn_data_end)
 
787
                                *xact_ptr = xact;
 
788
                        break;
 
789
                }
 
790
                xact = xact->xd_next_xact;
 
791
        }
 
792
        XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, FALSE);
 
793
        return found;
 
794
}
 
795
 
 
796
/*
 
797
 * Note, this function only returns TRUE if the transaction
 
798
 * still needs to be cleaned.
 
799
 */
 
800
static xtBool xn_get_xact_start(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr XT_UNUSED(thread), xtLogID *log_id, xtLogOffset *log_offset)
 
801
{
 
802
        register XTXactSegPtr   seg;
 
803
        register XTXactDataPtr  xact;
 
804
        xtBool                                  found = FALSE;
 
805
 
 
806
        seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
 
807
        XT_XACT_READ_LOCK(&seg->xs_tab_lock, thread);
 
808
        xact = seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
 
809
        while (xact) {
 
810
                if (xact->xd_start_xn_id == xn_id) {
 
811
                        /* Consider only transactions that have not be cleaned! */
 
812
                        if (!(xact->xd_flags & XT_XN_XAC_CLEANED))
 
813
                                found = TRUE;
 
814
                        if (log_id) {
 
815
                                *log_id = xact->xd_begin_log;
 
816
                                *log_offset = xact->xd_begin_offset;
 
817
                        }
 
818
                        break;
 
819
                }
 
820
                xact = xact->xd_next_xact;
 
821
        }
 
822
        XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, FALSE);
 
823
        return found;
 
824
}
 
825
 
 
826
/* NOTE: this function may only be used by the sweeper or the recovery process. */
 
827
xtPublic XTXactDataPtr xt_xn_get_xact(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr XT_UNUSED(thread))
 
828
{
 
829
        register XTXactSegPtr   seg;
 
830
        register XTXactDataPtr  xact;
 
831
 
 
832
        seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
 
833
        XT_XACT_READ_LOCK(&seg->xs_tab_lock, thread);
 
834
        xact = seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
 
835
        while (xact) {
 
836
                if (xact->xd_start_xn_id == xn_id)
 
837
                        break;
 
838
                xact = xact->xd_next_xact;
 
839
        }
 
840
        XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, FALSE);
 
841
        return xact;
 
842
}
 
843
 
 
844
/*
 
845
 * Delete a transaction, return TRUE if the transaction
 
846
 * was found.
 
847
 */
 
848
xtPublic xtBool xt_xn_delete_xact(XTDatabaseHPtr db, xtXactID xn_id, XTThreadPtr thread)
 
849
{
 
850
        XTXactDataPtr   xact, pxact = NULL;
 
851
        XTXactSegPtr    seg;
 
852
 
 
853
        (void) thread;
 
854
        seg = &db->db_xn_idx[xn_id & XT_XN_SEGMENT_MASK];
 
855
        XT_XACT_WRITE_LOCK(&seg->xs_tab_lock, thread);
 
856
        xact = seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE];
 
857
        while (xact) {
 
858
                if (xact->xd_start_xn_id == xn_id) {
 
859
                        if (pxact)
 
860
                                pxact->xd_next_xact = xact->xd_next_xact;
 
861
                        else
 
862
                                 seg->xs_table[(xn_id >> XT_XN_SEGMENT_SHIFTS) % XT_XN_HASH_TABLE_SIZE] = xact->xd_next_xact;
 
863
                        xn_free_xact(db, seg, xact);
 
864
                        XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, TRUE);
 
865
                        return TRUE;
 
866
                }
 
867
                pxact = xact;
 
868
                xact = xact->xd_next_xact;
 
869
        }
 
870
        XT_XACT_UNLOCK(&seg->xs_tab_lock, thread, TRUE);
 
871
        return FALSE;
 
872
}
 
873
 
 
874
//#define DEBUG_RAM_LIST
 
875
#ifdef DEBUG_RAM_LIST
 
876
 
 
877
#define DEBUG_RAM_LIST_SIZE                     80
 
878
 
 
879
int                                     check_ram_init_count = 0;
 
880
xt_rwlock_type          check_ram_lock;
 
881
xtXactID                        check_ram_trns[DEBUG_RAM_LIST_SIZE];
 
882
int                                     check_ram_dummy;
 
883
 
 
884
static void check_ram_init(void)
 
885
{
 
886
        if (check_ram_init_count == 0)
 
887
                xt_init_rwlock(NULL, &check_ram_lock);
 
888
        check_ram_init_count++;
 
889
}
 
890
 
 
891
static void check_ram_free(void)
 
892
{
 
893
        check_ram_init_count--;
 
894
        if (check_ram_init_count == 0)
 
895
                xt_free_rwlock(&check_ram_lock);
 
896
}
 
897
 
 
898
static void check_ram_min_id(XTDatabaseHPtr db)
 
899
{
 
900
        int i;
 
901
 
 
902
        xt_slock_rwlock_ns(&check_ram_lock);
 
903
        for (i=0; i<DEBUG_RAM_LIST_SIZE; i++) {
 
904
                if (check_ram_trns[i] && xt_xn_is_before(check_ram_trns[i], db->db_xn_min_ram_id)) {
 
905
                        /* This should never happen! */
 
906
                        XTXactDataPtr x_ptr;
 
907
 
 
908
                        check_ram_dummy = 0;
 
909
                        for (i=0; i<DEBUG_RAM_LIST_SIZE; i++) {
 
910
                                if (check_ram_trns[i]) {
 
911
                                        x_ptr = xt_xn_get_xact(db, check_ram_trns[i]);
 
912
                                        check_ram_dummy = 1;
 
913
                                }
 
914
                        }
 
915
                        break;
 
916
                }
 
917
        }
 
918
        xt_unlock_rwlock_ns(&check_ram_lock);
 
919
}
 
920
 
 
921
static void check_ram_add(xtXactID xn_id)
 
922
{
 
923
        int i;
 
924
        
 
925
        xt_xlock_rwlock_ns(&check_ram_lock);
 
926
        for (i=0; i<DEBUG_RAM_LIST_SIZE; i++) {
 
927
                if (!check_ram_trns[i]) {
 
928
                        check_ram_trns[i] = xn_id;
 
929
                        xt_unlock_rwlock_ns(&check_ram_lock);
 
930
                        return;
 
931
                }
 
932
        }
 
933
        xt_unlock_rwlock_ns(&check_ram_lock);
 
934
        printf("DEBUG --- List too small\n");
 
935
}
 
936
 
 
937
static void check_ram_del(xtXactID xn_id)
 
938
{
 
939
        int i;
 
940
        
 
941
        xt_xlock_rwlock_ns(&check_ram_lock);
 
942
        for (i=0; i<DEBUG_RAM_LIST_SIZE; i++) {
 
943
                if (check_ram_trns[i] == xn_id) {
 
944
                        check_ram_trns[i] = 0;
 
945
                        xt_unlock_rwlock_ns(&check_ram_lock);
 
946
                        return;
 
947
                }
 
948
        }
 
949
        xt_unlock_rwlock_ns(&check_ram_lock);
 
950
}
 
951
#endif
 
952
 
 
953
/* ----------------------------------------------------------------------
 
954
 * Init and Exit
 
955
 */
 
956
 
 
957
xtPublic void xt_xn_init_db(XTThreadPtr self, XTDatabaseHPtr db)
 
958
{
 
959
        XTXactDataPtr   xact;
 
960
        XTXactSegPtr    seg;
 
961
 
 
962
#ifdef DEBUG_RAM_LIST
 
963
        check_ram_init();
 
964
#endif
 
965
        xt_spinlock_init_with_autoname(self, &db->db_xn_id_lock);
 
966
        xt_spinlock_init_with_autoname(self, &db->db_xn_wait_spinlock);
 
967
        xt_init_mutex_with_autoname(self, &db->db_xn_xa_lock);
 
968
        //xt_init_mutex_with_autoname(self, &db->db_xn_wait_lock);
 
969
        //xt_init_cond(self, &db->db_xn_wait_cond);
 
970
        xt_init_mutex_with_autoname(self, &db->db_sw_lock);
 
971
        xt_init_cond(self, &db->db_sw_cond);
 
972
        xt_init_mutex_with_autoname(self, &db->db_wr_lock);
 
973
        xt_init_cond(self, &db->db_wr_cond);
 
974
 
 
975
        /* Pre-allocate transaction data structures: */
 
976
        db->db_xn_data = (xtWord1 *) xt_malloc(self, sizeof(XTXactDataRec) * XT_XN_DATA_ALLOC_COUNT * XT_XN_NO_OF_SEGMENTS);
 
977
        db->db_xn_data_end = db->db_xn_data + sizeof(XTXactDataRec) * XT_XN_DATA_ALLOC_COUNT * XT_XN_NO_OF_SEGMENTS;
 
978
        xact = (XTXactDataPtr) db->db_xn_data;
 
979
        for (u_int i=0; i<XT_XN_NO_OF_SEGMENTS; i++) {
 
980
                seg = &db->db_xn_idx[i];
 
981
                XT_XACT_INIT_LOCK(self, &seg->xs_tab_lock);
 
982
                for (u_int j=0;  j<XT_XN_DATA_ALLOC_COUNT; j++) {
 
983
                        xact->xd_next_xact = seg->xs_free_list;
 
984
                        seg->xs_free_list = xact;
 
985
                        xact++;
 
986
                }
 
987
        }
 
988
 
 
989
        /* Create a sorted list for XA transactions recovered: */
 
990
        db->db_xn_xa_list = xt_new_sortedlist(self, sizeof(XTXactXARec), 100, 50, xt_xn_xa_compare, db, NULL, FALSE, FALSE);
 
991
 
 
992
        /* Initialize the data logs: */
 
993
        db->db_datalogs.dlc_init(self, db); 
 
994
 
 
995
        /* Setup the transaction log: */
 
996
        db->db_xlog.xlog_setup(self, db, (off_t) xt_db_log_file_threshold, xt_db_transaction_buffer_size, xt_db_log_file_count);
 
997
 
 
998
        db->db_xn_end_time = 1;
 
999
 
 
1000
        /* Initializing the restart file, also does
 
1001
         * recovery. This returns the log position after recovery.
 
1002
         *
 
1003
         * This is the log position where the writer thread will
 
1004
         * begin. The writer thread writes changes to the database that
 
1005
         * have been flushed to the log.
 
1006
         */
 
1007
        xt_xres_init(self, db);
 
1008
 
 
1009
        /* Initialize the "last transaction in memory", by default
 
1010
         * this is the current transaction ID, which is the ID
 
1011
         * of the last transaction.
 
1012
         */
 
1013
        for (u_int i=0; i<XT_XN_NO_OF_SEGMENTS; i++) {
 
1014
                seg = &db->db_xn_idx[i];
 
1015
                XT_XACT_INIT_LOCK(self, &seg->xs_tab_lock);
 
1016
                seg->xs_last_xn_id = db->db_xn_curr_id;
 
1017
        }
 
1018
 
 
1019
        /*
 
1020
         * The next transaction to clean is the lowest transaction
 
1021
         * in memory:
 
1022
         */
 
1023
        db->db_xn_to_clean_id = db->db_xn_min_ram_id;
 
1024
#ifdef XT_SWEEPER_SORT_XACTS
 
1025
        db->db_sw_to_add = db->db_xn_min_ram_id;
 
1026
#endif
 
1027
 
 
1028
        /*
 
1029
         * No transactions are running, so the minimum transaction
 
1030
         * ID is the next one to run:
 
1031
         */
 
1032
        db->db_xn_min_run_id = db->db_xn_curr_id + 1;
 
1033
 
 
1034
        db->db_xn_wait_for = xt_new_sortedlist(self, sizeof(XNWaitForRec), 100, 50, xn_compare_wait_for, db, xn_free_wait_for, FALSE, FALSE);
 
1035
}
 
1036
 
 
1037
xtPublic void xt_xn_exit_db(XTThreadPtr self, XTDatabaseHPtr db)
 
1038
{
 
1039
#ifdef HIGH_X
 
1040
        printf("=========> MOST TXs CURR ALLOC: %lu\n", tot_alloced);
 
1041
        printf("=========> MOST TXs HIGH ALLOC: %lu\n", high_alloced);
 
1042
        printf("=========> MAX TXs NOT CLEAN: %lu\n", not_clean_max);
 
1043
        printf("=========> MAX TXs IN RAM: %lu\n", in_ram_max);
 
1044
#endif
 
1045
        XTXactPreparePtr xap, xap_next;
 
1046
 
 
1047
        xt_stop_sweeper(self, db);      // Should be done already!
 
1048
        xt_stop_writer(self, db);       // Should be done already!
 
1049
 
 
1050
        xt_xres_exit(self, db);
 
1051
        db->db_xlog.xlog_exit(self);
 
1052
 
 
1053
        db->db_datalogs.dlc_exit(self); 
 
1054
 
 
1055
        for (u_int i=0; i<XT_XN_NO_OF_SEGMENTS; i++) {
 
1056
                XTXactSegPtr    seg;
 
1057
 
 
1058
                seg = &db->db_xn_idx[i];
 
1059
                for (u_int j=0; j<XT_XN_HASH_TABLE_SIZE; j++) {
 
1060
                        XTXactDataPtr   xact, nxact;
 
1061
                        
 
1062
                        xact = seg->xs_table[j];
 
1063
                        while (xact) {
 
1064
                                nxact = xact->xd_next_xact;
 
1065
                                xn_free_xact(db, seg, xact);
 
1066
                                xact = nxact;
 
1067
                        }
 
1068
                }
 
1069
                XT_XACT_FREE_LOCK(self, &seg->xs_tab_lock);
 
1070
        }
 
1071
        if (db->db_xn_wait_for) {
 
1072
                xt_free_sortedlist(self, db->db_xn_wait_for);
 
1073
                db->db_xn_wait_for = NULL;
 
1074
        }
 
1075
        if (db->db_xn_data) {
 
1076
                xt_free(self, db->db_xn_data);
 
1077
                db->db_xn_data = NULL;
 
1078
                db->db_xn_data_end = NULL;
 
1079
        }
 
1080
 
 
1081
        xt_free_cond(&db->db_wr_cond);
 
1082
        xt_free_mutex(&db->db_wr_lock);
 
1083
        xt_free_cond(&db->db_sw_cond);
 
1084
        xt_free_mutex(&db->db_sw_lock);
 
1085
        //xt_free_cond(&db->db_xn_wait_cond);
 
1086
        //xt_free_mutex(&db->db_xn_wait_lock);
 
1087
        xt_free_mutex(&db->db_xn_xa_lock);
 
1088
        for (u_int i=0; i<XT_XA_HASH_TAB_SIZE; i++) {
 
1089
                xap = db->db_xn_xa_table[i];
 
1090
                while (xap) {
 
1091
                        xap_next = xap->xp_next;
 
1092
                        xt_free(self, xap);
 
1093
                        xap = xap_next;
 
1094
                }
 
1095
        }
 
1096
        if (db->db_xn_xa_list) {
 
1097
                xt_free_sortedlist(self, db->db_xn_xa_list);
 
1098
                db->db_xn_xa_list = NULL;
 
1099
        }
 
1100
        xt_spinlock_free(self, &db->db_xn_wait_spinlock);
 
1101
        xt_spinlock_free(self, &db->db_xn_id_lock);
 
1102
#ifdef DEBUG_RAM_LIST
 
1103
        check_ram_free();
 
1104
#endif
 
1105
}
 
1106
 
 
1107
xtPublic void xt_xn_init_thread(XTThreadPtr self, int what_for)
 
1108
{
 
1109
        ASSERT(self->st_database);
 
1110
 
 
1111
        if (!xt_init_row_lock_list(&self->st_lock_list))
 
1112
                xt_throw(self);
 
1113
        switch (what_for) {
 
1114
                case XT_FOR_COMPACTOR:
 
1115
                        self->st_dlog_buf.dlb_init(self->st_database, xt_db_log_buffer_size);
 
1116
                        break;
 
1117
                case XT_FOR_WRITER:
 
1118
                        /* The writer does not need a transaction buffer. */
 
1119
                        self->st_dlog_buf.dlb_init(self->st_database, 0);
 
1120
                        break;
 
1121
                case XT_FOR_SWEEPER:
 
1122
                case XT_FOR_POOL:
 
1123
                        self->st_dlog_buf.dlb_init(self->st_database, 0);
 
1124
                        break;
 
1125
                case XT_FOR_USER:
 
1126
                        self->st_dlog_buf.dlb_init(self->st_database, xt_db_log_buffer_size);
 
1127
                        break;
 
1128
        }
 
1129
}
 
1130
 
 
1131
xtPublic void xt_xn_exit_thread(XTThreadPtr self)
 
1132
{
 
1133
        if (self->st_xact_data)
 
1134
                xt_xn_rollback(self);
 
1135
        self->st_dlog_buf.dlb_exit(self);
 
1136
        xt_exit_row_lock_list(&self->st_lock_list);
 
1137
}
 
1138
 
 
1139
/* ----------------------------------------------------------------------
 
1140
 * Begin and End Transactions
 
1141
 */
 
1142
 
 
1143
xtPublic xtBool xt_xn_begin(XTThreadPtr self)
 
1144
{
 
1145
        XTDatabaseHPtr  db = self->st_database;
 
1146
        xtXactID                xn_id;
 
1147
 
 
1148
        ASSERT(!self->st_xact_data);
 
1149
 
 
1150
        xt_spinlock_lock(&db->db_xn_id_lock);
 
1151
        xn_id = ++db->db_xn_curr_id;
 
1152
        xt_spinlock_unlock(&db->db_xn_id_lock);
 
1153
 
 
1154
#ifdef HIGH_X
 
1155
        if (xt_xn_is_before(not_clean_max, xn_id - db->db_xn_to_clean_id))
 
1156
                not_clean_max = xn_id - db->db_xn_to_clean_id;
 
1157
        if (xt_xn_is_before(in_ram_max, xn_id - db->db_xn_min_ram_id))
 
1158
                in_ram_max = xn_id - db->db_xn_min_ram_id;
 
1159
#endif
 
1160
        /* {GAP-INC-ADD-XACT} This is the gap between incrementing the ID,
 
1161
         * and creating the transaction in memory.
 
1162
         * See xt_xn_get_curr_id().
 
1163
         */
 
1164
 
 
1165
        if (!(self->st_xact_data = xn_add_new_xact(db, xn_id, self)))
 
1166
                return FAILED;
 
1167
        self->st_xact_writer = FALSE;
 
1168
        
 
1169
        /* All transactions that committed before or at this time
 
1170
         * are this one are visible: */
 
1171
        self->st_visible_time = db->db_xn_end_time;
 
1172
 
 
1173
#ifdef TRACE_TRANSACTION
 
1174
        xt_ttracef(self, "BEGIN T%lu\n", (u_long) self->st_xact_data->xd_start_xn_id);
 
1175
#endif
 
1176
#ifdef XT_TRACK_CONNECTIONS
 
1177
        xt_track_conn_info[self->t_id].ci_curr_xact_id = self->st_xact_data->xd_start_xn_id;
 
1178
        xt_track_conn_info[self->t_id].ci_xact_start = xt_trace_clock();
 
1179
#endif
 
1180
        return OK;
 
1181
}
 
1182
 
 
1183
static xtBool xn_end_xact(XTThreadPtr thread, u_int status)
 
1184
{
 
1185
        XTXactDataPtr   xact;
 
1186
        xtBool                  ok = TRUE;
 
1187
 
 
1188
        ASSERT_NS(thread->st_xact_data);
 
1189
        if ((xact = thread->st_xact_data)) {
 
1190
                XTDatabaseHPtr  db = thread->st_database;
 
1191
                xtXactID                xn_id = xact->xd_start_xn_id;
 
1192
                xtBool                  writer;
 
1193
                
 
1194
                if ((writer = thread->st_xact_writer)) {
 
1195
                        /* The transaction wrote something: */
 
1196
                        XTXactEndEntryDRec      entry;
 
1197
                        xtWord4                         sum;
 
1198
 
 
1199
                        sum = XT_CHECKSUM4_XACT(xn_id) ^ XT_CHECKSUM4_XACT(0);
 
1200
                        entry.xe_status_1 = status;
 
1201
                        entry.xe_checksum_1 = XT_CHECKSUM_1(sum);
 
1202
                        XT_SET_DISK_4(entry.xe_xact_id_4, xn_id);
 
1203
                        XT_SET_DISK_4(entry.xe_not_used_4, 0);
 
1204
 
 
1205
#ifdef XT_IMPLEMENT_NO_ACTION
 
1206
                        /* This will check any resticts that have been delayed to the end of the statement. */
 
1207
                        if (thread->st_restrict_list.bl_count) {
 
1208
                                if (!xt_tab_restrict_rows(&thread->st_restrict_list, thread)) {
 
1209
                                        ok = FALSE;
 
1210
                                        status = XT_LOG_ENT_ABORT;
 
1211
                                }
 
1212
                        }
 
1213
#endif
 
1214
 
 
1215
                        /* Flush the data log: */
 
1216
                        if (!thread->st_dlog_buf.dlb_flush_log(TRUE, thread)) {
 
1217
                                ok = FALSE;
 
1218
                                status = XT_LOG_ENT_ABORT;
 
1219
                        }
 
1220
 
 
1221
                        /* Write and flush the transaction log:
 
1222
                         * We only flush if this was not a temp table.
 
1223
                         */
 
1224
                        if (!xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), (XTXactLogBufferDPtr) &entry, thread->st_non_temp_opened ? XT_XLOG_NO_WRITE_NO_FLUSH : xt_db_flush_log_at_trx_commit)) {
 
1225
                                ok = FALSE;
 
1226
                                status = XT_LOG_ENT_ABORT;
 
1227
                                /* Make sure this is done, if we failed to log
 
1228
                                 * the transction end!
 
1229
                                 */
 
1230
                                if (thread->st_xact_writer) {
 
1231
                                        /* Adjust this in case of error, but don't forget
 
1232
                                         * to lock!
 
1233
                                         */
 
1234
                                        xt_spinlock_lock(&db->db_xlog.xl_buffer_lock);
 
1235
                                        db->db_xn_writer_count--;
 
1236
                                        thread->st_xact_writer = FALSE;
 
1237
                                        if (thread->st_xact_long_running) {
 
1238
                                                db->db_xn_long_running_count--;
 
1239
                                                thread->st_xact_long_running = FALSE;
 
1240
                                        }
 
1241
                                        xt_spinlock_unlock(&db->db_xlog.xl_buffer_lock);
 
1242
                                }
 
1243
                        }
 
1244
 
 
1245
                        /* Setting this flag completes the transaction,
 
1246
                         * Do this before we release the locks, because
 
1247
                         * the unlocked transactions expect the
 
1248
                         * transaction they are waiting for to be
 
1249
                         * gone!
 
1250
                         */
 
1251
                        xact->xd_end_time = ++db->db_xn_end_time;
 
1252
                        if (status == XT_LOG_ENT_COMMIT) {
 
1253
                                thread->st_statistics.st_commits++;
 
1254
                                xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
 
1255
                        }
 
1256
                        else {
 
1257
                                thread->st_statistics.st_rollbacks++;
 
1258
                                xact->xd_flags |= XT_XN_XAC_ENDED;
 
1259
                        }
 
1260
 
 
1261
                        /* {REMOVE-LOCKS} Drop locks is you have any: */
 
1262
                        thread->st_lock_list.xt_remove_all_locks(db, thread);
 
1263
 
 
1264
                        /* Do this afterwards to make sure the sweeper
 
1265
                         * does not cleanup transactions start cleaning up
 
1266
                         * before any transactions that were waiting for
 
1267
                         * this transaction have completed!
 
1268
                         */
 
1269
                        xact->xd_end_xn_id = db->db_xn_curr_id;
 
1270
 
 
1271
                        /* Now you can sweep! */
 
1272
                        ASSERT_NS(xact->xd_flags & XT_XN_XAC_LOGGED);
 
1273
                        xact->xd_flags |= XT_XN_XAC_SWEEP;
 
1274
                }
 
1275
                else {
 
1276
                        /* Read-only transaction can be removed, immediately */
 
1277
                        xact->xd_end_time = ++db->db_xn_end_time;
 
1278
                        xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
 
1279
 
 
1280
                        /* Drop locks is you have any: */
 
1281
                        thread->st_lock_list.xt_remove_all_locks(db, thread);
 
1282
 
 
1283
                        xact->xd_end_xn_id = db->db_xn_curr_id;
 
1284
 
 
1285
                        ASSERT_NS(!(xact->xd_flags & XT_XN_XAC_LOGGED));
 
1286
                        xact->xd_flags |= XT_XN_XAC_SWEEP;
 
1287
 
 
1288
                        if (xt_xn_delete_xact(db, xn_id, thread)) {
 
1289
                                if (db->db_xn_min_ram_id == xn_id)
 
1290
                                        db->db_xn_min_ram_id = xn_id+1;
 
1291
                        }
 
1292
                }
 
1293
 
 
1294
#ifdef TRACE_TRANSACTION
 
1295
                if (status == XT_LOG_ENT_COMMIT)
 
1296
                        xt_ttracef(thread, "COMMIT T%lu\n", (u_long) xn_id);
 
1297
                else
 
1298
                        xt_ttracef(thread, "ABORT T%lu\n", (u_long) xn_id);
 
1299
#endif
 
1300
 
 
1301
                if (db->db_xn_min_run_id == xn_id)
 
1302
                        db->db_xn_min_run_id = xn_id+1;
 
1303
 
 
1304
                thread->st_xact_data = NULL;
 
1305
 
 
1306
#ifdef XT_TRACK_CONNECTIONS
 
1307
                xt_track_conn_info[thread->t_id].ci_prev_xact_id = xt_track_conn_info[thread->t_id].ci_curr_xact_id;
 
1308
                xt_track_conn_info[thread->t_id].ci_prev_xact_time = xt_trace_clock() - xt_track_conn_info[thread->t_id].ci_xact_start;
 
1309
                xt_track_conn_info[thread->t_id].ci_curr_xact_id = 0;
 
1310
                xt_track_conn_info[thread->t_id].ci_xact_start = 0;
 
1311
#endif
 
1312
 
 
1313
                xt_wakeup_waiting_threads(thread);
 
1314
 
 
1315
                /* {WAKE-SW} Waking the sweeper
 
1316
                 * is no longer unconditional.
 
1317
                 * (see all comments to {WAKE-SW})
 
1318
                 *
 
1319
                 * We now wake the sweeper if it is
 
1320
                 * supposed to work faster.
 
1321
                 *
 
1322
                 * There are now 2 cases:
 
1323
                 * - We run out of transaction slots.
 
1324
                 * - We encounter old index entries.
 
1325
                 *
 
1326
                 * The following test:
 
1327
                 * runTest(INCREMENT_TEST, 16, INCREMENT_TEST_UPDATE_COUNT);
 
1328
                 * has extreme problems with sweeping every 1/10s
 
1329
                 * because a huge number of index entries accumulate
 
1330
                 * that need to be cleaned.
 
1331
                 *
 
1332
                 * New code detects this case.
 
1333
                 */
 
1334
                if (db->db_sw_faster)
 
1335
                        xt_wakeup_sweeper(db);
 
1336
 
 
1337
                /* Don't get too far ahead of the sweeper! */
 
1338
                if (writer) {
 
1339
#ifdef XT_WAIT_FOR_CLEANUP
 
1340
                        xtXactID        wait_xn_id;
 
1341
                        
 
1342
                        /* This is the transaction that was committed 3 transactions ago: */
 
1343
                        wait_xn_id = thread->st_prev_xact[thread->st_last_xact];
 
1344
                        thread->st_prev_xact[thread->st_last_xact] = xn_id;
 
1345
                        /* This works because XT_MAX_XACT_BEHIND == 2! */
 
1346
                        ASSERT_NS((thread->st_last_xact + 1) % XT_MAX_XACT_BEHIND == (thread->st_last_xact ^ 1));
 
1347
                        thread->st_last_xact ^= 1;
 
1348
                        while (xt_xn_is_before(db->db_xn_to_clean_id, wait_xn_id) && (db->db_sw_faster & XT_SW_TOO_FAR_BEHIND)) {
 
1349
#ifdef XT_SWEEPER_SORT_XACTS
 
1350
                                if (!xn_get_xact_start(db, wait_xn_id, thread, NULL, NULL))
 
1351
                                        break;
 
1352
#endif
 
1353
                                xt_critical_wait();
 
1354
                        }
 
1355
#else
 
1356
                        if ((db->db_sw_faster & XT_SW_TOO_FAR_BEHIND) != 0) {
 
1357
                                xtWord8 then = xt_trace_clock() + (xtWord8) 20000000;
 
1358
 
 
1359
                                for (;;) {
 
1360
                                        xt_critical_wait();
 
1361
                                        if (db->db_sw_faster & XT_SW_TOO_FAR_BEHIND)
 
1362
                                                break;
 
1363
                                        if (xt_trace_clock() >= then)
 
1364
                                                break;
 
1365
                                }
 
1366
                        }
 
1367
#endif
 
1368
                }
 
1369
        }
 
1370
        return ok;
 
1371
}
 
1372
 
 
1373
xtPublic xtBool xt_xn_commit(XTThreadPtr thread)
 
1374
{
 
1375
        return xn_end_xact(thread, XT_LOG_ENT_COMMIT);
 
1376
}
 
1377
 
 
1378
xtPublic xtBool xt_xn_rollback(XTThreadPtr thread)
 
1379
{
 
1380
        return xn_end_xact(thread, XT_LOG_ENT_ABORT);
 
1381
}
 
1382
 
 
1383
xtPublic xtBool xt_xn_log_tab_id(XTThreadPtr self, xtTableID tab_id)
 
1384
{
 
1385
        XTXactNewTabEntryDRec   entry;
 
1386
 
 
1387
        entry.xt_status_1 = XT_LOG_ENT_NEW_TAB;
 
1388
        entry.xt_checksum_1 = XT_CHECKSUM_1(tab_id);
 
1389
        XT_SET_DISK_4(entry.xt_tab_id_4, tab_id);
 
1390
        return xt_xlog_log_data(self, sizeof(XTXactNewTabEntryDRec), (XTXactLogBufferDPtr) &entry, XT_XLOG_WRITE_AND_FLUSH);
 
1391
}
 
1392
 
 
1393
xtPublic int xt_xn_status(XTOpenTablePtr ot, xtXactID xn_id, xtRecordID XT_UNUSED(rec_id))
 
1394
{
 
1395
        register XTThreadPtr    self = ot->ot_thread;
 
1396
        int                                             flags;
 
1397
        xtWord4                                 end;
 
1398
 
 
1399
#ifdef DRIZZLED
 
1400
        /* Conditional waste of time!
 
1401
         * Drizzle has strict warnings.
 
1402
         * I know this is not necessary!
 
1403
         */
 
1404
        flags = 0;
 
1405
        end = 0;
 
1406
#endif
 
1407
        if (xn_id == self->st_xact_data->xd_start_xn_id)
 
1408
                return XT_XN_MY_UPDATE;
 
1409
        if (xt_xn_is_before(xn_id, self->st_database->db_xn_min_ram_id) ||
 
1410
                !xn_get_xact_details(self->st_database, xn_id, ot->ot_thread, &flags, NULL, &end, NULL)) {
 
1411
                /* Not in RAM, rollback done: */
 
1412
//*DBG*/xt_dump_xlogs(self->st_database, 0);
 
1413
//*DBG*/xt_check_table(self, ot);
 
1414
//*DBG*/xt_dump_trace();
 
1415
                /* {XACT-NOT-IN-RAM}
 
1416
                 * This should never happen (CHANGED see below)!
 
1417
                 *
 
1418
                 * Because if the transaction is no longer in RAM, then it has been
 
1419
                 * cleaned up. So the record should be marked as clean, or not
 
1420
                 * exist.
 
1421
                 *
 
1422
                 * After sweeping, we wait for all transactions to quit that were
 
1423
                 * running at the time of cleanup before removing the transaction record.
 
1424
                 * (see {XACT-NOT-IN-RAM})
 
1425
                 *
 
1426
                 * If this was not the case, then we could be here because:
 
1427
                 * - The user transaction (T2) reads record x and notes that the record
 
1428
                 * has not been cleaned (CLEAN bit not set).
 
1429
                 *
 
1430
                 * - The sweeper is busy sweeping the transaction (T1) that created
 
1431
                 * record x.
 
1432
                 * The SW sets the CLEAN bit on record x, and the schedules T1 for
 
1433
                 * deletion.
 
1434
                 *
 
1435
                 * Now T1 should not be deleted before T2 quits. If it does happen
 
1436
                 * then we land up here.
 
1437
                 *
 
1438
                 * THIS CAN NOW HAPPEN!
 
1439
                 *
 
1440
                 * First of all, a MYSTERY:
 
1441
                 * This did happen, dispite the description above! The reason why
 
1442
                 * is left as an exercise to the reader (really, I don't now why!)
 
1443
                 *
 
1444
                 * This has force me to add code to handle the situation. This
 
1445
                 * is done by re-reading the record that is being checked by this
 
1446
                 * function. After re-reading, the record should either be
 
1447
                 * invalid (free) or clean (CLEAN bit set).
 
1448
                 *
 
1449
                 * If this is the case, then we will not run land up here
 
1450
                 * again.
 
1451
                 *
 
1452
                 * Because we are only here because the record was valid but not
 
1453
                 * clean (you can confirm this by looking at the code that
 
1454
                 * calls this function).
 
1455
                 */
 
1456
                return XT_XN_REREAD;
 
1457
        }
 
1458
        if (!(flags & XT_XN_XAC_ENDED))
 
1459
                /* Transaction not ended, may be visible. */
 
1460
                return XT_XN_OTHER_UPDATE;
 
1461
        /* Visible if the transaction was committed: */
 
1462
        if (flags & XT_XN_XAC_COMMITTED) {
 
1463
                if (!xt_xn_is_before(self->st_visible_time, end))  // was self->st_visible_time >= xact->xd_end_time
 
1464
                        return XT_XN_VISIBLE;
 
1465
                return XT_XN_NOT_VISIBLE;
 
1466
        }
 
1467
        return XT_XN_ABORTED;
 
1468
}
 
1469
 
 
1470
/* ----------------------------------------------------------------------
 
1471
 * XA Functionality
 
1472
 */
 
1473
 
 
1474
xtPublic int xt_xn_xa_compare(XTThreadPtr XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
 
1475
{
 
1476
        xtXactID        *x = (xtXactID *) a;
 
1477
        XTXactXAPtr     y = (XTXactXAPtr) b;
 
1478
 
 
1479
        if (*x == y->xx_xact_id)
 
1480
                return 0;
 
1481
        if (xt_xn_is_before(*x, y->xx_xact_id))
 
1482
                return -1;
 
1483
        return 1;
 
1484
}
 
1485
 
 
1486
xtPublic xtBool xt_xn_prepare(int len, xtWord1 *xa_data, XTThreadPtr thread)
 
1487
{
 
1488
        XTXactDataPtr xact;
 
1489
 
 
1490
        ASSERT_NS(thread->st_xact_data);
 
1491
        if ((xact = thread->st_xact_data)) {
 
1492
                xtXactID xn_id = xact->xd_start_xn_id;
 
1493
 
 
1494
                /* Only makes sense if the transaction has already been logged: */
 
1495
                if ((thread->st_xact_data->xd_flags & XT_XN_XAC_LOGGED)) {
 
1496
                        if (!xt_xlog_modify_table(0, XT_LOG_ENT_PREPARE, xn_id, 0, 0, 0, len, xa_data, thread))
 
1497
                                return FAILED;
 
1498
                }
 
1499
        }
 
1500
        return OK;
 
1501
}
 
1502
 
 
1503
xtPublic xtBool xt_xn_store_xa_data(XTDatabaseHPtr db, xtXactID xact_id, int len, xtWord1 *xa_data, XTThreadPtr XT_UNUSED(thread))
 
1504
{
 
1505
        XTXactPreparePtr        xap;
 
1506
        u_int                           idx;
 
1507
        XTXactXARec                     xx;
 
1508
 
 
1509
        if (!(xap = (XTXactPreparePtr) xt_malloc_ns(offsetof(XTXactPrepareRec, xp_xa_data) + len)))
 
1510
                return FAILED;
 
1511
        xap->xp_xact_id = xact_id;
 
1512
        xap->xp_hash = xt_get_checksum4(xa_data, len);
 
1513
        xap->xp_data_len = len;
 
1514
        memcpy(xap->xp_xa_data, xa_data, len);
 
1515
        xx.xx_xact_id = xact_id;
 
1516
        xx.xx_xa_ptr = xap;
 
1517
 
 
1518
        idx = xap->xp_hash % XT_XA_HASH_TAB_SIZE;
 
1519
        xt_lock_mutex_ns(&db->db_xn_xa_lock);
 
1520
        if (!xt_sl_insert(NULL, db->db_xn_xa_list, &xact_id, &xx)) {
 
1521
                xt_unlock_mutex_ns(&db->db_xn_xa_lock);
 
1522
                xt_free_ns(xap);
 
1523
        }
 
1524
        xap->xp_next = db->db_xn_xa_table[idx];
 
1525
        db->db_xn_xa_table[idx] = xap;
 
1526
        xt_unlock_mutex_ns(&db->db_xn_xa_lock);
 
1527
        return OK;
 
1528
}
 
1529
 
 
1530
xtPublic void xt_xn_delete_xa_data_by_xact(XTDatabaseHPtr db, xtXactID xact_id, XTThreadPtr thread)
 
1531
{
 
1532
        XTXactXAPtr xx;
 
1533
 
 
1534
        xt_lock_mutex_ns(&db->db_xn_xa_lock);
 
1535
        if (!(xx = (XTXactXAPtr) xt_sl_find(NULL, db->db_xn_xa_list, &xact_id)))
 
1536
                return;
 
1537
        xt_xn_delete_xa_data(db, xx->xx_xa_ptr, TRUE, thread);
 
1538
}
 
1539
 
 
1540
xtPublic void xt_xn_delete_xa_data(XTDatabaseHPtr db, XTXactPreparePtr xap, xtBool unlock, XTThreadPtr XT_UNUSED(thread))
 
1541
{
 
1542
        u_int                           idx;
 
1543
        XTXactPreparePtr        xap_ptr, xap_pptr = NULL;
 
1544
 
 
1545
        xt_sl_delete(NULL, db->db_xn_xa_list, &xap->xp_xact_id);
 
1546
        idx = xap->xp_hash % XT_XA_HASH_TAB_SIZE;
 
1547
        xap_ptr = db->db_xn_xa_table[idx];
 
1548
        while (xap_ptr) {
 
1549
                if (xap_ptr == xap)
 
1550
                        break;
 
1551
                xap_pptr = xap_ptr;
 
1552
                xap_ptr = xap_ptr->xp_next;
 
1553
        }
 
1554
        if (xap_ptr) {
 
1555
                if (xap_pptr)
 
1556
                        xap_pptr->xp_next = xap_ptr->xp_next;
 
1557
                else
 
1558
                        db->db_xn_xa_table[idx] = xap_ptr->xp_next;
 
1559
                xt_free_ns(xap);
 
1560
        }
 
1561
        if (unlock)
 
1562
                xt_unlock_mutex_ns(&db->db_xn_xa_lock);
 
1563
}
 
1564
 
 
1565
xtPublic XTXactPreparePtr xt_xn_find_xa_data(XTDatabaseHPtr db, int len, xtWord1 *xa_data, xtBool lock, XTThreadPtr XT_UNUSED(thread))
 
1566
{
 
1567
        xtWord4                         hash;
 
1568
        XTXactPreparePtr        xap;
 
1569
        u_int                           idx;
 
1570
 
 
1571
        if (lock)
 
1572
                xt_lock_mutex_ns(&db->db_xn_xa_lock);
 
1573
        hash = xt_get_checksum4(xa_data, len);
 
1574
        idx = hash % XT_XA_HASH_TAB_SIZE;
 
1575
        xap = db->db_xn_xa_table[idx];
 
1576
        while (xap) {
 
1577
                if (xap->xp_hash == hash &&
 
1578
                        xap->xp_data_len == len &&
 
1579
                        memcmp(xap->xp_xa_data, xa_data, len) == 0) {
 
1580
                        break;
 
1581
                }
 
1582
                xap = xap->xp_next;
 
1583
        }
 
1584
        
 
1585
        return xap;
 
1586
}
 
1587
 
 
1588
xtPublic XTXactPreparePtr xt_xn_enum_xa_data(XTDatabaseHPtr db, XTXactEnumXAPtr exa)
 
1589
{
 
1590
        XTXactXAPtr xx;
 
1591
 
 
1592
        if (!exa->exa_locked) {
 
1593
                xt_lock_mutex_ns(&db->db_xn_xa_lock);
 
1594
                exa->exa_locked = TRUE;
 
1595
        }
 
1596
 
 
1597
        if ((xx = (XTXactXAPtr) xt_sl_item_at(db->db_xn_xa_list, exa->exa_index))) {
 
1598
                exa->exa_index++;
 
1599
                return xx->xx_xa_ptr;
 
1600
        }
 
1601
 
 
1602
        if (exa->exa_locked) {
 
1603
                exa->exa_locked = FALSE;
 
1604
                xt_unlock_mutex_ns(&db->db_xn_xa_lock);
 
1605
        }
 
1606
        return NULL;
 
1607
}
 
1608
 
 
1609
/* ----------------------------------------------------------------------
 
1610
 * S W E E P E R    F U N C T I O N S
 
1611
 */
 
1612
 
 
1613
xtPublic xtWord8 xt_xn_bytes_to_sweep(XTDatabaseHPtr db, XTThreadPtr thread)
 
1614
{
 
1615
        xtXactID                                xn_id;
 
1616
        xtXactID                                curr_xn_id;
 
1617
        xtLogID                                 xn_log_id = 0;
 
1618
        xtLogOffset                             xn_log_offset = 0;
 
1619
        xtLogID                                 x_log_id = 0;
 
1620
        xtLogOffset                             x_log_offset = 0;
 
1621
        xtLogID                                 log_id;
 
1622
        xtLogOffset                             log_offset;
 
1623
        xtWord8                                 byte_count = 0;
 
1624
 
 
1625
        xn_id = db->db_xn_to_clean_id;
 
1626
        curr_xn_id = xt_xn_get_curr_id(db);
 
1627
        // Limit the number of transactions checked!
 
1628
        for (int i=0; i<1000; i++) {
 
1629
                if (xt_xn_is_before(curr_xn_id, xn_id))
 
1630
                        break;
 
1631
                if (xn_get_xact_start(db, xn_id, thread, &x_log_id, &x_log_offset)) {
 
1632
                        if (xn_log_id) {
 
1633
                                if (xt_comp_log_pos(x_log_id, x_log_offset, xn_log_id, xn_log_offset) < 0) {
 
1634
                                        xn_log_id = x_log_id;
 
1635
                                        xn_log_offset = x_log_offset;
 
1636
                                }
 
1637
                        }
 
1638
                        else {
 
1639
                                xn_log_id = x_log_id;
 
1640
                                x_log_offset = x_log_offset;
 
1641
                        }
 
1642
                }
 
1643
                xn_id++;
 
1644
        }
 
1645
        if (!xn_log_id)
 
1646
                return 0;
 
1647
 
 
1648
        /* Assume the logs have the threshold: */
 
1649
        log_id = db->db_xlog.xl_write_log_id;
 
1650
        log_offset = db->db_xlog.xl_write_log_offset;
 
1651
        if (xn_log_id < log_id) {
 
1652
                if (xn_log_offset < xt_db_log_file_threshold)
 
1653
                        byte_count = (size_t) (xt_db_log_file_threshold - xn_log_offset);
 
1654
                xn_log_offset = 0;
 
1655
                xn_log_id++;
 
1656
        }
 
1657
        while (xn_log_id < log_id) {
 
1658
                byte_count += (size_t) xt_db_log_file_threshold;
 
1659
                xn_log_id++;
 
1660
        }
 
1661
        if (xn_log_offset < log_offset)
 
1662
                byte_count += (size_t) (log_offset - xn_log_offset);
 
1663
 
 
1664
        return byte_count;
 
1665
}
 
1666
 
 
1667
/* ----------------------------------------------------------------------
 
1668
 * S W E E P E R    P R O C E S S
 
1669
 */
 
1670
 
 
1671
typedef struct XNSweeperState {
 
1672
        XTDatabaseHPtr                  ss_db;
 
1673
        XTXactSeqReadRec                ss_seqread;
 
1674
        XTDataBufferRec                 ss_databuf;
 
1675
        u_int                                   ss_call_cnt;
 
1676
        XTBasicQueueRec                 ss_to_free;
 
1677
        xtBool                                  ss_flush_pending;
 
1678
        xtTableID                               ss_not_found;                           /* Cache the last table not found, this saves time. */
 
1679
        xtTableID                               ss_not_recovered;                       /* Cache the last table not recovered. */
 
1680
        XTOpenTablePtr                  ss_ot;
 
1681
} XNSweeperStateRec, *XNSweeperStatePtr;
 
1682
 
 
1683
/*
 
1684
 * This function NULL if the table cannot be opened.
 
1685
 * In this case cleanup_done will be set to TRUE
 
1686
 * if the cleanup should be skipped.
 
1687
 *
 
1688
 */
 
1689
static XTOpenTablePtr xn_sw_get_open_table(XTThreadPtr self, XNSweeperStatePtr ss, xtTableID tab_id, xtBool *skip_cleanup)
 
1690
{
 
1691
        if (ss->ss_ot) {
 
1692
                if (ss->ss_ot->ot_table->tab_id == tab_id)
 
1693
                        return ss->ss_ot;
 
1694
 
 
1695
                xt_db_return_table_to_pool(self, ss->ss_ot);
 
1696
                ss->ss_ot = NULL;
 
1697
        }
 
1698
 
 
1699
        if (ss->ss_not_found == tab_id || ss->ss_not_recovered == tab_id) {
 
1700
                *skip_cleanup = TRUE;
 
1701
                return NULL;
 
1702
        }
 
1703
 
 
1704
        if (!ss->ss_ot) {
 
1705
                int r;
 
1706
 
 
1707
                if (!(ss->ss_ot = xt_db_open_pool_table(self, ss->ss_db, tab_id, &r, TRUE))) {
 
1708
                        switch (r) {
 
1709
                                case XT_TAB_NOT_FOUND:
 
1710
                                        /* Remember the table if it was not found: */
 
1711
                                        ss->ss_not_found = tab_id;
 
1712
                                        *skip_cleanup = TRUE;
 
1713
                                        break;
 
1714
                                case XT_TAB_NO_DICTIONARY:
 
1715
                                case XT_TAB_POOL_CLOSED:
 
1716
                                        *skip_cleanup = FALSE;
 
1717
                                        break;
 
1718
                                default:
 
1719
                                        *skip_cleanup = TRUE;
 
1720
                                        break;
 
1721
                        }
 
1722
                        return NULL;
 
1723
                }
 
1724
                
 
1725
                /* Don't sweep transactions for table that have not been
 
1726
                 * recovered.
 
1727
                 */
 
1728
                if (ss->ss_ot->ot_table->tab_recovery_not_done) {
 
1729
                        xt_db_return_table_to_pool(self, ss->ss_ot);
 
1730
                        ss->ss_ot = NULL;
 
1731
                        ss->ss_not_recovered = tab_id;
 
1732
                        *skip_cleanup = TRUE;
 
1733
                        return NULL;
 
1734
                }
 
1735
        }
 
1736
 
 
1737
        return ss->ss_ot;
 
1738
}
 
1739
 
 
1740
static void xn_sw_close_open_table(XTThreadPtr self, XNSweeperStatePtr ss)
 
1741
{
 
1742
        if (ss->ss_ot) {
 
1743
                xt_db_return_table_to_pool(self, ss->ss_ot);
 
1744
                ss->ss_ot = NULL;
 
1745
        }
 
1746
}
 
1747
 
 
1748
/*
 
1749
 * A thread can set a bit in db_sw_faster to make
 
1750
 * the sweeper go faster.
 
1751
 */
 
1752
static void xn_sw_could_go_faster(XTThreadPtr self, XTDatabaseHPtr db)
 
1753
{
 
1754
        if (db->db_sw_faster) {
 
1755
                if (!db->db_sw_fast) {
 
1756
                        xt_set_priority(self, xt_db_sweeper_priority+1);
 
1757
                        db->db_sw_fast = TRUE;
 
1758
                }
 
1759
        }
 
1760
}
 
1761
 
 
1762
static void xn_sw_go_slower(XTThreadPtr self, XTDatabaseHPtr db)
 
1763
{
 
1764
        if (db->db_sw_fast) {
 
1765
                xt_set_priority(self, xt_db_sweeper_priority);
 
1766
                db->db_sw_fast = FALSE;
 
1767
        }
 
1768
        db->db_sw_faster = XT_SW_WORK_NORMAL;
 
1769
}
 
1770
 
 
1771
/* Add a record to the "to free" queue. We note the current
 
1772
 * transaction at the time this is done. The record will
 
1773
 * only be freed once this transaction terminated, together
 
1774
 * with all transactions that started before it! 
 
1775
 *
 
1776
 * The reason for this is that a sequential scan or some
 
1777
 * other operation may read a committed record which is no longer
 
1778
 * valid because it is no longer the latest variation (the first
 
1779
 * variation reachable from the row pointer).
 
1780
 *
 
1781
 * In this case, the sweeper will free the variation.
 
1782
 * If the variation is re-used and committed before
 
1783
 * the sequential scan or read completes, and by some
 
1784
 * fluke is used by the same record as previously,
 
1785
 * the system will think the record is valid
 
1786
 * again.
 
1787
 *
 
1788
 * Without re-reading the record the sequential
 
1789
 * scan or other read will find it on the variation list, and
 
1790
 * return the record data as if valid!
 
1791
 *
 
1792
 * ------------ 2008-01-03
 
1793
 *
 
1794
 * An example of this is:
 
1795
 *
 
1796
 * Assume we have 3 records.
 
1797
 * The 3rd record is deleted, and committed.
 
1798
 * Before cleanup can be performed
 
1799
 * a sequential scan takes a copy of the records.
 
1800
 *
 
1801
 * Now assume a new insert is done before
 
1802
 * the sequential scan gets to the 3rd record.
 
1803
 *
 
1804
 * The insert allocates the 3rd row and 3rd record
 
1805
 * again.
 
1806
 *
 
1807
 * Now, when the sequential scan gets to the old copy of the 3rd record,
 
1808
 * this is valid because the row points to this record again.
 
1809
 *
 
1810
 * HOWEVER! I have now changed the sequential scan so that it accesses
 
1811
 * the records from the cache, without making a copy.
 
1812
 *
 
1813
 * This means that this problem cannot occur because the sequential scan
 
1814
 * always reads the current data from the cache.
 
1815
 *
 
1816
 * There is also no race condition (although no lock is taken), because
 
1817
 * the record is writen before the row (see here [(5)]).
 
1818
 *
 
1819
 * This means that the row does not point to the record before the
 
1820
 * record has been modified.
 
1821
 *
 
1822
 * Once the record has been modified then the sequential scan will see
 
1823
 * that the record belongs to a new transaction.
 
1824
 *
 
1825
 * If the row pointer was set before the record updated then a race
 
1826
 * condition would exist when the sequential scan reads the record
 
1827
 * after the insert has updated the row pointer but before it has
 
1828
 * changed the record.
 
1829
 *
 
1830
 * AS A RESULT:
 
1831
 *
 
1832
 * I believe I can remove the delayed free record!
 
1833
 *
 
1834
 * This means I can combine the REMOVE and FREE operations.
 
1835
 *
 
1836
 * This is good because this takes care of the problem
 
1837
 * that records are lost when:
 
1838
 *
 
1839
 * The server crashes when the delayed free list still has items on it.
 
1840
 * AND
 
1841
 * The transaction that freed the records has been cleaned, and this
 
1842
 * fact has been committed to the log.
 
1843
 *
 
1844
 * So I have removed the delay here: [(6)]
 
1845
 *
 
1846
 * ------------ 2008-12-03
 
1847
 *
 
1848
 * This code to delay removal of records was finally removed (see above)
 
1849
 */
 
1850
 
 
1851
/*
 
1852
 * As above, but instead a transaction is added to the "to free" queue.
 
1853
 *
 
1854
 * It is important that transactions remain in memory until all
 
1855
 * currently running transactions have ended. This is because
 
1856
 * sequential and index scans have copies of old data.
 
1857
 *
 
1858
 * In the old data a record may not be indicated as cleaned. Such
 
1859
 * a record is considered invalid if the transaction is not in RAM.
 
1860
 *
 
1861
 * GOTCHA:
 
1862
 *
 
1863
 * And this problem is demonstrated by the following example
 
1864
 * which was derived from flush_table.test.
 
1865
 *
 
1866
 * Each handler command below is a separate transaction.
 
1867
 * However the buffer is loaded by 'read first'.
 
1868
 * Depending on when cleanup occurs, records can disappear
 
1869
 * in some of the next commands.
 
1870
 *
 
1871
 * 2 solutions for the test. Use begin ... commit around
 
1872
 * handler open ... close. Or use analyze table t1 before
 
1873
 * open. analyze table waits for the sweeper to complete!
 
1874
 *
 
1875
 * create table dummy(table_id char(20) primary key);
 
1876
 * let $1=100;
 
1877
 * while ($1)
 
1878
 * {
 
1879
 *   drop table if exists t1;
 
1880
 *   create table t1(table_id char(20) primary key);
 
1881
 *   insert into t1 values ('Record-01');
 
1882
 *   insert into t1 values ('Record-02');
 
1883
 *   insert into t1 values ('Record-03');
 
1884
 *   insert into t1 values ('Record-04');
 
1885
 *   insert into t1 values ('Record-05');
 
1886
 *   handler t1 open;
 
1887
 *   handler t1 read first limit 1;
 
1888
 *   handler t1 read next limit 1;
 
1889
 *   handler t1 read next limit 1;
 
1890
 *   handler t1 read next limit 1;
 
1891
 *   handler t1 close;
 
1892
 *   commit;
 
1893
 *   dec $1;
 
1894
 * }
 
1895
 * 
 
1896
 */
 
1897
#ifdef MUST_DELAY_REMOVE
 
1898
static void xn_sw_add_xact_to_free(XTThreadPtr self, XNSweeperStatePtr ss, xtXactID xn_id)
 
1899
{
 
1900
        XNSWToFreeItemRec free_item;
 
1901
 
 
1902
        if ((ss->ss_to_free.bq_front - ss->ss_to_free.bq_back) >= XT_TN_MAX_TO_FREE) {
 
1903
                /* If the queue is full, try to free some items:
 
1904
                 * We use the call count to avoid doing this every time,
 
1905
                 * when the queue overflows!
 
1906
                 */
 
1907
                if ((ss->ss_call_cnt % XT_TN_MAX_TO_FREE_CHECK) == 0)
 
1908
                        /* GOTCHA: This call was not locking the sweeper,
 
1909
                         * this could cause failure, of course:
 
1910
                         */
 
1911
                        xn_sw_service_to_free(self, ss, TRUE);
 
1912
                ss->ss_call_cnt++;
 
1913
        }
 
1914
 
 
1915
        free_item.ri_wait_xn_id = ss->ss_db->db_xn_curr_id;
 
1916
        free_item.ri_tab_id = 0;
 
1917
        free_item.x.ri_xn_id = xn_id;
 
1918
 
 
1919
        xt_bq_add(self, &ss->ss_to_free, &free_item);
 
1920
}
 
1921
#endif
 
1922
 
 
1923
static void xt_sw_delete_variations(XTThreadPtr self, XNSweeperStatePtr ss, XTOpenTablePtr ot, xtRecordID rec_id, xtRowID row_id, xtXactID xn_id)
 
1924
{
 
1925
        xtRecordID prev_var_rec_id;
 
1926
 
 
1927
        while (rec_id) {
 
1928
                switch (xt_tab_remove_record(ot, rec_id, ss->ss_databuf.db_data, &prev_var_rec_id, FALSE, row_id, xn_id)) {
 
1929
                        case XT_ERR:
 
1930
                                throw_();
 
1931
                                return;
 
1932
                        case TRUE:
 
1933
                                break;
 
1934
                }
 
1935
                rec_id = prev_var_rec_id;
 
1936
        }
 
1937
}
 
1938
 
 
1939
static void xt_sw_delete_variation(XTThreadPtr self, XNSweeperStatePtr ss, XTOpenTablePtr ot, xtRecordID rec_id, xtBool clean_delete, xtRowID row_id, xtXactID xn_id)
 
1940
{
 
1941
        xtRecordID prev_var_rec_id;
 
1942
 
 
1943
        switch (xt_tab_remove_record(ot, rec_id, ss->ss_databuf.db_data, &prev_var_rec_id, clean_delete, row_id, xn_id)) {
 
1944
                case XT_ERR:
 
1945
                        throw_();
 
1946
                        return;
 
1947
                case TRUE:
 
1948
                        break;
 
1949
                case FALSE:
 
1950
                        break;
 
1951
        }
 
1952
}
 
1953
 
 
1954
/* Set rec_type to this value in order to force cleanup, without
 
1955
 * a check.
 
1956
 */
 
1957
#define XN_FORCE_CLEANUP                XT_TAB_STATUS_FREED
 
1958
 
 
1959
/*
 
1960
 * Read the record to be cleaned. Return TRUE if the cleanup has already been done.
 
1961
 */
 
1962
static xtBool xn_sw_cleanup_done(XTThreadPtr self, XTOpenTablePtr ot, xtRecordID rec_id, xtXactID xn_id, u_int rec_type, u_int stat_id, xtRowID row_id, XTTabRecHeadDPtr rec_head)
 
1963
{
 
1964
        if (!xt_tab_get_rec_data(ot, rec_id, sizeof(XTTabRecHeadDRec), (xtWord1 *) rec_head))
 
1965
                throw_();
 
1966
 
 
1967
        if (rec_type == XN_FORCE_CLEANUP) {
 
1968
                if (XT_REC_IS_FREE(rec_head->tr_rec_type_1))
 
1969
                        return TRUE;
 
1970
        }
 
1971
        else {
 
1972
                /* Transaction must match: */
 
1973
                if (XT_GET_DISK_4(rec_head->tr_xact_id_4) != xn_id)
 
1974
                        return TRUE;
 
1975
 
 
1976
                /* Record header must match expected value from
 
1977
                 * log or clean has been done, or is not required.
 
1978
                 *
 
1979
                 * For example, it is not required if a record
 
1980
                 * has been overwritten in a transaction.
 
1981
                 */
 
1982
                if (rec_head->tr_rec_type_1 != rec_type ||
 
1983
                        rec_head->tr_stat_id_1 != stat_id)
 
1984
                        return TRUE;
 
1985
 
 
1986
                /* Row must match: */
 
1987
                if (XT_GET_DISK_4(rec_head->tr_row_id_4) != row_id)
 
1988
                        return TRUE;
 
1989
        }
 
1990
 
 
1991
        return FALSE;
 
1992
}
 
1993
 
 
1994
static void xn_sw_clean_indices(XTThreadPtr XT_NDEBUG_UNUSED(self), XTOpenTablePtr ot, xtRecordID rec_id, xtRowID row_id, xtWord1 *rec_data, xtWord1 *rec_buffer)
 
1995
{
 
1996
        XTTableHPtr     tab = ot->ot_table;
 
1997
        u_int           cols_req;
 
1998
        XTIndexPtr      *ind;
 
1999
 
 
2000
        if (!tab->tab_dic.dic_key_count)
 
2001
                return;
 
2002
 
 
2003
        cols_req = tab->tab_dic.dic_ind_cols_req;
 
2004
        if (XT_REC_IS_FIXED(rec_data[0]))
 
2005
                rec_buffer = rec_data + XT_REC_FIX_HEADER_SIZE;
 
2006
        else {
 
2007
                if (XT_REC_IS_VARIABLE(rec_data[0])) {
 
2008
                        if (!myxt_load_row(ot, rec_data + XT_REC_FIX_HEADER_SIZE, rec_buffer, cols_req))
 
2009
                                goto failed;
 
2010
                }
 
2011
                else if (XT_REC_IS_EXT_DLOG(rec_data[0])) {
 
2012
                        ASSERT(cols_req);
 
2013
                        if (cols_req && cols_req <= tab->tab_dic.dic_fix_col_count) {
 
2014
                                if (!myxt_load_row(ot, rec_data + XT_REC_EXT_HEADER_SIZE, rec_buffer, cols_req))
 
2015
                                        goto failed;
 
2016
                        }
 
2017
                        else {
 
2018
                                if (rec_data != ot->ot_row_rbuffer)
 
2019
                                        memcpy(ot->ot_row_rbuffer, rec_data, tab->tab_dic.dic_rec_size);
 
2020
                                if (!xt_tab_load_ext_data(ot, rec_id, rec_buffer, cols_req))
 
2021
                                        goto failed;
 
2022
                        }
 
2023
                }
 
2024
                else
 
2025
                        /* This is possible, the record has already been cleaned up. */
 
2026
                        return;
 
2027
        }
 
2028
 
 
2029
        ind = tab->tab_dic.dic_keys;
 
2030
        for (u_int i=0; i<tab->tab_dic.dic_key_count; i++, ind++) {
 
2031
                if (!xt_idx_update_row_id(ot, *ind, rec_id, row_id, rec_buffer))
 
2032
                        xt_log_and_clear_exception_ns();
 
2033
        }
 
2034
        return;
 
2035
        
 
2036
        failed:
 
2037
        xt_log_and_clear_exception_ns();
 
2038
}
 
2039
 
 
2040
/*
 
2041
 * Return TRUE if the cleanup was done. FAILED if cleanup could not be done
 
2042
 * because dictionary information is not available.
 
2043
 */
 
2044
static xtBool xn_sw_cleanup_variation(XTThreadPtr self, XNSweeperStatePtr ss, XTXactDataPtr xact, xtTableID tab_id, xtRecordID rec_id, u_int status, u_int rec_type, u_int stat_id, xtRowID row_id, xtWord1 *rec_buf)
 
2045
{
 
2046
        XTOpenTablePtr          ot;
 
2047
        XTTableHPtr                     tab;
 
2048
        XTTabRecHeadDRec        rec_head;
 
2049
        xtRecordID                      after_rec_id;
 
2050
        xtXactID                        xn_id;
 
2051
        xtBool                          skip_cleanup;
 
2052
 
 
2053
        if (!(ot = xn_sw_get_open_table(self, ss, tab_id, &skip_cleanup)))
 
2054
                /* The table no longer exists, consider cleanup done: */
 
2055
                return skip_cleanup;
 
2056
 
 
2057
        tab = ot->ot_table;
 
2058
        ASSERT_NS(ot->ot_thread == self);
 
2059
 
 
2060
        /* Make sure the buffer is large enough! */
 
2061
        xt_db_set_size(self, &ss->ss_databuf, (size_t) tab->tab_dic.dic_mysql_buf_size);
 
2062
 
 
2063
        xn_id = xact->xd_start_xn_id;
 
2064
        if (xact->xd_flags & XT_XN_XAC_COMMITTED) {
 
2065
                /* The transaction has been committed. Clean the record and
 
2066
                 * remove variations no longer in use.
 
2067
                 */
 
2068
                switch (status) {
 
2069
                        case XT_LOG_ENT_REC_MODIFIED:
 
2070
                        case XT_LOG_ENT_UPDATE:
 
2071
                        case XT_LOG_ENT_UPDATE_FL:
 
2072
                        case XT_LOG_ENT_UPDATE_BG:
 
2073
                        case XT_LOG_ENT_UPDATE_FL_BG:
 
2074
                                if (xn_sw_cleanup_done(self, ot, rec_id, xn_id, rec_type, stat_id, row_id, &rec_head))
 
2075
                                        goto done_ok;
 
2076
                                after_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
 
2077
                                xt_sw_delete_variations(self, ss, ot, after_rec_id, row_id, xn_id);
 
2078
                                rec_head.tr_rec_type_1 |= XT_TAB_STATUS_CLEANED_BIT;
 
2079
                                XT_SET_NULL_DISK_4(rec_head.tr_prev_rec_id_4);
 
2080
                                if (!xt_tab_put_log_op_rec_data(ot, XT_LOG_ENT_REC_CLEANED, 0, rec_id, offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE, (xtWord1 *) &rec_head))
 
2081
                                        throw_();
 
2082
                                xn_sw_clean_indices(self, ot, rec_id, row_id, rec_buf, ss->ss_databuf.db_data);
 
2083
                                break;
 
2084
                        case XT_LOG_ENT_INSERT:
 
2085
                        case XT_LOG_ENT_INSERT_FL:
 
2086
                        case XT_LOG_ENT_INSERT_BG:
 
2087
                        case XT_LOG_ENT_INSERT_FL_BG: {
 
2088
                                /* POTENTIAL BUG 1:
 
2089
                                 *
 
2090
                                 * DROP TABLE IF EXISTS t1;
 
2091
                                 * CREATE TABLE t1 ( id int, name varchar(300)) engine=pbxt;
 
2092
                                 * 
 
2093
                                 * begin;
 
2094
                                 * insert t1(id, name) values(1, "aaa");
 
2095
                                 * update t1 set name=REPEAT('A', 300) where id = 1;
 
2096
                                 * commit;
 
2097
                                 * flush tables;
 
2098
                                 * select * from t1;
 
2099
                                 *
 
2100
                                 * Because the type of record changes, from VARIABLE to
 
2101
                                 * EXTENDED, the cleanup needs to take this into account.
 
2102
                                 *
 
2103
                                 * The input new status value which is written here
 
2104
                                 * depends on the first write to the record.
 
2105
                                 * However, the second write changes the record status.
 
2106
                                 *
 
2107
                                 * Previously we used a OR function to write the bit and
 
2108
                                 * return the byte value of the result.
 
2109
                                 *
 
2110
                                 * The write funtion now checks the record to be written
 
2111
                                 * to make sure it matches the record that needs to be
 
2112
                                 * cleaned. So OR'ing the bit is no longer required.
 
2113
                                 *
 
2114
                                 * POTENTIAL BUG 2:
 
2115
                                 *
 
2116
                                 * We have changed this to fix the following bug:
 
2117
                                 *
 
2118
                                 * T1 starts
 
2119
                                 * T2 starts
 
2120
                                 * T2 insert record 100 in row 50
 
2121
                                 * T2 commits
 
2122
                                 * T1 updates row 50 and adds record 101
 
2123
                                 *
 
2124
                                 * The sweeper does cleanup in order T1, T2, ...
 
2125
                                 *
 
2126
                                 * The sweeper cleans T1 by removing record 100 from the 
 
2127
                                 * row 50 variation list.
 
2128
                                 * This means that record 100 is free.
 
2129
                                 *
 
2130
                                 * The sweeper cleans T2 by marking record 100 as clean.
 
2131
                                 * !BUG! record 100 has already been freed!
 
2132
                                 *
 
2133
                                 * To avoid this we have to check a record before 
 
2134
                                 * cleaning (as we do above for update in xn_sw_cleanup_done())
 
2135
                                 * We check that the record is, in fact, the exact
 
2136
                                 * record that was inserted.
 
2137
                                 *
 
2138
                                 * This is now done be xt_tc_write_cond().
 
2139
                                 */
 
2140
                                xtOpSeqNo op_seq;
 
2141
 
 
2142
                                rec_head.tr_rec_type_1 = rec_type | XT_TAB_STATUS_CLEANED_BIT;
 
2143
                                if(!tab->tab_recs.xt_tc_write_cond(self, ot->ot_rec_file, rec_id, rec_head.tr_rec_type_1, &op_seq, xn_id, row_id, stat_id, rec_type))
 
2144
                                        /* this means record was not updated by xt_tc_write_bor and doesn't need to */
 
2145
                                        break;
 
2146
                                if (!xt_xlog_modify_table(tab->tab_id, XT_LOG_ENT_REC_CLEANED_1, op_seq, 0, 0, rec_id, 1, &rec_head.tr_rec_type_1, self))
 
2147
                                        throw_();
 
2148
                                xn_sw_clean_indices(self, ot, rec_id, row_id, rec_buf, ss->ss_databuf.db_data);
 
2149
                                break;
 
2150
                        }
 
2151
                        case XT_LOG_ENT_DELETE:
 
2152
                        case XT_LOG_ENT_DELETE_FL:
 
2153
                        case XT_LOG_ENT_DELETE_BG:
 
2154
                        case XT_LOG_ENT_DELETE_FL_BG:
 
2155
                                if (xn_sw_cleanup_done(self, ot, rec_id, xn_id, rec_type, stat_id, row_id, &rec_head))
 
2156
                                        goto done_ok;
 
2157
                                after_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
 
2158
                                xt_sw_delete_variations(self, ss, ot, after_rec_id, row_id, xn_id);
 
2159
                                xt_sw_delete_variation(self, ss, ot, rec_id, TRUE, row_id, xn_id);
 
2160
                                if (row_id) {
 
2161
                                        if (!xt_tab_free_row(ot, tab, row_id))
 
2162
                                                throw_();
 
2163
                                }
 
2164
                                break;
 
2165
                }
 
2166
        }
 
2167
        else {
 
2168
                /* The transaction has been aborted. Remove the variation from the
 
2169
                 * variation list. If this means the list is empty, then remove
 
2170
                 * the record as well.
 
2171
                 */
 
2172
                xtRecordID                      first_rec_id, next_rec_id, prev_rec_id;
 
2173
                XTTabRecHeadDRec        prev_rec_head;
 
2174
 
 
2175
                if (xn_sw_cleanup_done(self, ot, rec_id, xn_id, rec_type, stat_id, row_id, &rec_head))
 
2176
                        goto done_ok;
 
2177
 
 
2178
                if (!row_id)
 
2179
                        row_id = XT_GET_DISK_4(rec_head.tr_row_id_4);
 
2180
                after_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
 
2181
                if (!row_id)
 
2182
                        goto unlink_done;
 
2183
 
 
2184
                /* Now remove the record from the variation list,
 
2185
                 * (if it is still on the list).
 
2186
                 */
 
2187
                XT_TAB_ROW_WRITE_LOCK(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS], self);
 
2188
 
 
2189
                /* Find the variation before the variation we wish to remove: */
 
2190
                if (!(xt_tab_get_row(ot, row_id, &first_rec_id)))
 
2191
                        goto failed;
 
2192
                prev_rec_id = 0;
 
2193
                next_rec_id = first_rec_id;
 
2194
                while (next_rec_id != rec_id) {
 
2195
                        if (!next_rec_id) {
 
2196
                                /* The record was not found in the list (we are done) */
 
2197
                                XT_TAB_ROW_UNLOCK(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS], self);
 
2198
                                goto unlink_done;
 
2199
                        }
 
2200
                        if (!xt_tab_get_rec_data(ot, next_rec_id, sizeof(XTTabRecHeadDRec), (xtWord1 *) &prev_rec_head)) {
 
2201
                                xt_log_and_clear_exception(self);
 
2202
                                break;
 
2203
                        }
 
2204
                        
 
2205
                        prev_rec_id = next_rec_id;
 
2206
                        next_rec_id = XT_GET_DISK_4(prev_rec_head.tr_prev_rec_id_4);
 
2207
                }
 
2208
 
 
2209
                if (next_rec_id == rec_id) {
 
2210
                        /* The record was found on the list: */
 
2211
                        if (prev_rec_id) {
 
2212
                                /* Unlink the deleted variation:
 
2213
                                 * I have found the following sequence:
 
2214
                                 *
 
2215
                                 * 17933 in use  1906112
 
2216
                                 * 1906112 delete      xact=2901   row=17933 prev=2419240
 
2217
                                 * 2419240 delete      xact=2899   row=17933 prev=2153360
 
2218
                                 * 2153360 record-X C  xact=2599   row=17933 prev=0 Xlog=151 Xoff=16824 Xsiz=100
 
2219
                                 *
 
2220
                                 * Despite the following facts which should prevent chains from
 
2221
                                 * forming:
 
2222
                                 *
 
2223
                                 * --- Only one transaction can modify a row
 
2224
                                 * at any one time. So it is not possible for a new change
 
2225
                                 * to be linked onto an uncommitted change.
 
2226
                                 * 
 
2227
                                 * --- Transactions that modify the same row
 
2228
                                 * twice do not allocate a new record for each change.
 
2229
                                 *
 
2230
                                 * -- A change that has been
 
2231
                                 * rolled back will not be linked onto. Instead
 
2232
                                 * the new transaction will link to the last.
 
2233
                                 * Comitted record.
 
2234
                                 *
 
2235
                                 * So if the sweeper is slow in doing its job
 
2236
                                 * we can have the situation that a number of records
 
2237
                                 * can refer to the last committed record of the
 
2238
                                 * row.
 
2239
                                 *
 
2240
                                 * Only one will be reference by the row pointer.
 
2241
                                 *
 
2242
                                 * The other, will all have been rolled back.
 
2243
                                 * This occurs over here: [(4)]
 
2244
                                 */
 
2245
                                XT_SET_DISK_4(prev_rec_head.tr_prev_rec_id_4, after_rec_id);
 
2246
                                if (!xt_tab_put_log_op_rec_data(ot, XT_LOG_ENT_REC_UNLINKED, 0, prev_rec_id, offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE, (xtWord1 *) &prev_rec_head))
 
2247
                                        goto failed;
 
2248
                        }
 
2249
                        else {
 
2250
                                /* Variation to be removed at the front of the list. */
 
2251
                                ASSERT(rec_id == first_rec_id);
 
2252
                                if (after_rec_id) {
 
2253
                                        /* Unlink the deleted variation, from the front of the list: */
 
2254
                                        if (!xt_tab_set_row(ot, XT_LOG_ENT_ROW_SET, row_id, after_rec_id))
 
2255
                                                goto failed;
 
2256
                                }
 
2257
                                else {
 
2258
                                        /* No more variations, remove the row: */
 
2259
                                        if (!xt_tab_free_row(ot, tab, row_id))
 
2260
                                                goto failed;
 
2261
                                }
 
2262
                        }
 
2263
                }
 
2264
 
 
2265
                XT_TAB_ROW_UNLOCK(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS], self);
 
2266
 
 
2267
                /* Note: even when not found on the row list, the record must still
 
2268
                 * be freed.
 
2269
                 *
 
2270
                 * There might be an exception to this, but there are very definite
 
2271
                 * cases where this is required, for example when an unreferenced
 
2272
                 * record is found and added to the clean up list xn_add_cu_record().
 
2273
                 */
 
2274
 
 
2275
                unlink_done:
 
2276
                /* Delete the extended record and index entries:
 
2277
                 *
 
2278
                 * NOTE! This must be done after we have release the row lock. Because
 
2279
                 * a thread that does a duplicate check locks the index, and then
 
2280
                 * check whether a row is valid, and can deadlock with
 
2281
                 * code that locks a row, then an index!
 
2282
                 *
 
2283
                 * However, this should all be OK, because the variation has been removed from the
 
2284
                 * row variation list at this stage, and now just need to be deleted.
 
2285
                 */
 
2286
                xt_sw_delete_variation(self, ss, ot, rec_id, FALSE, row_id, xn_id);
 
2287
        }
 
2288
 
 
2289
        done_ok:
 
2290
        return OK;
 
2291
 
 
2292
        failed:
 
2293
        XT_TAB_ROW_UNLOCK(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS], self);
 
2294
        throw_();
 
2295
        return FAILED;
 
2296
}
 
2297
 
 
2298
/* Go through all updated records of a transaction and cleanup.
 
2299
 * This means, of the transaction was aborted, then all the variations written
 
2300
 * by the transaction must be removed.
 
2301
 * If the transaction was committed then we remove older variations.
 
2302
 * If a delete was committed this can lead to the row being removed.
 
2303
 *
 
2304
 * After a transaction has been cleaned it can be removed from RAM.
 
2305
 * If this was the last transaction in a log, and the log has reached
 
2306
 * threshold, and the log is no longer in exclusive use, then the log
 
2307
 * can be deleted.
 
2308
 *
 
2309
 * This function returns OK if the transaction was cleaned up, FALSE
 
2310
 * if a retry is required. Othersize an error is thrown.
 
2311
 */
 
2312
static xtBool xn_sw_cleanup_xact(XTThreadPtr self, XNSweeperStatePtr ss, XTXactDataPtr xact)
 
2313
{
 
2314
        XTDatabaseHPtr          db = ss->ss_db;
 
2315
        XTXactLogBufferDPtr     record;
 
2316
        xtTableID                       tab_id;
 
2317
        xtRecordID                      rec_id;
 
2318
        xtXactID                        xn_id;
 
2319
        xtRowID                         row_id;
 
2320
 
 
2321
        if (!db->db_xlog.xlog_seq_start(&ss->ss_seqread, xact->xd_begin_log, xact->xd_begin_offset, FALSE))
 
2322
                xt_throw(self);
 
2323
 
 
2324
        for (;;) {
 
2325
                if (self->t_quit)
 
2326
                        return FAILED;
 
2327
 
 
2328
                xn_sw_could_go_faster(self, db);
 
2329
 
 
2330
                if (!db->db_xlog.xlog_seq_next(&ss->ss_seqread, &record, FALSE, self))
 
2331
                        xt_throw(self);
 
2332
                if (!record) {
 
2333
                        /* Recovered transactions are considered cleaned when we
 
2334
                         * reach the end of the transaction log.
 
2335
                         * This is required, because transactions that do
 
2336
                         * not have a commit (or rollback) record, because they were
 
2337
                         * running when the server last went down, will otherwise not
 
2338
                         * have the cleanup completed!!
 
2339
                         */
 
2340
                        ASSERT(xact->xd_flags & XT_XN_XAC_RECOVERED);
 
2341
                        if (!(xact->xd_flags & XT_XN_XAC_RECOVERED))
 
2342
                                return FAILED;
 
2343
                        goto cleanup_done;
 
2344
                }
 
2345
                switch (record->xh.xh_status_1) {
 
2346
                        case XT_LOG_ENT_NEW_LOG:
 
2347
                                if (!db->db_xlog.xlog_seq_start(&ss->ss_seqread, XT_GET_DISK_4(record->xl.xl_log_id_4), 0, FALSE))
 
2348
                                        xt_throw(self);
 
2349
                                break;
 
2350
                        case XT_LOG_ENT_COMMIT:
 
2351
                        case XT_LOG_ENT_ABORT:
 
2352
                                xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);
 
2353
                                if (xn_id == xact->xd_start_xn_id)
 
2354
                                        goto cleanup_done;
 
2355
                                break;
 
2356
                        case XT_LOG_ENT_REC_MODIFIED:
 
2357
                        case XT_LOG_ENT_UPDATE:
 
2358
                        case XT_LOG_ENT_INSERT:
 
2359
                        case XT_LOG_ENT_DELETE:
 
2360
                        case XT_LOG_ENT_UPDATE_BG:
 
2361
                        case XT_LOG_ENT_INSERT_BG:
 
2362
                        case XT_LOG_ENT_DELETE_BG:
 
2363
                                xn_id = XT_GET_DISK_4(record->xu.xu_xact_id_4);
 
2364
                                if (xn_id != xact->xd_start_xn_id)
 
2365
                                        break;
 
2366
                                tab_id = XT_GET_DISK_4(record->xu.xu_tab_id_4);
 
2367
                                rec_id = XT_GET_DISK_4(record->xu.xu_rec_id_4);
 
2368
                                row_id = XT_GET_DISK_4(record->xu.xu_row_id_4);
 
2369
                                if (!xn_sw_cleanup_variation(self, ss, xact, tab_id, rec_id, record->xu.xu_status_1, record->xu.xu_rec_type_1, record->xu.xu_stat_id_1, row_id, &record->xu.xu_rec_type_1))
 
2370
                                        return FAILED;
 
2371
                                break;
 
2372
                        case XT_LOG_ENT_UPDATE_FL:
 
2373
                        case XT_LOG_ENT_INSERT_FL:
 
2374
                        case XT_LOG_ENT_DELETE_FL:
 
2375
                        case XT_LOG_ENT_UPDATE_FL_BG:
 
2376
                        case XT_LOG_ENT_INSERT_FL_BG:
 
2377
                        case XT_LOG_ENT_DELETE_FL_BG:
 
2378
                                xn_id = XT_GET_DISK_4(record->xf.xf_xact_id_4);
 
2379
                                if (xn_id != xact->xd_start_xn_id)
 
2380
                                        break;
 
2381
                                tab_id = XT_GET_DISK_4(record->xf.xf_tab_id_4);
 
2382
                                rec_id = XT_GET_DISK_4(record->xf.xf_rec_id_4);
 
2383
                                row_id = XT_GET_DISK_4(record->xf.xf_row_id_4);
 
2384
                                if (!xn_sw_cleanup_variation(self, ss, xact, tab_id, rec_id, record->xf.xf_status_1, record->xf.xf_rec_type_1, record->xf.xf_stat_id_1, row_id, &record->xf.xf_rec_type_1))
 
2385
                                        return FAILED;
 
2386
                                break;
 
2387
                        default:
 
2388
                                break;
 
2389
                }
 
2390
        }
 
2391
 
 
2392
        cleanup_done:
 
2393
        /* Write the log to indicate the transaction has been cleaned: */
 
2394
        XTXactCleanupEntryDRec cu;
 
2395
 
 
2396
        cu.xc_status_1 = XT_LOG_ENT_CLEANUP;
 
2397
        cu.xc_checksum_1 = XT_CHECKSUM_1(XT_CHECKSUM4_XACT(xact->xd_start_xn_id));
 
2398
        XT_SET_DISK_4(cu.xc_xact_id_4, xact->xd_start_xn_id);
 
2399
 
 
2400
        if (!xt_xlog_log_data(self, sizeof(XTXactCleanupEntryDRec), (XTXactLogBufferDPtr) &cu, XT_XLOG_NO_WRITE_NO_FLUSH))
 
2401
                return FAILED;
 
2402
 
 
2403
        ss->ss_flush_pending = TRUE;
 
2404
 
 
2405
        xact->xd_flags |= XT_XN_XAC_CLEANED;
 
2406
#ifndef XT_SWEEPER_SORT_XACTS
 
2407
        ASSERT(db->db_xn_to_clean_id == xact->xd_start_xn_id);
 
2408
#endif
 
2409
#ifdef MUST_DELAY_REMOVE
 
2410
        xn_sw_add_xact_to_free(self, ss, xact->xd_start_xn_id);
 
2411
#else
 
2412
        xn_id = xact->xd_start_xn_id;
 
2413
        if (xt_xn_delete_xact(db, xn_id, self)) {
 
2414
                /* Recalculate the minimum memory transaction: */
 
2415
                ASSERT(!xt_xn_is_before(xn_id, db->db_xn_min_ram_id));
 
2416
                
 
2417
                if (db->db_xn_min_ram_id == xn_id) {
 
2418
                        db->db_xn_min_ram_id = xn_id+1;
 
2419
                }
 
2420
                else {
 
2421
                        xtXactID xn_curr_xn_id = xt_xn_get_curr_id(db);
 
2422
 
 
2423
                        while (!xt_xn_is_before(xn_curr_xn_id, db->db_xn_min_ram_id)) { // was db->db_xn_min_ram_id <= xn_curr_xn_id
 
2424
                                /* db_xn_min_ram_id may be changed, by some other process! */
 
2425
                                xn_id = db->db_xn_min_ram_id;
 
2426
                                if (xn_get_xact_details(db, xn_id, self, NULL, NULL, NULL, NULL))
 
2427
                                        break;
 
2428
                                db->db_xn_min_ram_id = xn_id+1;
 
2429
                        }
 
2430
                }
 
2431
        }
 
2432
#endif
 
2433
 
 
2434
        return OK;
 
2435
}
 
2436
 
 
2437
static void xn_free_sw_state(XTThreadPtr self, XNSweeperStatePtr ss)
 
2438
{
 
2439
        xn_sw_close_open_table(self, ss);
 
2440
        if (ss->ss_db)
 
2441
                ss->ss_db->db_xlog.xlog_seq_exit(&ss->ss_seqread);
 
2442
        xt_db_set_size(self, &ss->ss_databuf, 0);
 
2443
        xt_bq_set_size(self, &ss->ss_to_free, 0);
 
2444
}
 
2445
 
 
2446
#ifdef XT_SWEEPER_SORT_XACTS
 
2447
static int xn_compare_xact(const void *a, const void *b)
 
2448
{
 
2449
        register XTXactDataPtr b_a = *((XTXactDataPtr *) a);
 
2450
        register XTXactDataPtr b_b = *((XTXactDataPtr *) b);
 
2451
 
 
2452
        if (b_a->xd_end_xn_id == b_b->xd_end_xn_id) {
 
2453
                if (b_a->xd_start_xn_id < b_b->xd_start_xn_id)
 
2454
                        return -1;
 
2455
                return 1;
 
2456
        }
 
2457
        if (b_a->xd_end_xn_id < b_b->xd_end_xn_id)
 
2458
                return -1;
 
2459
        return 1;
 
2460
}
 
2461
#endif
 
2462
 
 
2463
static void xn_sw_main(XTThreadPtr self)
 
2464
{
 
2465
        XTDatabaseHPtr          db = self->st_database;
 
2466
        XNSweeperStatePtr       ss;
 
2467
        XTXactDataPtr           xact;
 
2468
        time_t                          idle_start = 0;
 
2469
        xtXactID                        curr_id;
 
2470
#ifdef XT_SWEEPER_SORT_XACTS
 
2471
        u_int                           i;
 
2472
        xtXactID                        next_clean_id;
 
2473
#else
 
2474
        XTXactDataPtr           xact2;
 
2475
#endif
 
2476
 
 
2477
        xt_set_priority(self, xt_db_sweeper_priority);
 
2478
 
 
2479
        alloczr_(ss, xn_free_sw_state, sizeof(XNSweeperStateRec), XNSweeperStatePtr);
 
2480
        ss->ss_db = db;
 
2481
 
 
2482
        if (!db->db_xlog.xlog_seq_init(&ss->ss_seqread, xt_db_log_buffer_size, FALSE))
 
2483
                xt_throw(self);
 
2484
 
 
2485
        ss->ss_to_free.bq_item_size = sizeof(XNSWToFreeItemRec);
 
2486
        ss->ss_to_free.bq_max_waste = XT_TN_MAX_TO_FREE_WASTE;
 
2487
        ss->ss_to_free.bq_item_inc = XT_TN_MAX_TO_FREE_INC;
 
2488
        ss->ss_call_cnt = 0;
 
2489
        ss->ss_flush_pending = FALSE;
 
2490
 
 
2491
        while (!self->t_quit) {
 
2492
                while (!self->t_quit) {
 
2493
                        curr_id = xt_xn_get_curr_id(db);
 
2494
 
 
2495
#ifdef XT_SWEEPER_SORT_XACTS
 
2496
                        /* Add transactions to the list if required: */
 
2497
                        while (db->db_sw_list_size < XT_SW_XACT_SORT_LIST_SIZE &&
 
2498
                                !xt_xn_is_before(curr_id, db->db_sw_to_add)) {
 
2499
                                if ((xact = xt_xn_get_xact(db, db->db_sw_to_add, self))) {
 
2500
                                        /* Only add transactions that have completed: */
 
2501
                                        if (!(xact->xd_flags & XT_XN_XAC_SWEEP))
 
2502
                                                break;
 
2503
 
 
2504
                                        /* Add only transactions that did an update to the list: */
 
2505
                                        if ((xact->xd_flags & XT_XN_XAC_LOGGED)) {
 
2506
                                                db->db_sw_xact_list[db->db_sw_list_size] = xact;
 
2507
                                                db->db_sw_list_size++;
 
2508
                                        }
 
2509
                                        else {
 
2510
                                                /* Should not be required (done by the transction itself)! */
 
2511
                                                if (xt_xn_delete_xact(db, db->db_sw_to_add, self)) {
 
2512
                                                        if (db->db_xn_min_ram_id == db->db_sw_to_add)
 
2513
                                                                db->db_xn_min_ram_id = db->db_sw_to_add+1;
 
2514
                                                }
 
2515
                                        }
 
2516
                                }
 
2517
                                db->db_sw_to_add++;
 
2518
                                /* If there are no transactions to be cleaned, then the
 
2519
                                 * next to clean will be at least the next one to check.
 
2520
                                 */
 
2521
                                if (!db->db_sw_list_size)
 
2522
                                        db->db_xn_to_clean_id = db->db_sw_to_add;
 
2523
                        }
 
2524
 
 
2525
                        if (!db->db_sw_list_size) {
 
2526
                                /* Nothing to do: */
 
2527
                                db->db_sw_faster &= ~XT_SW_TOO_FAR_BEHIND;
 
2528
                                goto sleep;
 
2529
                        }
 
2530
 
 
2531
                        if (!db->db_sw_list_size == XT_SW_XACT_SORT_LIST_SIZE)
 
2532
                                db->db_sw_faster |= XT_SW_TOO_FAR_BEHIND;
 
2533
                        xn_sw_could_go_faster(self, db);
 
2534
                        idle_start = 0;
 
2535
 
 
2536
                        /* Sort the transactions, according to there end order: */
 
2537
                        qsort(db->db_sw_xact_list, db->db_sw_list_size, sizeof(XTXactDataPtr), xn_compare_xact);
 
2538
 
 
2539
                        for (i=0; i<db->db_sw_list_size; i++) {
 
2540
                                xact = db->db_sw_xact_list[i];
 
2541
                                if (!xt_xn_is_before(xact->xd_end_xn_id, db->db_sw_to_add))  // xact->xd_end_xn_id >= db->db_sw_to_add
 
2542
                                        break;
 
2543
 
 
2544
                                if (!xn_sw_cleanup_xact(self, ss, xact)) {
 
2545
                                        /* We failed to clean (try again later)... */
 
2546
#ifdef TRACE_SWEEPER_ACTIVITY
 
2547
                                        printf("SWEEPER: cleanup retry...\n", (int) xact->xd_start_xn_id);
 
2548
#endif
 
2549
                                        goto sleep;
 
2550
                                }
 
2551
                        }
 
2552
 
 
2553
                        if (i == db->db_sw_list_size) {
 
2554
                                /* All cleaned out: */
 
2555
                                db->db_xn_to_clean_id = db->db_sw_to_add;
 
2556
                                db->db_sw_list_size = 0;
 
2557
                        }
 
2558
                        else {
 
2559
                                u_int           j;
 
2560
 
 
2561
                                /* The next to clean will be the smallest still in the
 
2562
                                 * list.
 
2563
                                 *
 
2564
                                 * NOTE: db_xn_to_clean_id means that all transactions
 
2565
                                 * before this are clean.
 
2566
                                 *
 
2567
                                 * It may be that some after this point have also
 
2568
                                 * been cleaned!!
 
2569
                                 */
 
2570
                                next_clean_id = db->db_sw_xact_list[i]->xd_start_xn_id;
 
2571
                                for (j=i+1; j<db->db_sw_list_size; j++) {
 
2572
                                        if (xt_xn_is_before(db->db_sw_xact_list[j]->xd_start_xn_id, next_clean_id))
 
2573
                                                next_clean_id = db->db_sw_xact_list[j]->xd_start_xn_id;
 
2574
                                }
 
2575
                                db->db_xn_to_clean_id = next_clean_id;
 
2576
 
 
2577
                                if (i == 0) {
 
2578
                                        /* Something to do, but nothing done! */
 
2579
                                        if ((xact = xt_xn_get_xact(db, db->db_sw_to_add, self))) {
 
2580
                                                /* Before we go to sleep, lets just check again: */ 
 
2581
                                                if (!(xact->xd_flags & XT_XN_XAC_SWEEP)) {
 
2582
                                                        db->db_stat_sweep_waits++;
 
2583
                                                        goto sleep;
 
2584
                                                }
 
2585
                                        }
 
2586
                                }
 
2587
 
 
2588
                                memmove(db->db_sw_xact_list, &db->db_sw_xact_list[i], (db->db_sw_list_size - i) * sizeof(XTXactDataPtr));
 
2589
                                db->db_sw_list_size -= i;
 
2590
                        }
 
2591
#else
 
2592
                        /* We are just about to check the condition for sleeping,
 
2593
                         * so if the condition for sleeping holds, then we will
 
2594
                         * exit the loop and sleep.
 
2595
                         *
 
2596
                         * We will then sleep if nobody sets the flag before we
 
2597
                         * actually do sleep!
 
2598
                         */
 
2599
                        if (xt_xn_is_before(curr_id, db->db_xn_to_clean_id)) {
 
2600
                                db->db_sw_faster &= ~XT_SW_TOO_FAR_BEHIND;
 
2601
                                break;
 
2602
                        }
 
2603
 
 
2604
                        /* {TUNING} How far to we allow the sweeper to get behind?
 
2605
                         * The higher this is, the higher burst performance can
 
2606
                         * be. But too high and the sweeper falls out of reading the
 
2607
                         * transaction log cache, and also starts to spread
 
2608
                         * changes around in index and data blocks that are no
 
2609
                         * longer hot.
 
2610
                         */
 
2611
                        if (curr_id - db->db_xn_to_clean_id > 250)
 
2612
                                db->db_sw_faster |= XT_SW_TOO_FAR_BEHIND;
 
2613
                        else
 
2614
                                db->db_sw_faster &= ~XT_SW_TOO_FAR_BEHIND;
 
2615
                        xn_sw_could_go_faster(self, db);
 
2616
                        idle_start = 0;
 
2617
 
 
2618
                        if ((xact = xt_xn_get_xact(db, db->db_xn_to_clean_id, self))) {
 
2619
                                xtXactID xn_id;
 
2620
 
 
2621
                                /* The sweep flag is set when the transaction is ready for sweeping.
 
2622
                                 * Prepared transactions may not be swept!
 
2623
                                 */
 
2624
                                if (!(xact->xd_flags & XT_XN_XAC_SWEEP) || (xact->xd_flags & XT_XN_XAC_PREPARED))
 
2625
                                        goto sleep;
 
2626
 
 
2627
                                /* Check if we can cleanup the transaction.
 
2628
                                 * We do this by checking to see if there is any running
 
2629
                                 * transaction which start before the end of this transaction.
 
2630
                                 */
 
2631
                                xn_id = xact->xd_start_xn_id;
 
2632
                                while (xt_xn_is_before(xn_id, xact->xd_end_xn_id)) {
 
2633
                                        xn_id++;
 
2634
                                        if ((xact2 = xt_xn_get_xact(db, xn_id, self))) {
 
2635
                                                if (!(xact2->xd_flags & XT_XN_XAC_ENDED)) {
 
2636
                                                        /* A transaction was started before the end of
 
2637
                                                         * the transaction we wish to sweep, and this
 
2638
                                                         * transaction has not committed, the we have to
 
2639
                                                         * wait.
 
2640
                                                         */
 
2641
                                                        db->db_stat_sweep_waits++;
 
2642
                                                        goto sleep;
 
2643
                                                }
 
2644
                                        }
 
2645
                                }
 
2646
                                
 
2647
                                /* Can cleanup the transaction, and move to the next. */
 
2648
                                if (xact->xd_flags & XT_XN_XAC_LOGGED) {
 
2649
#ifdef TRACE_SWEEPER_ACTIVITY
 
2650
                                        printf("SWEEPER: cleanup %d\n", (int) xact->xd_start_xn_id);
 
2651
#endif
 
2652
                                        if (!xn_sw_cleanup_xact(self, ss, xact)) {
 
2653
                                                /* We failed to clean (try again later)... */
 
2654
#ifdef TRACE_SWEEPER_ACTIVITY
 
2655
                                                printf("SWEEPER: cleanup retry...\n", (int) xact->xd_start_xn_id);
 
2656
#endif
 
2657
                                                goto sleep;
 
2658
                                        }
 
2659
#ifdef TRACE_SWEEPER_ACTIVITY
 
2660
                                        printf("SWEEPER: cleanup DONE\n", (int) xact->xd_start_xn_id);
 
2661
#endif
 
2662
                                }
 
2663
                                else {
 
2664
                                        /* This was a read-only transaction, it is safe to
 
2665
                                         * just remove the transaction structure from memory.
 
2666
                                         * (should not be necessary because RO transactions
 
2667
                                         * do this themselves):
 
2668
                                         */
 
2669
                                        if (xt_xn_delete_xact(db, db->db_xn_to_clean_id, self)) {
 
2670
                                                if (db->db_xn_min_ram_id == db->db_xn_to_clean_id)
 
2671
                                                        db->db_xn_min_ram_id = db->db_xn_to_clean_id+1;
 
2672
                                        }
 
2673
                                }
 
2674
                        }
 
2675
                        
 
2676
                        /* Move on to clean the next: */
 
2677
                        db->db_xn_to_clean_id++;
 
2678
#endif
 
2679
                }
 
2680
 
 
2681
                sleep:                  
 
2682
 
 
2683
                xn_sw_close_open_table(self, ss);
 
2684
 
 
2685
                xn_sw_go_slower(self, db);
 
2686
 
 
2687
                /* Shrink the free list, if it is empty, and larger then
 
2688
                 * the default:
 
2689
                 */
 
2690
                if (ss->ss_to_free.bq_size > XT_TN_MAX_TO_FREE) {
 
2691
                        if (ss->ss_to_free.bq_front == 0 && ss->ss_to_free.bq_back == 0)
 
2692
                                xt_bq_set_size(self, &ss->ss_to_free, XT_TN_MAX_TO_FREE);
 
2693
                }
 
2694
 
 
2695
                /* Windows: close the log file that we have open for reading, if we
 
2696
                 * read past the end of the log on the last transaction.
 
2697
                 * This makes sure that the log is closed when the checkpointer
 
2698
                 * tries to remove or rename it!!
 
2699
                 */
 
2700
                if (ss->ss_seqread.xseq_log_file) {
 
2701
                        if (ss->ss_seqread.xseq_rec_log_id != ss->ss_seqread.xseq_log_id)
 
2702
                                db->db_xlog.xlog_seq_close(&ss->ss_seqread);
 
2703
                }
 
2704
 
 
2705
                if (ss->ss_flush_pending) {
 
2706
                        /* Flush pending means we have written something to the log.
 
2707
                         *
 
2708
                         * if so we flush the log so that the writer will also do
 
2709
                         * its work!
 
2710
                         *
 
2711
                         * This will lead to the freeer continuing if it is waiting.
 
2712
                         */
 
2713
 
 
2714
                        time_t now = time(NULL);
 
2715
                        if (idle_start) {
 
2716
                                /* By default, we wait for 2 seconds idle time, then
 
2717
                                 * we flush the log.
 
2718
                                 */
 
2719
                                if (now >= idle_start + 2) {
 
2720
                                        /* Don't do this if flusher is active! */
 
2721
                                        if (!db->db_fl_thread &&
 
2722
                                                !xt_xlog_flush_log(db, self))
 
2723
                                                xt_throw(self);
 
2724
                                        ss->ss_flush_pending = FALSE;
 
2725
                                }
 
2726
                        }
 
2727
                        else
 
2728
                                idle_start = now;
 
2729
                }
 
2730
 
 
2731
                /* {WAKE-SW} Waking up the sweeper is very expensive!
 
2732
                 * Cost is 3% of execution time on the test:
 
2733
                 * runTest(SMALL_SELECT_TEST, 2, 100000)
 
2734
                 *
 
2735
                 * On the other hand, polling every 1/10 second
 
2736
                 * is cheap, because the check for transactions
 
2737
                 * ready for cleanup is very quick.
 
2738
                 *
 
2739
                 * So this is the prefered method.
 
2740
                 */
 
2741
                xn_sw_wait_for_xact(self, db, 10);
 
2742
        }
 
2743
 
 
2744
        if (ss->ss_flush_pending) {
 
2745
                xt_xlog_flush_log(db, self);
 
2746
                ss->ss_flush_pending = FALSE;
 
2747
        }
 
2748
 
 
2749
        freer_(); // xn_free_sw_state(ss)
 
2750
}
 
2751
 
 
2752
static void *xn_sw_run_thread(XTThreadPtr self)
 
2753
{
 
2754
        XTDatabaseHPtr  db = (XTDatabaseHPtr) self->t_data;
 
2755
        int                             count;
 
2756
        void                    *mysql_thread;
 
2757
 
 
2758
        if (!(mysql_thread = myxt_create_thread()))
 
2759
                xt_throw(self);
 
2760
 
 
2761
        while (!self->t_quit) {
 
2762
                try_(a) {
 
2763
                        /*
 
2764
                         * The garbage collector requires that the database
 
2765
                         * is in use because.
 
2766
                         */
 
2767
                        xt_use_database(self, db, XT_FOR_SWEEPER);
 
2768
 
 
2769
                        /* {BACKGROUND-RELEASE-DB}
 
2770
                         * This action is both safe and required:
 
2771
                         *
 
2772
                         * safe: releasing the database is safe because as
 
2773
                         * long as this thread is running the database
 
2774
                         * reference is valid, and this reference cannot
 
2775
                         * be the only one to the database because
 
2776
                         * otherwize this thread would not be running.
 
2777
                         *
 
2778
                         * required: releasing the database is necessary
 
2779
                         * otherwise we cannot close the database
 
2780
                         * correctly because we only shutdown this
 
2781
                         * thread when the database is closed and we
 
2782
                         * only close the database when all references
 
2783
                         * are removed.
 
2784
                         */
 
2785
                        xt_heap_release(self, self->st_database);
 
2786
 
 
2787
                        xn_sw_main(self);
 
2788
                }
 
2789
                catch_(a) {
 
2790
                        /* This error is "normal"! */
 
2791
                        if (self->t_exception.e_xt_err != XT_ERR_NO_DICTIONARY &&
 
2792
                                !(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
 
2793
                                self->t_exception.e_sys_err == SIGTERM))
 
2794
                                xt_log_and_clear_exception(self);
 
2795
                }
 
2796
                cont_(a);
 
2797
 
 
2798
                /* Avoid releasing the database (done above) */
 
2799
                self->st_database = NULL;
 
2800
                xt_unuse_database(self, self);
 
2801
 
 
2802
                /* After an exception, pause before trying again... */
 
2803
                /* Number of seconds */
 
2804
#ifdef DEBUG
 
2805
                count = 10;
 
2806
#else
 
2807
                count = 2*60;
 
2808
#endif
 
2809
                db->db_sw_idle = XT_THREAD_INERR;
 
2810
                while (!self->t_quit && count > 0) {
 
2811
                        sleep(1);
 
2812
                        count--;
 
2813
                }
 
2814
                db->db_sw_idle = XT_THREAD_BUSY;
 
2815
        }
 
2816
 
 
2817
   /*
 
2818
        * {MYSQL-THREAD-KILL}
 
2819
        myxt_destroy_thread(mysql_thread, TRUE);
 
2820
        */
 
2821
        return NULL;
 
2822
}
 
2823
 
 
2824
static void xn_sw_free_thread(XTThreadPtr self, void *data)
 
2825
{
 
2826
        XTDatabaseHPtr db = (XTDatabaseHPtr) data;
 
2827
 
 
2828
        if (db->db_sw_thread) {
 
2829
                xt_lock_mutex(self, &db->db_sw_lock);
 
2830
                pushr_(xt_unlock_mutex, &db->db_sw_lock);
 
2831
                db->db_sw_thread = NULL;
 
2832
                freer_(); // xt_unlock_mutex(&db->db_sw_lock)
 
2833
        }
 
2834
}
 
2835
 
 
2836
/* Wait for a transaction to quit: */
 
2837
static void xn_sw_wait_for_xact(XTThreadPtr self, XTDatabaseHPtr db, u_int hsecs)
 
2838
{
 
2839
        xt_lock_mutex(self, &db->db_sw_lock);
 
2840
        pushr_(xt_unlock_mutex, &db->db_sw_lock);
 
2841
        db->db_sw_idle = XT_THREAD_IDLE;
 
2842
        if (!self->t_quit && !db->db_sw_faster)
 
2843
                xt_timed_wait_cond(self, &db->db_sw_cond, &db->db_sw_lock, hsecs * 10);
 
2844
        db->db_sw_idle = XT_THREAD_BUSY;
 
2845
        db->db_sw_check_count++;
 
2846
        freer_(); // xt_unlock_mutex(&db->db_sw_lock)
 
2847
}
 
2848
 
 
2849
xtPublic void xt_start_sweeper(XTThreadPtr self, XTDatabaseHPtr db)
 
2850
{
 
2851
        char name[PATH_MAX];
 
2852
 
 
2853
        sprintf(name, "SW-%s", xt_last_directory_of_path(db->db_main_path));
 
2854
        xt_remove_dir_char(name);
 
2855
        db->db_sw_thread = xt_create_daemon(self, name);
 
2856
        xt_set_thread_data(db->db_sw_thread, db, xn_sw_free_thread);
 
2857
        xt_run_thread(self, db->db_sw_thread, xn_sw_run_thread);
 
2858
}
 
2859
 
 
2860
xtPublic void xt_init_sweeper_wait(XTThreadPtr self, XTDatabaseHPtr db)
 
2861
{
 
2862
        xtXactID        current_id;
 
2863
 
 
2864
        current_id = db->db_xn_curr_id;
 
2865
        if (!xt_xn_is_before(current_id, db->db_xn_to_clean_id)) {
 
2866
                double          init_diff, curr_done = 0;
 
2867
                time_t          start_time;
 
2868
                xtBool          print_progress = FALSE;
 
2869
                int                     perc_to_print = 1;
 
2870
 
 
2871
                init_diff = (double) xt_xn_diff(current_id, db->db_xn_to_clean_id);
 
2872
                start_time = time(NULL);
 
2873
 
 
2874
                xt_logf(XT_NT_INFO, "PBXT: Initial sweep, transactions to scan: %llu\n", (u_llong) init_diff);
 
2875
                
 
2876
                while (!xt_xn_is_before(current_id, db->db_xn_to_clean_id)) { // means: db->db_xn_to_clean_id <= current_id
 
2877
                        xt_lock_mutex(self, &db->db_sw_lock);
 
2878
                        pushr_(xt_unlock_mutex, &db->db_sw_lock);
 
2879
                        xt_wakeup_sweeper(db);
 
2880
                        freer_(); // xt_unlock_mutex(&db->db_sw_lock)
 
2881
 
 
2882
                        xt_sleep_milli_second(10);
 
2883
 
 
2884
                        if (!print_progress) {
 
2885
                                if (time(NULL) - start_time > 2)
 
2886
                                        print_progress = TRUE;
 
2887
                        }
 
2888
 
 
2889
                        if (print_progress) {
 
2890
                                curr_done = init_diff - (double) xt_xn_diff(current_id, db->db_xn_to_clean_id);
 
2891
                                while (perc_to_print <= (int) (curr_done / init_diff * 100)) {
 
2892
                                        if (((perc_to_print - 1) % 25) == 0)
 
2893
                                                xt_logf(XT_NT_INFO, "PBXT: ");
 
2894
                                        if ((perc_to_print % 25) == 0)
 
2895
                                                xt_logf(XT_NT_INFO, "%2d\n", perc_to_print);
 
2896
                                        else
 
2897
                                                xt_logf(XT_NT_INFO, "%2d ", perc_to_print);
 
2898
                                        xt_log_flush(self);
 
2899
                                        perc_to_print++;
 
2900
                                }
 
2901
                        }
 
2902
                }
 
2903
 
 
2904
                if (print_progress) {
 
2905
                        while (perc_to_print <= 100) {
 
2906
                                if (((perc_to_print - 1) % 25) == 0)
 
2907
                                        xt_logf(XT_NT_INFO, "PBXT: ");
 
2908
                                if ((perc_to_print % 25) == 0)
 
2909
                                        xt_logf(XT_NT_INFO, "%2d\n", perc_to_print);
 
2910
                                else
 
2911
                                        xt_logf(XT_NT_INFO, "%2d ", perc_to_print);
 
2912
                                xt_log_flush(self);
 
2913
                                perc_to_print++;
 
2914
                        }
 
2915
                }
 
2916
                xt_logf(XT_NT_INFO, "PBXT: Initial sweep complete, transactions scanned: %llu\n", (u_llong) init_diff);
 
2917
        }
 
2918
}
 
2919
 
 
2920
xtPublic void xt_wait_for_sweeper(XTThreadPtr self, XTDatabaseHPtr db, int abort_time)
 
2921
{
 
2922
        time_t  then, now;
 
2923
        xtBool  message = FALSE;
 
2924
 
 
2925
        if (db->db_sw_thread) {
 
2926
                then = time(NULL);
 
2927
                /* Changed xt_xn_get_curr_id(db) to db->db_xn_curr_id,
 
2928
                 * This should work because we are not concerned about the difference
 
2929
                 * between xt_xn_get_curr_id(db) and db->db_xn_curr_id,
 
2930
                 * Which is just a matter of when transactions we can expect ot find
 
2931
                 * in memory (see {GAP-INC-ADD-XACT})
 
2932
                 */
 
2933
                while (!xt_xn_is_before(db->db_xn_curr_id, db->db_xn_to_clean_id)) { // was db->db_xn_to_clean_id <= xt_xn_get_curr_id(db)
 
2934
                        xt_lock_mutex(self, &db->db_sw_lock);
 
2935
                        pushr_(xt_unlock_mutex, &db->db_sw_lock);
 
2936
                        xt_wakeup_sweeper(db);
 
2937
                        freer_(); // xt_unlock_mutex(&db->db_sw_lock)
 
2938
                        xt_sleep_milli_second(10);
 
2939
                        now = time(NULL);
 
2940
                        if (abort_time && now >= then + abort_time) {
 
2941
                                xt_logf(XT_NT_INFO, "Aborting wait for '%s' sweeper\n", db->db_name);
 
2942
                                message = FALSE;
 
2943
                                break;
 
2944
                        }
 
2945
                        if (now >= then + 2) {
 
2946
                                if (!message) {
 
2947
                                        message = TRUE;
 
2948
                                        xt_logf(XT_NT_INFO, "Waiting for '%s' sweeper...\n", db->db_name);
 
2949
                                }
 
2950
                        }
 
2951
                }
 
2952
 
 
2953
                if (message)
 
2954
                        xt_logf(XT_NT_INFO, "Sweeper '%s' done.\n", db->db_name);
 
2955
        }
 
2956
}
 
2957
 
 
2958
xtPublic void xt_stop_sweeper(XTThreadPtr self, XTDatabaseHPtr db)
 
2959
{
 
2960
        XTThreadPtr thr_sw;
 
2961
 
 
2962
        if (db->db_sw_thread) {
 
2963
                xt_lock_mutex(self, &db->db_sw_lock);
 
2964
                pushr_(xt_unlock_mutex, &db->db_sw_lock);
 
2965
 
 
2966
                /* This pointer is safe as long as you have the transaction lock. */
 
2967
                if ((thr_sw = db->db_sw_thread)) {
 
2968
                        xtThreadID tid = thr_sw->t_id;
 
2969
 
 
2970
                        /* Make sure the thread quits when woken up. */
 
2971
                        xt_terminate_thread(self, thr_sw);
 
2972
 
 
2973
                        xt_wakeup_sweeper(db);
 
2974
        
 
2975
                        freer_(); // xt_unlock_mutex(&db->db_sw_lock)
 
2976
 
 
2977
                        /*
 
2978
                         * GOTCHA: This is a wierd thing but the SIGTERM directed
 
2979
                         * at a particular thread (in this case the sweeper) was
 
2980
                         * being caught by a different thread and killing the server
 
2981
                         * sometimes. Disconcerting.
 
2982
                         * (this may only be a problem on Mac OS X)
 
2983
                        xt_kill_thread(thread);
 
2984
                         */
 
2985
                        xt_wait_for_thread_to_exit(tid, FALSE);
 
2986
        
 
2987
                        /* PMC - This should not be necessary to set the signal here, but in the
 
2988
                         * debugger the handler is not called!!?
 
2989
                        thr_sw->t_delayed_signal = SIGTERM;
 
2990
                        xt_kill_thread(thread);
 
2991
                         */
 
2992
                        db->db_sw_thread = NULL;
 
2993
                }
 
2994
                else
 
2995
                        freer_(); // xt_unlock_mutex(&db->db_sw_lock)
 
2996
        }
 
2997
}
 
2998
 
 
2999
xtPublic void xt_wakeup_sweeper(XTDatabaseHPtr db)
 
3000
{
 
3001
        /* This flag makes the gap for the race condition
 
3002
         * very small.
 
3003
         *
 
3004
         * However, this posibility still remains because
 
3005
         * we do not lock the mutex db_sw_lock here.
 
3006
         *
 
3007
         * The reason is that it is too expensive.
 
3008
         *
 
3009
         * In the event that the wakeup is missed the sleeper
 
3010
         * wait will timeout eventually.
 
3011
         */
 
3012
        if (db->db_sw_idle) {
 
3013
                if (!xt_broadcast_cond_ns(&db->db_sw_cond))
 
3014
                        xt_log_and_clear_exception_ns();
 
3015
        }
 
3016
}