209
196
ASSERT(self->st_xact_data);
211
wf.wf_waiting = self->st_xact_data->xd_start_id;
212
wf.wf_for_me = xn_id;
198
wf.wf_waiting_xn_id = self->st_xact_data->xd_start_xn_id;
199
wf.wf_for_me_xn_id = xn_id;
214
xt_mutex_lock(&db->db_xn_wait_lock);
201
xt_lock_mutex_ns(&db->db_xn_wait_lock);
217
if (!(xact = xn_get_xact(db, xn_id)))
204
if (!(xact = xt_xn_get_xact(db, xn_id)))
220
207
/* This is a dirty read, but it should work! */
221
if (xact->xd_end_id || xact->xd_start_id != xn_id)
208
if ((xact->xd_flags & XT_XN_XAC_ENDED) || xact->xd_start_xn_id != xn_id)
224
if (xn_detect_deadlock(db, wf.wf_waiting, wf.wf_for_me))
227
if (!xt_sl_insert(NULL, db->db_xn_wait_for, &wf.wf_waiting, &wf))
230
if (xact->xd_end_id || xact->xd_start_id != xn_id)
211
if (xn_detect_deadlock(db, wf.wf_waiting_xn_id, wf.wf_for_me_xn_id))
214
if (!xt_sl_insert(NULL, db->db_xn_wait_for, &wf.wf_waiting_xn_id, &wf))
217
if ((xact->xd_flags & XT_XN_XAC_ENDED) || xact->xd_start_xn_id != xn_id)
233
220
/* Timed wait because it is possible that transaction quits before
234
221
* we go to sleep.
236
223
if (!xt_timed_wait_cond(NULL, &db->db_xn_wait_cond, &db->db_xn_wait_lock, 2 * 1000)) {
237
xt_sl_delete(self, db->db_xn_wait_for, &wf.wf_waiting);
224
xt_sl_delete(self, db->db_xn_wait_for, &wf.wf_waiting_xn_id);
241
xt_sl_delete(self, db->db_xn_wait_for, &wf.wf_waiting);
228
xt_sl_delete(self, db->db_xn_wait_for, &wf.wf_waiting_xn_id);
244
xt_mutex_unlock(&db->db_xn_wait_lock);
234
xt_unlock_mutex_ns(&db->db_xn_wait_lock);
248
xt_mutex_unlock(&db->db_xn_wait_lock);
238
xt_unlock_mutex_ns(&db->db_xn_wait_lock);
252
242
xtPublic void xt_xn_wakeup_transactions(XTDatabaseHPtr db, xtBool wait_for_sweeper)
244
/* This flag makes the gap for the race condition
247
* However, this posibility still remains because
248
* we do not lock the mutex db_xn_wait_lock here.
250
* The reason is that it is too expensive.
252
* In the event that the wakeup is missed the sleeper
253
* wait will timeout eventually.
255
db->db_sw_wakeup = TRUE;
255
256
if (!xt_broadcast_cond(NULL, &db->db_xn_wait_cond))
256
257
xt_log_and_clear_exception_ns();
258
if (wait_for_sweeper) {
261
xt_mutex_lock(&db->db_xn_wait_lock);
262
idle = db->db_sw_idle;
263
xt_mutex_unlock(&db->db_xn_wait_lock);
270
260
/* ----------------------------------------------------------------------
274
static void xn_logname(size_t size, char *path, XTDatabaseHPtr db, xtWord4 log_no)
279
sprintf(name, "xtlog-%lu.xt", (u_long) log_no);
280
xt_strcpy(PATH_MAX, path, db->db_path);
281
xt_add_dir_char(PATH_MAX, path);
282
xt_strcat(PATH_MAX, path, name);
285
static xtBool xn_shared_read(XTOpenFilePtr of, off_t offset, size_t *io_size, void *data)
287
size_t size = *io_size;
289
return xt_pread_file(of, offset, size, 0, data, io_size);
292
static xtBool xn_shared_write(XTOpenFilePtr of, off_t offset, size_t size, void *data)
294
return xt_pwrite_file(of, offset, size, data);
297
/* This function ensures that the record at the given offset
298
* is completely in the buffer.
300
static xtBool xn_read_log(XTXactLogPtr xlog, XTOpenFilePtr of, off_t offset, size_t *io_size, void *data)
302
size_t size = *io_size;
307
if (!xlog->xl_offset || offset + size <= xlog->xl_file_size)
308
return xn_shared_read(of, offset, io_size, data);
310
xt_rwlock_rdlock(&xlog->xl_rwlock);
312
/* Check required again, after lock! */
313
if (!xlog->xl_offset || (offset + size <= xlog->xl_file_size)) {
314
xt_rwlock_unlock(&xlog->xl_rwlock);
316
/* Expensive (unlock and lock) but it should not happen often: */
317
ok = xn_shared_read(of, offset, io_size, data);
321
if (offset < xlog->xl_file_size) {
322
/* Part in file plus first then part of buffer: */
323
tfer = (size_t) (xlog->xl_file_size - offset);
325
if (boff > xlog->xl_offset)
326
boff = xlog->xl_offset;
327
memcpy(((char *) data) + tfer, xlog->xl_buffer, boff); // Copy the rest
328
xt_rwlock_unlock(&xlog->xl_rwlock);
330
ok = xn_shared_read(of, offset, &tfer, data);
331
*io_size = tfer + boff;
335
/* Read completely in buffer: */
336
boff = (u_int) (offset - xlog->xl_file_size);
338
if (tfer > xlog->xl_offset - boff)
339
tfer = xlog->xl_offset - boff;
340
memcpy(data, xlog->xl_buffer + boff, tfer);
342
xt_rwlock_unlock(&xlog->xl_rwlock);
347
static xtBool xn_write_log(XTXactLogPtr xlog, XTOpenFilePtr of, off_t offset, size_t size, void *data)
353
if (!xlog->xl_offset || offset + size <= xlog->xl_file_size) {
354
ok = xn_shared_write(of, offset, size, data);
358
xt_rwlock_rdlock(&xlog->xl_rwlock);
360
/* Check required again, after lock! */
361
if (!xlog->xl_offset || (offset + size <= xlog->xl_file_size)) {
362
xt_rwlock_unlock(&xlog->xl_rwlock);
364
/* Expensive (unlock and lock) but it should not happen often: */
365
ok = xn_shared_write(of, offset, size, data);
369
if (offset < xlog->xl_file_size) {
370
/* Part in file plus first then part of buffer: */
371
tfer = (size_t) (xlog->xl_file_size - offset);
373
if (boff > xlog->xl_offset)
375
memcpy(xlog->xl_buffer, ((char *) data) + tfer, boff); // Copy the rest
376
xt_rwlock_unlock(&xlog->xl_rwlock);
378
ok = xn_shared_write(of, offset, tfer, data);
382
/* Read completely in buffer: */
383
boff = (u_int) (offset - xlog->xl_file_size);
385
if (tfer > xlog->xl_offset - boff)
387
memcpy(xlog->xl_buffer + boff, data, tfer);
389
xt_rwlock_unlock(&xlog->xl_rwlock);
393
/* Tried to write past the EOF! */
394
xt_rwlock_unlock(&xlog->xl_rwlock);
395
xt_register_ferrno(XT_REG_CONTEXT, ESPIPE, xt_file_path(xlog->xl_exfile));
399
static xtBool xn_append_log(XTThreadPtr self, size_t size, void *data, off_t *address)
401
register XTXactLogPtr xlog;
403
if (!(xlog = self->st_xact_log)) {
404
if (!(xlog = xn_get_log_for_writing(self->st_database, XT_FOR_USER)))
406
self->st_xact_log = xlog;
409
ASSERT_NS(xlog->xl_exfile);
411
if (xlog->xl_offset + size > XT_XACT_LOG_BUFFER_SIZE) {
412
if (!xt_xn_flush_log(xlog))
416
ASSERT_NS(size <= XT_DATA_LOG_BUFFER_SIZE);
418
// The record fits in the buffer: */
420
*address = xlog->xl_file_size + (off_t) xlog->xl_offset;
421
memcpy(xlog->xl_buffer + xlog->xl_offset, data, size);
422
xlog->xl_offset += size;
429
266
u_long tot_alloced;
701
static xtBool xn_seq_init(XTThreadPtr self, XTXactSeqReadPtr sr, XTXactLogPtr xlog, XTOpenFilePtr of)
707
sr->sr_buffer = (xtWord1 *) xt_malloc(self, XT_XACT_LOG_READ_BUFFER_SIZE);
708
return sr->sr_buffer != NULL;
711
static void xn_seq_exit(XTThreadPtr self, XTXactSeqReadPtr sr)
714
xt_free(self, sr->sr_buffer);
715
sr->sr_buffer = NULL;
723
static xtBool xn_seq_read(XTXactSeqReadPtr sr, off_t offset, XTXactLogBufferDPtr *entry, size_t *space)
725
if (offset >= sr->sr_offset && offset + sizeof(XTXactLogBufferDRec) <= sr->sr_offset + sr->sr_length) {
726
/* Completely in the buffer: */
729
boff = (size_t) (offset - sr->sr_offset);
730
*entry = (XTXactLogBufferDPtr) (sr->sr_buffer + boff);
731
*space = sr->sr_length - boff;
735
/* Although the record may be partially in the buffer we don't bother
736
* to preserver bytes arlready read because the records are so
739
sr->sr_length = XT_XACT_LOG_READ_BUFFER_SIZE;
740
sr->sr_offset = offset;
741
if (!xn_read_log(sr->sr_xlog, sr->sr_of, offset, &sr->sr_length, sr->sr_buffer))
743
*entry = (XTXactLogBufferDPtr) sr->sr_buffer;
744
*space = sr->sr_length;
748
#define XN_MODE_RECOVER 0
749
#define XN_MODE_EXCLUSIVE 1
750
#define XN_MODE_SWEEPER 2
752
static xtBool xn_recover_log(XTXactLogPtr xlog, XTOpenFilePtr of)
754
XTDatabaseHPtr db = xlog->xl_db;
757
XTXactLogBufferDPtr entry;
760
xtWord8 curr_xn_id = 0;
763
u_int active_count = 0;
766
xlog->xl_file_size = xt_seek_eof_file(NULL, of);
767
if (xlog->xl_file_size == -1)
770
if (!xn_seq_init(NULL, &sr, xlog, of))
776
if (!xn_seq_read(&sr, curr_offset, &entry, &space))
780
switch (entry->xb.xe_status_1) {
781
case XT_XN_STATUS_HEADER: {
782
if (space < offsetof(XTXactLogHeaderDRec, xh_size_4) + 4)
784
size = XT_GET_DISK_4(entry->xh.xh_size_4);
785
if (space < size || size < 18)
787
xn_id = XT_GET_DISK_6(entry->xh.xh_curr_id_6);
788
if (xn_id > db->db_xn_curr_id)
789
db->db_xn_curr_id = xn_id;
790
tab_id = XT_GET_DISK_4(entry->xh.xh_tab_id_4);
791
if (tab_id > db->db_curr_tab_id)
792
db->db_curr_tab_id = tab_id;
796
case XT_XN_STATUS_BEGIN:
797
case XT_XN_STATUS_COMMITTED:
798
case XT_XN_STATUS_ABORTED:
799
if (space < sizeof(XTXactBeginEntryDRec))
801
curr_xn_id = XT_GET_DISK_6(entry->xb.xe_xact_id_6);
802
if (curr_xn_id > db->db_xn_curr_id)
803
db->db_xn_curr_id = curr_xn_id;
805
if (entry->xb.xe_status_1 == XT_XN_STATUS_BEGIN) {
806
if (!(xact = xn_add_old_xact(db, curr_xn_id)))
809
xact->xd_begin_log = xlog->xl_number;
810
xact->xd_begin_offset = curr_offset;
811
xact->xd_committed = FALSE;
812
xact->xd_end_id = curr_xn_id;
815
/* This may affect the "minimum RAM transaction": */
816
if (!db->db_xn_min_ram_id || curr_xn_id < db->db_xn_min_ram_id)
817
db->db_xn_min_ram_id = curr_xn_id;
822
curr_offset += sizeof(XTXactBeginEntryDRec);
824
case XT_XN_STATUS_COMMIT:
825
case XT_XN_STATUS_ABORT:
826
if (space < sizeof(XTXactEndEntryDRec))
828
xn_id = XT_GET_DISK_6(entry->xe.xe_xend_id_6);
829
if (curr_xn_id && (xact = xn_get_xact(db, curr_xn_id))) {
830
xact->xd_committed = entry->xb.xe_status_1 == XT_XN_STATUS_COMMIT;
831
xact->xd_end_id = xn_id;
834
curr_offset += sizeof(XTXactEndEntryDRec);
836
case XT_XN_STATUS_UPDATE:
837
case XT_XN_STATUS_INSERT:
838
case XT_XN_STATUS_DELETE:
839
if (space < sizeof(XTactUpdateEntryDRec))
841
curr_offset += sizeof(XTactUpdateEntryDRec);
843
case XT_XN_STATUS_CURR_IDS:
844
if (space < sizeof(XTXactCurrEntryDRec))
847
xn_id = XT_GET_DISK_6(entry->xc.xe_curr_id_6);
848
if (xn_id > db->db_xn_curr_id) {
849
db->db_xn_curr_id = xn_id;
851
tab_id = XT_GET_DISK_4(entry->xc.xe_tab_id_4);
852
if (tab_id > db->db_curr_tab_id)
853
db->db_curr_tab_id = tab_id;
855
curr_offset += sizeof(XTXactCurrEntryDRec);
863
xn_seq_exit(NULL, &sr);
864
if (xlog->xl_file_size != curr_offset) {
865
xt_registerf(XT_REG_CONTEXT, XT_ERR_XLOG_WAS_CORRUPTED, 0, "Corrupted transaction log, '%s' has been truncated at %lu", xt_file_path(of), (u_long) curr_offset);
866
xt_log_and_clear_exception_ns();
867
xlog->xl_file_size = curr_offset;
868
if (!xt_set_eof_file(NULL, of, xlog->xl_file_size))
870
xlog->xl_file_size = curr_offset;
873
if (!active_count && xlog->xl_file_size >= XT_XACT_LOG_ROLLOVER_SIZE)
874
xlog->xl_file_size = 0;
876
if (db->db_xn_curr_id > db->db_xn_max_disk_id)
877
db->db_xn_max_disk_id = db->db_xn_curr_id;
879
xlog->xl_recovered = TRUE;
883
xn_seq_exit(NULL, &sr);
887
static void xn_free_log(XTXactLogPtr xlog)
889
xlog->xl_ref_count--;
890
if (!xlog->xl_ref_count) {
891
if (xlog->xl_sw_file)
892
xt_close_file_ns(xlog->xl_sw_file);
895
xt_close_file_ns(xlog->xl_exfile);
897
xt_free_rwlock(&xlog->xl_rwlock);
903
static void xn_recalc_high_log(XTDatabaseHPtr db)
907
old_high = db->db_xn_high_log;
908
while (db->db_xn_high_log > 0) {
909
if (!db->db_xn_log_list[db->db_xn_high_log-1] ||
910
db->db_xn_log_list[db->db_xn_high_log-1]->xl_file_size > 0)
912
db->db_xn_high_log--;
915
for (u_int i=db->db_xn_high_log; i<old_high; i++) {
916
if (db->db_xn_log_list[i]) {
917
ASSERT_NS(!db->db_xn_log_list[i]->xl_file_size);
918
if (!db->db_xn_log_list[i]->xl_file_size) {
919
xn_free_log(db->db_xn_log_list[i]);
920
db->db_xn_log_list[i] = NULL;
926
static xtBool xn_create_new_header(XTXactLogPtr xlog, XTOpenFilePtr of, XTDatabaseHPtr db)
928
XTXactLogHeaderDRec header;
930
memset(&header, 0, sizeof(header));
931
XT_SET_DISK_4(header.xh_magic_4, XT_XN_XACT_LOG_MAGIC);
932
XT_SET_DISK_4(header.xh_size_4, sizeof(XTXactLogHeaderDRec));
933
XT_SET_DISK_6(header.xh_curr_id_6, db->db_xn_curr_id);
934
XT_SET_DISK_4(header.xh_tab_id_4, db->db_curr_tab_id);
935
if (!xt_pwrite_file(of, 0, sizeof(XTXactLogHeaderDRec), &header))
937
xlog->xl_file_size = sizeof(XTXactLogHeaderDRec);
938
if (!xt_set_eof_file(NULL, of, xlog->xl_file_size))
941
if (db->db_xn_curr_id > db->db_xn_max_disk_id)
942
db->db_xn_max_disk_id = db->db_xn_curr_id;
947
static xtBool xn_use_xlog(XTDatabaseHPtr db, XTXactLogPtr *out_xlog, xtWord4 log_no, int mode)
952
xt_mutex_lock(&db->db_xn_log_lock);
954
/* We use offset log_no - 1: */
955
if (log_no > db->db_xn_log_count) {
956
if (!xt_sys_realloc((void **) &db->db_xn_log_list, log_no * sizeof(XTXactLogPtr)))
958
memset(&db->db_xn_log_list[db->db_xn_log_count], 0, (log_no - db->db_xn_log_count) * sizeof(XTXactLogPtr));
959
db->db_xn_log_count = log_no;
962
if (!(xlog = db->db_xn_log_list[log_no-1])) {
964
if (!(xlog = (XTXactLogPtr) xt_sys_malloc(sizeof(XTXactLogRec))))
966
memset(xlog, 0, offsetof(XTXactLogRec, xl_buffer));
968
xlog->xl_number = log_no;
969
xlog->xl_ref_count = 1;
971
if (!xt_init_rwlock(NULL, &xlog->xl_rwlock)) {
976
db->db_xn_log_list[log_no-1] = xlog; // This reference was counted above
979
xn_logname(PATH_MAX, path, db, log_no);
981
if (mode == XN_MODE_RECOVER) {
982
if (!xlog->xl_recovered) {
985
if (!(of = xt_open_file_ns(path, XT_FS_DEFAULT)))
988
if (!xn_recover_log(xlog, of)) {
989
xt_close_file_ns(of);
993
if (!xlog->xl_file_size && !xn_create_new_header(xlog, of, db)) {
994
xt_close_file_ns(of);
998
xt_close_file_ns(of);
1003
else if (mode == XN_MODE_SWEEPER) {
1004
if (!xlog->xl_sw_file) {
1005
if (!(xlog->xl_sw_file = xt_open_file_ns(path, XT_FS_DEFAULT)))
1008
if (!xlog->xl_recovered) {
1009
if (!xn_recover_log(xlog, xlog->xl_sw_file)) {
1010
xt_close_file_ns(xlog->xl_sw_file);
1011
xlog->xl_sw_file = NULL;
1017
/* Already open for exclusive use. */
1018
if (xlog->xl_exfile) {
1023
ASSERT_NS(!xlog->xl_offset);
1024
if (xlog->xl_recovered && xlog->xl_file_size >= XT_XACT_LOG_ROLLOVER_SIZE) {
1029
if (!(xlog->xl_exfile = xt_open_file_ns(path, XT_FS_CREATE)))
1032
if (!xlog->xl_recovered) {
1033
if (!xn_recover_log(xlog, xlog->xl_exfile))
1037
if (xlog->xl_file_size == 0) {
1038
if (!xn_create_new_header(xlog, xlog->xl_exfile, db))
1041
else if (xlog->xl_file_size >= XT_XACT_LOG_ROLLOVER_SIZE) {
1042
/* The log does not have enough space: */
1043
xt_close_file_ns(xlog->xl_exfile);
1044
xlog->xl_exfile = NULL;
1050
if (db->db_xn_log_list[log_no-1] && db->db_xn_log_list[log_no-1]->xl_file_size) {
1051
if (log_no > db->db_xn_high_log)
1052
db->db_xn_high_log = log_no;
1057
xlog->xl_ref_count++;
1061
xt_mutex_unlock(&db->db_xn_log_lock);
1065
xt_close_file_ns(xlog->xl_exfile);
1066
xlog->xl_exfile = NULL;
1069
xt_mutex_unlock(&db->db_xn_log_lock);
1073
/* Get a transaction log for writting. We fill the logs
1074
* at the end first in the hopes of reducing the
1077
static XTXactLogPtr xn_get_log_for_writing(XTDatabaseHPtr db, int what_for)
1080
XTXactLogPtr xlog = NULL;
1083
high = db->db_xn_high_log;
1085
if (what_for == XT_FOR_SWEEPER)
1088
log_no = db->db_xn_next_log;
1093
for (xtWord4 i=0; i<high; i++) {
1096
if (!xn_use_xlog(db, &xlog, log_no, XN_MODE_EXCLUSIVE))
1099
ASSERT_NS(xlog->xl_exfile);
1109
if (!xn_use_xlog(db, &xlog, log_no, XN_MODE_EXCLUSIVE))
1112
ASSERT_NS(xlog->xl_exfile);
1115
if (what_for != XT_FOR_SWEEPER)
1116
db->db_xn_next_log = log_no-1;
1120
xtPublic xtBool xt_xn_flush_log(XTXactLogPtr xlog)
1124
if (xlog->xl_offset) {
1125
xt_rwlock_wrlock(&xlog->xl_rwlock);
1126
if ((ok = xt_pwrite_file(xlog->xl_exfile, xlog->xl_file_size, xlog->xl_offset, xlog->xl_buffer))) {
1127
/* Locking is to make sure the reader has a consistant view of the length of the file. */
1128
xlog->xl_file_size += xlog->xl_offset;
1129
xlog->xl_offset = 0;
1131
xt_rwlock_unlock(&xlog->xl_rwlock);
1136
static void xn_unlock_xlog(XTXactLogPtr xlog)
1138
if (xlog->xl_exfile) {
1139
if (!xt_xn_flush_log(xlog))
1140
xt_log_and_clear_exception_ns();
1141
xt_close_file_ns(xlog->xl_exfile);
1142
xlog->xl_exfile = NULL;
1146
static void xn_release_log(XTXactLogPtr xlog)
1148
XTDatabaseHPtr db = xlog->xl_db;
1150
xt_mutex_lock(&db->db_xn_log_lock);
1152
xt_mutex_unlock(&db->db_xn_log_lock);
1155
static void xn_release_log_w_self(XTThreadPtr self, XTXactLogPtr xlog)
1157
xn_release_log(xlog);
1160
540
/* ----------------------------------------------------------------------
1448
822
xtPublic xtBool xt_xn_commit(XTThreadPtr self)
1450
XT_DEBUG_TRACE(("COMMIT %p tx=%d\n", self, (int) self->st_xact_data->xd_start_id));
1451
return xn_end_xact(self, XT_XN_STATUS_COMMIT);
824
#ifdef PBXT_TRACE_STAT
825
xt_tracef_query(self, "COMMIT T%lu\n", (u_long) self->st_xact_data->xd_start_xn_id);
827
#ifdef TRACE_VARIATIONS
828
xt_trace("%s commit: T%d\n", self->t_name, (int) self->st_xact_data->xd_start_xn_id);
830
XT_DEBUG_TRACE(("COMMIT %p tx=%d\n", self, (int) self->st_xact_data->xd_start_xn_id));
831
return xn_end_xact(self, XT_LOG_ENT_COMMIT);
1454
834
xtPublic xtBool xt_xn_rollback(XTThreadPtr self)
1456
XT_DEBUG_TRACE(("ABORT %p tx=%d\n", self, (int) self->st_xact_data->xd_start_id));
1457
return xn_end_xact(self, XT_XN_STATUS_ABORT);
1460
/* Return TRUE if records written by the given transaction are
1461
* visible to the thread (i.e. the transaction of the thread).
1463
xtPublic xtBool xt_xn_visible(XTOpenTablePtr ot, xtWord8 xn_id, off_t address, xtBool *mine)
1465
register XTThreadPtr self = ot->ot_thread;
1466
register XTXactDataPtr xact;
1468
/* NOTE: If a transaction is not in RAM, then it is considered aborted.
1469
* This means that we can only remove a transaction from memory when
1470
* all transactions that were running when we started cleanup have
1472
* Only these transactions may have read something that has been
1473
* changed by the sweeper in the meantime.
1474
* For example a transaction may fill its buffer when doing a
1475
* sequential read. A record may be cleaned by the sweeper that
1476
* is in this buffer, but the reader already has a old copy of
1477
* the data. Then the sweeper removes the transaction
1478
* from RAM. The reader will then consider this record
1481
if (xn_id < self->st_database->db_xn_min_ram_id) {
1482
/* This record is not clean, and the transaction is not in
1483
* RAM. This means it has be missed, so clean it up.
1485
xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, address);
1488
if (xn_id == self->st_xact_data->xd_start_id) {
1493
if (xn_id > self->st_xact_data->xd_start_id)
1494
/* This record is written after the this transaction
1495
* started (is not visible).
1498
if (!(xact = xn_get_xact(self->st_database, xn_id))) {
1499
xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, address);
1502
if (!xact->xd_end_id || xact->xd_end_id >= self->st_xact_data->xd_start_id)
1503
/* Either this transaction has not yet ended, or this
1504
* record was written by a transaction that ended
1505
* after the reading transaction started!
1506
* So this record is not visible!
1509
/* Visible if the transaction was committed: */
1510
return xact->xd_committed;
1514
* Return TRUE if the record has been commited.
1516
xtPublic xtBool xt_xn_committed(XTOpenTablePtr ot, xtWord8 xn_id, off_t address, xtBool *mine)
1518
register XTThreadPtr self = ot->ot_thread;
1519
register XTXactDataPtr xact;
1521
if (xn_id < self->st_database->db_xn_min_ram_id) {
1522
/* This record is not clean, and the transaction is not in
1523
* RAM. This means it has be missed, so clean it up.
1525
xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, address);
1528
if (xn_id == self->st_xact_data->xd_start_id) {
1533
if (!(xact = xn_get_xact(self->st_database, xn_id))) {
1534
xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, address);
1537
if (!xact->xd_end_id)
1538
/* Either this transaction has not yet ended: */
1540
/* TRUE if the record was committed: */
1541
return xact->xd_committed;
1545
* Return TRUE of the transaction is committed, or may be
1546
* committed in the future.
1548
* if used, 'wait' must be initialized to FALSE!
1550
* It will be set to TRUE if the transaction has not yet ended.
1551
* Return FALSE of the transaction was aborted.
1553
xtPublic xtBool xt_xn_may_commit(XTOpenTablePtr ot, xtWord8 xn_id, off_t address, xtBool *mine, xtBool *wait)
1555
register XTThreadPtr self = ot->ot_thread;
1556
register XTXactDataPtr xact;
1558
if (xn_id < self->st_database->db_xn_min_ram_id) {
1559
/* Not in RAM, rollback done: */
1560
xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, address);
1563
if (xn_id == self->st_xact_data->xd_start_id) {
1568
if (!(xact = xn_get_xact(self->st_database, xn_id))) {
1569
/* Not in RAM, rollback done: */
1570
xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, address);
1573
if (!xact->xd_end_id) {
836
#ifdef PBXT_TRACE_STAT
837
xt_tracef_query(self, "ABORT T%lu\n", (u_long) self->st_xact_data->xd_start_xn_id);
839
#ifdef TRACE_VARIATIONS
840
xt_trace("%s abort: T%d\n", self->t_name, (int) self->st_xact_data->xd_start_xn_id);
842
XT_DEBUG_TRACE(("ABORT %p tx=%d\n", self, (int) self->st_xact_data->xd_start_xn_id));
843
return xn_end_xact(self, XT_LOG_ENT_ABORT);
846
xtPublic xtBool xt_xn_log_tab_id(XTThreadPtr self, xtTableID tab_id)
848
XTXactNewTabEntryDRec entry;
850
entry.xt_status_1 = XT_LOG_ENT_NEW_TAB;
851
entry.xt_checksum_1 = XT_CHECKSUM_1(tab_id);
852
XT_SET_DISK_4(entry.xt_tab_id_4, tab_id);
853
return self->st_xact_buf.xbuf_log_data(self, sizeof(XTXactNewTabEntryDRec), (XTXactLogBufferDPtr) &entry, TRUE);
857
* XT_XN_ABORTED - Transaction was aborted.
858
* XT_XN_MY_UPDATE - The record was update by me.
859
* XT_XN_OTHER_UPDATE - The record was updated by someone else.
860
* XT_XN_COMMITTED - The transaction was committed.
862
xtPublic int xt_xn_status(XTOpenTablePtr ot, xtXactID xn_id, xtRecordID rec_id)
864
register XTThreadPtr self = ot->ot_thread;
865
register XTXactDataPtr xact;
867
if (xt_xn_is_before(xn_id, self->st_database->db_xn_min_ram_id)) {
868
/* Not in RAM, rollback done: */
869
//*DBG*/xt_dump_trace();
870
//*DBG*/xt_dump_xlogs(self->st_database);
871
//*DBG*/xt_check_table(self, ot);
872
xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, rec_id);
873
return XT_XN_ABORTED;
875
if (xn_id == self->st_xact_data->xd_start_xn_id)
876
return XT_XN_MY_UPDATE;
877
if (!(xact = xt_xn_get_xact(self->st_database, xn_id))) {
878
/* Not in RAM, rollback done: */
879
xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, rec_id);
880
return XT_XN_ABORTED;
882
if (!(xact->xd_flags & XT_XN_XAC_ENDED))
1574
883
/* Transaction not ended, may be visible. */
884
return XT_XN_OTHER_UPDATE;
1579
885
/* Visible if the transaction was committed: */
1580
return xact->xd_committed;
1583
/* ----------------------------------------------------------------------
1587
static xtBool xn_log_begin(XTThreadPtr self, register XTXactDataPtr xact)
1589
XTXactBeginEntryDRec begin;
1591
begin.xe_status_1 = XT_XN_STATUS_BEGIN;
1592
XT_SET_DISK_6(begin.xe_xact_id_6, xact->xd_start_id);
1593
return xn_append_log(self, sizeof(XTXactBeginEntryDRec), &begin, &xact->xd_begin_offset);
1597
* This function makes sure that this transaction is less than the
1598
* next transaction number that will be used on restart.
1600
* The next number to be used on restart is the highest
1601
* number on disk plus a certain constant:
1602
* XT_TN_NUMBER_INCREMENT (>= 1).
1604
* The function must be called before a transaction ID is written
1605
* to a record on disk.
1607
* It ensures that the record can be correctly identified
1608
* as invalid (i.e. rolled back). A record is rolled back
1609
* if it has a transaction ID that was not committed. Any
1610
* record that is not clean, but has an ID smaller than
1611
* all transactions in RAM is considered invalid.
1613
* If this is not the case, we write this transaction number
1616
xtPublic xtBool xt_xn_log_begin(XTOpenTablePtr ot)
1618
register XTXactDataPtr xact;
1620
if (!(xact = ot->ot_thread->st_xact_data)) {
1621
xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_TRANSACTION);
1624
if (!xact->xd_begin_log) {
1625
register XTDatabaseHPtr db;
1627
db = ot->ot_table->tab_db;
1628
if (xact->xd_start_id > db->db_xn_max_disk_id + XT_TN_NUMBER_INCREMENT) {
1629
if (!xn_log_begin(ot->ot_thread, xact))
1631
ASSERT_NS(ot->ot_thread->st_xact_log);
1633
xact->xd_begin_log = ot->ot_thread->st_xact_log->xl_number;
1634
if (!xt_xn_flush_log(ot->ot_thread->st_xact_log))
1636
if (xact->xd_start_id > db->db_xn_max_disk_id)
1637
db->db_xn_max_disk_id = xact->xd_start_id;
1643
xtPublic xtBool xt_xn_log_update(XTOpenTablePtr ot, off_t record, u_int status, u_int rec_type)
1645
XTactUpdateEntryDRec entry;
1646
register XTXactDataPtr xact;
1648
xact = ot->ot_thread->st_xact_data;
1649
if (!xact->xd_begin_log) {
1650
if (!xn_log_begin(ot->ot_thread, xact))
1652
ASSERT_NS(ot->ot_thread->st_xact_log);
1653
xact->xd_begin_log = ot->ot_thread->st_xact_log->xl_number;
1656
entry.xe_status_1 = (xtWord1) status;
1657
entry.xe_rec_type_1 = (xtWord1) rec_type;
1658
XT_SET_DISK_4(entry.xe_tab_id_4, ot->ot_table->tab_id);
1659
XT_SET_DISK_6(entry.xe_record_6, record);
1660
return xn_append_log(ot->ot_thread, sizeof(XTactUpdateEntryDRec), &entry, NULL);
1663
xtPublic xtBool xt_xn_log_ids(XTThreadPtr self, XTDatabaseHPtr db)
1665
XTXactCurrEntryDRec entry;
1667
/* Before we delete a log, we have to preserve the current IDs: */
1668
entry.xe_status_1 = XT_XN_STATUS_CURR_IDS;
1669
XT_SET_DISK_6(entry.xe_curr_id_6, db->db_xn_curr_id);
1670
XT_SET_DISK_4(entry.xe_tab_id_4, db->db_curr_tab_id);
1671
if (!xn_append_log(self, sizeof(XTXactCurrEntryDRec), &entry, NULL))
1674
if (!xn_flush_log_and_rollover(self, XT_FOR_SWEEPER))
1677
if (db->db_xn_curr_id > db->db_xn_max_disk_id)
1678
db->db_xn_max_disk_id = db->db_xn_curr_id;
1682
/* ----------------------------------------------------------------------
1686
static void xn_sw_free_tables(XTThreadPtr self, XTDatabaseHPtr db)
1688
XTSWTablePtr st, tmp_st;
1690
if ((st = db->db_sw_tables)) {
1692
tmp_st = st->st_less_ru;
1694
xt_flush_table(st->st_table);
1695
xt_close_table(st->st_table);
1699
} while (st != db->db_sw_tables);
1701
db->db_sw_tab_count = 0;
1702
db->db_sw_tables = NULL;
1705
static void xn_sw_flush_tables(XTThreadPtr self, XTDatabaseHPtr db)
1709
if ((st = db->db_sw_tables)) {
1711
if (st->st_table && st->st_dirty) {
1712
if (!xt_flush_table(st->st_table))
1714
st->st_dirty = FALSE;
1716
st = st->st_less_ru;
1717
} while (st != db->db_sw_tables);
1721
xtPublic void xt_sw_lock_sweeper(XTThreadPtr self, XTDatabaseHPtr db)
1723
/* A deadlock may occur if the calling thread
1724
* has a transaction!
1726
* I have this situation in which this occurs
1727
* without a deadlock:
1729
* create table t2 select * from t1;
1731
* This query creates a transaction to write the data
1732
* to the table. It then closes the table as usual
1735
* Unfortunately I can't remember how the deadlock
1738
//ASSERT(!self->st_xact_data);
1740
xt_lock_mutex(self, &db->db_sw_tab_lock);
1741
pushr_(xt_unlock_mutex, &db->db_sw_tab_lock);
1742
xn_sw_free_tables(self, db);
1743
popr_(); // xt_unlock_mutex(db->db_sw_tab_lock)
1746
xtPublic void xt_sw_unlock_sweeper(XTThreadPtr self, XTDatabaseHPtr db)
1748
xt_unlock_mutex(self, &db->db_sw_tab_lock);
1752
* If we return NULL, set *have_dic to TRUE if the table cannot
1755
static int xn_sw_open_table(XTThreadPtr self, XTOpenTablePtr *ot, XTDatabaseHPtr db, xtWord4 tab_id)
1761
if ((st = db->db_sw_tables)) {
1763
if (st->st_tab_id == tab_id) {
1764
/* Make this table the most recently used: */
1765
if (st != db->db_sw_tables) {
1767
st->st_less_ru->st_more_ru = st->st_more_ru;
1768
st->st_more_ru->st_less_ru = st->st_less_ru;
1770
/* Add to the front: */
1771
st->st_less_ru = db->db_sw_tables;
1772
st->st_more_ru = db->db_sw_tables->st_more_ru;
1773
st->st_less_ru->st_more_ru = st;
1774
st->st_more_ru->st_less_ru = st;
1775
db->db_sw_tables = st;
1777
st->st_dirty = TRUE;
1781
st = st->st_less_ru;
1782
} while (st != db->db_sw_tables);
1785
if ((r = xt_use_table_by_id(self, &tab, db, tab_id)))
1788
pushr_(xt_heap_release, tab);
1790
* Only if a table has been opened by MySQL will we
1791
* have key types and charset information we need
1792
* to compare index keys!
1795
/* Add it to the list: */
1796
st = (XTSWTablePtr) xt_calloc(self, sizeof(XTSWTableRec));
1797
if (!(*ot = xt_open_table(tab))) {
1801
db->db_sw_tab_count++;
1802
st->st_tab_id = tab_id;
1804
st->st_dirty = TRUE;
1805
if (db->db_sw_tables) {
1806
st->st_less_ru = db->db_sw_tables;
1807
st->st_more_ru = db->db_sw_tables->st_more_ru;
1808
st->st_less_ru->st_more_ru = st;
1809
st->st_more_ru->st_less_ru = st;
1810
db->db_sw_tables = st;
1812
/* Too many on the open list: */
1813
if (db->db_sw_tab_count > XT_SW_MAX_OPEN_TABLES) {
1814
/* Remove the least recently used: */
1815
st = db->db_sw_tables->st_more_ru;
1817
st->st_less_ru->st_more_ru = st->st_more_ru;
1818
st->st_more_ru->st_less_ru = st->st_less_ru;
1820
db->db_sw_tab_count--;
1821
xt_flush_table(st->st_table);
1822
xt_close_table(st->st_table);
1827
db->db_sw_tables = st;
1828
st->st_less_ru = st;
1829
st->st_more_ru = st;
1831
freer_(); // xt_heap_release(tab)
886
if (xact->xd_flags & XT_XN_XAC_COMMITTED) {
887
if (!xt_xn_is_before(self->st_visible_time, xact->xd_end_time)) // was self->st_visible_time >= xact->xd_end_time
888
return XT_XN_VISIBLE;
889
return XT_XN_NOT_VISIBLE;
891
return XT_XN_ABORTED;
894
/* ----------------------------------------------------------------------
895
* S W E E P E R P R O C E S S
898
typedef struct XNSweeperState {
899
XTDatabaseHPtr ss_db;
900
XTXactSeqReadRec ss_seqread;
901
XTDataBufferRec ss_databuf;
903
XTBasicQueueRec ss_to_free;
904
xtBool ss_flush_pending;
905
XTOpenTablePtr ss_ot;
906
} XNSweeperStateRec, *XNSweeperStatePtr;
908
static XTOpenTablePtr xn_sw_get_open_table(XTThreadPtr self, XNSweeperStatePtr ss, xtTableID tab_id, int *r)
911
if (ss->ss_ot->ot_table->tab_id == tab_id)
914
/* Flush the table indexes: */
915
/* This is now done by the checkpointer thread...
916
* see xres_cp_flush_indices()
917
if (!xt_flush_table_index(ss->ss_ot))
921
xt_db_return_table_to_pool(self, ss->ss_ot);
926
if (!(ss->ss_ot = xt_db_open_pool_table(self, ss->ss_db, tab_id, r, TRUE)))
933
static void xn_sw_close_open_table(XTThreadPtr self, XNSweeperStatePtr ss)
936
/* Flush the table indexes: */
937
/* This is now done by the checkpointer thread
938
* see xres_cp_flush_indices()
939
if (!xt_flush_table_index(ss->ss_ot))
943
xt_db_return_table_to_pool(self, ss->ss_ot);
1891
998
/* Before we can free this resource, we must wait
1892
999
* until the transaction is clean!
1894
if (free_item->ri_wait_id >= db->db_xn_min_run_id) {
1001
if (!xt_xn_is_before(free_item->ri_wait_xn_id, db->db_xn_min_run_id)) { // was >=
1895
1002
/* Now we have to check to see if we cannot move
1896
1003
* the minimum run ID forward.
1898
1005
XTXactDataPtr xact;
1899
xtWord8 tmp_id; // Protect against concurrent update!
1900
xtWord8 xn_curr_id = xn_get_curr_id(db);
1006
xtXactID tmp_xn_id; // Protect against concurrent update!
1007
xtXactID xn_curr_xn_id = xt_xn_get_curr_id(db);
1903
tmp_id = db->db_xn_min_run_id;
1904
if (tmp_id > xn_curr_id)
1010
tmp_xn_id = db->db_xn_min_run_id;
1011
if (xt_xn_is_before(xn_curr_xn_id, tmp_xn_id))
1906
if ((xact = xn_get_xact(db, tmp_id)) && !xact->xd_end_id)
1013
if ((xact = xt_xn_get_xact(db, tmp_xn_id)) && !(xact->xd_flags & XT_XN_XAC_ENDED))
1907
1014
/* The transaction is still running... */
1909
db->db_xn_min_run_id = tmp_id+1;
1016
db->db_xn_min_run_id = tmp_xn_id+1;
1911
if (free_item->ri_wait_id >= db->db_xn_min_run_id)
1018
if (!xt_xn_is_before(free_item->ri_wait_xn_id, db->db_xn_min_run_id))
1915
1022
/* The transaction is clean, I can free the resource... */
1916
1023
if ((tab_id = free_item->ri_tab_id)) {
1917
1024
/* Free the data record: */
1918
if (!ot || tab_id != ot->ot_table->tab_id) {
1919
switch (xn_sw_open_table(self, &ot, db, tab_id)) {
1920
case XT_TAB_NO_DICTIONARY:
1921
xt_throw_ulxterr(XT_CONTEXT, XT_ERR_NO_DICTIONARY, (u_long) tab_id);
1923
case XT_TAB_POOL_CLOSED:
1924
xt_throw_ulxterr(XT_CONTEXT, XT_ERR_TABLE_LOCKED, (u_long) tab_id);
1929
if (!xt_tab_free_record(ot, free_item->x.ri_address))
1025
if ((ot = xn_sw_get_open_table(self, ss, tab_id, NULL))) {
1026
ss->ss_flush_pending = TRUE;
1027
if (!xt_tab_free_record(ot, XT_LOG_ENT_REC_FREED, free_item->x.ri_rec_id, FALSE))
1935
1032
/* Free the transaction record in memory: */
1936
xn_delete_xact(db, free_item->x.ri_xact_id);
1033
xt_xn_delete_xact(db, free_item->x.ri_xn_id);
1938
1035
/* Recalculate the minimum memory transaction: */
1939
ASSERT(free_item->x.ri_xact_id >= db->db_xn_min_ram_id);
1036
ASSERT(!xt_xn_is_before(free_item->x.ri_xn_id, db->db_xn_min_ram_id));
1941
if (db->db_xn_min_ram_id == free_item->x.ri_xact_id) {
1942
db->db_xn_min_ram_id = free_item->x.ri_xact_id+1;
1038
if (db->db_xn_min_ram_id == free_item->x.ri_xn_id) {
1039
db->db_xn_min_ram_id = free_item->x.ri_xn_id+1;
1946
xtWord8 xn_curr_id = xn_get_curr_id(db);
1043
xtXactID xn_curr_xn_id = xt_xn_get_curr_id(db);
1948
while (db->db_xn_min_ram_id <= xn_curr_id) {
1045
while (!xt_xn_is_before(xn_curr_xn_id, db->db_xn_min_ram_id)) { // was db->db_xn_min_ram_id <= xn_curr_xn_id
1949
1046
/* db_xn_min_ram_id may be changed, by some other process! */
1950
1047
xn_id = db->db_xn_min_ram_id;
1951
if (xn_get_xact(db, xn_id))
1048
if (xt_xn_get_xact(db, xn_id))
1953
1050
db->db_xn_min_ram_id = xn_id+1;
2129
1273
* Read the record to be cleaned. Return TRUE if the cleanup has already been done.
2131
static xtBool xn_sw_cleanup_done(XTThreadPtr self, XTOpenTablePtr ot, XTXactDataPtr xact, off_t address, u_int rec_type, XTTabRecHeadDPtr rec_head)
1275
static xtBool xn_sw_cleanup_done(XTThreadPtr self, XTOpenTablePtr ot, xtRecordID rec_id, xtXactID xn_id, u_int rec_type, u_int stat_id, xtRowID row_id, XTTabRecHeadDPtr rec_head)
2133
if (!xt_tab_get_data(ot, address, sizeof(XTTabRecHeadDRec), (xtWord1 *) rec_head, NULL))
1277
if (!xt_tab_get_rec_data(ot, rec_id, sizeof(XTTabRecHeadDRec), (xtWord1 *) rec_head))
2136
1280
if (rec_type == XN_FORCE_CLEANUP) {
2137
if (rec_head->tr_rec_type_1 == XT_TAB_STATUS_FREED)
1281
if (XT_REC_IS_FREE(rec_head->tr_rec_type_1))
1285
/* Transaction must match: */
1286
if (XT_GET_DISK_4(rec_head->tr_xact_id_4) != xn_id)
2141
1289
/* Record header must match expected value from
2142
* log in order to cleanup!
1290
* log or clean has been done, or is not required.
1292
* For example, it is not required if a record
1293
* has been overwritten in a transaction.
2144
if (rec_head->tr_rec_type_1 != rec_type)
1295
if (rec_head->tr_rec_type_1 != rec_type ||
1296
rec_head->tr_stat_id_1 != stat_id)
2147
/* Transaction must match: */
2148
if (XT_GET_DISK_6(rec_head->tr_xact_id_6) != xact->xd_start_id)
1299
/* Row must match: */
1300
if (XT_GET_DISK_4(rec_head->tr_row_id_4) != row_id)
1307
static void xn_sw_clean_indices(XTThreadPtr self, XTOpenTablePtr ot, xtRecordID rec_id, xtRowID row_id, xtWord1 *rec_data, xtWord1 *rec_buffer)
1309
XTTableHPtr tab = ot->ot_table;
1313
if (!tab->tab_dic.dic_key_count)
1316
cols_req = tab->tab_dic.dic_ind_cols_req;
1317
if (XT_REC_IS_FIXED(rec_data[0]))
1318
rec_buffer = rec_data + XT_REC_FIX_HEADER_SIZE;
1320
if (XT_REC_IS_VARIABLE(rec_data[0])) {
1321
if (!myxt_load_row(ot, rec_data + XT_REC_FIX_HEADER_SIZE, rec_buffer, cols_req))
1324
else if (XT_REC_IS_EXT_DLOG(rec_data[0])) {
1326
if (cols_req && cols_req <= tab->tab_dic.dic_fix_col_count) {
1327
if (!myxt_load_row(ot, rec_data + XT_REC_EXT_HEADER_SIZE, rec_buffer, cols_req))
1331
if (rec_data != ot->ot_row_rbuffer)
1332
memcpy(ot->ot_row_rbuffer, rec_data, tab->tab_dic.dic_rec_size);
1333
if (!xt_tab_load_ext_data(ot, rec_id, rec_buffer, cols_req))
1338
/* This is possible, the record has already been cleaned up. */
1342
ind = tab->tab_dic.dic_keys;
1343
for (u_int i=0; i<tab->tab_dic.dic_key_count; i++, ind++) {
1344
if (!xt_idx_update_row_id(ot, *ind, rec_id, row_id, rec_buffer))
1345
xt_log_and_clear_exception_ns();
1350
xt_log_and_clear_exception_ns();
2156
1354
* Return TRUE if the cleanup was done. FAILED if cleanup could not be done
2157
1355
* because dictionary information is not available.
2159
static xtBool xn_sw_cleanup_variation(XTThreadPtr self, XNSweeperStatePtr ss, XTXactDataPtr xact, xtWord4 tab_id, off_t address, u_int status, u_int rec_type)
1357
static xtBool xn_sw_cleanup_variation(XTThreadPtr self, XNSweeperStatePtr ss, XTXactDataPtr xact, xtTableID tab_id, xtRecordID rec_id, u_int status, u_int rec_type, u_int stat_id, xtRowID row_id, xtWord1 *rec_buf)
2161
1359
XTOpenTablePtr ot;
2162
1360
XTTableHPtr tab;
2163
1361
XTTabRecHeadDRec rec_head;
1362
xtRecordID after_rec_id;
2167
switch (xn_sw_open_table(self, &ot, ss->ss_db, tab_id)) {
2168
case XT_TAB_NOT_FOUND:
2169
/* The table no longer exists, consider cleanup done: */
2171
case XT_TAB_NO_DICTIONARY:
2172
case XT_TAB_POOL_CLOSED:
1366
if (!(ot = xn_sw_get_open_table(self, ss, tab_id, &r))) {
1367
/* The table no longer exists, consider cleanup done: */
1369
case XT_TAB_NOT_FOUND:
1371
case XT_TAB_NO_DICTIONARY:
1372
case XT_TAB_POOL_CLOSED:
2176
1378
tab = ot->ot_table;
2178
1380
/* Make sure the buffer is large enough! */
2179
1381
xt_db_set_size(self, &ss->ss_databuf, (size_t) tab->tab_dic.dic_buf_size);
2181
if (xact->xd_committed) {
1383
xn_id = xact->xd_start_xn_id;
1384
if (xact->xd_flags & XT_XN_XAC_COMMITTED) {
2182
1385
/* The transaction has been committed. Clean the record and
2183
1386
* remove variations no longer in use.
2185
1388
switch (status) {
2186
case XT_XN_STATUS_UPDATE:
2187
if (xn_sw_cleanup_done(self, ot, xact, address, rec_type, &rec_head))
2189
after_rec = (off_t) XT_GET_DISK_6(rec_head.tr_prev_var_6);
2190
xt_sw_delete_variations(self, ss, ot, after_rec);
1389
case XT_LOG_ENT_REC_MODIFIED:
1390
case XT_LOG_ENT_UPDATE:
1391
case XT_LOG_ENT_UPDATE_FL:
1392
case XT_LOG_ENT_UPDATE_BG:
1393
case XT_LOG_ENT_UPDATE_FL_BG:
1394
if (xn_sw_cleanup_done(self, ot, rec_id, xn_id, rec_type, stat_id, row_id, &rec_head))
1396
after_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
1397
xt_sw_delete_variations(self, ss, ot, after_rec_id, row_id, xn_id);
2191
1398
rec_head.tr_rec_type_1 |= XT_TAB_STATUS_CLEANED_BIT;
2192
XT_SET_NULL_DISK_6(rec_head.tr_prev_var_6);
2193
if (!xt_tab_put_data(ot, address, offsetof(XTTabRecHeadDRec, tr_prev_var_6) + 6, (xtWord1 *) &rec_head))
1399
XT_SET_NULL_DISK_4(rec_head.tr_prev_rec_id_4);
1400
if (!xt_tab_put_log_op_rec_data(ot, XT_LOG_ENT_REC_CLEANED, 0, rec_id, offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE, (xtWord1 *) &rec_head))
1402
xn_sw_clean_indices(self, ot, rec_id, row_id, rec_buf, ss->ss_databuf.db_data);
2196
case XT_XN_STATUS_INSERT:
1404
case XT_LOG_ENT_INSERT:
1405
case XT_LOG_ENT_INSERT_FL:
1406
case XT_LOG_ENT_INSERT_BG:
1407
case XT_LOG_ENT_INSERT_FL_BG:
2197
1408
/* In the case of insert, we avoid reading the record into cache! */
2198
1409
rec_head.tr_rec_type_1 = rec_type | XT_TAB_STATUS_CLEANED_BIT;
2199
if (!xt_tab_put_data(ot, address + offsetof(XTTabRecHeadDRec, tr_rec_type_1), 1, (xtWord1 *) &rec_head.tr_rec_type_1))
1410
if (!xt_tab_put_log_op_rec_data(ot, XT_LOG_ENT_REC_CLEANED_1, 0, rec_id, 1, (xtWord1 *) &rec_head.tr_rec_type_1))
1412
xn_sw_clean_indices(self, ot, rec_id, row_id, rec_buf, ss->ss_databuf.db_data);
2202
case XT_XN_STATUS_DELETE:
2203
if (xn_sw_cleanup_done(self, ot, xact, address, rec_type, &rec_head))
2205
after_rec = (off_t) XT_GET_DISK_6(rec_head.tr_prev_var_6);
2206
xt_sw_delete_variations(self, ss, ot, after_rec);
2207
xt_sw_delete_variation(self, ss, ot, address, TRUE);
2208
if ((row_id = XT_GET_DISK_4(rec_head.tr_row_id_4))) {
1414
case XT_LOG_ENT_DELETE:
1415
case XT_LOG_ENT_DELETE_FL:
1416
case XT_LOG_ENT_DELETE_BG:
1417
case XT_LOG_ENT_DELETE_FL_BG:
1418
if (xn_sw_cleanup_done(self, ot, rec_id, xn_id, rec_type, stat_id, row_id, &rec_head))
1420
after_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
1421
xt_sw_delete_variations(self, ss, ot, after_rec_id, row_id, xn_id);
1422
xt_sw_delete_variation(self, ss, ot, rec_id, TRUE, row_id, xn_id);
2209
1424
if (!xt_tab_free_row(ot, tab, row_id))
2217
1432
* variation list. If this means the list is empty, then remove
2218
1433
* the record as well.
2220
off_t first_rec, next_rec, prev_rec;
1435
xtRecordID first_rec_id, next_rec_id, prev_rec_id;
2221
1436
XTTabRecHeadDRec prev_rec_head;
2223
if (xn_sw_cleanup_done(self, ot, xact, address, rec_type, &rec_head))
2226
if (!(row_id = XT_GET_DISK_4(rec_head.tr_row_id_4)))
2228
after_rec = (off_t) XT_GET_DISK_6(rec_head.tr_prev_var_6);
2230
/* Delete the extended record and index entries: */
2231
xt_sw_delete_variation(self, ss, ot, address, FALSE);
1438
if (xn_sw_cleanup_done(self, ot, rec_id, xn_id, rec_type, stat_id, row_id, &rec_head))
1442
row_id = XT_GET_DISK_4(rec_head.tr_row_id_4);
1443
after_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
2233
1445
/* Now remove the record from the variation list,
2234
1446
* (if it is still on the list).
2236
xt_rwlock_wrlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
1448
xt_rwlock_wrlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
2238
1450
/* Find the variation before the variation we wish to remove: */
2239
if (!(xt_tab_get_row(ot, row_id, &first_rec)))
1451
if (!(xt_tab_get_row(ot, row_id, &first_rec_id)))
2242
next_rec = first_rec;
2243
while (next_rec != address) {
1454
next_rec_id = first_rec_id;
1455
while (next_rec_id != rec_id) {
2245
1457
/* The record was not found in the list (we are done) */
1458
xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
2246
1459
goto unlink_done;
2247
if (!xt_tab_get_data(ot, next_rec, sizeof(XTTabRecHeadDRec), (xtWord1 *) &prev_rec_head, NULL))
2249
prev_rec = next_rec;
2250
next_rec = XT_GET_DISK_6(prev_rec_head.tr_prev_var_6);
1461
if (!xt_tab_get_rec_data(ot, next_rec_id, sizeof(XTTabRecHeadDRec), (xtWord1 *) &prev_rec_head)) {
1462
xt_log_and_clear_exception(self);
1465
prev_rec_id = next_rec_id;
1466
next_rec_id = XT_GET_DISK_4(prev_rec_head.tr_prev_rec_id_4);
2253
/* Delete everything except the record. */
2255
/* Unlink the deleted variation: */
2256
XT_SET_DISK_6(prev_rec_head.tr_prev_var_6, after_rec);
2257
if (!xt_tab_put_data(ot, prev_rec + offsetof(XTTabRecHeadDRec, tr_prev_var_6), 6, (xtWord1 *) &prev_rec_head.tr_prev_var_6))
2261
/* Variation to be removed is at the front of the list. */
2262
ASSERT(address == first_rec);
2264
/* Unlink the deleted variation, from the front of the list: */
2265
if (!xt_tab_set_row(ot, row_id, after_rec, TRUE))
1469
if (next_rec_id == rec_id) {
1470
/* The record was found on the list: */
1472
/* Unlink the deleted variation:
1473
* I have found the following sequence:
1475
* 17933 in use 1906112
1476
* 1906112 delete xact=2901 row=17933 prev=2419240
1477
* 2419240 delete xact=2899 row=17933 prev=2153360
1478
* 2153360 record-X C xact=2599 row=17933 prev=0 Xlog=151 Xoff=16824 Xsiz=100
1480
* Despite the following facts which should prevent chains from
1483
* --- Only one transaction can modify a row
1484
* at any one time. So it is not possible for a new change
1485
* to be linked onto an uncommitted change.
1487
* --- Transactions that modify the same row
1488
* twice do not allocate a new record for each change.
1490
* -- A change that has been
1491
* rolled back will not be linked onto. Instead
1492
* the new transaction will link to the last.
1495
* So if the sweeper is slow in doing its job
1496
* we can have the situation that a number of records
1497
* can refer to the last committed record of the
1500
* Only one will be reference by the row pointer.
1502
* The other, will all have been rolled back.
1503
* This occurs over here: [(4)]
1505
XT_SET_DISK_4(prev_rec_head.tr_prev_rec_id_4, after_rec_id);
1506
if (!xt_tab_put_log_op_rec_data(ot, XT_LOG_ENT_REC_UNLINKED, 0, prev_rec_id, offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE, (xtWord1 *) &prev_rec_head))
2269
/* No more variations, remove the row: */
2270
if (!xt_tab_free_row(ot, tab, row_id))
1510
/* Variation to be removed at the front of the list. */
1511
ASSERT(rec_id == first_rec_id);
1513
/* Unlink the deleted variation, from the front of the list: */
1514
if (!xt_tab_set_row(ot, XT_LOG_ENT_ROW_SET, row_id, after_rec_id))
1518
/* No more variations, remove the row: */
1519
if (!xt_tab_free_row(ot, tab, row_id))
1525
xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
1527
/* Note: even when not found on the row list, the record must still
1530
* There might be an exception to this, but there are very definite
1531
* cases where this is required, for example when an unreferenced
1532
* record is fouund and added to the clean up list xn_add_cu_record().
2276
xt_rwlock_unlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
1536
/* Delete the extended record and index entries:
1538
* NOTE! This must be done after we have release the row lock. Because
1539
* a thread that does a duplicate check locks the index, and then
1540
* check whether a row is valid, and can deadlock with
1541
* code that locks a row, then an index!
1543
* However, this should all be OK, because the variation has been removed from the
1544
* row variation list at this stage, and now just need to be deleted.
1546
xt_sw_delete_variation(self, ss, ot, rec_id, FALSE, row_id, xn_id);
2282
xt_rwlock_unlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
1553
xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
2332
1588
xn_sw_could_go_faster(self, db);
2334
if (!xn_seq_read(&ss->ss_seqread, curr_offset, &entry, &space))
1590
if (!db->db_xlog.xlog_seq_next(&ss->ss_seqread, &record, FALSE))
2338
switch (entry->xb.xe_status_1) {
2339
case XT_XN_STATUS_HEADER:
2340
if (space < sizeof(XTXactLogHeaderDRec))
2342
size = XT_GET_DISK_4(entry->xh.xh_size_4);
2345
curr_offset += size;
2347
case XT_XN_STATUS_BEGIN:
2348
if (space < sizeof(XTXactBeginEntryDRec))
2350
if (curr_offset != xact->xd_begin_offset)
2352
begin_offset = curr_offset;
2353
curr_offset += sizeof(XTXactBeginEntryDRec);
2355
case XT_XN_STATUS_COMMITTED:
2356
case XT_XN_STATUS_ABORTED:
2358
case XT_XN_STATUS_COMMIT:
2359
case XT_XN_STATUS_ABORT:
2360
if (space < sizeof(XTXactEndEntryDRec))
2362
curr_offset += sizeof(XTXactEndEntryDRec);
2364
case XT_XN_STATUS_UPDATE:
2365
case XT_XN_STATUS_INSERT:
2366
case XT_XN_STATUS_DELETE:
2367
if (space < sizeof(XTactUpdateEntryDRec))
2369
tab_id = XT_GET_DISK_4(entry->xu.xe_tab_id_4);
2370
rec_address = XT_GET_DISK_6(entry->xu.xe_record_6);
2371
if (!xn_sw_cleanup_variation(self, ss, xact, tab_id, rec_address, entry->xu.xe_status_1, entry->xu.xe_rec_type_1))
2373
curr_offset += sizeof(XTactUpdateEntryDRec);
2375
case XT_XN_STATUS_CURR_IDS:
2376
curr_offset += sizeof(XTXactCurrEntryDRec);
1594
switch (record->xh.xh_status_1) {
1595
case XT_LOG_ENT_NEW_LOG:
1596
if (!db->db_xlog.xlog_seq_start(&ss->ss_seqread, XT_GET_DISK_4(record->xl.xl_log_id_4), 0, FALSE))
1599
case XT_LOG_ENT_COMMIT:
1600
case XT_LOG_ENT_ABORT:
1601
xn_id = XT_GET_DISK_4(record->xe.xe_xact_id_4);
1602
if (xn_id == xact->xd_start_xn_id)
1605
case XT_LOG_ENT_REC_MODIFIED:
1606
case XT_LOG_ENT_UPDATE:
1607
case XT_LOG_ENT_INSERT:
1608
case XT_LOG_ENT_DELETE:
1609
case XT_LOG_ENT_UPDATE_BG:
1610
case XT_LOG_ENT_INSERT_BG:
1611
case XT_LOG_ENT_DELETE_BG:
1612
xn_id = XT_GET_DISK_4(record->xu.xu_xact_id_4);
1613
if (xn_id != xact->xd_start_xn_id)
1615
tab_id = XT_GET_DISK_4(record->xu.xu_tab_id_4);
1616
rec_id = XT_GET_DISK_4(record->xu.xu_rec_id_4);
1617
row_id = XT_GET_DISK_4(record->xu.xu_row_id_4);
1618
if (!xn_sw_cleanup_variation(self, ss, xact, tab_id, rec_id, record->xu.xu_status_1, record->xu.xu_rec_type_1, record->xu.xu_stat_id_1, row_id, &record->xu.xu_rec_type_1))
1621
case XT_LOG_ENT_UPDATE_FL:
1622
case XT_LOG_ENT_INSERT_FL:
1623
case XT_LOG_ENT_DELETE_FL:
1624
case XT_LOG_ENT_UPDATE_FL_BG:
1625
case XT_LOG_ENT_INSERT_FL_BG:
1626
case XT_LOG_ENT_DELETE_FL_BG:
1627
xn_id = XT_GET_DISK_4(record->xf.xf_xact_id_4);
1628
if (xn_id != xact->xd_start_xn_id)
1630
tab_id = XT_GET_DISK_4(record->xf.xf_tab_id_4);
1631
rec_id = XT_GET_DISK_4(record->xf.xf_rec_id_4);
1632
row_id = XT_GET_DISK_4(record->xf.xf_row_id_4);
1633
if (!xn_sw_cleanup_variation(self, ss, xact, tab_id, rec_id, record->xf.xf_status_1, record->xf.xf_rec_type_1, record->xf.xf_stat_id_1, row_id, &record->xf.xf_rec_type_1))
2383
1641
cleanup_complete = TRUE;
2386
xn_sw_flush_tables(self, db);
2388
freer_(); // xt_unlock_mutex(db->db_sw_tab_lock)
2390
if (!cleanup_complete) {
2391
freer_(); // xn_release_log_w_self(xlog)
1644
if (!cleanup_complete)
2395
1647
/* Write the log to indicate the transaction has been cleaned: */
2397
xtWord1 status = xact->xd_committed ? XT_XN_STATUS_COMMITTED : XT_XN_STATUS_ABORTED;
2399
if (!xn_write_log(xlog, xlog->xl_sw_file, begin_offset + offsetof(XTXactBeginEntryDRec, xe_status_1), 1, &status))
2403
xn_sw_add_xact_to_free(self, ss, xact->xd_start_id);
2405
if (!self->t_quit && xlog->xl_file_size + xlog->xl_offset >= XT_XACT_LOG_ROLLOVER_SIZE && !xlog->xl_exfile) {
2406
ASSERT_NS(xlog->xl_offset == 0);
2407
/* Check if we have just cleaned up the last transaction in this log: */
2408
if (curr_offset >= xlog->xl_file_size + xlog->xl_offset) {
2409
char path[PATH_MAX];
2411
if (!xt_xn_log_ids(self, db))
2414
if (xlog->xl_sw_file) {
2415
xt_close_file_ns(xlog->xl_sw_file);
2416
xlog->xl_sw_file = NULL;
2419
xn_logname(PATH_MAX, path, xlog->xl_db, xlog->xl_number);
2420
if (!xt_fs_delete(NULL, path))
2421
xt_log_and_clear_exception_ns();
2423
xlog->xl_recovered = FALSE;
2424
xlog->xl_file_size = 0;
2425
xlog->xl_offset = 0;
2427
if (xlog->xl_number <= db->db_xn_high_log) {
2428
/* Recalculate the high log: */
2429
xt_mutex_lock(&db->db_xn_log_lock);
2430
xn_recalc_high_log(db);
2431
xt_mutex_unlock(&db->db_xn_log_lock);
2436
freer_(); // xn_release_log_w_self(xlog)
1648
XTXactCleanupEntryDRec cu;
1650
cu.xc_status_1 = XT_LOG_ENT_CLEANUP;
1651
cu.xc_checksum_1 = XT_CHECKSUM_1(XT_CHECKSUM4_XACT(xact->xd_start_xn_id));
1652
XT_SET_DISK_4(cu.xc_xact_id_4, xact->xd_start_xn_id);
1654
if (!self->st_xact_buf.xbuf_log_data(self, sizeof(XTXactCleanupEntryDRec), (XTXactLogBufferDPtr) &cu, FALSE))
1657
ss->ss_flush_pending = TRUE;
1659
xn_sw_add_xact_to_free(self, ss, xact->xd_start_xn_id);
2017
/* Return TRUE if records written by the given transaction are
2018
* visible to the thread (i.e. the transaction of the thread).
2020
xtPublic xtBool xt_xn_visible(XTOpenTablePtr ot, xtXactID xn_id, xtRecordID rec_id, xtBool *mine)
2022
register XTThreadPtr self = ot->ot_thread;
2023
register XTXactDataPtr xact;
2025
/* NOTE: If a transaction is not in RAM, then it is considered aborted.
2026
* This means that we can only remove a transaction from memory when
2027
* all transactions that were running when we started cleanup have
2029
* Only these transactions may have read something that has been
2030
* changed by the sweeper in the meantime.
2031
* For example a transaction may fill its buffer when doing a
2032
* sequential read. A record may be cleaned by the sweeper that
2033
* is in this buffer, but the reader already has a old copy of
2034
* the data. Then the sweeper removes the transaction
2035
* from RAM. The reader will then consider this record
2038
if (xt_xn_is_before(xn_id, self->st_database->db_xn_min_ram_id)) {
2039
/* This record is not clean, and the transaction is not in
2040
* RAM. This means it has be missed, so clean it up.
2042
xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, rec_id);
2045
if (xn_id == self->st_xact_data->xd_start_xn_id) {
2050
if (xt_xn_is_before(self->st_xact_data->xd_start_xn_id, xn_id))
2051
/* This record is written after the this transaction
2052
* started (is not visible).
2055
if (!(xact = xt_xn_get_xact(self->st_database, xn_id))) {
2056
xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, rec_id);
2059
if (!(xact->xd_flags & XT_XN_XAC_ENDED) || !xt_xn_is_before(xact->xd_end_xn_id, self->st_xact_data->xd_start_xn_id)) // was >=
2060
/* Either this transaction has not yet ended, or this
2061
* record was written by a transaction that ended
2062
* after the reading transaction started!
2063
* So this record is not visible!
2066
/* Visible if the transaction was committed: */
2067
return xact->xd_flags & XT_XN_XAC_COMMITTED;
2071
* Return TRUE if the record has been commited.
2073
xtPublic xtBool xt_xn_committed(XTOpenTablePtr ot, xtXactID xn_id, xtRecordID rec_id, xtBool *mine)
2075
register XTThreadPtr self = ot->ot_thread;
2076
register XTXactDataPtr xact;
2078
if (xt_xn_is_before(xn_id, self->st_database->db_xn_min_ram_id)) {
2079
/* This record is not clean, and the transaction is not in
2080
* RAM. This means it has be missed, so clean it up.
2082
xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, rec_id);
2085
if (xn_id == self->st_xact_data->xd_start_xn_id) {
2090
if (!(xact = xt_xn_get_xact(self->st_database, xn_id))) {
2091
xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, rec_id);
2094
if (!(xact->xd_flags & XT_XN_XAC_ENDED))
2095
/* Either this transaction has not yet ended, or */
2097
/* TRUE if the record was committed: */
2098
return xact->xd_flags & XT_XN_XAC_COMMITTED;
2102
* Return TRUE of the transaction is committed, or may be
2103
* committed in the future.
2105
* if used, 'wait' must be initialized to FALSE!
2107
* It will be set to TRUE if the transaction has not yet ended.
2108
* Return FALSE of the transaction was aborted.
2110
* Return TRUE if the transaction was committed
2112
xtPublic xtBool xt_xn_maybe_committed(XTOpenTablePtr ot, xtXactID xn_id, xtRecordID rec_id, xtBool *mine, xtBool *wait)
2114
register XTThreadPtr self = ot->ot_thread;
2115
register XTXactDataPtr xact;
2117
if (xt_xn_is_before(xn_id, self->st_database->db_xn_min_ram_id)) {
2118
/* Not in RAM, rollback done: */
2119
xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, rec_id);
2122
if (xn_id == self->st_xact_data->xd_start_xn_id) {
2127
if (!(xact = xt_xn_get_xact(self->st_database, xn_id))) {
2128
/* Not in RAM, rollback done: */
2129
xn_add_cu_record(ot->ot_table->tab_db, ot->ot_table->tab_id, rec_id);
2132
if (!(xact->xd_flags & XT_XN_XAC_ENDED)) {
2133
/* Transaction not ended, may be visible. */
2138
/* Visible if the transaction was committed: */
2139
return xact->xd_flags & XT_XN_XAC_COMMITTED;