~vkolesnikov/pbxt/pbxt-preload-test-bug

« back to all changes in this revision

Viewing changes to pbxt/src/table_xt.cc

  • Committer: paul-mccullagh
  • Date: 2008-03-10 11:36:34 UTC
  • Revision ID: paul-mccullagh-417ebf175a9c8ee6e5b3777d9e2398e1fb197391
Implemented full durability

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (c) 2005 SNAP Innovation GmbH
 
1
/* Copyright (c) 2005 PrimeBase Technologies GmbH
2
2
 *
3
3
 * PrimeBase XT
4
4
 *
41
41
#include "cache_xt.h"
42
42
#include "trace_xt.h"
43
43
#include "streaming_xt.h"
 
44
#include "index_xt.h"
 
45
#include "restart_xt.h"
44
46
 
45
 
static void tab_close_open_tables(struct XTThread *self, XTTableHPtr tab);
 
47
#ifdef DEBUG
 
48
//#define TRACE_VARIATIONS
 
49
//#define TRACE_VARIATIONS_IN_DUP_CHECK
 
50
#endif
46
51
 
47
52
/*
48
53
 * -----------------------------------------------------------------------
94
99
        XTDatabaseHPtr  db = tab->tab_db;
95
100
        XTTableEntryPtr te_ptr;
96
101
 
97
 
        tab_close_open_tables(self, tab);
98
 
 
99
102
        /* Remove the reference from the ID list, whem the table is
100
103
         * removed from the name list:
101
104
         */
111
114
{
112
115
        XTTableHPtr     tab = (XTTableHPtr) x;
113
116
 
114
 
        xt_dl_exit_tab(self, tab);
 
117
        xt_exit_row_locks(&tab->tab_locks);
 
118
 
 
119
        xt_xres_exit_tab(self, tab);
 
120
 
115
121
        if (tab->tab_ind_file) {
116
 
                xt_free(self, tab->tab_ind_file);
 
122
                xt_fs_release_file(self, tab->tab_ind_file);
117
123
                tab->tab_ind_file = NULL;
118
124
        }
119
 
        if (tab->tab_data_file) {
120
 
                xt_free(self, tab->tab_data_file);
121
 
                tab->tab_data_file = NULL;
 
125
        if (tab->tab_rec_file) {
 
126
                xt_fs_release_file(self, tab->tab_rec_file);
 
127
                tab->tab_rec_file = NULL;
122
128
        }
123
129
        if (tab->tab_row_file) {
124
 
                xt_free(self, tab->tab_row_file);
 
130
                xt_fs_release_file(self, tab->tab_row_file);
125
131
                tab->tab_row_file = NULL;
126
132
        }
127
 
        if (tab->tab_pointers) {
128
 
                xt_sys_free(tab->tab_pointers);
129
 
                tab->tab_pointers = NULL;
 
133
 
 
134
        if (tab->tab_index_head) {
 
135
                xt_free(self, tab->tab_index_head);
 
136
                tab->tab_index_head = NULL;
130
137
        }
131
138
        if (tab->tab_name) {
132
139
                xt_free(self, tab->tab_name);
133
140
                tab->tab_name = NULL;
134
141
        }
135
 
        if (tab->tab_data_buf) {
136
 
                xt_free(self, tab->tab_data_buf);
137
 
                tab->tab_data_buf = NULL;
138
 
        }
139
142
        myxt_free_dictionary(self, &tab->tab_dic);
140
143
        if (tab->tab_free_locks) {
 
144
                tab->tab_seq.xt_op_seq_exit(self);
141
145
                xt_free_mutex_tr(&tab->tab_ainc_lock);
142
146
                xt_free_mutex(&tab->dic_field_lock);
143
 
                xt_free_mutex(&tab->tab_open_lock);
144
 
                xt_free_cond(&tab->tab_open_cond);
145
147
                xt_free_mutex(&tab->tab_row_lock);
146
148
                xt_free_mutex(&tab->tab_ind_lock);
147
 
                xt_free_mutex(&tab->tab_log_lock);
148
 
                xt_free_mutex(&tab->tab_free_lock);
149
 
                xt_free_rwlock(&tab->tab_buf_rwlock);
150
 
                for (u_int i=0; i<XT_ROW_LOCK_TABLE_SIZE; i++)
151
 
                        xt_free_rwlock(&tab->tab_row_locks[i]);
 
149
                xt_free_mutex(&tab->tab_rec_lock);
 
150
                for (u_int i=0; i<XT_ROW_RWLOCKS; i++)
 
151
                        xt_free_rwlock(&tab->tab_row_rwlock[i]);
152
152
        }
153
153
}
154
154
 
157
157
        XTTableHPtr     tab = (XTTableHPtr) x;
158
158
 
159
159
        /* Signal threads waiting for exclusive use of the table: */
160
 
        xt_ht_signal(self, tab->tab_db->db_tables);
 
160
        if (tab->tab_db->db_tables)
 
161
                xt_ht_signal(self, tab->tab_db->db_tables);
161
162
}
162
163
 
163
164
/*
208
209
        return file_name + len;
209
210
}
210
211
 
211
 
static void tab_get_row_file_name(char *table_name, char *name, xtWord4 tab_id)
 
212
static void tab_get_row_file_name(char *table_name, char *name, xtTableID tab_id)
212
213
{
213
214
        sprintf(table_name, "%s-%lu.xtr", name, (u_long) tab_id);
214
215
}
215
216
 
216
 
static void tab_get_data_file_name(char *table_name, char *name, xtWord4 tab_id)
 
217
static void tab_get_data_file_name(char *table_name, char *name, xtTableID tab_id)
217
218
{
218
219
        sprintf(table_name, "%s.xtd", name);
219
220
}
220
221
 
221
 
static void tab_get_index_file_name(char *table_name, char *name, xtWord4 tab_id)
 
222
static void tab_get_index_file_name(char *table_name, char *name, xtTableID tab_id)
222
223
{
223
224
        sprintf(table_name, "%s.xti", name);
224
225
}
239
240
static int tab_comp_by_id(XTThreadPtr self, register const void *thunk, register const void *a, register const void *b)
240
241
{
241
242
#pragma unused(self, thunk)
242
 
        xtWord4                 te_id = *((xtWord4 *) a);
 
243
        xtTableID               te_id = *((xtTableID *) a);
243
244
        XTTableEntryPtr te_ptr = (XTTableEntryPtr) b;
244
245
 
245
246
        if (te_id < te_ptr->te_tab_id)
273
274
 
274
275
        tab_name = xt_dir_name(self, td->td_open_dir);
275
276
        td->td_file_name = tab_name;
276
 
        td->td_tab_id = xt_file_name_to_id(tab_name);
 
277
        td->td_tab_id = (xtTableID) xt_file_name_to_id(tab_name);
277
278
        xt_tab_file_to_name(XT_TABLE_NAME_SIZE, td->td_tab_name, tab_name);
278
279
        return_(TRUE);
279
280
}
310
311
        pushr_(xt_describe_tables_exit, &desc);
311
312
        while (xt_describe_tables_next(self, &desc)) {
312
313
                te_tab.te_tab_id = desc.td_tab_id;
 
314
 
 
315
                if (te_tab.te_tab_id > db->db_curr_tab_id)
 
316
                        db->db_curr_tab_id = te_tab.te_tab_id;
 
317
 
313
318
                te_tab.te_tab_name = xt_dup_string(self, desc.td_tab_name);
314
319
                te_tab.te_table = NULL;
315
320
                xt_sl_insert(self, db->db_table_by_id, &desc.td_tab_id, &te_tab);
355
360
                        freer_(); // xt_ht_unlock(db->db_tables)
356
361
                        if (!te_ptr)
357
362
                                break;
358
 
                        tab = xt_use_table(self, te_ptr->te_tab_name, FALSE);
 
363
                        tab = xt_use_table(self, te_ptr->te_tab_name, FALSE, FALSE);
359
364
                        tab_check_table(self, tab);
360
365
                        xt_heap_release(self, tab);
361
366
                        tab = NULL;
395
400
        return en_ptr;
396
401
}
397
402
 
398
 
xtPublic void xt_enum_files_of_tables_init(XTDatabaseHPtr db, char *tab_name, xtWord4 tab_id, XTFilesOfTablePtr ft)
 
403
xtPublic void xt_enum_files_of_tables_init(XTDatabaseHPtr db, char *tab_name, xtTableID tab_id, XTFilesOfTablePtr ft)
399
404
{
400
405
        ft->ft_state = 0;
401
406
        ft->ft_db = db;
419
424
                        tab_get_index_file_name(file_name, ft->ft_tab_name, ft->ft_tab_id);
420
425
                        break;
421
426
                default:
422
 
                        if (!xt_dl_get_log_file_name(XT_MAX_TABLE_FILE_NAME_SIZE, file_name, ft->ft_tab_name, ft->ft_state - 3))
423
 
                                return FAILED;
424
 
                        break;
 
427
                        return FAILED;
425
428
        }
426
429
 
427
430
        ft->ft_state++;
434
437
        return TRUE;
435
438
}
436
439
 
437
 
static xtBool tab_find_table(XTThreadPtr self, XTDatabaseHPtr db, char *name, char *table_name, xtWord4 *tab_id)
 
440
static xtBool tab_find_table(XTThreadPtr self, XTDatabaseHPtr db, char *name, char *table_name, xtTableID *tab_id)
438
441
{
439
442
        u_int                   edx;
440
443
        XTTableEntryPtr te_ptr;
451
454
        return FALSE;
452
455
}
453
456
 
454
 
static void tab_load_table_format(XTThreadPtr self, XTOpenFilePtr file, char *table_name, size_t *ret_head_size, XTDictionaryPtr dic)
 
457
static void tab_load_index_format(XTThreadPtr self, XTOpenFilePtr file, char *table_name, size_t *ret_head_size, XTDictionaryPtr dic)
455
458
{
456
 
        XTDiskValue4    size_buf;
457
 
        size_t                  head_size;
458
 
        XTTabFormatDRec tab_fmt;
459
 
        size_t                  fmt_size;
 
459
        XTDiskValue4            size_buf;
 
460
        size_t                          head_size;
 
461
        XTIndexFormatDRec       index_fmt;
460
462
 
461
463
        if (!xt_pread_file(file, 0, 4, 4, &size_buf, NULL))
462
464
                xt_throw(self);
464
466
        head_size = XT_GET_DISK_4(size_buf);
465
467
 
466
468
        /* Load the table format information: */
467
 
        if (!xt_pread_file(file, head_size, offsetof(XTTabFormatDRec, tf_reserved), offsetof(XTTabFormatDRec, tf_tab_version_2) + 2, &tab_fmt, NULL))
 
469
        if (!xt_pread_file(file, head_size, sizeof(XTIndexFormatDRec), offsetof(XTIndexFormatDRec, if_ind_version_2) + 2, (xtWord1 *) &index_fmt, NULL))
468
470
                xt_throw(self);
469
471
 
470
472
        /* If the table version is less than or equal to an incompatible (unsupported
471
473
         * version), or greater than the current version, then we cannot open this table
472
474
         */
473
 
        if (XT_GET_DISK_2(tab_fmt.tf_tab_version_2) <= XT_TAB_INCOMPATIBLE_VERSION ||
474
 
                XT_GET_DISK_2(tab_fmt.tf_tab_version_2) > XT_TAB_CURRENT_VERSION) {
475
 
                switch (XT_GET_DISK_2(tab_fmt.tf_tab_version_2)) {
 
475
        if (XT_GET_DISK_2(index_fmt.if_tab_version_2) <= XT_TAB_INCOMPATIBLE_VERSION ||
 
476
                XT_GET_DISK_2(index_fmt.if_tab_version_2) > XT_TAB_CURRENT_VERSION) {
 
477
                switch (XT_GET_DISK_2(index_fmt.if_tab_version_2)) {
 
478
                        case 4: 
 
479
                                xt_throw_i2xterr(XT_CONTEXT, XT_ERR_UPGRADE_TABLE, table_name, "0.9.91 Beta");
 
480
                                break;
476
481
                        case 3: 
477
482
                                xt_throw_i2xterr(XT_CONTEXT, XT_ERR_UPGRADE_TABLE, table_name, "0.9.85 Beta");
478
483
                                break;
483
488
                return;
484
489
        }
485
490
 
486
 
        if (XT_GET_DISK_2(tab_fmt.tf_ind_version_2) != XT_IND_CURRENT_VERSION) {
 
491
        if (XT_GET_DISK_2(index_fmt.if_ind_version_2) != XT_IND_CURRENT_VERSION) {
487
492
                /* Should be handled by allowing a sequential scan, but no index access. */
488
493
                xt_throw_ixterr(XT_CONTEXT, XT_ERR_BAD_INDEX_VERSION, table_name);
489
494
                dic->dic_bad_ind_ver = TRUE;
492
497
                dic->dic_bad_ind_ver = FALSE;
493
498
 
494
499
        *ret_head_size = head_size;
 
500
}
 
501
 
 
502
static void tab_load_index_header(XTThreadPtr self, XTTableHPtr tab, XTOpenFilePtr file)
 
503
{
 
504
        u_int           rec_size = tab->tab_index_head_size;
 
505
        XTIndexPtr      *ind;
 
506
        xtWord1         *data;
 
507
 
 
508
        /* Load the pointers: */
 
509
        if (tab->tab_index_head)
 
510
                xt_free_ns(tab->tab_index_head);
 
511
        tab->tab_index_head = (XTIndexHeadDPtr) xt_malloc(self, rec_size);
 
512
 
 
513
        if (!xt_pread_file(file, 0, rec_size, rec_size, tab->tab_index_head, NULL))
 
514
                xt_throw(self);
 
515
 
 
516
        tab->tab_ind_rec_log_id = XT_GET_DISK_4(tab->tab_index_head->tp_rec_log_id_4);
 
517
        tab->tab_ind_rec_log_offset = XT_GET_DISK_6(tab->tab_index_head->tp_rec_log_offs_6);
 
518
 
 
519
        tab->tab_ind_eof = XT_GET_DISK_6(tab->tab_index_head->tp_ind_eof_6);
 
520
        tab->tab_ind_free = XT_GET_DISK_6(tab->tab_index_head->tp_ind_free_6);
 
521
 
 
522
        data = tab->tab_index_head->tp_data;
 
523
        ind = tab->tab_dic.dic_keys;
 
524
        for (u_int i=0; i<tab->tab_dic.dic_key_count; i++, ind++) {
 
525
                (*ind)->mi_root = XT_GET_NODE_REF(data);
 
526
                data += XT_NODE_REF_SIZE;
 
527
        }
 
528
}
 
529
 
 
530
static xtBool tab_store_index_header(XTTableHPtr tab, XTOpenFilePtr file)
 
531
{
 
532
        XTIndexPtr      *ind;
 
533
        u_int           i;
 
534
        xtWord1         *data;
 
535
 
 
536
        /* This flag is set without locking, so we need to
 
537
         * set it to FALSE before we actually write the data.
 
538
         */
 
539
        tab->tab_ind_head_dirty = FALSE;
 
540
 
 
541
        XT_SET_DISK_4(tab->tab_index_head->tp_rec_log_id_4, tab->tab_ind_rec_log_id);
 
542
        XT_SET_DISK_6(tab->tab_index_head->tp_rec_log_offs_6, tab->tab_ind_rec_log_offset);
 
543
 
 
544
        XT_SET_DISK_6(tab->tab_index_head->tp_ind_eof_6, tab->tab_ind_eof);
 
545
        XT_SET_DISK_6(tab->tab_index_head->tp_ind_free_6, tab->tab_ind_free);
 
546
 
 
547
        data = tab->tab_index_head->tp_data;
 
548
        for (i=0, ind=tab->tab_dic.dic_keys; i<tab->tab_dic.dic_key_count; i++, ind++) {
 
549
                XT_SET_NODE_REF(data, (*ind)->mi_root);
 
550
                data += XT_NODE_REF_SIZE;
 
551
        }
 
552
        if (!xt_pwrite_file(file, 0, tab->tab_index_head_size, tab->tab_index_head)) {
 
553
                tab->tab_ind_head_dirty = TRUE;
 
554
                return FAILED;
 
555
        }
 
556
        return OK;
 
557
}
 
558
 
 
559
static void tab_load_table_format(XTThreadPtr self, XTOpenFilePtr file, char *table_name, size_t *ret_head_size, XTDictionaryPtr dic)
 
560
{
 
561
        XTDiskValue4            size_buf;
 
562
        size_t                          head_size;
 
563
        XTTableFormatDRec       tab_fmt;
 
564
        size_t                          fmt_size;
 
565
 
 
566
        if (!xt_pread_file(file, 0, 4, 4, &size_buf, NULL))
 
567
                xt_throw(self);
 
568
 
 
569
        head_size = XT_GET_DISK_4(size_buf);
 
570
 
 
571
        /* Load the table format information: */
 
572
        if (!xt_pread_file(file, head_size, offsetof(XTTableFormatDRec, tf_definition), offsetof(XTTableFormatDRec, tf_tab_version_2) + 2, &tab_fmt, NULL))
 
573
                xt_throw(self);
 
574
 
 
575
        /* If the table version is less than or equal to an incompatible (unsupported
 
576
         * version), or greater than the current version, then we cannot open this table
 
577
         */
 
578
        if (XT_GET_DISK_2(tab_fmt.tf_tab_version_2) <= XT_TAB_INCOMPATIBLE_VERSION ||
 
579
                XT_GET_DISK_2(tab_fmt.tf_tab_version_2) > XT_TAB_CURRENT_VERSION) {
 
580
                switch (XT_GET_DISK_2(tab_fmt.tf_tab_version_2)) {
 
581
                        case 4: 
 
582
                                xt_throw_i2xterr(XT_CONTEXT, XT_ERR_UPGRADE_TABLE, table_name, "0.9.91 Beta");
 
583
                                break;
 
584
                        case 3: 
 
585
                                xt_throw_i2xterr(XT_CONTEXT, XT_ERR_UPGRADE_TABLE, table_name, "0.9.85 Beta");
 
586
                                break;
 
587
                        default:
 
588
                                xt_throw_ixterr(XT_CONTEXT, XT_ERR_BAD_TABLE_VERSION, table_name);
 
589
                                break;
 
590
                }
 
591
                return;
 
592
        }
 
593
 
495
594
        fmt_size = XT_GET_DISK_4(tab_fmt.tf_format_size_4);
 
595
        *ret_head_size = XT_GET_DISK_4(tab_fmt.tf_tab_head_size_4);
496
596
        dic->dic_rec_size = XT_GET_DISK_4(tab_fmt.tf_rec_size_4);
497
597
        dic->dic_rec_fixed = XT_GET_DISK_1(tab_fmt.tf_rec_fixed_1);
498
 
        if (fmt_size >= offsetof(XTTabFormatDRec, tf_min_auto_inc_8) + 8)
499
 
                dic->dic_min_auto_inc = XT_GET_DISK_8(tab_fmt.tf_min_auto_inc_8);
500
 
        else
501
 
                dic->dic_min_auto_inc = 0;
502
 
        if (fmt_size > offsetof(XTTabFormatDRec, tf_definition)) {
503
 
                size_t  def_size = fmt_size - offsetof(XTTabFormatDRec, tf_definition);
 
598
        dic->dic_min_auto_inc = XT_GET_DISK_8(tab_fmt.tf_min_auto_inc_8);
 
599
        if (fmt_size > offsetof(XTTableFormatDRec, tf_definition)) {
 
600
                size_t  def_size = fmt_size - offsetof(XTTableFormatDRec, tf_definition);
504
601
                char    *def_sql;
505
602
 
506
603
                pushsr_(def_sql, xt_free, (char *) xt_malloc(self, def_size));
507
 
                if (!xt_pread_file(file, head_size+offsetof(XTTabFormatDRec, tf_definition), def_size, def_size, def_sql, NULL)) {
 
604
                if (!xt_pread_file(file, head_size+offsetof(XTTableFormatDRec, tf_definition), def_size, def_size, def_sql, NULL))
508
605
                        xt_throw(self);
509
 
                }
510
606
                dic->dic_table = xt_ri_create_table(self, false, def_sql, myxt_create_table_from_table(self, dic->dic_my_table));
511
607
                freer_(); // xt_free(def_sql)
512
608
        }
514
610
                dic->dic_table = myxt_create_table_from_table(self, dic->dic_my_table);
515
611
}
516
612
 
517
 
static void tab_load_table_pointers(XTThreadPtr self, XTTableHPtr tab, XTOpenFilePtr file)
518
 
{
519
 
        u_int rec_size = tab->tab_head_size;
520
 
 
521
 
        /* Load the pointers: */
522
 
        if (tab->tab_pointers)
523
 
                xt_sys_free(tab->tab_pointers);
524
 
        tab->tab_pointers = (XTTabPointersDPtr) xt_malloc(self, rec_size);
525
 
 
526
 
        if (!xt_pread_file(file, 0, rec_size, rec_size, tab->tab_pointers, NULL))
527
 
                xt_throw(self);
528
 
 
529
 
        tab->tab_row_eof = XT_GET_DISK_4(tab->tab_pointers->tp_tab_eof_4) << XT_TAB_ROW_SHIFTS;
530
 
        tab->tab_row_free = XT_GET_DISK_4(tab->tab_pointers->tp_tab_free_4) << XT_TAB_ROW_SHIFTS;
531
 
        tab->tab_row_fnum = XT_GET_DISK_4(tab->tab_pointers->tp_tab_fnum_4);
532
 
 
533
 
        tab->tab_data_eof = XT_GET_DISK_6(tab->tab_pointers->tp_data_eof_6);
534
 
        tab->tab_data_free = XT_GET_DISK_6(tab->tab_pointers->tp_data_free_6);
535
 
//      ASSERT(tab->tab_data_free <= tab->tab_data_eof);
536
 
        tab->tab_data_fnum = XT_GET_DISK_4(tab->tab_pointers->tp_data_fnum_4);
537
 
 
538
 
        tab->tab_ind_eof = XT_GET_DISK_6(tab->tab_pointers->tp_ind_eof_6);
539
 
        tab->tab_ind_free = XT_GET_DISK_6(tab->tab_pointers->tp_ind_free_6);
540
 
 
541
 
        xt_tab_load_index_roots(tab);
542
 
}
543
 
 
544
 
xtPublic void xt_tab_load_index_roots(XTTableHPtr tab)
545
 
{
546
 
        XTIndexPtr      *ind;
547
 
        u_int           i;
548
 
        xtWord1         *data;
549
 
 
550
 
        data = tab->tab_pointers->tp_data;
551
 
        for (i=0, ind=tab->tab_dic.dic_keys; i<tab->tab_dic.dic_key_count; i++, ind++) {
552
 
                (*ind)->mi_root = XT_GET_NODE_REF(data);
553
 
                data += XT_NODE_REF_SIZE;
554
 
        }
555
 
}
556
 
 
557
 
static xtBool tab_store_table_header(XTTableHPtr tab, XTOpenFilePtr file)
558
 
{
559
 
        XTIndexPtr      *ind;
560
 
        u_int           i;
561
 
        xtWord1         *data;
562
 
 
563
 
        tab->tab_head_dirty = FALSE;
564
 
 
565
 
        XT_SET_DISK_4(tab->tab_pointers->tp_tab_eof_4, tab->tab_row_eof >> XT_TAB_ROW_SHIFTS);
566
 
        XT_SET_DISK_4(tab->tab_pointers->tp_tab_free_4, tab->tab_row_free >> XT_TAB_ROW_SHIFTS);
567
 
        XT_SET_DISK_4(tab->tab_pointers->tp_tab_fnum_4, tab->tab_row_fnum);
568
 
 
569
 
        XT_SET_DISK_6(tab->tab_pointers->tp_data_eof_6, tab->tab_data_eof);
570
 
        XT_SET_DISK_6(tab->tab_pointers->tp_data_free_6, tab->tab_data_free);
571
 
        XT_SET_DISK_4(tab->tab_pointers->tp_data_fnum_4, tab->tab_data_fnum);
572
 
 
573
 
        XT_SET_DISK_6(tab->tab_pointers->tp_ind_eof_6, tab->tab_ind_eof);
574
 
        XT_SET_DISK_6(tab->tab_pointers->tp_ind_free_6, tab->tab_ind_free);
575
 
 
576
 
        data = tab->tab_pointers->tp_data;
577
 
        for (i=0, ind=tab->tab_dic.dic_keys; i<tab->tab_dic.dic_key_count; i++, ind++) {
578
 
                XT_SET_NODE_REF(data, (*ind)->mi_root);
579
 
                data += XT_NODE_REF_SIZE;
580
 
        }
581
 
        return xt_pwrite_file(file, 0, tab->tab_head_size, tab->tab_pointers);
582
 
}
583
 
 
584
 
static void tab_alloc_data_buf(XTThreadPtr self, XTTableHPtr tab, size_t rec_size)
585
 
{
586
 
        size_t          recs_in_buf;
587
 
        size_t          buf_size;
588
 
 
589
 
        /* Make the buffer size a multiple of the record size: */
590
 
        recs_in_buf = ((XT_MIN_TAB_BUFFER_SIZE - 1) / rec_size) + 1;
591
 
        buf_size = recs_in_buf * rec_size;
592
 
        tab->tab_buf_size = buf_size;
593
 
        tab->tab_data_buf = (xtWord1 *) xt_malloc(self, buf_size);
 
613
static void tab_load_table_header(XTThreadPtr self, XTTableHPtr tab, XTOpenFilePtr file)
 
614
{
 
615
        XTTableHeadDRec rec_head;
 
616
 
 
617
        if (!xt_pread_file(file, 0, sizeof(XTTableHeadDRec), sizeof(XTTableHeadDRec), (xtWord1 *) &rec_head, NULL))
 
618
                xt_throw(self);
 
619
 
 
620
        tab->tab_head_op_seq = XT_GET_DISK_4(rec_head.th_op_seq_4);
 
621
        tab->tab_head_row_free_id = (xtRowID) XT_GET_DISK_6(rec_head.th_row_free_6);
 
622
        tab->tab_head_row_eof_id = XT_GET_DISK_6(rec_head.th_row_eof_6);
 
623
        tab->tab_head_row_fnum = XT_GET_DISK_6(rec_head.th_row_fnum_6);
 
624
        tab->tab_head_rec_free_id = XT_GET_DISK_6(rec_head.th_rec_free_6);
 
625
        tab->tab_head_rec_eof_id = XT_GET_DISK_6(rec_head.th_rec_eof_6);
 
626
        tab->tab_head_rec_fnum = XT_GET_DISK_6(rec_head.th_rec_fnum_6);
 
627
}
 
628
 
 
629
xtPublic void xt_tab_store_header(struct XTThread *self, XTOpenTablePtr ot)
 
630
{
 
631
        XTTableHPtr             tab = ot->ot_table;
 
632
        XTTableHeadDRec rec_head;
 
633
 
 
634
        XT_SET_DISK_4(rec_head.th_op_seq_4, tab->tab_head_op_seq);
 
635
        XT_SET_DISK_6(rec_head.th_row_free_6, tab->tab_head_row_free_id);
 
636
        XT_SET_DISK_6(rec_head.th_row_eof_6, tab->tab_head_row_eof_id);
 
637
        XT_SET_DISK_6(rec_head.th_row_fnum_6, tab->tab_head_row_fnum);
 
638
        XT_SET_DISK_6(rec_head.th_rec_free_6, tab->tab_head_rec_free_id);
 
639
        XT_SET_DISK_6(rec_head.th_rec_eof_6, tab->tab_head_rec_eof_id);
 
640
        XT_SET_DISK_6(rec_head.th_rec_fnum_6, tab->tab_head_rec_fnum);
 
641
        if (!xt_pwrite_file(ot->ot_rec_file, offsetof(XTTableHeadDRec, th_op_seq_4), 40, (xtWord1 *) &rec_head.th_op_seq_4))
 
642
                xt_throw(self);
594
643
}
595
644
 
596
645
/*
598
647
 * Return NULL if the table is missing, and it is OK for the table
599
648
 * to be missing.
600
649
 */
601
 
static int tab_new_handle(XTThreadPtr self, XTTableHPtr *r_tab, XTDatabaseHPtr db, xtWord4 tab_id, char *table_name, xtBool missing_ok, XTDictionaryPtr dic)
 
650
static int tab_new_handle(XTThreadPtr self, XTTableHPtr *r_tab, XTDatabaseHPtr db, xtTableID tab_id, char *table_name, xtBool missing_ok, XTDictionaryPtr dic)
602
651
{
603
 
        size_t                  head_size;
604
652
        char                    path[PATH_MAX];
605
653
        XTTableHPtr             tab;
606
654
        char                    file_name[XT_MAX_TABLE_FILE_NAME_SIZE];
607
 
        XTOpenFilePtr   of_ind;
 
655
        XTOpenFilePtr   of_rec, of_ind;
608
656
        XTTableEntryPtr te_ptr;
 
657
        size_t                  tab_head_size;
 
658
        size_t                  ind_head_size;
609
659
 
610
660
        enter_();
611
661
 
612
 
        xt_strcpy(PATH_MAX, path, db->db_path);
613
 
        xt_add_dir_char(PATH_MAX, path);
614
 
        tab_get_index_file_name(file_name, table_name, tab_id);
615
 
        xt_strcat(PATH_MAX, path, file_name);
616
 
 
617
662
        tab = (XTTableHPtr) xt_heap_new(self, sizeof(XTTableHRec), tab_finalize);
618
663
        pushr_(xt_heap_release, tab);
619
664
 
632
677
                }
633
678
        }
634
679
 
635
 
        tab->tab_ind_file = xt_dup_string(self, path);
636
 
 
637
 
        of_ind = xt_open_file(self, path, missing_ok ? XT_FS_MISSING_OK : XT_FS_DEFAULT);
638
 
        if (!of_ind) {
639
 
                freer_(); // xt_heap_release(tab)
640
 
                return_(XT_TAB_NOT_FOUND);
641
 
        }
642
 
        pushr_(xt_close_file, of_ind);
643
 
 
644
 
        tab_load_table_format(self, of_ind, table_name, &head_size, &tab->tab_dic);
645
 
 
646
 
        tab->tab_dic.dic_table->dt_table = tab;
647
 
        tab->tab_head_size = head_size;
648
 
        tab_alloc_data_buf(self, tab, tab->tab_dic.dic_rec_size);
649
 
        xt_heap_set_release_callback(self, tab, tab_onrelease);
650
 
 
651
 
        xt_remove_last_name_of_path(path);
652
 
        tab_get_row_file_name(file_name, table_name, tab_id);
653
 
        xt_strcat(PATH_MAX, path, file_name);
654
 
        tab->tab_row_file = xt_dup_string(self, path);
655
 
 
 
680
        tab->tab_seq.xt_op_seq_init(self);
656
681
        xt_init_mutex_tr(self, &tab->tab_ainc_lock);
657
682
        xt_init_mutex(self, &tab->dic_field_lock);
658
 
        xt_init_mutex(self, &tab->tab_open_lock);
659
 
        xt_init_cond(self, &tab->tab_open_cond);
660
683
        xt_init_mutex(self, &tab->tab_row_lock);
661
684
        xt_init_mutex(self, &tab->tab_ind_lock);
662
 
        xt_init_mutex(self, &tab->tab_log_lock);
663
 
        xt_init_mutex(self, &tab->tab_free_lock);
664
 
        xt_init_rwlock(self, &tab->tab_buf_rwlock);
665
 
        for (u_int i=0; i<XT_ROW_LOCK_TABLE_SIZE; i++)
666
 
                xt_init_rwlock(self, &tab->tab_row_locks[i]);
 
685
        xt_init_mutex(self, &tab->tab_rec_lock);
 
686
        for (u_int i=0; i<XT_ROW_RWLOCKS; i++)
 
687
                xt_init_rwlock(self, &tab->tab_row_rwlock[i]);
667
688
        tab->tab_free_locks = TRUE;
668
689
 
 
690
        xt_strcpy(PATH_MAX, path, db->db_path);
 
691
        xt_add_dir_char(PATH_MAX, path);
 
692
        tab_get_row_file_name(file_name, table_name, tab_id);
 
693
        xt_strcat(PATH_MAX, path, file_name);
 
694
        tab->tab_row_file = xt_fs_get_file(self, path);
 
695
 
669
696
        xt_remove_last_name_of_path(path);
670
697
        tab_get_data_file_name(file_name, table_name, tab_id);
671
698
        xt_strcat(PATH_MAX, path, file_name);
672
 
        tab->tab_data_file = xt_dup_string(self, path);
673
 
 
674
 
        tab_load_table_pointers(self, tab, of_ind);
675
 
 
 
699
        tab->tab_rec_file = xt_fs_get_file(self, path);
 
700
 
 
701
        xt_remove_last_name_of_path(path);
 
702
        tab_get_index_file_name(file_name, table_name, tab_id);
 
703
        xt_strcat(PATH_MAX, path, file_name);
 
704
        tab->tab_ind_file = xt_fs_get_file(self, path);
 
705
 
 
706
        of_ind = xt_open_file(self, tab->tab_ind_file->fil_path, missing_ok ? XT_FS_MISSING_OK : XT_FS_DEFAULT);
 
707
        if (!of_ind) {
 
708
                freer_(); // xt_heap_release(tab)
 
709
                return_(XT_TAB_NOT_FOUND);
 
710
        }
 
711
        pushr_(xt_close_file, of_ind);
 
712
        tab_load_index_format(self, of_ind, table_name, &ind_head_size, &tab->tab_dic);
 
713
        tab->tab_index_head_size = ind_head_size;
 
714
        tab_load_index_header(self, tab, of_ind);
676
715
        freer_(); // xt_close_file(of_ind)
677
716
 
678
 
        xt_dl_init_tab(self, tab);
 
717
        of_rec = xt_open_file(self, tab->tab_rec_file->fil_path, missing_ok ? XT_FS_MISSING_OK : XT_FS_DEFAULT);
 
718
        if (!of_rec) {
 
719
                freer_(); // xt_heap_release(tab)
 
720
                return_(XT_TAB_NOT_FOUND);
 
721
        }
 
722
        pushr_(xt_close_file, of_rec);
 
723
        tab_load_table_format(self, of_rec, table_name, &tab_head_size, &tab->tab_dic);
 
724
        tab->tab_table_head_size = tab_head_size;
 
725
        tab->tab_dic.dic_table->dt_table = tab;
 
726
        tab_load_table_header(self, tab, of_rec);
 
727
        freer_(); // xt_close_file(of_rec)
 
728
 
 
729
        tab->tab_seq.xt_op_seq_set(self, tab->tab_head_op_seq+1);
 
730
        tab->tab_row_eof_id = tab->tab_head_row_eof_id;
 
731
        tab->tab_row_free_id = tab->tab_head_row_free_id;
 
732
        tab->tab_row_fnum = tab->tab_head_row_fnum;
 
733
        tab->tab_rec_eof_id = tab->tab_head_rec_eof_id;
 
734
        tab->tab_rec_free_id = tab->tab_head_rec_free_id;
 
735
        tab->tab_rec_fnum = tab->tab_head_rec_fnum;
 
736
 
 
737
        tab->tab_rows.xt_tc_setup(tab, sizeof(XTTabRowHeadDRec), sizeof(XTTabRowRefDRec));
 
738
        tab->tab_recs.xt_tc_setup(tab, tab_head_size, tab->tab_dic.dic_rec_size);
 
739
 
 
740
        xt_xres_init_tab(self, tab);
 
741
 
 
742
        if (!xt_init_row_locks(&tab->tab_locks))
 
743
                xt_throw(self);
679
744
 
680
745
        if (tab->tab_dic.dic_table)
681
 
                tab->tab_dic.dic_table->attachReferences(self);
 
746
                tab->tab_dic.dic_table->attachReferences(self, db);
 
747
 
 
748
        xt_heap_set_release_callback(self, tab, tab_onrelease);
682
749
 
683
750
        popr_(); // Discard xt_heap_release(tab)
684
751
 
694
761
        return_(XT_TAB_OK);
695
762
}
696
763
 
 
764
 
697
765
/*
698
766
 * Get a reference to a table in the current database. The table reference is valid,
699
767
 * as long as the thread is using the database!!!
707
775
 
708
776
        tab = (XTTableHPtr) xt_ht_get(self, db->db_tables, name);
709
777
        if (!tab && !no_load) {
710
 
                char    table_name[XT_TABLE_NAME_SIZE];
711
 
                xtWord4 tab_id;
 
778
                char            table_name[XT_TABLE_NAME_SIZE];
 
779
                xtTableID       tab_id = 0;
712
780
 
713
781
                if (!tab_find_table(self, db, name, table_name, &tab_id)) {
714
782
                        if (missing_ok)
728
796
 
729
797
static void tab_close_table(XTOpenTablePtr ot)
730
798
{
731
 
        if (ot->ot_data_file) {
732
 
                xt_close_file_ns(ot->ot_data_file);
733
 
                ot->ot_data_file = NULL;
 
799
        if (ot->ot_rec_file) {
 
800
                xt_close_file_ns(ot->ot_rec_file);
 
801
                ot->ot_rec_file = NULL;
734
802
                
735
803
        }
736
804
        if (ot->ot_ind_file) {
743
811
                ot->ot_row_file = NULL;
744
812
                
745
813
        }
746
 
        xt_dl_exit_open_tab(ot);
747
814
        if (ot->ot_table) {
748
815
                xt_heap_release(xt_get_self(), ot->ot_table);
749
816
                ot->ot_table = NULL;
750
817
        }
751
818
        if (ot->ot_row_rbuffer) {
752
 
                xt_sys_free(ot->ot_row_rbuffer);
 
819
                xt_free_ns(ot->ot_row_rbuffer);
753
820
                ot->ot_row_rbuf_size = 0;
754
821
                ot->ot_row_rbuffer = NULL;
755
822
        }
756
823
        if (ot->ot_row_wbuffer) {
757
 
                xt_sys_free(ot->ot_row_wbuffer);
 
824
                xt_free_ns(ot->ot_row_wbuffer);
758
825
                ot->ot_row_wbuf_size = 0;
759
826
                ot->ot_row_wbuffer = NULL;
760
827
        }
761
828
        xt_free(NULL, ot);
762
829
}
763
830
 
764
 
static void tab_close_open_tables(struct XTThread *self, XTTableHPtr tab)
765
 
{
766
 
        XTOpenTablePtr ot;
767
 
 
768
 
        xt_lock_mutex(self, &tab->tab_open_lock);
769
 
        pushr_(xt_unlock_mutex, &tab->tab_open_lock);
770
 
 
771
 
        while (tab->tab_open_pool) {
772
 
                ot = tab->tab_open_pool;
773
 
                tab->tab_open_pool = ot->ot_pool_next;
774
 
                ASSERT(tab->tab_open_count > 0);
775
 
                tab->tab_open_count--;
776
 
                if (!tab->tab_open_count)
777
 
                        xt_broadcast_cond(self, &ot->ot_table->tab_open_cond);
778
 
                tab_close_table(ot);
779
 
        }
780
 
 
781
 
        freer_(); // xt_unlock_mutex(&tab->tab_open_lock)
782
 
}
783
 
 
784
 
static void tab_wait_for_open_tables(struct XTThread *self, XTTableHPtr tab)
785
 
{
786
 
        XTOpenTablePtr ot;
787
 
 
788
 
        xt_lock_mutex(self, &tab->tab_open_lock);
789
 
        pushr_(xt_unlock_mutex, &tab->tab_open_lock);
790
 
 
791
 
        tab->tab_will_close++;
792
 
 
793
 
        /* Free all in the pool. */
794
 
        while (tab->tab_open_pool) {
795
 
                ot = tab->tab_open_pool;
796
 
                tab->tab_open_pool = ot->ot_pool_next;
797
 
                ASSERT(tab->tab_open_count > 0);
798
 
                tab->tab_open_count--;
799
 
                if (!tab->tab_open_count)
800
 
                        xt_broadcast_cond(self, &ot->ot_table->tab_open_cond);
801
 
                tab_close_table(ot);
802
 
        }
803
 
 
804
 
        /* Wait for the pool to be empty: */
805
 
        while (tab->tab_open_count > 0) {
806
 
                if (!self->t_quit)
807
 
                        xt_timed_wait_cond(self, &tab->tab_open_cond, &tab->tab_open_lock, 500);
808
 
        }
809
 
 
810
 
        freer_(); // xt_unlock_mutex(&tab->tab_open_lock)
811
 
}
812
 
 
813
831
/*
814
832
 * This function locks a particular table by locking the table directory
815
833
 * and waiting for all open tables handles to close.
817
835
 * Things are a bit complicated because the sweeper must be turned off before
818
836
 * the table directory is locked.
819
837
 */
820
 
static XTTableHPtr tab_lock_table(XTThreadPtr self, char *name, xtBool no_load)
 
838
static XTOpenTablePoolPtr tab_lock_table(XTThreadPtr self, char *name, xtBool no_load, xtBool flush_table, XTTableHPtr *tab)
821
839
{
822
 
        XTDatabaseHPtr  db = self->st_database;
823
 
        XTTableHPtr             tab;
 
840
        XTOpenTablePoolPtr      table_pool;
 
841
        XTDatabaseHPtr          db = self->st_database;
824
842
 
825
843
        enter_();
826
 
        /* Lock order: TABLE, SWEEPER, COMPACTOR! */
827
 
        /* Force the sweeper to close all tables: */
828
 
        xt_sw_lock_sweeper(self, db);
829
 
        pushr_(xt_sw_unlock_sweeper, db);
830
 
        /* Force the compactor to close all files */
831
 
        xt_dl_lock_compactor(self, db);
832
 
        pushr_(xt_dl_unlock_compactor, db);
833
 
 
834
 
        xt_ht_lock(self, db->db_tables);
835
 
        pushr_(xt_ht_unlock, db->db_tables);
 
844
        /* Lock the table, and close all references: */
 
845
        pushsr_(table_pool, xt_db_unlock_table_pool, xt_db_lock_table_pool_by_name(self, db, name, no_load, flush_table, FALSE, FALSE, tab));
836
846
 
837
847
        /* Tell MyBS to close all open tables of this sort: */
838
848
        char url[XT_DATABASE_NAME_SIZE + XT_TABLE_NAME_SIZE];
842
852
        xt_strcat(XT_DATABASE_NAME_SIZE + XT_TABLE_NAME_SIZE, url, name);
843
853
        xt_mybs_close_all_tables(url);
844
854
 
845
 
        if ((tab = xt_use_table_no_lock(self, db, name, no_load, FALSE, NULL)))
846
 
                /* Wait for all open tables to close: */
847
 
                tab_wait_for_open_tables(self, tab);
848
 
 
849
 
        popr_(); // Discard xt_ht_unlock(db->db_tables)
850
 
 
851
 
        freer_(); // xt_dl_unlock_compactor(db)
852
 
        freer_(); // xt_sw_unlock_sweeper(db)
853
 
        return_(tab);
 
855
        /* Wait for all open tables to close: */
 
856
        xt_db_wait_for_open_tables(self, table_pool);
 
857
 
 
858
        popr_(); // Discard xt_db_unlock_table_pool(table_pool)
 
859
        return_(table_pool);
854
860
}
855
861
 
856
862
/*
857
863
 * Return the ID of the table. 0 if table not found.
858
864
 */
859
 
static xtWord4 tab_lock_table_entry(XTThreadPtr self, char *name, xtBool missing_ok)
 
865
static XTOpenTablePoolPtr tab_lock_table_entry(XTThreadPtr self, char *name, xtBool missing_ok, xtTableID *tab_id)
860
866
{
861
 
        XTDatabaseHPtr  db = self->st_database;
862
 
        xtWord4                 tab_id;
 
867
        XTDatabaseHPtr          db = self->st_database;
 
868
        XTOpenTablePoolPtr      table_pool;
 
869
        XTTableHPtr                     tab;
863
870
 
864
871
        enter_();
865
 
        /* Lock order: TABLE, SWEEPER, COMPACTOR! */
866
 
        /* Force the sweeper to close all tables: */
867
 
        xt_sw_lock_sweeper(self, db);
868
 
        pushr_(xt_sw_unlock_sweeper, db);
869
 
        /* Force the compactor to close all files */
870
 
        xt_dl_lock_compactor(self, db);
871
 
        pushr_(xt_dl_unlock_compactor, db);
872
 
 
873
 
        xt_ht_lock(self, db->db_tables);
874
 
        pushr_(xt_ht_unlock, db->db_tables);
875
 
 
876
 
        if (!tab_find_table(self, db, name, NULL, &tab_id)) {
877
 
                if (!missing_ok)
878
 
                        xt_throw_ixterr(XT_CONTEXT, XT_ERR_TABLE_NOT_FOUND, name);                      
879
 
                tab_id = 0;
880
 
        }
881
 
 
882
 
        popr_(); // Discard xt_ht_unlock(db->db_tables)
883
 
 
884
 
        freer_(); // xt_dl_unlock_compactor(db)
885
 
        freer_(); // xt_sw_unlock_sweeper(db)
886
 
        return_(tab_id);
887
 
}
888
 
 
889
 
static void tab_unlock_table(XTThreadPtr self, XTTableHPtr tab)
890
 
{
891
 
        XTDatabaseHPtr db = self->st_database;
892
 
 
893
 
        if (tab) {
894
 
                xt_lock_mutex(self, &tab->tab_open_lock);
895
 
                tab->tab_will_close--;
896
 
                xt_unlock_mutex(self, &tab->tab_open_lock);
897
 
                xt_heap_release(self, tab);
898
 
        }
899
 
        xt_ht_unlock(self, db->db_tables);
900
 
}
901
 
 
902
 
static void tab_delete_table_files(XTThreadPtr self, XTDatabaseHPtr db, char *tab_name, xtWord4 tab_id)
 
872
        /* Lock the table, and close all references: */
 
873
        pushsr_(table_pool, xt_db_unlock_table_pool, xt_db_lock_table_pool_by_name(self, db, name, FALSE, TRUE, missing_ok, FALSE, &tab));
 
874
        if (!table_pool) {
 
875
                freer_(); // xt_db_unlock_table_pool(db)
 
876
                return_(NULL);
 
877
        }
 
878
 
 
879
        *tab_id = tab->tab_id;
 
880
        xt_heap_release(self, tab);
 
881
 
 
882
        popr_(); // Discart xt_db_unlock_table_pool(table_pool)
 
883
        return_(table_pool);
 
884
}
 
885
 
 
886
static void tab_delete_table_files(XTThreadPtr self, XTDatabaseHPtr db, char *tab_name, xtTableID tab_id)
903
887
{
904
888
        XTFilesOfTableRec       ft;
905
889
 
911
895
}
912
896
 
913
897
/* This function assumes the file is locked, if delete_if_exists = TRUE! */
914
 
static xtWord4 tab_get_new_id(XTThreadPtr self, XTDatabaseHPtr db, char *tab_name, xtBool delete_if_exists)
 
898
static xtTableID tab_get_new_id(XTThreadPtr self, XTDatabaseHPtr db, char *tab_name, xtBool delete_if_exists)
915
899
{
916
900
        u_int                   edx;
917
901
        XTTableEntryPtr te_ptr;
918
 
        xtWord4                 old_tab_id = 0;
919
 
        xtWord4                 tab_id;
 
902
        xtTableID               old_tab_id = 0;
 
903
        xtTableID               tab_id;
920
904
        u_int                   cnt;
921
905
 
922
906
        tab_id = 0;
934
918
                if (delete_if_exists) {
935
919
                        XTTableHPtr tab;
936
920
 
937
 
                        tab_delete_table_files(self, db, tab_name, old_tab_id);
938
921
                        if ((tab = xt_use_table_no_lock(self, db, tab_name, TRUE, TRUE, NULL))) {
939
 
                                xt_dl_logs_deleted(self, tab, FALSE);
 
922
                                xt_dl_delete_ext_data(self, tab, FALSE);
940
923
                                xt_heap_release(self, tab);
941
924
                        }
 
925
                        tab_delete_table_files(self, db, tab_name, old_tab_id);
942
926
                }
943
927
                else
944
928
                        xt_throw_ixterr(XT_CONTEXT, XT_ERR_TABLE_EXISTS, tab_name);
960
944
        char                            table_name[XT_MAX_TABLE_FILE_NAME_SIZE];
961
945
        char                            path[PATH_MAX];
962
946
        XTDatabaseHPtr          db = self->st_database;
 
947
        XTOpenTablePoolPtr      table_pool;
963
948
        XTTableHPtr                     tab;
964
 
        xtWord4                         old_tab_id = 0;
965
 
        xtWord4                         tab_id = 0;
966
 
        size_t                          pnt_size;
 
949
        xtTableID                       old_tab_id = 0;
 
950
        xtTableID                       tab_id = 0;
 
951
        size_t                          index_head_size;
967
952
        XTTabRowHeadDRec        row_head;
968
 
        XTTabDataHeadDRec       data_head;
969
 
        XTTabFormatDRec         head_fmt;
 
953
        XTTableHeadDRec         rec_head;
 
954
        XTTableFormatDRec       table_fmt;
 
955
        XTIndexFormatDRec       index_fmt;
970
956
        XTStringBufferRec       tab_def = { 0, 0, 0 };
971
957
        XTTableEntryRec         te_tab;
972
958
        XTSortedListInfoRec     li_undo;
978
964
                xt_throw_xterr(XT_CONTEXT, XT_ERR_NO_DATABASE_IN_USE);
979
965
 
980
966
        /* Lock to prevent table list change during creation. */
981
 
        old_tab_id = tab_lock_table_entry(self, name, TRUE);
982
 
        pushr_(tab_unlock_table, NULL);
 
967
        table_pool = tab_lock_table_entry(self, name, TRUE, &old_tab_id);
 
968
        pushr_(xt_db_unlock_table_pool, table_pool);
 
969
        xt_ht_lock(self, db->db_tables);
 
970
        pushr_(xt_ht_unlock, db->db_tables);
 
971
 
 
972
        /* This must be done before we remove the old table
 
973
         * from the directory, or we will not be able
 
974
         * to find the table, which could is require
 
975
         * for TRUNCATE!
 
976
         */
 
977
        tab_id = tab_get_new_id(self, db, name, delete_if_exists);
983
978
 
984
979
        /* Remove the table from the directory. It will get a new
985
980
         * ID so the handle in the directory will no longer be valid.
986
981
         */
987
 
        if (old_tab_id)
 
982
        if (old_tab_id) {
988
983
                xt_ht_del(self, db->db_tables, name);
989
 
 
990
 
        tab_id = tab_get_new_id(self, db, name, delete_if_exists);
 
984
        }
991
985
 
992
986
        /* Add the table to the directory, well remove on error! */
993
987
        li_undo.li_sl = db->db_table_by_id;
1000
994
 
1001
995
        *path = 0;
1002
996
        try_(a) {
1003
 
                XTOpenFilePtr   of, of_ind;
 
997
                XTOpenFilePtr   of_row, of_rec, of_ind;
1004
998
                off_t                   eof;
 
999
                size_t                  def_len = 0;
1005
1000
 
1006
1001
                tab = (XTTableHPtr) xt_heap_new(self, sizeof(XTTableHRec), tab_finalize);
1007
1002
                pushr_(xt_heap_release, tab);
1008
1003
 
1009
 
                pnt_size = offsetof(XTTabPointersDRec, tp_data) + dic->dic_key_count * XT_NODE_REF_SIZE;
1010
 
                tab->tab_head_size = pnt_size;
1011
 
                if (!(tab->tab_pointers = (XTTabPointersDPtr) xt_sys_calloc(tab->tab_head_size)))
 
1004
                /* This is the size of the index header: */
 
1005
                index_head_size = offsetof(XTIndexHeadDRec, tp_data) + dic->dic_key_count * XT_NODE_REF_SIZE;
 
1006
                tab->tab_index_head_size = index_head_size;
 
1007
                if (!(tab->tab_index_head = (XTIndexHeadDPtr) xt_calloc_ns(tab->tab_index_head_size)))
1012
1008
                        xt_throw(self);
1013
1009
 
1014
 
                /* ROW FILE: */
 
1010
                /* The length of the foreign key definition: */
 
1011
                if (dic->dic_table) {
 
1012
                        dic->dic_table->loadString(self, &tab_def);
 
1013
                        def_len = tab_def.sb_len + 1;
 
1014
                }
 
1015
 
 
1016
#ifdef DEBUG
 
1017
                tab->tab_head_op_seq = 0xFFFFFFFF - 12;
 
1018
#else
 
1019
                tab->tab_head_op_seq = 0;
 
1020
#endif
 
1021
 
 
1022
                /* ------- ROW FILE: */
1015
1023
                xt_strcpy(PATH_MAX, path, db->db_path);
1016
1024
                xt_add_dir_char(PATH_MAX, path);
1017
1025
                tab_get_row_file_name(table_name, name, tab_id);
1018
1026
                xt_strcat(PATH_MAX, path, table_name);
1019
1027
 
1020
 
                of = xt_open_file(self, path, XT_FS_CREATE | XT_FS_EXCLUSIVE);
1021
 
                pushr_(xt_close_file, of);
 
1028
                of_row = xt_open_file(self, path, XT_FS_CREATE | XT_FS_EXCLUSIVE);
 
1029
                pushr_(xt_close_file, of_row);
1022
1030
                XT_SET_DISK_4(row_head.rh_magic_4, XT_TAB_ROW_MAGIC);
1023
 
                XT_SET_DISK_4(row_head.rh_reserved_4, 0);
1024
 
                if (!xt_pwrite_file(of, 0, sizeof(row_head), &row_head))
 
1031
                if (!xt_pwrite_file(of_row, 0, sizeof(row_head), &row_head))
1025
1032
                        xt_throw(self);
1026
 
                freer_(); // xt_close_file(of)
 
1033
                freer_(); // xt_close_file(of_row)
1027
1034
 
1028
1035
                (void) ASSERT(sizeof(XTTabRowHeadDRec) == sizeof(XTTabRowRefDRec));
1029
1036
                (void) ASSERT(sizeof(XTTabRowRefDRec) == 1 << XT_TAB_ROW_SHIFTS);
1030
 
                tab->tab_row_eof = sizeof(row_head);
1031
 
                tab->tab_row_free = 0;
 
1037
 
 
1038
                tab->tab_row_eof_id = 1;
 
1039
                tab->tab_row_free_id = 0;
1032
1040
                tab->tab_row_fnum = 0;
1033
1041
 
1034
 
                /* INDEX FILE: */
 
1042
                tab->tab_head_row_eof_id = 1;
 
1043
                tab->tab_head_row_free_id = 0;
 
1044
                tab->tab_head_row_fnum  = 0;
 
1045
 
 
1046
                /* ------------ DATA FILE: */
 
1047
                xt_remove_last_name_of_path(path);
 
1048
                tab_get_data_file_name(table_name, name, tab_id);
 
1049
                xt_strcat(PATH_MAX, path, table_name);
 
1050
                of_rec = xt_open_file(self, path, XT_FS_CREATE | XT_FS_EXCLUSIVE);
 
1051
                pushr_(xt_close_file, of_rec);
 
1052
 
 
1053
                /* Calculate the offset of the first record in the data handle file. */
 
1054
                eof = sizeof(XTTableHeadDRec) + offsetof(XTTableFormatDRec, tf_definition) + def_len + XT_FORMAT_DEF_SPACE;
 
1055
                eof = (eof + 1024 - 1) / 1024 * 1024;           // Round to a value divisible by 1024
 
1056
 
 
1057
                tab->tab_table_head_size = eof;
 
1058
 
 
1059
                tab->tab_rec_eof_id = 1;                                                // This is the first record ID!
 
1060
                tab->tab_rec_free_id = 0;
 
1061
                tab->tab_rec_fnum = 0;
 
1062
                
 
1063
                tab->tab_head_rec_eof_id = 1;                                   // The first record ID
 
1064
                tab->tab_head_rec_free_id = 0;
 
1065
                tab->tab_head_rec_fnum = 0;
 
1066
 
 
1067
                tab->tab_dic.dic_rec_size = dic->dic_rec_size;
 
1068
                tab->tab_dic.dic_rec_fixed = dic->dic_rec_fixed;
 
1069
                tab->tab_dic.dic_min_auto_inc = dic->dic_min_auto_inc;
 
1070
 
 
1071
                XT_SET_DISK_4(rec_head.th_head_size_4, sizeof(XTTableHeadDRec));
 
1072
                XT_SET_DISK_4(rec_head.th_op_seq_4, tab->tab_head_op_seq);
 
1073
                XT_SET_DISK_6(rec_head.th_row_free_6, tab->tab_head_row_free_id);
 
1074
                XT_SET_DISK_6(rec_head.th_row_eof_6, tab->tab_head_row_eof_id);
 
1075
                XT_SET_DISK_6(rec_head.th_row_fnum_6, tab->tab_head_row_fnum);
 
1076
                XT_SET_DISK_6(rec_head.th_rec_free_6, tab->tab_head_rec_free_id);
 
1077
                XT_SET_DISK_6(rec_head.th_rec_eof_6, tab->tab_head_rec_eof_id);
 
1078
                XT_SET_DISK_6(rec_head.th_rec_fnum_6, tab->tab_head_rec_fnum);
 
1079
 
 
1080
                if (!xt_pwrite_file(of_rec, 0, sizeof(XTTableHeadDRec), &rec_head))
 
1081
                        xt_throw(self);
 
1082
 
 
1083
                /* Store the table format: */
 
1084
                memset(&table_fmt, 0, offsetof(XTTableFormatDRec, tf_definition));
 
1085
                XT_SET_DISK_4(table_fmt.tf_format_size_4, offsetof(XTTableFormatDRec, tf_definition) + def_len);
 
1086
                XT_SET_DISK_4(table_fmt.tf_tab_head_size_4, eof);
 
1087
                XT_SET_DISK_2(table_fmt.tf_tab_version_2, XT_TAB_CURRENT_VERSION);
 
1088
                XT_SET_DISK_4(table_fmt.tf_rec_size_4, tab->tab_dic.dic_rec_size);
 
1089
                XT_SET_DISK_1(table_fmt.tf_rec_fixed_1, tab->tab_dic.dic_rec_fixed);
 
1090
                XT_SET_DISK_8(table_fmt.tf_min_auto_inc_8, tab->tab_dic.dic_min_auto_inc);
 
1091
 
 
1092
                if (!xt_pwrite_file(of_rec, sizeof(XTTableHeadDRec), offsetof(XTTableFormatDRec, tf_definition), &table_fmt))
 
1093
                        xt_throw(self);
 
1094
                if (def_len) {
 
1095
                        if (!xt_pwrite_file(of_rec, sizeof(XTTableHeadDRec) + offsetof(XTTableFormatDRec, tf_definition), def_len, tab_def.sb_cstring))
 
1096
                                xt_throw(self);
 
1097
                }
 
1098
 
 
1099
                freer_(); // xt_close_file(of_rec)
 
1100
 
 
1101
                /* ----------- INDEX FILE: */
1035
1102
                xt_remove_last_name_of_path(path);
1036
1103
                tab_get_index_file_name(table_name, name, tab_id);
1037
1104
                xt_strcat(PATH_MAX, path, table_name);
1043
1110
                 * blocks. This is important to ensure that the writing of the cache
1044
1111
                 * blocks does not conflict with the writing of the header.
1045
1112
                 */
1046
 
                size_t def_len = 0;
1047
 
 
1048
 
                if (dic->dic_table) {
1049
 
                        dic->dic_table->loadString(self, &tab_def);
1050
 
                        def_len = tab_def.sb_len + 1;
1051
 
                }
1052
 
 
1053
 
                eof = tab->tab_head_size + offsetof(XTTabFormatDRec, tf_definition) + def_len;
1054
 
                eof = (eof + XT_DC_BLOCK_SIZE - 1) / XT_DC_BLOCK_SIZE * XT_DC_BLOCK_SIZE;
1055
 
                eof = (eof + XT_IDX_PAGE_SIZE - 1) / XT_IDX_PAGE_SIZE * XT_IDX_PAGE_SIZE;
1056
 
 
 
1113
 
 
1114
                eof = tab->tab_index_head_size + sizeof(XTIndexFormatDRec);
 
1115
                eof = (eof + XT_INDEX_PAGE_SIZE - 1) / XT_INDEX_PAGE_SIZE * XT_INDEX_PAGE_SIZE;
 
1116
                eof = (eof + XT_INDEX_PAGE_SIZE - 1) / XT_INDEX_PAGE_SIZE * XT_INDEX_PAGE_SIZE;
 
1117
 
 
1118
                xt_lock_mutex_ns(&db->db_wr_lock);
 
1119
                tab->tab_ind_rec_log_id = db->db_xlog.xl_flush_log_id;
 
1120
                tab->tab_ind_rec_log_offset = db->db_xlog.xl_flush_log_offset;
 
1121
                xt_unlock_mutex_ns(&db->db_wr_lock);
1057
1122
                tab->tab_ind_eof = eof;
1058
1123
                tab->tab_ind_free = 0;
1059
1124
 
1060
 
                /* DATA FILE: */
1061
 
                xt_remove_last_name_of_path(path);
1062
 
                tab_get_data_file_name(table_name, name, tab_id);
1063
 
                xt_strcat(PATH_MAX, path, table_name);
1064
 
                of = xt_open_file(self, path, XT_FS_CREATE | XT_FS_EXCLUSIVE);
1065
 
                pushr_(xt_close_file, of);
1066
 
                XT_SET_DISK_4(data_head.dh_magic_4, XT_TAB_DATA_MAGIC);
1067
 
                XT_SET_DISK_4(data_head.dh_reserved_4, 0);
1068
 
                if (!xt_pwrite_file(of, 0, sizeof(row_head), &row_head))
1069
 
                        xt_throw(self);
1070
 
                freer_(); // xt_close_file(of)
1071
 
 
1072
 
                tab->tab_data_eof = sizeof(XTTabDataHeadDRec);
1073
 
                tab->tab_data_free = 0;
1074
 
                tab->tab_data_fnum = 0;
1075
 
 
1076
 
                tab->tab_dic.dic_rec_size = dic->dic_rec_size;
1077
 
                tab->tab_dic.dic_rec_fixed = dic->dic_rec_fixed;
1078
 
                tab->tab_dic.dic_min_auto_inc = dic->dic_min_auto_inc;
1079
 
 
1080
 
                XT_SET_DISK_4(tab->tab_pointers->tp_head_size_4, tab->tab_head_size);
 
1125
                XT_SET_DISK_4(tab->tab_index_head->tp_head_size_4, tab->tab_index_head_size);
1081
1126
 
1082
1127
                /* Save the header: */
1083
 
                if (!tab_store_table_header(tab, of_ind))
1084
 
                        xt_throw(self);
1085
 
 
1086
 
                /* Store the table format: */
1087
 
                memset(&head_fmt, 0, offsetof(XTTabFormatDRec, tf_definition));
1088
 
                XT_SET_DISK_4(head_fmt.tf_format_size_4, offsetof(XTTabFormatDRec, tf_definition) + def_len);
1089
 
                XT_SET_DISK_2(head_fmt.tf_tab_version_2, XT_TAB_CURRENT_VERSION);
1090
 
                XT_SET_DISK_2(head_fmt.tf_ind_version_2, XT_IND_CURRENT_VERSION);
1091
 
                XT_SET_DISK_1(head_fmt.tf_node_ref_size_1, XT_NODE_REF_SIZE);
1092
 
                XT_SET_DISK_1(head_fmt.tf_rec_ref_size_1, XT_RECORD_REF_SIZE);
1093
 
                XT_SET_DISK_4(head_fmt.tf_rec_size_4, tab->tab_dic.dic_rec_size);
1094
 
                XT_SET_DISK_1(head_fmt.tf_rec_fixed_1, tab->tab_dic.dic_rec_fixed);
1095
 
                XT_SET_DISK_8(head_fmt.tf_min_auto_inc_8, tab->tab_dic.dic_min_auto_inc);
1096
 
                XT_SET_DISK_2(head_fmt.tf_no_of_data_logs_2, XT_NO_OF_DATA_LOGS);
1097
 
 
1098
 
                if (!xt_pwrite_file(of_ind, pnt_size, offsetof(XTTabFormatDRec, tf_definition), &head_fmt))
1099
 
                        xt_throw(self);
1100
 
 
1101
 
                if (def_len) {
1102
 
                        if (!xt_pwrite_file(of_ind, pnt_size + offsetof(XTTabFormatDRec, tf_definition), def_len, tab_def.sb_cstring))
1103
 
                                xt_throw(self);
1104
 
                }
1105
 
 
 
1128
                if (!tab_store_index_header(tab, of_ind))
 
1129
                        xt_throw(self);
 
1130
 
 
1131
                /* Store the index format: */
 
1132
                memset(&index_fmt, 0, sizeof(XTIndexFormatDRec));
 
1133
                XT_SET_DISK_4(index_fmt.if_format_size_4, sizeof(XTIndexFormatDRec));
 
1134
                XT_SET_DISK_2(index_fmt.if_tab_version_2, XT_TAB_CURRENT_VERSION);
 
1135
                XT_SET_DISK_2(index_fmt.if_ind_version_2, XT_IND_CURRENT_VERSION);
 
1136
                XT_SET_DISK_1(index_fmt.if_node_ref_size_1, XT_NODE_REF_SIZE);
 
1137
                XT_SET_DISK_1(index_fmt.if_rec_ref_size_1, XT_RECORD_REF_SIZE);
 
1138
 
 
1139
                if (!xt_pwrite_file(of_ind, tab->tab_index_head_size, sizeof(XTIndexFormatDRec), &index_fmt))
 
1140
                        xt_throw(self);
 
1141
 
 
1142
                freer_(); // xt_close_file(of_ind)
 
1143
 
 
1144
                /* ------------ */
1106
1145
                /* Log the new table ID! */
1107
1146
                db->db_curr_tab_id = tab_id;
1108
 
                if (!xt_xn_log_ids(self, db)) {
 
1147
                if (!xt_xn_log_tab_id(self, tab_id)) {
1109
1148
                        db->db_curr_tab_id = tab_id - 1;
1110
1149
                        xt_throw(self);
1111
1150
                }
1112
1151
 
1113
 
                freer_(); // xt_close_file(of_ind)
1114
1152
                freer_(); // xt_heap_release(tab)
1115
1153
        }
1116
1154
        catch_(a) {
1128
1166
                xt_sl_delete(self, db->db_table_by_id, &old_tab_id);
1129
1167
        popr_(); // Discard xt_sl_delete_from_info(&li_undo)
1130
1168
 
1131
 
        freer_(); // tab_unlock_table(NULL)
 
1169
        freer_(); // xt_ht_unlock(db->db_tables)
 
1170
        freer_(); // xt_db_unlock_table_pool(table_pool)
1132
1171
 
1133
1172
        /* I open the table here, because I cannot rely on MySQL to do
1134
1173
         * it after a create. This is normally OK, but with foreign keys
1135
 
         * table can be referenced and then they are not opened
 
1174
         * tables can be referenced and then they are not opened
1136
1175
         * before use. In this example, the INSERT opens t2, but t1 is
1137
1176
         * not opened of the create. As a result the foreign key
1138
1177
         * reference is not resolved.
1163
1202
 
1164
1203
xtPublic void xt_drop_table(XTThreadPtr self, char *tab_name)
1165
1204
{
1166
 
        XTDatabaseHPtr  db = self->st_database;
1167
 
        xtWord4                 tab_id;
 
1205
        XTDatabaseHPtr          db = self->st_database;
 
1206
        XTOpenTablePoolPtr      table_pool;
 
1207
        xtTableID                       tab_id = 0;
1168
1208
 
1169
1209
        enter_();
1170
1210
 
1171
 
        tab_id = tab_lock_table_entry(self, tab_name, TRUE);
1172
 
        pushr_(tab_unlock_table, NULL);
1173
 
 
1174
 
        xt_ht_del(self, db->db_tables, tab_name);
1175
 
 
1176
 
        tab_delete_table_files(self, db, tab_name, tab_id);
 
1211
        table_pool = tab_lock_table_entry(self, tab_name, TRUE, &tab_id);
 
1212
        pushr_(xt_db_unlock_table_pool, table_pool);
 
1213
        xt_ht_lock(self, db->db_tables);
 
1214
        pushr_(xt_ht_unlock, db->db_tables);
1177
1215
 
1178
1216
        if (tab_id) {
1179
1217
                XTTableHPtr tab;
1180
1218
                
 
1219
                if ((tab = xt_use_table_no_lock(self, db, tab_name, TRUE, TRUE, NULL))) {
 
1220
                        xt_dl_delete_ext_data(self, tab, TRUE);
 
1221
                        xt_heap_release(self, tab);
 
1222
                }
 
1223
                tab_delete_table_files(self, db, tab_name, tab_id);
1181
1224
                xt_sl_delete(self, db->db_table_by_id, &tab_id);
1182
 
                if ((tab = xt_use_table_no_lock(self, db, tab_name, TRUE, TRUE, NULL))) {
1183
 
                        xt_dl_logs_deleted(self, tab, FALSE);
1184
 
                        xt_heap_release(self, tab);
1185
 
                }
1186
1225
        }
1187
1226
 
1188
 
        /* Release the lock on the table directory: */
1189
 
        freer_(); // tab_unlock_table(NULL)
 
1227
        xt_ht_del(self, db->db_tables, tab_name);
 
1228
 
 
1229
        freer_(); // xt_ht_unlock(db->db_tables)
 
1230
        freer_(); // xt_db_unlock_table_pool(table_pool)
1190
1231
        exit_();
1191
1232
}
1192
1233
 
 
1234
xtPublic void xt_check_table(XTThreadPtr self, XTOpenTablePtr ot)
 
1235
{
 
1236
        XTTableHPtr                             tab = ot->ot_table;
 
1237
        xtRecordID                              prec_id;
 
1238
        XTTabRecExtDRec                 rec_buf;
 
1239
        xtWord4                                 free_count = 0, free_count2 = 0;
 
1240
        XTactExtRecEntryDRec    ext_rec;
 
1241
        size_t                                  log_size;
 
1242
        xtLogID                                 log_id;
 
1243
        xtLogOffset                             log_offset;
 
1244
        xtRecordID                              rec_id;
 
1245
        xtRecordID                              prev_rec_id;
 
1246
        xtXactID                                xn_id;
 
1247
        xtRowID                                 row_id;
 
1248
 
 
1249
//*DBG*/xt_dump_xlogs(tab->tab_db);
 
1250
        printf("\nCHECK TABLE: %s\n", tab->tab_name);
 
1251
 
 
1252
        xt_lock_mutex(self, &tab->tab_db->db_co_ext_lock);
 
1253
        pushr_(xt_unlock_mutex, &tab->tab_db->db_co_ext_lock);
 
1254
 
 
1255
        xt_lock_mutex(self, &tab->tab_rec_lock);
 
1256
        pushr_(xt_unlock_mutex, &tab->tab_rec_lock);
 
1257
 
 
1258
        printf("Records:-\n");
 
1259
        printf("Free list: %llu (%llu)\n", (u_llong) tab->tab_rec_free_id, (u_llong) tab->tab_rec_fnum);
 
1260
        printf("EOF:       %llu\n", (u_llong) tab->tab_rec_eof_id);
 
1261
 
 
1262
        rec_id = 1;
 
1263
        while (rec_id < tab->tab_rec_eof_id) {
 
1264
                if (!xt_tab_get_rec_data(ot, rec_id, XT_REC_EXT_HEADER_SIZE, (xtWord1 *) &rec_buf))
 
1265
                        xt_throw(self);
 
1266
 
 
1267
                printf("%-4llu ", (u_llong) rec_id);
 
1268
                switch (rec_buf.tr_rec_type_1 & XT_TAB_STATUS_MASK) {
 
1269
                        case XT_TAB_STATUS_FREED:
 
1270
                                printf("======== ");
 
1271
                                free_count++;
 
1272
                                break;
 
1273
                        case XT_TAB_STATUS_DELETE:
 
1274
                                printf("delete   ");
 
1275
                                break;
 
1276
                        case XT_TAB_STATUS_FIXED:
 
1277
                                printf("record-F ");
 
1278
                                break;
 
1279
                        case XT_TAB_STATUS_VARIABLE:
 
1280
                                printf("record-V ");
 
1281
                                break;
 
1282
                        case XT_TAB_STATUS_EXT_DLOG:
 
1283
                                printf("record-X ");
 
1284
                                break;
 
1285
                }
 
1286
                if (rec_buf.tr_rec_type_1 & XT_TAB_STATUS_CLEANED_BIT)
 
1287
                        printf("C");
 
1288
                else
 
1289
                        printf(" ");
 
1290
                prev_rec_id = XT_GET_DISK_4(rec_buf.tr_prev_rec_id_4);
 
1291
                xn_id = XT_GET_DISK_4(rec_buf.tr_xact_id_4);
 
1292
                row_id = XT_GET_DISK_4(rec_buf.tr_row_id_4);
 
1293
                switch (rec_buf.tr_rec_type_1 & XT_TAB_STATUS_MASK) {
 
1294
                        case XT_TAB_STATUS_FREED:
 
1295
                                printf(" prev=%-3llu (xact=%-3llu row=%lu)\n", (u_llong) prev_rec_id, (u_llong) xn_id, (u_long) row_id);
 
1296
                                break;
 
1297
                        case XT_TAB_STATUS_EXT_DLOG:
 
1298
                                printf(" prev=%-3llu  xact=%-3llu row=%lu  Xlog=%lu Xoff=%llu Xsiz=%lu\n", (u_llong) prev_rec_id, (u_llong) xn_id, (u_long) row_id, (u_long) XT_GET_DISK_2(rec_buf.re_log_id_2), (u_llong) XT_GET_DISK_6(rec_buf.re_log_offs_6), (u_long) XT_GET_DISK_4(rec_buf.re_log_dat_siz_4));
 
1299
 
 
1300
                                log_size = XT_GET_DISK_4(rec_buf.re_log_dat_siz_4);
 
1301
                                XT_GET_LOG_REF(log_id, log_offset, &rec_buf);
 
1302
                                if (!self->st_dlog_buf.dlb_read_log(log_id, log_offset, offsetof(XTactExtRecEntryDRec, er_data), (xtWord1 *) &ext_rec))
 
1303
                                        xt_log_and_clear_exception(self);
 
1304
                                else {
 
1305
                                        size_t          log_size2;
 
1306
                                        xtTableID       curr_tab_id;
 
1307
                                        xtRecordID      curr_rec_id;
 
1308
 
 
1309
                                        log_size2 = XT_GET_DISK_4(ext_rec.er_data_size_4);
 
1310
                                        curr_tab_id = XT_GET_DISK_4(ext_rec.er_tab_id_4);
 
1311
                                        curr_rec_id = XT_GET_DISK_4(ext_rec.er_rec_id_4);
 
1312
                                        if (log_size2 != log_size || curr_tab_id != tab->tab_id || curr_rec_id != rec_id) {
 
1313
                                                xt_logf(XT_INFO, "Table %s: record %llu, extended record %lu:%llu not valid\n", tab->tab_name, (u_llong) rec_id, (u_long) log_id, (u_llong) log_offset);
 
1314
                                        }
 
1315
                                }
 
1316
                                break;
 
1317
                        default:
 
1318
                                printf(" prev=%-3llu  xact=%-3llu row=%lu\n", (u_llong) prev_rec_id, (u_llong) xn_id, (u_long) row_id);
 
1319
                                break;
 
1320
                }
 
1321
                rec_id++;
 
1322
        }
 
1323
        
 
1324
        if (tab->tab_rec_fnum != free_count)
 
1325
                xt_logf(XT_INFO, "Table %s: incorrect number of free blocks, %llu, should be: %llu\n", tab->tab_name, (u_llong) free_count, (u_llong) tab->tab_rec_fnum);
 
1326
 
 
1327
        /* Checking the free list: */
 
1328
        prec_id = 0;
 
1329
        rec_id = tab->tab_rec_free_id;
 
1330
        while (rec_id) {
 
1331
                if (rec_id >= tab->tab_rec_eof_id) {
 
1332
                        xt_logf(XT_INFO, "Table %s: invalid reference on free list: %llu, ", tab->tab_name, (u_llong) rec_id);
 
1333
                        if (prec_id)
 
1334
                                xt_logf(XT_INFO, "reference by: %llu\n", (u_llong) prec_id);
 
1335
                        else
 
1336
                                xt_logf(XT_INFO, "reference by list head pointer\n");
 
1337
                        break;
 
1338
                }
 
1339
                if (!xt_tab_get_rec_data(ot, rec_id, XT_REC_EXT_HEADER_SIZE, (xtWord1 *) &rec_buf)) {
 
1340
                        xt_log_and_clear_exception(self);
 
1341
                        break;
 
1342
                }
 
1343
                if ((rec_buf.tr_rec_type_1 & XT_TAB_STATUS_MASK) != XT_TAB_STATUS_FREED)
 
1344
                        xt_logf(XT_INFO, "Table %s: record, %llu, on free list is not free\n", tab->tab_name, (u_llong) rec_id);
 
1345
                free_count2++;
 
1346
                prec_id = rec_id;
 
1347
                rec_id = XT_GET_DISK_4(rec_buf.tr_prev_rec_id_4);
 
1348
        }
 
1349
        if (free_count2 < free_count)
 
1350
                xt_logf(XT_INFO, "Table %s: not all free blocks (%llu) on free list: %llu\n", tab->tab_name, (u_llong) free_count, (u_llong) free_count2);
 
1351
 
 
1352
        freer_(); // xt_unlock_mutex_ns(&tab->tab_rec_lock);
 
1353
 
 
1354
        XTTabRowRefDRec row_buf;
 
1355
        xtRefID                 ref_id;
 
1356
 
 
1357
        xt_lock_mutex(self, &tab->tab_row_lock);
 
1358
        pushr_(xt_unlock_mutex, &tab->tab_row_lock);
 
1359
 
 
1360
        printf("Rows:-\n");
 
1361
        printf("Free list: %llu (%llu)\n", (u_llong) tab->tab_row_free_id, (u_llong) tab->tab_row_fnum);
 
1362
        printf("EOF:       %llu\n", (u_llong) tab->tab_row_eof_id);
 
1363
 
 
1364
        rec_id = 1;
 
1365
        while (rec_id < tab->tab_row_eof_id) {
 
1366
                if (!tab->tab_rows.xt_tc_read_4(ot->ot_row_file, rec_id, (xtWord4 *) &row_buf))
 
1367
                        xt_throw(self);
 
1368
                printf("%-3llu ", (u_llong) rec_id);
 
1369
                ref_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
 
1370
                if (ref_id == 0)
 
1371
                        printf("====== 0\n");
 
1372
                else
 
1373
                        printf("in use %llu\n", (u_llong) ref_id);
 
1374
                rec_id++;
 
1375
        }
 
1376
 
 
1377
        freer_(); // xt_unlock_mutex(&tab->tab_row_lock);
 
1378
 
 
1379
        freer_(); // xt_unlock_mutex(&tab->tab_db->db_co_ext_lock);
 
1380
}
 
1381
 
1193
1382
xtPublic void xt_rename_table(XTThreadPtr self, char *old_name, char *new_name)
1194
1383
{
1195
1384
        XTDatabaseHPtr          db = self->st_database;
 
1385
        XTOpenTablePoolPtr      table_pool;
1196
1386
        XTTableHPtr                     tab;
1197
1387
        char                            to_path[PATH_MAX];
1198
1388
        char                            table_name[XT_MAX_TABLE_FILE_NAME_SIZE];
1199
1389
        char                            *postfix;
1200
1390
        XTFilesOfTableRec       ft;
1201
1391
        XTDictionaryRec         dic = { 0, 0, 0, 0, 0, 0, 0, 0 };
1202
 
        xtWord4                         tab_id;
 
1392
        xtTableID                       tab_id;
1203
1393
        XTTableEntryPtr         te_ptr;
1204
1394
        char                            *te_new_name;
1205
1395
 
1226
1416
         * So instead we just make sure that the sweeper is not
1227
1417
         * using the table.
1228
1418
         */
1229
 
        tab = tab_lock_table(self, old_name, FALSE);
1230
 
        pushr_(tab_unlock_table, NULL);
 
1419
        table_pool = tab_lock_table(self, old_name, FALSE, TRUE, &tab);
 
1420
        pushr_(xt_db_unlock_table_pool, table_pool);
 
1421
        xt_ht_lock(self, db->db_tables);
 
1422
        pushr_(xt_ht_unlock, db->db_tables);
1231
1423
        tab_id = tab->tab_id;
1232
1424
        myxt_move_dictionary(&dic, &tab->tab_dic);
1233
1425
        pushr_(myxt_free_dictionary, &dic);
1269
1461
        xt_heap_release(self, tab);
1270
1462
 
1271
1463
        freer_(); // myxt_free_dictionary(&dic)
1272
 
        freer_(); // tab_unlock_table(NULL)
 
1464
        freer_(); // xt_ht_unlock(db->db_tables)
 
1465
        freer_(); // xt_db_unlock_table_pool(table_pool)
1273
1466
}
1274
1467
 
1275
1468
xtPublic void xt_move_table(XTThreadPtr self, char *old_name, XTDatabaseHPtr new_db, char *new_name)
1276
1469
{
1277
1470
        XTDatabaseHPtr          db = self->st_database;
 
1471
        XTOpenTablePoolPtr      table_pool;
1278
1472
        XTTableHPtr                     tab;
1279
1473
        char                            to_path[PATH_MAX];
1280
1474
        char                            table_name[XT_MAX_TABLE_FILE_NAME_SIZE];
1281
1475
        char                            *postfix;
1282
1476
        XTFilesOfTableRec       ft;
1283
 
        xtWord4                         old_tab_id;
1284
 
        xtWord4                         new_tab_id = 0;
 
1477
        xtTableID                       old_tab_id;
 
1478
        xtTableID                       new_tab_id = 0;
1285
1479
        XTDictionaryRec         dic = { 0, 0, 0, 0, 0, 0, 0, 0 };
1286
1480
        XTTableEntryRec         te_tab;
1287
1481
        XTSortedListInfoRec     li_undo;
1289
1483
        if (strlen(new_name) > XT_TABLE_NAME_SIZE-1)
1290
1484
                xt_throw_ixterr(XT_CONTEXT, XT_ERR_NAME_TOO_LONG, new_name);
1291
1485
 
1292
 
        tab = tab_lock_table(self, old_name, TRUE);
1293
 
        pushr_(tab_unlock_table, NULL);
 
1486
        table_pool = tab_lock_table(self, old_name, TRUE, TRUE, &tab);
 
1487
        pushr_(xt_db_unlock_table_pool, table_pool);
 
1488
        xt_ht_lock(self, db->db_tables);
 
1489
        pushr_(xt_ht_unlock, db->db_tables);
1294
1490
        old_tab_id = tab->tab_id;
1295
1491
        myxt_move_dictionary(&dic, &tab->tab_dic);
1296
1492
        pushr_(myxt_free_dictionary, &dic);
1345
1541
        xt_heap_release(self, tab);
1346
1542
 
1347
1543
        freer_(); // myxt_free_dictionary(&dic)
1348
 
        freer_(); // tab_unlock_table(NULL)
 
1544
        freer_(); // xt_ht_unlock(db->db_tables)
 
1545
        freer_(); // xt_db_unlock_table_pool(table_pool)
1349
1546
}
1350
1547
 
1351
 
xtPublic XTTableHPtr xt_use_table(XTThreadPtr self, char *name, xtBool missing_ok)
 
1548
xtPublic XTTableHPtr xt_use_table(XTThreadPtr self, char *name, xtBool no_load, xtBool missing_ok)
1352
1549
{
1353
1550
        XTTableHPtr             tab;
1354
1551
        XTDatabaseHPtr  db = self->st_database;
1355
1552
 
1356
1553
        xt_ht_lock(self, db->db_tables);
1357
1554
        pushr_(xt_ht_unlock, db->db_tables);
1358
 
        tab = xt_use_table_no_lock(self, db, name, FALSE, missing_ok, NULL);
 
1555
        tab = xt_use_table_no_lock(self, db, name, no_load, missing_ok, NULL);
1359
1556
        freer_();
1360
1557
        return tab;
1361
1558
}
1362
1559
 
 
1560
xtPublic void xt_flush_table(XTThreadPtr self, XTOpenTablePtr ot)
 
1561
{
 
1562
        XTTableHPtr             tab = ot->ot_table;
 
1563
        XTDatabaseHPtr  db = tab->tab_db;
 
1564
 
 
1565
        /* Wakeup the sweeper:
 
1566
         * We want the sweeper to check if there is anything to do,
 
1567
         * so we must wake it up.
 
1568
         * Once it has done all it can, it will go back to sleep.
 
1569
         * This should be good enough.
 
1570
         */
 
1571
        if (db->db_sw_idle) {
 
1572
                u_int check_count = db->db_sw_check_count;
 
1573
 
 
1574
                for (;;) {
 
1575
                        xt_broadcast_cond(NULL, &db->db_xn_wait_cond);
 
1576
                        if (!db->db_sw_thread || db->db_sw_idle || check_count != db->db_sw_check_count)
 
1577
                                break;
 
1578
                        xt_sleep_100th_second(1);
 
1579
                }
 
1580
        }
 
1581
 
 
1582
        /* Wait for the sweeper become idle: */
 
1583
        xt_lock_mutex(self, &db->db_xn_wait_lock);
 
1584
        pushr_(xt_unlock_mutex, &db->db_xn_wait_lock);
 
1585
        while (db->db_sw_thread && !db->db_sw_idle) {
 
1586
                xt_timed_wait_cond(self, &db->db_xn_wait_cond, &db->db_xn_wait_lock, 10);
 
1587
        }
 
1588
        freer_(); // xt_unlock_mutex(&db->db_xn_wait_lock)
 
1589
 
 
1590
        /* Wait for the writer to write out all operations on the table: */
 
1591
        while (XTTableSeq::xt_op_is_before(tab->tab_head_op_seq+1, tab->tab_seq.ts_next_seq)) {
 
1592
                /* Flush the log, in case this is holding up the
 
1593
                 * writer!
 
1594
                 */
 
1595
                if (!db->db_xlog.xlog_flush(self))
 
1596
                        xt_throw(self);
 
1597
 
 
1598
                xt_lock_mutex(self, &db->db_wr_lock);
 
1599
                pushr_(xt_unlock_mutex, &db->db_wr_lock);
 
1600
                db->db_wr_thread_waiting++;
 
1601
                /*
 
1602
                 * Wake the writer if it is sleeping. In order to
 
1603
                 * flush a table we must wait for the writer to complete
 
1604
                 * committing all the changes in the table to the database.
 
1605
                 */
 
1606
                if (db->db_wr_idle) {
 
1607
                        if (!xt_broadcast_cond(NULL, &db->db_wr_cond))
 
1608
                                xt_log_and_clear_exception_ns();
 
1609
                }
 
1610
 
 
1611
                freer_(); // xt_unlock_mutex(&db->db_wr_lock)
 
1612
                xt_sleep_100th_second(1);
 
1613
 
 
1614
                xt_lock_mutex(self, &db->db_wr_lock);
 
1615
                pushr_(xt_unlock_mutex, &db->db_wr_lock);
 
1616
                db->db_wr_thread_waiting--;
 
1617
                freer_(); // xt_unlock_mutex(&db->db_wr_lock)
 
1618
        }
 
1619
 
 
1620
        /* Write the table header: */
 
1621
        if (tab->tab_flush_pending) {
 
1622
                tab->tab_flush_pending = FALSE;
 
1623
                xt_tab_store_header(self, ot);
 
1624
 
 
1625
                /* Flush the table data: */
 
1626
                if (!xt_flush_file(ot->ot_rec_file) ||
 
1627
                        !xt_flush_file(ot->ot_row_file)) {
 
1628
                        tab->tab_flush_pending = TRUE;
 
1629
                        xt_throw(self);
 
1630
                }
 
1631
        }
 
1632
 
 
1633
        if (!xt_flush_table_index(ot))
 
1634
                xt_throw(self);
 
1635
}
 
1636
 
1363
1637
xtPublic XTOpenTablePtr tab_open_table(XTTableHPtr tab)
1364
1638
{
1365
1639
        volatile XTOpenTablePtr ot;
1366
1640
        XTThreadPtr                             self;
1367
1641
 
1368
 
        if (!(ot = (XTOpenTablePtr) xt_sys_malloc(offsetof(XTOpenTableRec, ot_read_buf) + tab->tab_buf_size)))
 
1642
        if (!(ot = (XTOpenTablePtr) xt_malloc_ns(sizeof(XTOpenTableRec))))
1369
1643
                return NULL;
1370
1644
        memset(ot, 0, offsetof(XTOpenTableRec, ot_ind_rbuf));
1371
1645
 
1373
1647
        try_(a) {
1374
1648
                xt_heap_reference(self, tab);
1375
1649
                ot->ot_table = tab;
1376
 
                ot->ot_row_file = xt_open_file(self, ot->ot_table->tab_row_file, XT_FS_DEFAULT);
1377
 
                ot->ot_data_file = xt_open_file(self, ot->ot_table->tab_data_file, XT_FS_DEFAULT);
1378
 
                ot->ot_ind_file = xt_open_file(self, ot->ot_table->tab_ind_file, XT_FS_DEFAULT);
 
1650
                ot->ot_row_file = xt_open_file(self, ot->ot_table->tab_row_file->fil_path, XT_FS_DEFAULT);
 
1651
                ot->ot_rec_file = xt_open_file(self, ot->ot_table->tab_rec_file->fil_path, XT_FS_DEFAULT);
 
1652
                ot->ot_ind_file = xt_open_file(self, ot->ot_table->tab_ind_file->fil_path, XT_FS_DEFAULT);
1379
1653
        }
1380
1654
        catch_(a) {
1381
1655
                ;
1382
1656
        }
1383
1657
        cont_(a);
1384
1658
 
1385
 
        if (!ot->ot_table || !ot->ot_row_file || !ot->ot_data_file || !ot->ot_ind_file)
 
1659
        if (!ot->ot_table || !ot->ot_row_file || !ot->ot_rec_file || !ot->ot_ind_file)
1386
1660
                goto failed;
1387
1661
 
1388
 
        ot->ot_row_file->of_flush = xt_dc_flush;
1389
 
        ot->ot_ind_file->of_flush = xt_dc_flush;
1390
 
 
1391
 
        if (!(ot->ot_row_rbuffer = (xtWord1 *) xt_sys_malloc(ot->ot_table->tab_dic.dic_rec_size)))
 
1662
        if (!(ot->ot_row_rbuffer = (xtWord1 *) xt_malloc_ns(ot->ot_table->tab_dic.dic_rec_size)))
1392
1663
                goto failed;
1393
1664
        ot->ot_row_rbuf_size = ot->ot_table->tab_dic.dic_rec_size;
1394
 
        if (!(ot->ot_row_wbuffer = (xtWord1 *) xt_sys_malloc(ot->ot_table->tab_dic.dic_rec_size)))
 
1665
        if (!(ot->ot_row_wbuffer = (xtWord1 *) xt_malloc_ns(ot->ot_table->tab_dic.dic_rec_size)))
1395
1666
                goto failed;
1396
1667
        ot->ot_row_wbuf_size = ot->ot_table->tab_dic.dic_rec_size;
1397
1668
 
1399
1670
        ot->ot_rec_fixed = ot->ot_table->tab_dic.dic_rec_fixed;
1400
1671
        ot->ot_rec_size = ot->ot_table->tab_dic.dic_rec_size;
1401
1672
 
1402
 
        if (!xt_dl_init_open_tab(ot))
1403
 
                goto failed;
1404
 
 
1405
1673
        return ot;
1406
1674
 
1407
1675
        failed:
1411
1679
 
1412
1680
xtPublic XTOpenTablePtr xt_open_table(XTTableHPtr tab)
1413
1681
{
1414
 
        XTOpenTablePtr ot;
1415
 
 
1416
 
        xt_mutex_lock(&tab->tab_open_lock);
1417
 
        if (tab->tab_will_close) {
1418
 
                xt_register_ixterr(XT_REG_CONTEXT, XT_ERR_TABLE_LOCKED, tab->tab_name);
1419
 
                ot = NULL;
1420
 
        }
1421
 
        else if ((ot = tab_open_table(tab))) {
1422
 
                tab->tab_open_count++;
1423
 
        }
1424
 
        xt_mutex_unlock(&tab->tab_open_lock);
1425
 
        return ot;
1426
 
}
1427
 
 
1428
 
xtPublic XTOpenTablePtr xt_open_table_by_name(XTThreadPtr self, char *name)
1429
 
{
1430
 
        XTTableHPtr             tab;
1431
 
        XTOpenTablePtr  ot;
1432
 
 
1433
 
        tab = xt_use_table(self, name, FALSE);
1434
 
        ot = xt_open_table(tab);
1435
 
        xt_heap_release(self, tab);
1436
 
        if (!ot)
1437
 
                throw_();
1438
 
 
1439
 
        return ot;
 
1682
        return tab_open_table(tab);
1440
1683
}
1441
1684
 
1442
1685
xtPublic void xt_close_table(XTOpenTablePtr ot)
1443
1686
{
1444
 
        XTTableHPtr tab = ot->ot_table;
1445
 
 
1446
 
        xt_mutex_lock(&tab->tab_open_lock);
1447
 
        ASSERT_NS(tab->tab_open_count > 0);
1448
 
        tab->tab_open_count--;
1449
 
        if (!tab->tab_open_count)
1450
 
                xt_cond_wakeall(&tab->tab_open_cond);
1451
 
        xt_mutex_unlock(&tab->tab_open_lock);
1452
1687
        tab_close_table(ot);
1453
1688
}
1454
1689
 
1455
 
xtPublic xtBool xt_flush_table(XTOpenTablePtr ot)
 
1690
xtPublic xtBool xt_flush_table_index(XTOpenTablePtr ot)
1456
1691
{
1457
 
        /* Fixed length record files have no log. */
1458
 
        if (ot->ot_log_buf_len > 0 && !xt_dl_flush_log(ot))
1459
 
                return FAILED;
1460
 
        if (!xt_dc_flush(ot->ot_row_file))
1461
 
                return FAILED;
1462
 
        if (!xt_dc_flush(ot->ot_ind_file))
1463
 
                return FAILED;
1464
 
        return xt_tab_flush_data(ot);
 
1692
        XTTableHPtr             tab = ot->ot_table;
 
1693
        XTDatabaseHPtr  db = tab->tab_db;
 
1694
        xtBool                  ok = TRUE;
 
1695
 
 
1696
        xt_lock_mutex_ns(&db->db_wr_lock);
 
1697
        tab->tab_ind_rec_log_id = db->db_xlog.xl_flush_log_id;
 
1698
        tab->tab_ind_rec_log_offset = db->db_xlog.xl_flush_log_offset;
 
1699
        xt_unlock_mutex_ns(&db->db_wr_lock);
 
1700
 
 
1701
        /* Flush the index cache: */
 
1702
        if (!xt_ind_flush(ot->ot_ind_file, &tab->tab_ind_flush_pending))
 
1703
                return FAILED;
 
1704
 
 
1705
        /* And write the index file header if required: */
 
1706
        if (tab->tab_ind_head_dirty) {
 
1707
                xt_lock_mutex_ns(&tab->tab_ind_lock);
 
1708
                ok = tab_store_index_header(tab, ot->ot_ind_file);
 
1709
                xt_unlock_mutex_ns(&tab->tab_ind_lock);
 
1710
        }
 
1711
        return ok;
1465
1712
}
1466
1713
 
1467
 
xtPublic int xt_use_table_by_id(XTThreadPtr self, XTTableHPtr *r_tab, XTDatabaseHPtr db, xtWord4 tab_id)
 
1714
xtPublic int xt_use_table_by_id(XTThreadPtr self, XTTableHPtr *r_tab, XTDatabaseHPtr db, xtTableID tab_id)
1468
1715
{
1469
1716
        XTTableEntryPtr te_ptr;
1470
1717
        XTTableHPtr             tab = NULL;
1492
1739
        return r;
1493
1740
}
1494
1741
 
1495
 
xtPublic xtBool xt_tab_flush_data(XTOpenTablePtr ot)
1496
 
{
1497
 
        register XTTableHPtr    tab = ot->ot_table;
1498
 
        xtBool                                  ok = TRUE;
1499
 
 
1500
 
        ASSERT_NS(((tab->tab_data_eof + tab->tab_buf_offset) - sizeof(XTTabDataHeadDRec)) % tab->tab_dic.dic_rec_size == 0);
1501
 
        if (tab->tab_buf_offset) {
1502
 
                xt_rwlock_wrlock(&tab->tab_buf_rwlock);
1503
 
                if (tab->tab_buf_offset) {
1504
 
                        if ((ok = xt_pwrite_file(ot->ot_data_file, tab->tab_data_eof, tab->tab_buf_offset, tab->tab_data_buf))) {
1505
 
                                tab->tab_data_eof += tab->tab_buf_offset;
1506
 
                                tab->tab_buf_offset = 0;
1507
 
                                ok = tab_store_table_header(tab, ot->ot_ind_file);
1508
 
                        }
1509
 
                }
1510
 
                else if (tab->tab_head_dirty)
1511
 
                        ok = tab_store_table_header(tab, ot->ot_ind_file);
1512
 
                xt_rwlock_unlock(&tab->tab_buf_rwlock);
1513
 
        }
1514
 
        else if (tab->tab_head_dirty) {
1515
 
                xt_rwlock_wrlock(&tab->tab_buf_rwlock);
1516
 
                if (tab->tab_head_dirty)
1517
 
                        ok = tab_store_table_header(tab, ot->ot_ind_file);
1518
 
                xt_rwlock_unlock(&tab->tab_buf_rwlock);
1519
 
        }
1520
 
        return ok;
1521
 
}
1522
 
 
1523
1742
/* The fixed part of the record is already in the row buffer.
1524
1743
 * This function loads the extended part, expanding the row
1525
1744
 * buffer if necessary.
1526
1745
 */
1527
 
static xtBool tab_load_ext_data(XTOpenTablePtr ot, off_t load_rec, xtWord1 *buffer)
 
1746
xtPublic xtBool xt_tab_load_ext_data(XTOpenTablePtr ot, xtRecordID load_rec_id, xtWord1 *buffer, u_int cols_req)
1528
1747
{
1529
 
        size_t                          log_size;
1530
 
        xtWord4                         log_id;
1531
 
        off_t                           log_offset;
1532
 
        xtWord1                         save_buffer[XT_LOG_REC_HEADER_SIZE];
1533
 
        u_int                           retry_count = 10;
1534
 
        XTDataLogBufferDPtr     ext_data_ptr;
1535
 
        size_t                          log_size2;
1536
 
        off_t                           curr_rec;
 
1748
        size_t                                  log_size;
 
1749
        xtLogID                                 log_id;
 
1750
        xtLogOffset                             log_offset;
 
1751
        xtWord1                                 save_buffer[offsetof(XTactExtRecEntryDRec, er_data)];
 
1752
        xtBool                                  retried = FALSE;
 
1753
        XTactExtRecEntryDPtr    ext_data_ptr;
 
1754
        size_t                                  log_size2;
 
1755
        xtTableID                               curr_tab_id;
 
1756
        xtRecordID                              curr_rec_id;
1537
1757
 
1538
1758
        log_size = XT_GET_DISK_4(((XTTabRecExtDPtr) ot->ot_row_rbuffer)->re_log_dat_siz_4);
1539
 
        XT_GET_LOG_REF_6(log_id, log_offset, ((XTTabRecExtDPtr) ot->ot_row_rbuffer)->re_log_rec_6);
 
1759
        XT_GET_LOG_REF(log_id, log_offset, (XTTabRecExtDPtr) ot->ot_row_rbuffer);
1540
1760
 
1541
1761
        if (ot->ot_rec_size + log_size > ot->ot_row_rbuf_size) {
1542
 
                if (!xt_sys_realloc((void **) &ot->ot_row_rbuffer, ot->ot_rec_size + log_size))
 
1762
                if (!xt_realloc_ns((void **) &ot->ot_row_rbuffer, ot->ot_rec_size + log_size))
1543
1763
                        return FAILED;
1544
1764
                ot->ot_row_rbuf_size = ot->ot_rec_size + log_size;
1545
1765
        }
1546
1766
 
1547
1767
        /* Read the extended part first: */
1548
 
        ext_data_ptr = (XTDataLogBufferDPtr) (ot->ot_row_rbuffer + ot->ot_rec_size - XT_LOG_REC_HEADER_SIZE);
 
1768
        ext_data_ptr = (XTactExtRecEntryDPtr) (ot->ot_row_rbuffer + ot->ot_rec_size - offsetof(XTactExtRecEntryDRec, er_data));
1549
1769
 
1550
1770
        /* Save the data which the header will overwrite: */
1551
 
        memcpy(save_buffer, ext_data_ptr, XT_LOG_REC_HEADER_SIZE);
 
1771
        memcpy(save_buffer, ext_data_ptr, offsetof(XTactExtRecEntryDRec, er_data));
1552
1772
        
1553
1773
        reread:
1554
 
        if (!xt_dl_read_log(ot, log_id, log_offset, log_size + XT_LOG_REC_HEADER_SIZE, (xtWord1 *) ext_data_ptr))
 
1774
        if (!ot->ot_thread->st_dlog_buf.dlb_read_log(log_id, log_offset, offsetof(XTactExtRecEntryDRec, er_data) + log_size, (xtWord1 *) ext_data_ptr))
1555
1775
                goto retry_read;
1556
1776
 
1557
 
        log_size2 = XT_GET_DISK_4(ext_data_ptr->lb_data_size_4);
1558
 
        curr_rec = XT_GET_DISK_6(ext_data_ptr->lb_record_6);
 
1777
        log_size2 = XT_GET_DISK_4(ext_data_ptr->er_data_size_4);
 
1778
        curr_tab_id = XT_GET_DISK_4(ext_data_ptr->er_tab_id_4);
 
1779
        curr_rec_id = XT_GET_DISK_4(ext_data_ptr->er_rec_id_4);
1559
1780
 
1560
 
        if (log_size2 != log_size || curr_rec != load_rec) {
 
1781
        if (log_size2 != log_size || curr_tab_id != ot->ot_table->tab_id || curr_rec_id != load_rec_id) {
 
1782
                /* [(3)] This can happen in the following circumstances:
 
1783
                 * - A new record is created, but the data log is not
 
1784
                 * flushed.
 
1785
                 * - The server quits.
 
1786
                 * - On restart the transaction is rolled back, but the data record
 
1787
                 *   was not written, so later a new record could be written at this
 
1788
                 *   location.
 
1789
                 * - Later the sweeper tries to cleanup this record, and finds
 
1790
                 *   that a different record has been written at this position.
 
1791
                 *
 
1792
                 * NOTE: Index entries can only be written to disk for records
 
1793
                 *       that have been committed to the disk, because uncommitted
 
1794
                 *       records may not exist in order to remove the index entry
 
1795
                 *       on cleanup.
 
1796
                 */
1561
1797
                xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_EXT_RECORD);
1562
1798
                goto retry_read;
1563
1799
        }
1564
1800
 
1565
1801
        /* Restore the saved area: */
1566
 
        memcpy(ext_data_ptr, save_buffer, XT_LOG_REC_HEADER_SIZE);
1567
 
 
1568
 
        return myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_EXT_HEADER_SIZE, buffer);
1569
 
 
1570
 
        retry_read: {
 
1802
        memcpy(ext_data_ptr, save_buffer, offsetof(XTactExtRecEntryDRec, er_data));
 
1803
 
 
1804
        if (retried)
 
1805
                xt_unlock_mutex_ns(&ot->ot_table->tab_db->db_co_ext_lock);
 
1806
        return myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_EXT_HEADER_SIZE, buffer, cols_req);
 
1807
 
 
1808
        retry_read:
 
1809
        if (!retried) {
1571
1810
                /* (1) It may be that reading the log fails because the garbage collector
1572
1811
                 * has moved the record since we determined the location.
1573
1812
                 * We handle this here, by re-reading the data the garbage collector
1588
1827
                 * part of an uncommitted record (belonging to some other thread/
1589
1828
                 * transaction).
1590
1829
                 */
1591
 
                xtWord4 log_id2;
1592
 
                off_t   log_offset2;
1593
 
 
1594
 
                if (!xt_tab_get_data(ot, load_rec + offsetof(XTTabRecExtDRec, re_log_rec_6), 6,
1595
 
                        (xtWord1 *) &((XTTabRecExtDPtr) ot->ot_row_rbuffer)->re_log_rec_6, NULL))
1596
 
                        return FAILED;
1597
 
 
1598
 
                XT_GET_LOG_REF_6(log_id2, log_offset2, ((XTTabRecExtDPtr) ot->ot_row_rbuffer)->re_log_rec_6);
1599
 
                if (log_id != log_id2 || log_offset != log_offset2) {
1600
 
                        log_id = log_id2;
1601
 
                        log_offset = log_offset2;
1602
 
                        goto reread;
1603
 
                }
1604
 
                else if (retry_count) {
1605
 
                        retry_count--;
1606
 
                        xt_busy_wait();
1607
 
                        goto reread;
1608
 
                }
1609
 
                        
 
1830
                XTTabRecExtDRec rec_buf;
 
1831
 
 
1832
                xt_lock_mutex_ns(&ot->ot_table->tab_db->db_co_ext_lock);
 
1833
                retried = TRUE;
 
1834
 
 
1835
                if (!xt_tab_get_rec_data(ot, load_rec_id, XT_REC_EXT_HEADER_SIZE, (xtWord1 *) &rec_buf))
 
1836
                        goto failed;
 
1837
 
 
1838
                XT_GET_LOG_REF(log_id, log_offset, &rec_buf);
 
1839
                goto reread;
1610
1840
        }
 
1841
 
 
1842
        failed:
 
1843
        if (retried)
 
1844
                xt_unlock_mutex_ns(&ot->ot_table->tab_db->db_co_ext_lock);
1611
1845
        return FAILED;
1612
1846
}
1613
1847
 
1614
 
static xtBool tab_delete_log_record(XTOpenTablePtr ot, off_t address, xtWord4 curr_log_id, off_t curr_log_offset, size_t curr_log_size)
1615
 
{
1616
 
        xtWord4 log_id;
1617
 
        off_t   log_offset;
1618
 
 
1619
 
        while (!xt_dl_delete_log(ot, curr_log_id, curr_log_offset, curr_log_size, address)) {
1620
 
                /* It may be that the garbage collector has moved the record, try again: */
1621
 
                if (!xt_tab_get_data(ot, address + offsetof(XTTabRecExtDRec, re_log_rec_6), 6,
1622
 
                        (xtWord1 *) &((XTTabRecExtDPtr) ot->ot_row_wbuffer)->re_log_rec_6, NULL))
1623
 
                        return FAILED;
1624
 
 
1625
 
                XT_GET_LOG_REF_6(log_id, log_offset, ((XTTabRecExtDPtr) ot->ot_row_wbuffer)->re_log_rec_6);
1626
 
 
1627
 
                if (log_id == curr_log_id && log_offset == curr_log_offset)
1628
 
                        /* Nothing has changed, this must be a real failure: */
1629
 
                        return FAILED;
1630
 
 
1631
 
                /* Try with the new log position: */
1632
 
                curr_log_id = log_id;
1633
 
                curr_log_offset = log_offset;
1634
 
        }
1635
 
        return OK;
1636
 
}
1637
 
 
1638
 
xtPublic xtBool xt_tab_put_data(XTOpenTablePtr ot, off_t address, u_int size, xtWord1 *buffer)
1639
 
{
1640
 
        register XTTableHPtr    tab = ot->ot_table;
1641
 
 
1642
 
        ASSERT_NS((tab->tab_data_eof - sizeof(XTTabDataHeadDRec)) % tab->tab_dic.dic_rec_size == 0);
1643
 
        write_data:
1644
 
        if (!tab->tab_buf_offset || address + (off_t) size <= tab->tab_data_eof)
1645
 
                return xt_rc_write(ot, address, size, buffer);
1646
 
 
1647
 
        /* Writing the data include part of the write buffer: */
1648
 
        xt_rwlock_wrlock(&tab->tab_buf_rwlock);
1649
 
        /* Get the write buffer part first: */
1650
 
        if (!tab->tab_buf_offset || address + (off_t) size <= tab->tab_data_eof) {
1651
 
                /* May have changed between check and lock... */
1652
 
                xt_rwlock_unlock(&tab->tab_buf_rwlock);
1653
 
                goto write_data;
1654
 
        }
1655
 
 
1656
 
        /* You cannot put passed the current EOF: */
1657
 
        if (address + size > tab->tab_data_eof + tab->tab_buf_offset) {
1658
 
                xt_rwlock_unlock(&tab->tab_buf_rwlock);
1659
 
                return xt_register_ferrno(XT_REG_CONTEXT, ESPIPE, xt_file_path(ot->ot_data_file));
1660
 
        }
1661
 
 
1662
 
        if (address < tab->tab_data_eof) {
1663
 
                /* Part of the data is in the write buffer (this should not happen): */
1664
 
                size_t tfer, boff;
1665
 
 
1666
 
                ASSERT_NS(FALSE);
1667
 
                tfer = (size_t) (address + (off_t) size - tab->tab_data_eof);
1668
 
                boff = size - tfer;
1669
 
                memcpy(tab->tab_data_buf, buffer + boff, tfer);
1670
 
                xt_rwlock_unlock(&tab->tab_buf_rwlock);
1671
 
                return xt_rc_write(ot, address, boff, buffer);
1672
 
        }
1673
 
        else {
1674
 
                /* Complete copy from the write buffer: */
1675
 
                memcpy(tab->tab_data_buf + (address - tab->tab_data_eof), buffer, size);
1676
 
                xt_rwlock_unlock(&tab->tab_buf_rwlock);
1677
 
        }
1678
 
        return OK;
1679
 
}
1680
 
 
1681
 
xtPublic xtBool xt_tab_get_record(register XTOpenTablePtr ot, off_t address, u_int size, xtWord1 *buffer)
1682
 
{
1683
 
        register XTTableHPtr    tab = ot->ot_table;
1684
 
        size_t                                  boff;
1685
 
 
1686
 
        /* We assume we are reading a whole record, from
1687
 
         * a record boundary: */
1688
 
        ASSERT_NS(size <= tab->tab_dic.dic_rec_size);
1689
 
        ASSERT_NS(tab->tab_buf_size % tab->tab_dic.dic_rec_size == 0);
1690
 
        ASSERT_NS((address - sizeof(XTTabDataHeadDRec)) % tab->tab_dic.dic_rec_size == 0);
1691
 
        ASSERT_NS(ot->ot_rec_size == tab->tab_dic.dic_rec_size);
1692
 
        (void) ASSERT_NS(tab->tab_dic.dic_rec_size == tab->tab_dic.dic_rec_size);
1693
 
        
1694
 
        if (address < tab->tab_data_eof) {
1695
 
                /* Fast track to read (nothing in the write buffer): */
1696
 
                ASSERT_NS(address + (off_t) tab->tab_dic.dic_rec_size <= tab->tab_data_eof);
1697
 
                return xt_rc_read_record(ot, address, size, buffer);
1698
 
        }
1699
 
 
1700
 
        /* Loading the buffer will include part of the write buffer: */
1701
 
        xt_rwlock_rdlock(&tab->tab_buf_rwlock);
1702
 
        /* Get the write buffer part first: */
1703
 
        if (address < tab->tab_data_eof) {
1704
 
                /* May have changed between check and lock... */
1705
 
                xt_rwlock_unlock(&tab->tab_buf_rwlock);
1706
 
                return xt_rc_read_record(ot, address, size, buffer);
1707
 
        }
1708
 
        ASSERT_NS(address + tab->tab_dic.dic_rec_size <= tab->tab_data_eof + tab->tab_buf_offset);
1709
 
 
1710
 
        boff = (size_t) (address - tab->tab_data_eof);
1711
 
        if (boff + size > tab->tab_buf_offset)
1712
 
                /* Return an empty buffer if no record at this offset! */
1713
 
                memset(buffer, 0, size);
1714
 
        else
1715
 
                memcpy(buffer, tab->tab_data_buf + boff, size);
1716
 
        xt_rwlock_unlock(&tab->tab_buf_rwlock);
1717
 
        return OK;
1718
 
}
1719
 
 
1720
 
xtPublic xtBool xt_tab_get_data(register XTOpenTablePtr ot, off_t address, u_int size, xtWord1 *buffer, u_int *red_size)
1721
 
{
1722
 
        register XTTableHPtr    tab = ot->ot_table;
1723
 
        size_t                                  tfer, boff;
1724
 
        off_t                                   diff;
1725
 
 
1726
 
        load_buffer:
1727
 
        if (!tab->tab_buf_offset) {
1728
 
                /* Fastest track to read (nothing in the write buffer): */
1729
 
                diff = tab->tab_data_eof - address;
1730
 
                if (diff > size)
1731
 
                        tfer = size;
1732
 
                else {
1733
 
                        if (!(tfer = (size_t) diff)) {
1734
 
                                if (red_size)
1735
 
                                        *red_size = 0;
1736
 
                                else if (size)
1737
 
                                        return xt_register_ferrno(XT_REG_CONTEXT, ESPIPE, xt_file_path(ot->ot_data_file));
1738
 
                                return TRUE;
1739
 
                        }
1740
 
                }
1741
 
                if (!xt_rc_read(ot, address, tfer, buffer))
1742
 
                        return FAILED;
1743
 
                if (red_size)
1744
 
                        *red_size = tfer;
1745
 
                else if (size != tfer)
1746
 
                        return xt_register_ferrno(XT_REG_CONTEXT, ESPIPE, xt_file_path(ot->ot_data_file));
1747
 
                return TRUE;
1748
 
        }
1749
 
 
1750
 
        if (address + (off_t) size <= tab->tab_data_eof) {
1751
 
                /* Read is completely before the write buffer: */
1752
 
                /* This code reads from the file system cache:
1753
 
                if (!xt_pread_file(ot->ot_data_file, address, size, size, buffer, NULL))
1754
 
                        return FAILED;
 
1848
xtPublic xtBool xt_tab_put_rec_data(XTOpenTablePtr ot, xtRecordID rec_id, size_t size, xtWord1 *buffer, xtOpSeqNo *op_seq)
 
1849
{
 
1850
        register XTTableHPtr    tab = ot->ot_table;
 
1851
 
 
1852
        ASSERT_NS(rec_id);
 
1853
 
 
1854
        return tab->tab_recs.xt_tc_write(ot->ot_rec_file, rec_id, 0, size, buffer, op_seq);
 
1855
}
 
1856
 
 
1857
xtPublic xtBool xt_tab_put_log_op_rec_data(XTOpenTablePtr ot, u_int status, xtRecordID free_rec_id, xtRecordID rec_id, size_t size, xtWord1 *buffer)
 
1858
{
 
1859
        register XTTableHPtr    tab = ot->ot_table;
 
1860
        xtOpSeqNo                               op_seq;
 
1861
 
 
1862
        ASSERT_NS(rec_id);
 
1863
 
 
1864
        if (status == XT_LOG_ENT_REC_MOVED) {
 
1865
                if (!tab->tab_recs.xt_tc_write(ot->ot_rec_file, rec_id, offsetof(XTTabRecExtDRec, re_log_id_2), size, buffer, &op_seq))
 
1866
                        return FAILED;
 
1867
        }
 
1868
        else {
 
1869
                if (!tab->tab_recs.xt_tc_write(ot->ot_rec_file, rec_id, 0, size, buffer, &op_seq))
 
1870
                        return FAILED;
 
1871
        }
 
1872
 
 
1873
        return ot->ot_thread->st_xact_buf.xbuf_modify_table(ot, status, op_seq, free_rec_id, rec_id, size, buffer);
 
1874
}
 
1875
 
 
1876
xtPublic xtBool xt_tab_put_log_rec_data(XTOpenTablePtr ot, u_int status, xtRecordID free_rec_id, xtRecordID rec_id, size_t size, xtWord1 *buffer, xtOpSeqNo *op_seq)
 
1877
{
 
1878
        register XTTableHPtr    tab = ot->ot_table;
 
1879
 
 
1880
        ASSERT_NS(rec_id);
 
1881
 
 
1882
        if (status == XT_LOG_ENT_REC_MOVED) {
 
1883
                if (!tab->tab_recs.xt_tc_write(ot->ot_rec_file, rec_id, offsetof(XTTabRecExtDRec, re_log_id_2), size, buffer, op_seq))
 
1884
                        return FAILED;
 
1885
        }
 
1886
        else {
 
1887
                if (!tab->tab_recs.xt_tc_write(ot->ot_rec_file, rec_id, 0, size, buffer, op_seq))
 
1888
                        return FAILED;
 
1889
        }
 
1890
 
 
1891
        return ot->ot_thread->st_xact_buf.xbuf_modify_table(ot, status, *op_seq, free_rec_id, rec_id, size, buffer);
 
1892
}
 
1893
 
 
1894
xtPublic xtBool xt_tab_get_rec_data(XTOpenTablePtr ot, xtRecordID rec_id, size_t size, xtWord1 *buffer)
 
1895
{
 
1896
        register XTTableHPtr    tab = ot->ot_table;
 
1897
 
 
1898
        ASSERT_NS(rec_id);
 
1899
 
 
1900
        return tab->tab_recs.xt_tc_read(ot->ot_rec_file, rec_id, (size_t) size, buffer);
 
1901
}
 
1902
 
 
1903
static xtBool tab_wait_for_update(XTThreadPtr thread, register XTOpenTablePtr ot, xtRowID row_id, xtXactID xn_id)
 
1904
{
 
1905
        register XTTableHPtr    tab = ot->ot_table;
 
1906
        int                                             lock_type;
 
1907
 
 
1908
        lock_type = tab->tab_locks.xt_release_locks(ot, row_id, &thread->st_lock_list);
 
1909
        do {
 
1910
                /* The variation may be visible, we have to wait for the
 
1911
                * transaction that wrote it to commit!
1755
1912
                */
1756
 
                /* This code reads from row cache: */
1757
 
                if (!xt_rc_read(ot, address, size, buffer))
1758
 
                        return FAILED;
1759
 
                if (red_size)
1760
 
                        *red_size = size;
1761
 
                return TRUE;
1762
 
        }
1763
 
 
1764
 
        /* Loading the buffer will include part of the write buffer: */
1765
 
        xt_rwlock_rdlock(&tab->tab_buf_rwlock);
1766
 
        /* Get the write buffer part first: */
1767
 
        if (!tab->tab_buf_offset || address + (off_t) size <= tab->tab_data_eof) {
1768
 
                /* May have changed between check and lock... */
1769
 
                xt_rwlock_unlock(&tab->tab_buf_rwlock);
1770
 
                goto load_buffer;
1771
 
        }
1772
 
 
1773
 
        if (address < tab->tab_data_eof) {
1774
 
                /* Part of the read is in the write buffer (copy this first): */
1775
 
                /* This can happen during a sequential scan, which uses
1776
 
                 * a buffer larger than one row in size.
1777
 
                 */
1778
 
                tfer = (size_t) (address + (off_t) size - tab->tab_data_eof);
1779
 
                if (tfer > tab->tab_buf_offset)
1780
 
                        tfer = tab->tab_buf_offset;
1781
 
                boff = (size_t) (tab->tab_data_eof - address);
1782
 
                memcpy(buffer + boff, tab->tab_data_buf, tfer);
1783
 
                xt_rwlock_unlock(&tab->tab_buf_rwlock);
1784
 
                if (!xt_rc_read(ot, address, boff, buffer))
1785
 
                        return FAILED;
1786
 
                if (red_size)
1787
 
                        *red_size = boff + tfer;
1788
 
                else if (size != boff + tfer)
1789
 
                        return xt_register_ferrno(XT_REG_CONTEXT, ESPIPE, xt_file_path(ot->ot_data_file));
1790
 
        }
1791
 
        else {
1792
 
                /* Complete load from the write buffer: */
1793
 
                tfer = (size_t) (tab->tab_data_eof + (off_t) tab->tab_buf_offset - address);
1794
 
                if (!tfer) {
1795
 
                        if (red_size)
1796
 
                                *red_size = 0;
1797
 
                        xt_rwlock_unlock(&tab->tab_buf_rwlock);
1798
 
                        return OK;
1799
 
                }
1800
 
                if (tfer > size)
1801
 
                        tfer = size;
1802
 
                boff = (size_t) (address - tab->tab_data_eof);
1803
 
                memcpy(buffer, tab->tab_data_buf + boff, tfer);
1804
 
                xt_rwlock_unlock(&tab->tab_buf_rwlock);
1805
 
                if (red_size)
1806
 
                        *red_size = tfer;
1807
 
                else if (size != tfer)
1808
 
                        return xt_register_ferrno(XT_REG_CONTEXT, ESPIPE, xt_file_path(ot->ot_data_file));
1809
 
        }
 
1913
                if (!xt_xn_wait_for_xact(thread, xn_id, lock_type == XT_TEMP_LOCK))
 
1914
                        return FAILED;
 
1915
                if (!lock_type)
 
1916
                        break;
 
1917
                if (!tab->tab_locks.xt_regain_locks(ot, &lock_type, &xn_id, &thread->st_lock_list))
 
1918
                        return FAILED;
 
1919
        } while (lock_type);
1810
1920
        return OK;
1811
1921
}
1812
1922
 
1813
1923
/*
1814
1924
 * Is a record visible?
1815
 
 * Return TRUE, FALSE or XT_ERR
 
1925
 * Returns TRUE, FALSE, XT_ERR, XT_DEL.
 
1926
 *
 
1927
 * TRUE - The record is visible.
 
1928
 * FALSE - The record is not visible.
 
1929
 * XT_ERR - An exception (error) occurred.
 
1930
 * XT_DEL - The record is not valid (a free or delete record). If an
 
1931
 * index is pointing to this record, then it should be deleted.
 
1932
 * XT_NEW - The most recent variation of this row has been returned
 
1933
 * and is to be used instead of the input!
 
1934
 *
 
1935
 * Basically, a record is visible if it was committed on or before
 
1936
 * the transactions "visible time" (st_visible_time), and there
 
1937
 * are no other visible records before this record in the
 
1938
 * variation chain for the record.
 
1939
 *
 
1940
 * This holds in general, but you don't always get to see the
 
1941
 * visible record (as defined in this sence).
 
1942
 *
 
1943
 * On any kind of update (SELECT FOR UPDATE, UPDATE or DELETE), you
 
1944
 * get to see the most recent variation of the row!
 
1945
 *
 
1946
 * So on update, this function will wait if necessary for a recent
 
1947
 * update to be committed.
 
1948
 *
 
1949
 * So an update is a kind of "committed read" with a wait for
 
1950
 * uncommitted records.
 
1951
 *
 
1952
 * The result:
 
1953
 * - INSERTS may not seen by the update read, depending on when
 
1954
 *   they occur.
 
1955
 * - Records may be returned in non-index order.
 
1956
 * - New records returned must be checked again by an index scan
 
1957
 *   to make sure they conform to the condition!
 
1958
 * 
 
1959
 * CREATE TABLE test_tab (ID int primary key, Value int, Name varchar(20), 
 
1960
 * index(Value, Name)) ENGINE=pbxt;
 
1961
 * INSERT test_tab values(4, 2, 'D');
 
1962
 * INSERT test_tab values(5, 2, 'E');
 
1963
 * INSERT test_tab values(6, 2, 'F');
 
1964
 * INSERT test_tab values(7, 2, 'G');
 
1965
 * 
 
1966
 * -- C1
 
1967
 * begin;
 
1968
 * select * from test_tab where id = 6 for update;
 
1969
 * -- C2
 
1970
 * begin;
 
1971
 * select * from test_tab where value = 2 order by value, name for update;
 
1972
 * -- C1
 
1973
 * update test_tab set Name = 'A' where id = 7;
 
1974
 * commit;
 
1975
 * -- C2
 
1976
 * Result order D, E, F, A.
 
1977
 *
 
1978
 * But Jim does it like this, so it should be OK.
1816
1979
 */
1817
 
static int tab_visible(register XTOpenTablePtr ot, XTTabRecHeadDPtr rec_head)
 
1980
static int tab_visible(register XTOpenTablePtr ot, XTTabRecHeadDPtr rec_head, xtRecordID *new_rec_id)
1818
1981
{
1819
 
        XTThreadPtr                             self = ot->ot_thread;
 
1982
        XTThreadPtr                             thread = ot->ot_thread;
 
1983
        xtXactID                                xn_id;
1820
1984
        XTTabRecHeadDRec                var_head;
1821
 
        xtWord8                                 rec_tn_id = 0;
1822
 
#ifdef XT_REPEATABLE_READ_BLOCKS
1823
 
        xtWord8                                 wait_tn_id;
 
1985
        xtRowID                                 row_id;
 
1986
        xtRecordID                              var_rec_id;
 
1987
        register XTTableHPtr    tab;
1824
1988
        xtBool                                  wait = FALSE;
1825
 
#endif
1826
 
        xtWord4                                 row_id;
1827
 
        off_t                                   variation;
1828
 
        xtWord8                                 tn_id;
1829
 
        //register XTTableHPtr  tab;
 
1989
        xtXactID                                wait_xn_id = 0;
1830
1990
#ifdef TRACE_VARIATIONS
1831
1991
        char                                    t_buf[500];
1832
1992
        int                                             len;
1833
1993
#endif
1834
 
        
 
1994
        int                                             result = TRUE;
 
1995
 
 
1996
        retry:
 
1997
        if (XT_REC_NOT_VALID(rec_head->tr_rec_type_1))
 
1998
                return XT_DEL;
 
1999
 
 
2000
        row_id = XT_GET_DISK_4(rec_head->tr_row_id_4);
 
2001
#ifdef DEBUG
 
2002
        ASSERT_NS(!ot->ot_curr_row_id || row_id == ot->ot_curr_row_id);
 
2003
#else
 
2004
        if (ot->ot_curr_row_id && row_id != ot->ot_curr_row_id)
 
2005
                return XT_DEL;
 
2006
#endif
 
2007
 
 
2008
#ifdef TRACE_VARIATIONS
 
2009
        len = sprintf(t_buf, "%s visible: row=%d rec=%d ", thread->t_name, (int) row_id, (int) ot->ot_curr_rec_id);
 
2010
#endif
1835
2011
        if (!XT_REC_IS_CLEAN(rec_head->tr_rec_type_1)) {
1836
2012
                /* The record is not clean, which means it has not been swept.
1837
2013
                 * So we have to check if it is visible.
1838
2014
                 */
1839
 
                xtBool mine = FALSE;
1840
 
 
1841
 
                rec_tn_id = XT_GET_DISK_6(rec_head->tr_xact_id_6);
1842
 
                if (self->st_xact_mode >= XT_XACT_REPEATABLE_READ) {
1843
 
                        if (!xt_xn_visible(ot, rec_tn_id, ot->ot_curr_rec, &mine))
1844
 
                                return FALSE;
1845
 
                }
1846
 
                else {
1847
 
#ifdef XT_REPEATABLE_READ_BLOCKS
1848
 
                        if (!xt_xn_may_commit(ot, rec_tn_id, ot->ot_curr_rec, &mine, &wait))
1849
 
                                return FALSE;
1850
 
                        if (wait)
1851
 
                                wait_tn_id = rec_tn_id;
1852
 
#else
1853
 
                        if (!xt_xn_committed(ot, rec_tn_id, ot->ot_curr_rec, &mine))
1854
 
                                return FALSE;
1855
 
#endif
1856
 
                }
1857
 
 
1858
 
                /* This is a record written by this transaction. */
1859
 
                /* Check that it was not written by the current update statement: */
1860
 
                if (mine && self->st_is_update) {
1861
 
                        if (XT_STAT_ID_MASK(self->st_update_id) == rec_head->tr_stat_id_1)
 
2015
 
 
2016
                xn_id = XT_GET_DISK_4(rec_head->tr_xact_id_4);
 
2017
                switch (xt_xn_status(ot, xn_id, ot->ot_curr_rec_id)) {
 
2018
                        case XT_XN_VISIBLE:
 
2019
                                break;
 
2020
                        case XT_XN_NOT_VISIBLE:
 
2021
                                if (ot->ot_for_update) {
 
2022
                                        /* It is visible, only if it is an insert,
 
2023
                                         * which means if has no previous variation.
 
2024
                                         * Note, if an insert is updated, the record
 
2025
                                         * should be overwritten (TODO - check this).
 
2026
                                         */
 
2027
                                        var_rec_id = XT_GET_DISK_4(rec_head->tr_prev_rec_id_4);
 
2028
                                        if (!var_rec_id)
 
2029
                                                break;
 
2030
#ifdef TRACE_VARIATIONS
 
2031
                                        if (len <= 450)
 
2032
                                                len += sprintf(t_buf+len, "OTHER COMMIT (OVERWRITTEN) T%d\n", (int) xn_id);
 
2033
                                        xt_trace("%s", t_buf);
 
2034
#endif
 
2035
                                }
 
2036
#ifdef TRACE_VARIATIONS
 
2037
                                else {
 
2038
                                        if (len <= 450)
 
2039
                                                len += sprintf(t_buf+len, "OTHER COMMIT T%d\n", (int) xn_id);
 
2040
                                        xt_trace("%s", t_buf);
 
2041
                                }
 
2042
#endif
 
2043
                                return FALSE;
 
2044
                        case XT_XN_ABORTED:
 
2045
#ifdef TRACE_VARIATIONS
 
2046
                                if (len <= 450)
 
2047
                                        len += sprintf(t_buf+len, "ABORTED T%d\n", (int) xn_id);
 
2048
                                xt_trace("%s", t_buf);
 
2049
#endif
 
2050
                                return FALSE;
 
2051
                        case XT_XN_MY_UPDATE:
 
2052
                                /* This is a record written by this transaction. */
 
2053
                                if (thread->st_is_update) {
 
2054
                                        /* Check that it was not written by the current update statement: */
 
2055
                                        if (XT_STAT_ID_MASK(thread->st_update_id) == rec_head->tr_stat_id_1) {
 
2056
#ifdef TRACE_VARIATIONS
 
2057
                                                if (len <= 450)
 
2058
                                                        len += sprintf(t_buf+len, "MY UPDATE IN THIS STATEMENT T%d\n", (int) xn_id);
 
2059
                                                xt_trace("%s", t_buf);
 
2060
#endif
 
2061
                                                return FALSE;
 
2062
                                        }
 
2063
                                }
 
2064
                                ot->ot_curr_row_id = row_id;
 
2065
                                ot->ot_curr_updated = TRUE;
 
2066
                                if (!(xt_tab_get_row(ot, row_id, &var_rec_id)))
 
2067
                                        return XT_ERR;
 
2068
                                /* It is visible if it is at the front of the list.
 
2069
                                 * An update can end up not being at the front of the list
 
2070
                                 * if it is deleted afterwards!
 
2071
                                 */
 
2072
#ifdef TRACE_VARIATIONS
 
2073
                                if (len <= 450) {
 
2074
                                        if (var_rec_id == ot->ot_curr_rec_id)
 
2075
                                                len += sprintf(t_buf+len, "MY UPDATE T%d\n", (int) xn_id);
 
2076
                                        else
 
2077
                                                len += sprintf(t_buf+len, "MY UPDATE (OVERWRITTEN) T%d\n", (int) xn_id);
 
2078
                                }
 
2079
                                xt_trace("%s", t_buf);
 
2080
#endif
 
2081
                                return var_rec_id == ot->ot_curr_rec_id;
 
2082
                        case XT_XN_OTHER_UPDATE:
 
2083
                                if (ot->ot_for_update) {
 
2084
                                        /* If this is an insert, we are interested!
 
2085
                                         * Updated values are handled below. This is because
 
2086
                                         * the changed (new) records returned below are always
 
2087
                                         * followed (in the version chain) by the record
 
2088
                                         * we would have returned (if nothing had changed).
 
2089
                                         *
 
2090
                                         * As a result, we only return records here which have
 
2091
                                         * no "history". 
 
2092
                                         */
 
2093
                                        var_rec_id = XT_GET_DISK_4(rec_head->tr_prev_rec_id_4);
 
2094
                                        if (!var_rec_id) {
 
2095
#ifdef TRACE_VARIATIONS
 
2096
                                                if (len <= 450)
 
2097
                                                        len += sprintf(t_buf+len, "OTHER INSERT (WAIT FOR) T%d\n", (int) xn_id);
 
2098
                                                xt_trace("%s", t_buf);
 
2099
#endif
 
2100
                                                if (!tab_wait_for_update(thread, ot, row_id, xn_id))
 
2101
                                                        return XT_ERR;
 
2102
                                                if (!xt_tab_get_rec_data(ot, ot->ot_curr_rec_id, sizeof(XTTabRecHeadDRec), (xtWord1 *) &var_head))
 
2103
                                                        return XT_ERR;
 
2104
                                                rec_head = &var_head;
 
2105
                                                goto retry;
 
2106
                                        }
 
2107
                                }
 
2108
#ifdef TRACE_VARIATIONS
 
2109
                                if (len <= 450)
 
2110
                                        len += sprintf(t_buf+len, "OTHER UPDATE T%d\n", (int) xn_id);
 
2111
                                xt_trace("%s", t_buf);
 
2112
#endif
1862
2113
                                return FALSE;
1863
2114
                }
1864
2115
        }
1868
2119
         * it is not visible at all. If it in not found on the
1869
2120
         * variation chain, it is also not visible.
1870
2121
         */
1871
 
        //tab = ot->ot_table;
1872
 
        row_id = XT_GET_DISK_4(rec_head->tr_row_id_4);
1873
 
        
1874
 
        /* Not required because records that may have been read by this
1875
 
         * transactions are not free until this transaction
1876
 
         * completes!
1877
 
         */
1878
 
        //xt_rwlock_rdlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
1879
 
 
1880
 
#ifdef XT_REPEATABLE_READ_BLOCKS
1881
 
        retry:
1882
 
#endif
1883
 
        if (!(xt_tab_get_row(ot, row_id, &variation)))
 
2122
        tab = ot->ot_table;
 
2123
 
 
2124
        retry_2:
 
2125
        if (ot->ot_for_update)
 
2126
                xt_rwlock_wrlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
 
2127
        else
 
2128
                xt_rwlock_rdlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
 
2129
 
 
2130
        if (!(xt_tab_get_row(ot, row_id, &var_rec_id)))
1884
2131
                goto failed;
1885
2132
#ifdef TRACE_VARIATIONS
1886
 
        len = sprintf(t_buf, "vis row=%d", (int) row_id);
 
2133
        len += sprintf(t_buf+len, "vis row=%d", (int) row_id);
1887
2134
#endif
1888
 
        while (variation != ot->ot_curr_rec) {
1889
 
                if (!variation)
 
2135
        while (var_rec_id != ot->ot_curr_rec_id) {
 
2136
                if (!var_rec_id)
1890
2137
                        goto not_found;
 
2138
                if (!xt_tab_get_rec_data(ot, var_rec_id, sizeof(XTTabRecHeadDRec), (xtWord1 *) &var_head))
 
2139
                        goto failed;
1891
2140
#ifdef TRACE_VARIATIONS
1892
2141
                if (len <= 450)
1893
 
                        len += sprintf(t_buf+len, " -> %d", (int) variation);
 
2142
                        len += sprintf(t_buf+len, " -> %d(%d)", (int) var_rec_id, (int) var_head.tr_rec_type_1);
1894
2143
#endif
1895
 
                if (!xt_tab_get_record(ot, variation, sizeof(XTTabRecHeadDRec), (xtWord1 *) &var_head))
1896
 
                        goto failed;
1897
2144
                /* All clean records are visible, by all transactions: */
1898
2145
                if (XT_REC_IS_CLEAN(var_head.tr_rec_type_1))
1899
2146
                        goto not_found;
1900
 
                if (!XT_TAB_IS_DELETED(var_head.tr_rec_type_1)) {
1901
 
                        tn_id = XT_GET_DISK_6(var_head.tr_xact_id_6);
1902
 
                        if (self->st_xact_mode >= XT_XACT_REPEATABLE_READ) {
1903
 
                                /* XT_XACT_REPEATABLE_READ & XT_XACT_SERIALIZABLE */
1904
 
                                if (xt_xn_visible(ot, tn_id, variation, NULL)) {
1905
 
                                        /* This variation is visible, i.e committed before this
1906
 
                                         * transaction started, or updated by this transaction.
1907
 
                                         *
1908
 
                                         * We now know that this is the valid variation for
1909
 
                                         * this record (for this table) for this transaction!
1910
 
                                         * This will not change, unless the transaction
1911
 
                                         * updates the record (again).
1912
 
                                         *
1913
 
                                         * So we can store this information as a hint, if
1914
 
                                         * we see other variations belonging to this record,
1915
 
                                         * then we can ignore them immediately!
1916
 
                                         */
1917
 
                                        goto not_found;
1918
 
                                }
1919
 
                        }
1920
 
                        else {
1921
 
#ifdef XT_REPEATABLE_READ_BLOCKS
1922
 
                                /* XT_XACT_UNCOMMITTED_READ & XT_XACT_COMMITTED_READ
1923
 
                                 * If this record is not committed, then we will have to wait for
1924
 
                                 * it because, in COMMITTED READ mode, we see committed
1925
 
                                 * records immediately.
1926
 
                                 */
1927
 
                                xtBool var_wait = FALSE;  /* Assume we won't need to wait. */
1928
 
                                if (xt_xn_may_commit(ot, tn_id, variation, NULL, &var_wait)) {
1929
 
                                        /* This variation is not aborted, i.e. committed or
1930
 
                                         * not yet committed (uncommitted).
1931
 
                                         */
1932
 
                                        if (!var_wait)
1933
 
                                                /* The record was committed, or updated by this transaction. */
1934
 
                                                goto not_found;
1935
 
                                        /* If the variation is not yet committed and was
1936
 
                                         * updated by some other transaction.
1937
 
                                         * So we must wait for it (the variation we are reading would
1938
 
                                         * be valid if this uncommitted variation was not
1939
 
                                         * committed)!
1940
 
                                         *
1941
 
                                         * If it would not be valid anyway (because there
1942
 
                                         * is something in the chain before it), then
1943
 
                                         * we need not wait for this variation to commit or
1944
 
                                         * abort. So we continue in this loop.
1945
 
                                         *
1946
 
                                         * NOTE: There should only be one uncommitted variation in
1947
 
                                         * the chain.
1948
 
                                         */
1949
 
                                        if (!wait) {
1950
 
                                                wait_tn_id = tn_id;
1951
 
                                                wait = TRUE;
 
2147
                if (XT_REC_IS_FREE(var_head.tr_rec_type_1))
 
2148
                        /* Should not happen! */
 
2149
                        goto not_found;
 
2150
                xn_id = XT_GET_DISK_4(var_head.tr_xact_id_4);
 
2151
                /* This variation is visibleif committed before this
 
2152
                 * transaction started, or updated by this transaction.
 
2153
                 *
 
2154
                 * We now know that this is the valid variation for
 
2155
                 * this record (for this table) for this transaction!
 
2156
                 * This will not change, unless the transaction
 
2157
                 * updates the record (again).
 
2158
                 *
 
2159
                 * So we can store this information as a hint, if
 
2160
                 * we see other variations belonging to this record,
 
2161
                 * then we can ignore them immediately!
 
2162
                 */
 
2163
                switch (xt_xn_status(ot, xn_id, var_rec_id)) {
 
2164
                        case XT_XN_VISIBLE:
 
2165
                                goto not_found;
 
2166
                        case XT_XN_NOT_VISIBLE:
 
2167
                                if (ot->ot_for_update) {
 
2168
                                        /* Substitute this record for the one we
 
2169
                                         * are reading!!
 
2170
                                         */
 
2171
                                        if (result == TRUE) {
 
2172
                                                if (XT_REC_IS_DELETE(var_head.tr_rec_type_1))
 
2173
                                                        result = FALSE;
 
2174
                                                else {
 
2175
                                                        *new_rec_id = var_rec_id;
 
2176
                                                        result = XT_NEW;
 
2177
                                                }
1952
2178
                                        }
1953
2179
                                }
1954
 
#else
1955
 
                                if (xt_xn_committed(ot, tn_id, variation, NULL)) {
1956
 
                                        goto not_found;
 
2180
                                break;
 
2181
                        case XT_XN_ABORTED:
 
2182
                                /* Ignore the record, it will be removed. */
 
2183
                                break;
 
2184
                        case XT_XN_MY_UPDATE:
 
2185
                                goto not_found;
 
2186
                        case XT_XN_OTHER_UPDATE:
 
2187
                                /* Wait for this update to commit or abort: */
 
2188
                                if (!wait) {
 
2189
                                        wait = TRUE;
 
2190
                                        wait_xn_id = xn_id;
1957
2191
                                }
 
2192
#ifdef TRACE_VARIATIONS
 
2193
                                if (len <= 450)
 
2194
                                        len += sprintf(t_buf+len, "-T%d", (int) wait_xn_id);
1958
2195
#endif
1959
 
                        }
 
2196
                                break;
1960
2197
                }
1961
 
                variation = XT_GET_DISK_6(var_head.tr_prev_var_6);
 
2198
                var_rec_id = XT_GET_DISK_4(var_head.tr_prev_rec_id_4);
1962
2199
        }
1963
2200
#ifdef TRACE_VARIATIONS
1964
2201
        if (len <= 450)
1965
 
                sprintf(t_buf+len, " -> %d\n", (int) variation);
 
2202
                sprintf(t_buf+len, " -> %d(%d)\n", (int) var_rec_id, (int) rec_head->tr_rec_type_1);
1966
2203
        else
1967
 
                sprintf(t_buf+len, " ...\n", (int) variation);
1968
 
#endif
1969
 
 
1970
 
        //xt_rwlock_unlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
1971
 
#ifdef XT_REPEATABLE_READ_BLOCKS
1972
 
        if (wait) {
1973
 
                /* The variation may be visible, we have to wait for the
1974
 
                 * transaction that wrote it to commit!
1975
 
                 */
1976
 
                if (!xt_xn_wait_for_xact(self, wait_tn_id))
1977
 
                        goto failed;
1978
 
                wait_tn_id = XT_GET_DISK_6(rec_head->tr_xact_id_6);
1979
 
                wait = FALSE;
1980
 
                goto retry;                     
1981
 
        }
1982
 
#endif
1983
 
#ifdef TRACE_VARIATIONS
 
2204
                sprintf(t_buf+len, " ...\n");
1984
2205
        xt_trace("%s", t_buf);
1985
2206
#endif
 
2207
 
 
2208
        if (ot->ot_for_update) {
 
2209
                int     lock_type;
 
2210
 
 
2211
                if (wait) {
 
2212
#ifdef TRACE_VARIATIONS
 
2213
                        xt_trace("%s %d WAIT FOR %d\n", thread->t_name, (int) thread->st_xact_data->xd_start_xn_id, (int) wait_xn_id);
 
2214
#endif
 
2215
                        xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
 
2216
                        if (!tab_wait_for_update(thread, ot, row_id, wait_xn_id))
 
2217
                                return XT_ERR;
 
2218
                        wait = FALSE;
 
2219
                        wait_xn_id = 0;
 
2220
                        return XT_RETRY;
 
2221
                }
 
2222
                lock_type = tab->tab_locks.xt_set_temp_lock(ot, row_id, &xn_id, &thread->st_lock_list);
 
2223
                if (lock_type) {
 
2224
#ifdef TRACE_VARIATIONS
 
2225
                        xt_trace("%s T%d WAIT FOR LOCK(%D) T%d\n", thread->t_name, (int) thread->st_xact_data->xd_start_xn_id, (int) lock_type, (int) xn_id);
 
2226
#endif
 
2227
                        xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
 
2228
                        do {
 
2229
                                if (!xt_xn_wait_for_xact(thread, xn_id, lock_type == XT_TEMP_LOCK))
 
2230
                                        return XT_ERR;
 
2231
                                lock_type = tab->tab_locks.xt_is_locked(ot, row_id, &xn_id);
 
2232
                        }
 
2233
                        while (lock_type);
 
2234
#ifdef TRACE_VARIATIONS
 
2235
                        len = sprintf(t_buf, "%s visible (retry): row=%d rec=%d ", thread->t_name, (int) row_id, (int) ot->ot_curr_rec_id);
 
2236
#endif
 
2237
                        /* GOTCHA!
 
2238
                         * Reset the result before we go down the list again, to make sure we
 
2239
                         * get the latest record!!
 
2240
                         */
 
2241
                        result = TRUE;
 
2242
                        goto retry_2;
 
2243
                }
 
2244
        }
 
2245
#ifdef TRACE_VARIATIONS
 
2246
        if (result == XT_NEW)
 
2247
                xt_trace("%s RETURN NEW %d\n", thread->t_name, (int) *new_rec_id);
 
2248
        else if (!result)
 
2249
                xt_trace("%s RETURN NOT VISIBLE (NEW)\n", thread->t_name);
 
2250
#endif
 
2251
        xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
 
2252
 
1986
2253
        ot->ot_curr_row_id = row_id;
1987
 
        ot->ot_curr_updated = (rec_tn_id == self->st_xact_data->xd_start_id);
1988
 
        return TRUE;
 
2254
        ot->ot_curr_updated = FALSE;
 
2255
        return result;
1989
2256
 
1990
2257
        not_found:
1991
 
        //xt_rwlock_unlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
 
2258
        xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
1992
2259
        return FALSE;
1993
2260
 
1994
2261
        failed:
1995
 
        //xt_rwlock_unlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
 
2262
        xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
1996
2263
        return XT_ERR;
1997
2264
}
1998
2265
 
2000
2267
 * Return TRUE if the record has been read, and is visible.
2001
2268
 * Return FALSE if the record is not visible.
2002
2269
 * Return XT_ERR if an error occurs.
2003
 
 * REturn XT_DEL if the record has been deleted.
 
2270
 * REturn XT_DEL if the record is not valid (freed or is a delete record).
2004
2271
 */
2005
2272
xtPublic int xt_tab_visible(XTOpenTablePtr ot)
2006
2273
{
2007
 
        XTTabRecHeadDRec rec_head;
2008
 
 
2009
 
        if (!xt_tab_get_record(ot, ot->ot_curr_rec, sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head))
 
2274
        xtRowID                         row_id;
 
2275
        XTTabRecHeadDRec        rec_head;
 
2276
        xtRecordID                      new_rec_id;
 
2277
        int                                     r;
 
2278
 
 
2279
        if ((row_id = ot->ot_curr_row_id)) {
 
2280
                /* Fast track, do a quick check.
 
2281
                 * Row ID is only set if this record has been committed.
 
2282
                 * Check if it is the first on the list!
 
2283
                 */
 
2284
                xtRecordID var_rec_id;
 
2285
 
 
2286
                retry:
 
2287
                if (!(xt_tab_get_row(ot, row_id, &var_rec_id)))
 
2288
                        return XT_ERR;
 
2289
                if (ot->ot_curr_rec_id == var_rec_id) {
 
2290
                        /* Looks good.. */
 
2291
                        if (ot->ot_for_update) {
 
2292
                                XTThreadPtr     thread = ot->ot_thread;
 
2293
                                int                     lock_type;
 
2294
                                xtXactID        xn_id;
 
2295
                                XTTableHPtr     tab = ot->ot_table;
 
2296
 
 
2297
                                lock_type = tab->tab_locks.xt_set_temp_lock(ot, row_id, &xn_id, &thread->st_lock_list);
 
2298
                                if (lock_type) {
 
2299
                                        xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
 
2300
                                        do {
 
2301
                                                if (!xt_xn_wait_for_xact(thread, xn_id, lock_type == XT_TEMP_LOCK))
 
2302
                                                        return XT_ERR;
 
2303
                                                lock_type = tab->tab_locks.xt_is_locked(ot, row_id, &xn_id);
 
2304
                                        }
 
2305
                                        while (lock_type);
 
2306
                                        goto retry;
 
2307
                                }
 
2308
                        }
 
2309
                        return TRUE;
 
2310
                }
 
2311
        }
 
2312
 
 
2313
        if (!xt_tab_get_rec_data(ot, ot->ot_curr_rec_id, sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head))
2010
2314
                return XT_ERR;
2011
 
        if (XT_REC_NOT_VISIBLE(rec_head.tr_rec_type_1))
2012
 
                return XT_DEL;
2013
 
        return tab_visible(ot, &rec_head);
 
2315
 
 
2316
        if ((r = tab_visible(ot, &rec_head, &new_rec_id)) == XT_NEW)
 
2317
                ot->ot_curr_rec_id = new_rec_id;
 
2318
        return r;
2014
2319
}
2015
2320
 
2016
2321
/*
2017
 
 * Return TRUE if the record has been read, and is visible.
2018
 
 * Return FALSE if the record is not visible.
2019
 
 * Return XT_ERR if an error occurs.
 
2322
 * Read a record, and return one of the following:
 
2323
 * TRUE - the record has been read, and is visible.
 
2324
 * FALSE - the record is not visible.
 
2325
 * XT_ERR - an error occurs.
 
2326
 * XT_DEL - The record is invalid, of an index references this
 
2327
 * record, it should be deleted.
 
2328
 * XT_NEW - Means the expected record has been changed.
 
2329
 * When doing an index scan, the conditions must be checked again!
2020
2330
 */
2021
2331
xtPublic int xt_tab_read_record(register XTOpenTablePtr ot, xtWord1 *buffer)
2022
2332
{
2023
2333
        register XTTableHPtr    tab = ot->ot_table;
2024
2334
        size_t                                  rec_size = tab->tab_dic.dic_rec_size;
 
2335
        xtRecordID                              new_rec_id;
 
2336
        int                                             result;
2025
2337
 
2026
2338
        if (!(ot->ot_thread->st_xact_data)) {
2027
2339
                xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_TRANSACTION);
2028
2340
                return XT_ERR;
2029
2341
        }
2030
 
        if (!xt_tab_get_record(ot, ot->ot_curr_rec, rec_size, ot->ot_row_rbuffer))
 
2342
 
 
2343
        if (!xt_tab_get_rec_data(ot, ot->ot_curr_rec_id, rec_size, ot->ot_row_rbuffer))
2031
2344
                return XT_ERR;
2032
 
        if (XT_REC_NOT_VISIBLE(ot->ot_row_rbuffer[0]))
2033
 
                return XT_DEL;
2034
2345
 
2035
 
        switch (tab_visible(ot, (XTTabRecHeadDPtr) ot->ot_row_rbuffer)) {
 
2346
        ASSERT_NS(!XT_REC_NOT_VALID(*ot->ot_row_rbuffer));
 
2347
        switch (tab_visible(ot, (XTTabRecHeadDPtr) ot->ot_row_rbuffer, &new_rec_id)) {
2036
2348
                case FALSE:
2037
2349
                        return FALSE;
2038
2350
                case XT_ERR:
2039
2351
                        return XT_ERR;
2040
 
        }
2041
 
 
2042
 
        if (ot->ot_rec_fixed)
2043
 
                memcpy(buffer, ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE, rec_size - XT_REC_FIX_HEADER_SIZE);
2044
 
        else if (ot->ot_row_rbuffer[0] == XT_TAB_STATUS_VARIABLE || ot->ot_row_rbuffer[0] == XT_TAB_STATUS_VAR_CLEAN) {
2045
 
                if (!myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE, buffer))
2046
 
                        return XT_ERR;
2047
 
        }
2048
 
        else {
2049
 
                if (!tab_load_ext_data(ot, ot->ot_curr_rec, buffer))
2050
 
                        return XT_ERR;
 
2352
                case XT_DEL:
 
2353
                        return XT_DEL;
 
2354
                case XT_NEW:
 
2355
                        if (!xt_tab_get_rec_data(ot, new_rec_id, rec_size, ot->ot_row_rbuffer))
 
2356
                                return XT_ERR;
 
2357
                        ot->ot_curr_rec_id = new_rec_id;
 
2358
                        result = XT_NEW;
 
2359
                        break;
 
2360
                case XT_RETRY:
 
2361
                        return XT_RETRY;
 
2362
                default:
 
2363
                        result = OK;
 
2364
                        break;
 
2365
        }
 
2366
 
 
2367
        if (ot->ot_rec_fixed)
 
2368
                memcpy(buffer, ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE, rec_size - XT_REC_FIX_HEADER_SIZE);
 
2369
        else if (ot->ot_row_rbuffer[0] == XT_TAB_STATUS_VARIABLE || ot->ot_row_rbuffer[0] == XT_TAB_STATUS_VAR_CLEAN) {
 
2370
                if (!myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE, buffer, ot->ot_cols_req))
 
2371
                        return XT_ERR;
 
2372
        }
 
2373
        else {
 
2374
                u_int cols_req = ot->ot_cols_req;
 
2375
 
 
2376
                ASSERT_NS(cols_req);
 
2377
                if (cols_req && cols_req <= tab->tab_dic.dic_fix_col_count) {
 
2378
                        if (!myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_EXT_HEADER_SIZE, buffer, cols_req))
 
2379
                                return XT_ERR;
 
2380
                }
 
2381
                else {
 
2382
                        if (!xt_tab_load_ext_data(ot, ot->ot_curr_rec_id, buffer, cols_req))
 
2383
                                return XT_ERR;
 
2384
                }
 
2385
        }
 
2386
 
 
2387
        return result;
 
2388
}
 
2389
 
 
2390
/*
 
2391
 * Returns:
 
2392
 *
 
2393
 * TRUE/OK - record was read.
 
2394
 * FALSE/FAILED - An error occurred.
 
2395
 * XT_DEL - Record deleted.
 
2396
 */
 
2397
xtPublic int xt_tab_dirty_read_record(register XTOpenTablePtr ot, xtWord1 *buffer)
 
2398
{
 
2399
        register XTTableHPtr    tab = ot->ot_table;
 
2400
        size_t                                  rec_size = tab->tab_dic.dic_rec_size;
 
2401
 
 
2402
        if (!xt_tab_get_rec_data(ot, ot->ot_curr_rec_id, rec_size, ot->ot_row_rbuffer))
 
2403
                return FAILED;
 
2404
 
 
2405
        if (XT_REC_NOT_VALID(ot->ot_row_rbuffer[0]))
 
2406
                /* Should not happen! */
 
2407
                return XT_DEL;
 
2408
 
 
2409
        ot->ot_curr_row_id = XT_GET_DISK_4(((XTTabRecHeadDPtr) ot->ot_row_rbuffer)->tr_row_id_4);
 
2410
        ot->ot_curr_updated =
 
2411
                (XT_GET_DISK_4(((XTTabRecHeadDPtr) ot->ot_row_rbuffer)->tr_xact_id_4) == ot->ot_thread->st_xact_data->xd_start_xn_id);
 
2412
 
 
2413
        if (ot->ot_rec_fixed)
 
2414
                memcpy(buffer, ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE, rec_size - XT_REC_FIX_HEADER_SIZE);
 
2415
        else if (ot->ot_row_rbuffer[0] == XT_TAB_STATUS_VARIABLE || ot->ot_row_rbuffer[0] == XT_TAB_STATUS_VAR_CLEAN) {
 
2416
                if (!myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE, buffer, ot->ot_cols_req))
 
2417
                        return FAILED;
 
2418
        }
 
2419
        else {
 
2420
                u_int cols_req = ot->ot_cols_req;
 
2421
 
 
2422
                ASSERT_NS(cols_req);
 
2423
                if (cols_req && cols_req <= tab->tab_dic.dic_fix_col_count) {
 
2424
                        if (!myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_EXT_HEADER_SIZE, buffer, cols_req))
 
2425
                                return FAILED;
 
2426
                }
 
2427
                else {
 
2428
                        if (!xt_tab_load_ext_data(ot, ot->ot_curr_rec_id, buffer, cols_req))
 
2429
                                return FAILED;
 
2430
                }
2051
2431
        }
2052
2432
 
2053
2433
        return OK;
2054
2434
}
2055
2435
 
2056
 
xtPublic xtBool xt_tab_load_record(register XTOpenTablePtr ot, off_t address, XTInfoBufferPtr rec_buf)
 
2436
xtPublic void xt_tab_load_row_pointers(XTThreadPtr self, XTOpenTablePtr ot)
 
2437
{
 
2438
        XTTableHPtr     tab = ot->ot_table;
 
2439
        off_t           eof = xt_seek_eof_file(self, ot->ot_row_file);
 
2440
        xtInt8          usage;
 
2441
 
 
2442
        /* Check if there is enough cache: */
 
2443
        usage = xt_tc_get_usage();
 
2444
        if (xt_tc_get_high() > usage)
 
2445
                usage = xt_tc_get_high();
 
2446
        if (usage + eof < xt_tc_get_size()) {
 
2447
                xtRecordID                      rec_id;
 
2448
                xtRecordID                      eof_rec_id;
 
2449
                size_t                          offset;
 
2450
                XTTabCachePagePtr       page;
 
2451
                
 
2452
                eof_rec_id = xt_row_offset_row_id(tab, eof);
 
2453
                rec_id = 1;
 
2454
                while (rec_id < eof_rec_id) {
 
2455
                        if (!(page = tab->tab_rows.xt_tc_lock_page(ot->ot_row_file, rec_id, &offset)))
 
2456
                                xt_throw(self);
 
2457
                        tab->tab_rows.xt_tc_unlock_page(page);
 
2458
                        rec_id += tab->tab_rows.tci_rows_per_page;
 
2459
                }
 
2460
        }
 
2461
}
 
2462
 
 
2463
xtPublic xtBool xt_tab_load_record(register XTOpenTablePtr ot, xtRecordID rec_id, XTInfoBufferPtr rec_buf)
2057
2464
{
2058
2465
        register XTTableHPtr    tab = ot->ot_table;
2059
2466
        size_t                                  rec_size = tab->tab_dic.dic_rec_size;
2060
2467
 
2061
 
        if (!xt_tab_get_record(ot, address, rec_size, ot->ot_row_rbuffer))
 
2468
        if (!xt_tab_get_rec_data(ot, rec_id, rec_size, ot->ot_row_rbuffer))
2062
2469
                return FAILED;
2063
2470
 
2064
 
        if (XT_REC_NOT_VISIBLE(ot->ot_row_rbuffer[0])) {
 
2471
        if (XT_REC_NOT_VALID(ot->ot_row_rbuffer[0])) {
2065
2472
                /* Should not happen! */
2066
2473
                XTThreadPtr self = ot->ot_thread;
2067
2474
 
2069
2476
                return OK;
2070
2477
        }
2071
2478
 
 
2479
        ot->ot_curr_row_id = XT_GET_DISK_4(((XTTabRecHeadDPtr) ot->ot_row_rbuffer)->tr_row_id_4);
 
2480
        ot->ot_curr_updated =
 
2481
                (XT_GET_DISK_4(((XTTabRecHeadDPtr) ot->ot_row_rbuffer)->tr_xact_id_4) == ot->ot_thread->st_xact_data->xd_start_xn_id);
 
2482
 
2072
2483
        if (ot->ot_rec_fixed) {
2073
2484
                size_t size = rec_size - XT_REC_FIX_HEADER_SIZE;
2074
2485
                if (!xt_ib_alloc(NULL, rec_buf, size))
2079
2490
                if (!xt_ib_alloc(NULL, rec_buf, tab->tab_dic.dic_buf_size))
2080
2491
                        return FAILED;
2081
2492
                if (ot->ot_row_rbuffer[0] == XT_TAB_STATUS_VARIABLE || ot->ot_row_rbuffer[0] == XT_TAB_STATUS_VAR_CLEAN) {
2082
 
                        if (!myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE, rec_buf->ib_db.db_data))
 
2493
                        if (!myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE, rec_buf->ib_db.db_data, ot->ot_cols_req))
2083
2494
                                return FAILED;
2084
2495
                }
2085
2496
                else {
2086
 
                        if (!tab_load_ext_data(ot, ot->ot_curr_rec, rec_buf->ib_db.db_data))
2087
 
                                return FAILED;
 
2497
                        u_int cols_req = ot->ot_cols_req;
 
2498
 
 
2499
                        ASSERT_NS(cols_req);
 
2500
                        if (cols_req && cols_req <= tab->tab_dic.dic_fix_col_count) {
 
2501
                                if (!myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_EXT_HEADER_SIZE, rec_buf->ib_db.db_data, cols_req))
 
2502
                                        return FAILED;
 
2503
                        }
 
2504
                        else {
 
2505
                                if (!xt_tab_load_ext_data(ot, ot->ot_curr_rec_id, rec_buf->ib_db.db_data, cols_req))
 
2506
                                        return FAILED;
 
2507
                        }
2088
2508
                }
2089
2509
        }
2090
2510
 
2091
2511
        return OK;
2092
2512
}
2093
2513
 
2094
 
xtPublic xtBool xt_tab_free_row(XTOpenTablePtr ot, XTTableHPtr tab, xtWord4 row_id)
 
2514
xtPublic xtBool xt_tab_free_row(XTOpenTablePtr ot, XTTableHPtr tab, xtRowID row_id)
2095
2515
{
2096
2516
        XTTabRowRefDRec free_row;
2097
 
        off_t                   offset = (off_t) row_id << XT_TAB_ROW_SHIFTS;
 
2517
        xtRowID                 prev_row;
 
2518
        xtOpSeqNo               op_seq;
2098
2519
 
2099
2520
        ASSERT_NS(row_id); // Cannot free the header!
2100
 
        free_row.rr_rec_type_1 = XT_TAB_ROW_FREE;
2101
 
        free_row.rr_unused_1 = 0;
2102
2521
 
2103
 
        xt_mutex_lock(&tab->tab_row_lock);
2104
 
        XT_SET_DISK_6(free_row.rr_variation_6, tab->tab_row_free);
2105
 
        if (!xt_dc_write(ot->ot_row_file, offset, sizeof(XTTabRowRefDRec), (xtWord1 *) &free_row)) {
2106
 
                xt_mutex_unlock(&tab->tab_row_lock);
 
2522
        xt_lock_mutex_ns(&tab->tab_row_lock);
 
2523
        prev_row = tab->tab_row_free_id;
 
2524
        XT_SET_DISK_4(free_row.rr_ref_id_4, prev_row);
 
2525
        if (!tab->tab_rows.xt_tc_write(ot->ot_row_file, row_id, 0, sizeof(XTTabRowRefDRec), (xtWord1 *) &free_row, &op_seq)) {
 
2526
                xt_unlock_mutex_ns(&tab->tab_row_lock);
2107
2527
                return FAILED;
2108
2528
        }
2109
 
        tab->tab_row_free = offset;
 
2529
        tab->tab_row_free_id = row_id;
2110
2530
        tab->tab_row_fnum++;
2111
 
        tab->tab_head_dirty = TRUE;
2112
 
        xt_mutex_unlock(&tab->tab_row_lock);
 
2531
        xt_unlock_mutex_ns(&tab->tab_row_lock);
 
2532
 
 
2533
        if (!ot->ot_thread->st_xact_buf.xbuf_modify_table(ot, XT_LOG_ENT_ROW_FREED, op_seq, 0, row_id, sizeof(XTTabRowRefDRec), (xtWord1 *) &free_row))
 
2534
                return FAILED;
 
2535
 
2113
2536
        return OK;
2114
2537
}
2115
2538
 
2116
 
static void tab_free_extended_record(XTOpenTablePtr ot, off_t address, XTTabRecExtDPtr ext_rec)
 
2539
static void tab_free_ext_record_on_fail(XTOpenTablePtr ot, xtRecordID rec_id, XTTabRecExtDPtr ext_rec, xtBool log_err)
2117
2540
{
2118
 
        xtWord4 log_over_size = XT_GET_DISK_4(ext_rec->re_log_dat_siz_4);
2119
 
        xtWord4 log_id;
2120
 
        off_t   log_offset;
2121
 
 
2122
 
        XT_GET_LOG_REF_6(log_id, log_offset, ext_rec->re_log_rec_6);
2123
 
        if (!tab_delete_log_record(ot, address, log_id, log_offset, log_over_size))
2124
 
                xt_log_and_clear_exception_ns();
 
2541
        xtWord4         log_over_size = XT_GET_DISK_4(ext_rec->re_log_dat_siz_4);
 
2542
        xtLogID         log_id;
 
2543
        xtLogOffset     log_offset;
 
2544
 
 
2545
        XT_GET_LOG_REF(log_id, log_offset, ext_rec);
 
2546
 
 
2547
        if (!ot->ot_thread->st_dlog_buf.dlb_delete_log(log_id, log_offset, log_over_size, ot->ot_table->tab_id, rec_id)) {
 
2548
                if (log_err)
 
2549
                        xt_log_and_clear_exception_ns();
 
2550
        }
2125
2551
}
2126
2552
 
2127
2553
static void tab_save_exception(XTExceptionPtr e)
2140
2566
 
2141
2567
/*
2142
2568
 * This function assumes that a record may be partially written.
2143
 
 * It removes whatever all associated data and references to the record.
 
2569
 * It removes all associated data and references to the record.
2144
2570
 *
2145
2571
 * This function return XT_ERR if an error occurs.
2146
2572
 * TRUE if the record has been removed, and may be freed.
2147
2573
 * FALSE if the record has already been freed. 
2148
2574
 *
2149
2575
 */
2150
 
xtPublic int xt_tab_remove_record(XTOpenTablePtr ot, off_t address, xtWord1 *rec_data, off_t *prev_var, xtBool clean_delete)
 
2576
xtPublic int xt_tab_remove_record(XTOpenTablePtr ot, xtRecordID rec_id, xtWord1 *rec_data, xtRecordID *prev_var_id, xtBool clean_delete, xtRowID row_id, xtXactID xn_id)
2151
2577
{
2152
2578
        register XTTableHPtr    tab = ot->ot_table;
2153
2579
 
2154
 
        *prev_var = 0;
 
2580
        *prev_var_id = 0;
 
2581
 
 
2582
        if (!rec_id)
 
2583
                return FALSE;
 
2584
 
2155
2585
        /*
2156
2586
         * NOTE: This function uses the read buffer. This should be OK because
2157
2587
         * the function is only called by the sweeper. The read buffer
2158
 
         * is REQUIRED because of the call to tab_load_ext_data()!!!
 
2588
         * is REQUIRED because of the call to xt_tab_load_ext_data()!!!
2159
2589
         */
2160
 
        if (!xt_tab_get_record(ot, address, tab->tab_dic.dic_rec_size, ot->ot_row_rbuffer))
 
2590
        if (!xt_tab_get_rec_data(ot, rec_id, tab->tab_dic.dic_rec_size, ot->ot_row_rbuffer))
2161
2591
                return XT_ERR;
2162
2592
 
2163
 
        if (ot->ot_row_rbuffer[0] == XT_TAB_STATUS_FREED)
2164
 
                /* The record has already been freed. */
2165
 
                return FALSE;
2166
 
 
2167
 
        *prev_var = (off_t) XT_GET_DISK_6(((XTTabRecExtDPtr) ot->ot_row_rbuffer)->tr_prev_var_6);
2168
 
 
 
2593
        /* Check of the record has not already been freed: */
 
2594
        if (XT_REC_IS_FREE(((XTTabRecExtDPtr) ot->ot_row_rbuffer)->tr_rec_type_1))
 
2595
                return FALSE;
 
2596
 
 
2597
        /* This record must belong to the given row: */
 
2598
        if (XT_GET_DISK_4(((XTTabRecExtDPtr) ot->ot_row_rbuffer)->tr_row_id_4) != row_id)
 
2599
                return FALSE;
 
2600
 
 
2601
        /* The transaction ID of the record must be BEFORE or equal to the given
 
2602
         * transaction ID.
 
2603
         *
 
2604
         * No, this does not always hold. Because we wait for updates now,
 
2605
         * a "younger" transaction can update before an older
 
2606
         * transaction.
 
2607
         * Commit order determined the actual order in which the transactions
 
2608
         * should be replicated. This is determined by the log number of
 
2609
         * the commit record!
 
2610
        if (db->db_xn_curr_id(xn_id, XT_GET_DISK_4(((XTTabRecExtDPtr) ot->ot_row_rbuffer)->tr_xact_id_4)))
 
2611
                return FALSE;
 
2612
         */
 
2613
 
 
2614
        *prev_var_id = XT_GET_DISK_4(((XTTabRecExtDPtr) ot->ot_row_rbuffer)->tr_prev_rec_id_4);
 
2615
 
 
2616
        /* Bit no longer used.
2169
2617
        if (XT_REC_IS_REMOVED(ot->ot_row_rbuffer[0]))
2170
 
                /* The record has already been removed. */
2171
2618
                return TRUE;
 
2619
        */
2172
2620
 
2173
2621
        if (tab->tab_dic.dic_key_count) {
2174
2622
                XTIndexPtr      *ind;
2175
2623
 
2176
 
                ind = tab->tab_dic.dic_keys;
2177
 
 
2178
2624
                switch (ot->ot_row_rbuffer[0]) {
2179
2625
                        case XT_TAB_STATUS_DELETE:
 
2626
                                /* Note, clean_delete affect is now ignored, but this
 
2627
                                 * is OK because the records is freed immediately.
 
2628
                                 */
2180
2629
                                if (clean_delete)
2181
2630
                                        ot->ot_row_rbuffer[0] |= XT_TAB_STATUS_CLEANED_BIT;
2182
2631
                        case XT_TAB_STATUS_DEL_CLEAN:
2187
2636
                                break;
2188
2637
                        case XT_TAB_STATUS_VARIABLE:
2189
2638
                        case XT_TAB_STATUS_VAR_CLEAN:
2190
 
                                if (!myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE, rec_data)) {
 
2639
                                if (!myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE, rec_data, tab->tab_dic.dic_ind_cols_req)) {
2191
2640
                                        xt_log_and_clear_exception_ns();
2192
 
                                        return TRUE;
 
2641
                                        goto set_removed;
2193
2642
                                }
2194
2643
                                break;
2195
 
                        case XT_TAB_STATUS_EXTENDED:
 
2644
                        case XT_TAB_STATUS_EXT_DLOG:
2196
2645
                        case XT_TAB_STATUS_EXT_CLEAN: {
2197
 
                                if (!tab_load_ext_data(ot, address, rec_data)) {
2198
 
                                        xt_log_and_clear_exception_ns();
2199
 
                                        return TRUE;
 
2646
                                u_int cols_req = tab->tab_dic.dic_ind_cols_req;
 
2647
 
 
2648
                                ASSERT_NS(cols_req);
 
2649
                                if (cols_req && cols_req <= tab->tab_dic.dic_fix_col_count) {
 
2650
                                        if (!myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_EXT_HEADER_SIZE, rec_data, cols_req)) {
 
2651
                                                xt_log_and_clear_exception_ns();
 
2652
                                                goto set_removed;
 
2653
                                        }
 
2654
                                }
 
2655
                                else {
 
2656
                                        if (!xt_tab_load_ext_data(ot, rec_id, rec_data, cols_req)) {
 
2657
                                                /* This is actually quite possible after recovery, see [(3)] */
 
2658
                                                if (ot->ot_thread->t_exception.e_xt_err != XT_ERR_BAD_EXT_RECORD &&
 
2659
                                                        ot->ot_thread->t_exception.e_xt_err != XT_ERR_DATA_LOG_NOT_FOUND)
 
2660
                                                        xt_log_and_clear_exception_ns();
 
2661
                                                goto set_removed;
 
2662
                                        }
2200
2663
                                }
2201
2664
                                break;
2202
2665
                        }
2204
2667
                                break;
2205
2668
                }
2206
2669
 
2207
 
                if (tab->tab_dic.dic_blob_count)
2208
 
                        myxt_release_blobs(ot, rec_data, address);
2209
 
 
 
2670
                /* TODO: This change may only be flushed after the
 
2671
                 * operation below has been flushed to the log.
 
2672
                 */
 
2673
                ind = tab->tab_dic.dic_keys;
2210
2674
                for (u_int i=0; i<tab->tab_dic.dic_key_count; i++, ind++) {
2211
 
                        if (!xt_idx_delete(ot, *ind, address, rec_data))
 
2675
                        if (!xt_idx_delete(ot, *ind, rec_id, rec_data))
2212
2676
                                xt_log_and_clear_exception_ns();
2213
2677
                }
2214
2678
        }
2218
2682
                                if (clean_delete)
2219
2683
                                        ot->ot_row_rbuffer[0] |= XT_TAB_STATUS_CLEANED_BIT;
2220
2684
                        case XT_TAB_STATUS_DEL_CLEAN:
2221
 
                                goto set_removed;
 
2685
                                break;
2222
2686
                }
2223
2687
        }
2224
2688
 
2225
 
        if (ot->ot_row_rbuffer[0] == XT_TAB_STATUS_EXTENDED || ot->ot_row_rbuffer[0] == XT_TAB_STATUS_EXT_CLEAN)
2226
 
                tab_free_extended_record(ot, address, (XTTabRecExtDPtr) ot->ot_row_rbuffer);
2227
 
 
2228
2689
        set_removed:
2229
 
        ot->ot_row_rbuffer[0] |= XT_TAB_STATUS_REMOVED_BIT;
2230
 
        if (!xt_tab_put_data(ot, address + offsetof(XTTabRecHeadDRec, tr_rec_type_1), 1, ot->ot_row_rbuffer))
2231
 
                return XT_ERR;
2232
 
 
 
2690
        if (XT_REC_IS_EXT_DLOG(ot->ot_row_rbuffer[0])) {
 
2691
                /* [(1)] Lock, and read again to make sure that the
 
2692
                 * compactor does not change this record, while
 
2693
                 * we are removing it! */
 
2694
                xt_lock_mutex_ns(&tab->tab_db->db_co_ext_lock);
 
2695
 
 
2696
                if (!xt_tab_free_record(ot, XT_LOG_ENT_REC_REMOVED_EXT, rec_id, clean_delete)) {
 
2697
                        xt_unlock_mutex_ns(&tab->tab_db->db_co_ext_lock);
 
2698
                        return XT_ERR;
 
2699
                }
 
2700
 
 
2701
                xt_unlock_mutex_ns(&tab->tab_db->db_co_ext_lock);
 
2702
                return TRUE;
 
2703
        }
 
2704
 
 
2705
        if (XT_REC_IS_DELETE(ot->ot_row_rbuffer[0])) {
 
2706
                /* No attached resources, does not need to be removed! */
 
2707
                if (!xt_tab_free_record(ot, XT_LOG_ENT_REC_FREED, rec_id, clean_delete))
 
2708
                        return XT_ERR;
 
2709
        }
 
2710
        else {
 
2711
                if (!xt_tab_free_record(ot, XT_LOG_ENT_REC_REMOVED, rec_id, clean_delete))
 
2712
                        return XT_ERR;
 
2713
        }
2233
2714
        return TRUE;
2234
2715
}
2235
2716
 
2236
 
static xtWord4 tab_new_row(XTOpenTablePtr ot, XTTableHPtr tab)
 
2717
static xtRowID tab_new_row(XTOpenTablePtr ot, XTTableHPtr tab)
2237
2718
{
2238
 
        off_t                   row_id;
 
2719
        xtRowID                 row_id;
2239
2720
        XTTabRowRefDRec row_buf;
2240
 
        //D xtWord4                     op_seq;
2241
 
 
2242
 
        xt_mutex_lock(&tab->tab_row_lock);
2243
 
        //D op_seq = tab->tab_seq.ts_get_op_seq();
2244
 
        if ((row_id = tab->tab_row_free)) {
2245
 
 
2246
 
                if (!xt_dc_read(ot->ot_row_file, row_id, sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf)) {
2247
 
                        xt_mutex_unlock(&tab->tab_row_lock);
 
2721
        xtOpSeqNo               op_seq;
 
2722
        xtRowID                 next_row_id = 0;
 
2723
        u_int                   status;
 
2724
 
 
2725
        xt_lock_mutex_ns(&tab->tab_row_lock);
 
2726
        if ((row_id = tab->tab_row_free_id)) {
 
2727
                status = XT_LOG_ENT_ROW_NEW_FL;
 
2728
 
 
2729
                if (!tab->tab_rows.xt_tc_read_4(ot->ot_row_file, row_id, (xtWord4 *) &row_buf)) {
 
2730
                        xt_unlock_mutex_ns(&tab->tab_row_lock);
2248
2731
                        return 0;
2249
2732
                }
2250
 
                tab->tab_row_free = XT_GET_DISK_6(row_buf.rr_variation_6);
 
2733
                next_row_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
 
2734
                tab->tab_row_free_id = next_row_id;
2251
2735
                tab->tab_row_fnum--;
2252
2736
        }
2253
2737
        else {
2254
 
                row_id = tab->tab_row_eof;
2255
 
                tab->tab_row_eof += sizeof(XTTabRowRefDRec);
 
2738
                status = XT_LOG_ENT_ROW_NEW;
 
2739
                row_id = tab->tab_row_eof_id;
 
2740
                if (row_id == 0xFFFFFFFF) {
 
2741
                        xt_unlock_mutex_ns(&tab->tab_row_lock);
 
2742
                        xt_register_xterr(XT_REG_CONTEXT, XT_ERR_MAX_ROW_COUNT);
 
2743
                        return 0;
 
2744
                }
 
2745
                tab->tab_row_eof_id++;
2256
2746
        }
2257
 
        tab->tab_head_dirty = TRUE;
2258
 
        xt_mutex_unlock(&tab->tab_row_lock);
2259
 
 
2260
 
        //D if () {
2261
 
        //D     if (!ot->ot_thread.alloc_rec_from_free(&tab->tab_db->db_xlog, op_seq, xtWord8 rec, xtWord8 next_rec))
2262
 
        //D             return 0;
2263
 
        //D }
2264
 
        //D else {
2265
 
        //D xtBool                                      alloc_rec_from_eof(XTDatabaseLogPtr log, xtWord4 op_seq, xtWord8 rec);
2266
 
        //D }
2267
 
 
2268
 
        XT_DISABLED_TRACE(("new row tx=%d row=%d\n", (int) ot->ot_thread->st_xact_data->xd_start_id, (int) (row_id >> XT_TAB_ROW_SHIFTS)));
2269
 
        ASSERT_NS(row_id >> XT_TAB_ROW_SHIFTS);
2270
 
        return (xtWord4) (row_id >> XT_TAB_ROW_SHIFTS);
 
2747
        op_seq = tab->tab_seq.ts_get_op_seq();
 
2748
        xt_unlock_mutex_ns(&tab->tab_row_lock);
 
2749
 
 
2750
        if (!ot->ot_thread->st_xact_buf.xbuf_modify_table(ot, status, op_seq, next_row_id, row_id, 0, NULL))
 
2751
                return 0;
 
2752
 
 
2753
        XT_DISABLED_TRACE(("new row tx=%d row=%d\n", (int) ot->ot_thread->st_xact_data->xd_start_xn_id, (int) row_id));
 
2754
        ASSERT_NS(row_id);
 
2755
        return row_id;
2271
2756
}
2272
2757
 
2273
 
xtPublic xtBool xt_tab_get_row(register XTOpenTablePtr ot, xtWord4 row_id, off_t *variation)
 
2758
xtPublic xtBool xt_tab_get_row(register XTOpenTablePtr ot, xtRowID row_id, xtRecordID *var_rec_id)
2274
2759
{
2275
 
        union {
2276
 
                XTTabRowRefDRec row_buf;
2277
 
                xtWord8                 buff_8;
2278
 
        } x;
2279
 
 
2280
 
        (void) ASSERT_NS(sizeof(XTTabRowRefDRec) == 8);
2281
 
        if (!xt_dc_read_8(ot->ot_row_file, (off_t) row_id << XT_TAB_ROW_SHIFTS, &x.buff_8))
 
2760
        register XTTableHPtr    tab = ot->ot_table;
 
2761
        XTTabRowRefDRec                 row_buf;
 
2762
 
 
2763
        (void) ASSERT_NS(sizeof(XTTabRowRefDRec) == 4);
 
2764
 
 
2765
        if (!tab->tab_rows.xt_tc_read_4(ot->ot_row_file, row_id, (xtWord4 *) &row_buf))
2282
2766
                return FAILED;
2283
 
        if (x.row_buf.rr_rec_type_1 == XT_TAB_ROW_FREE)
2284
 
                *variation = 0;
2285
 
        else
2286
 
                *variation = XT_GET_DISK_6(x.row_buf.rr_variation_6);
 
2767
        *var_rec_id = (xtRecordID) XT_GET_DISK_4(row_buf.rr_ref_id_4);
2287
2768
        return OK;
2288
2769
}
2289
2770
 
2290
 
xtPublic xtBool xt_tab_set_row(XTOpenTablePtr ot, xtWord4 row_id, off_t variation, xtBool write_thru)
 
2771
xtPublic xtBool xt_tab_set_row(XTOpenTablePtr ot, u_int status, xtRowID row_id, xtRecordID var_rec_id)
2291
2772
{
2292
 
        XTTabRowRefDRec row_buf;
2293
 
 
2294
 
        row_buf.rr_rec_type_1 = XT_TAB_ROW_IN_USE;
2295
 
        XT_SET_DISK_6(row_buf.rr_variation_6, variation);
2296
 
        row_buf.rr_unused_1 = 0;
2297
 
        if (write_thru)
2298
 
                return xt_dc_write_thru(ot->ot_row_file, (off_t) row_id << XT_TAB_ROW_SHIFTS, sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf);
2299
 
        return xt_dc_write(ot->ot_row_file, (off_t) row_id << XT_TAB_ROW_SHIFTS, sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf);
 
2773
        register XTTableHPtr    tab = ot->ot_table;
 
2774
        XTTabRowRefDRec                 row_buf;
 
2775
        xtOpSeqNo                               op_seq;
 
2776
 
 
2777
        ASSERT_NS(var_rec_id < tab->tab_rec_eof_id);
 
2778
        XT_SET_DISK_4(row_buf.rr_ref_id_4, var_rec_id);
 
2779
 
 
2780
        if (!tab->tab_rows.xt_tc_write(ot->ot_row_file, row_id, 0, sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf, &op_seq))
 
2781
                return FAILED;
 
2782
 
 
2783
        return ot->ot_thread->st_xact_buf.xbuf_modify_table(ot, status, op_seq, 0, row_id, sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf);
2300
2784
}
2301
2785
 
2302
 
xtPublic xtBool xt_tab_free_record(XTOpenTablePtr ot, off_t address)
 
2786
xtPublic xtBool xt_tab_free_record(XTOpenTablePtr ot, u_int status, xtRecordID rec_id, xtBool clean_delete)
2303
2787
{
2304
2788
        register XTTableHPtr    tab = ot->ot_table;
2305
 
        XTTabRecFreeDRec                rec_buf;
 
2789
        XTTabRecHeadDRec                rec_head;
 
2790
        XTactFreeRecEntryDRec   free_rec;
 
2791
        xtRecordID                              prev_rec_id;
2306
2792
 
2307
2793
        /* Don't free the record if it is already free! */
2308
 
        if (!xt_tab_get_record(ot, address, 1, (xtWord1 *) &rec_buf))
 
2794
        if (!xt_tab_get_rec_data(ot, rec_id, sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head))
2309
2795
                return FAILED;
2310
2796
 
2311
 
        if (rec_buf.tr_rec_type_1 != XT_TAB_STATUS_FREED) {
2312
 
                rec_buf.tr_rec_type_1 = XT_TAB_STATUS_FREED;
2313
 
                rec_buf.tr_stat_id_1 = 0;
2314
 
 
2315
 
                xt_mutex_lock(&tab->tab_free_lock);
2316
 
                XT_SET_DISK_6(rec_buf.rf_next_block_6, tab->tab_data_free);
2317
 
                if (!xt_tab_put_data(ot, address, sizeof(XTTabRecFreeDRec), (xtWord1 *) &rec_buf)) {
2318
 
                        xt_mutex_unlock(&tab->tab_free_lock);
 
2797
        if (!XT_REC_IS_FREE(rec_head.tr_rec_type_1)) {
 
2798
                xtOpSeqNo op_seq;
 
2799
 
 
2800
                /* This information will be used to determine if the resources of the record
 
2801
                 * should be removed.
 
2802
                 */
 
2803
                free_rec.fr_stat_id_1 = rec_head.tr_stat_id_1;
 
2804
                XT_COPY_DISK_4(free_rec.fr_xact_id_4, rec_head.tr_xact_id_4);
 
2805
 
 
2806
                /* A record is "clean" deleted if the record was
 
2807
                 * XT_TAB_STATUS_DELETE which was comitted.
 
2808
                 * This makes sure that the record will still invalidate
 
2809
                 * following records in a row.
 
2810
                 *
 
2811
                 * Example:
 
2812
                 *
 
2813
                 * 1. INSERT A ROW, then DELETE it, assume the sweeper is delayed.
 
2814
                 *
 
2815
                 * We now have the sequence row X --> del rec A --> valid rec B.
 
2816
                 *
 
2817
                 * 2. A SELECT can still find B. Assume it now goes to check
 
2818
                 *    if the record is valid, ti reads row X, and gets A.
 
2819
                 *
 
2820
                 * 3. Now the sweeper gets control and removes X, A and B.
 
2821
                 *    It frees A with the clean bit.
 
2822
                 *
 
2823
                 * 4. Now the SELECT gets control and reads A. Normally a freed record
 
2824
                 *    would be ignored, and it would go onto B, which would then
 
2825
                 *    be considered valid (note, even after the free, the next
 
2826
                 *    pointer is not affected).
 
2827
                 *
 
2828
                 * However, because the clean bit has been set, it will stop at A
 
2829
                 * and consider B invalid (which is the desired result).
 
2830
                 *
 
2831
                 * NOTE: We assume it is not possible for A to be allocated and refer
 
2832
                 * to B, because B is freed before A. This means that B may refer to
 
2833
                 * A after the next allocation.
 
2834
                 */
 
2835
 
 
2836
                ASSERT_NS(sizeof(XTTabRecFreeDRec) == sizeof(XTactFreeRecEntryDRec) - offsetof(XTactFreeRecEntryDRec, fr_rec_type_1));
 
2837
                free_rec.fr_rec_type_1 = XT_TAB_STATUS_FREED | (clean_delete ? XT_TAB_STATUS_CLEANED_BIT : 0);
 
2838
                free_rec.fr_not_used_1 = 0;
 
2839
 
 
2840
                xt_lock_mutex_ns(&tab->tab_rec_lock);
 
2841
                prev_rec_id = tab->tab_rec_free_id;
 
2842
                XT_SET_DISK_4(free_rec.fr_next_rec_id_4, prev_rec_id);
 
2843
                if (!xt_tab_put_rec_data(ot, rec_id, sizeof(XTTabRecFreeDRec), &free_rec.fr_rec_type_1, &op_seq)) {
 
2844
                        xt_unlock_mutex_ns(&tab->tab_rec_lock);
2319
2845
                        return FAILED;
2320
2846
                }
2321
 
                tab->tab_data_free = address;
2322
 
                ASSERT_NS(tab->tab_data_free < tab->tab_data_eof + tab->tab_buf_offset);
2323
 
                tab->tab_data_fnum++;
2324
 
                tab->tab_head_dirty = TRUE;
2325
 
                xt_mutex_unlock(&tab->tab_free_lock);
 
2847
                tab->tab_rec_free_id = rec_id;
 
2848
                ASSERT_NS(tab->tab_rec_free_id < tab->tab_rec_eof_id);
 
2849
                tab->tab_rec_fnum++;
 
2850
                xt_unlock_mutex_ns(&tab->tab_rec_lock);
 
2851
 
 
2852
                if (!ot->ot_thread->st_xact_buf.xbuf_modify_table(ot, status, op_seq, rec_id, rec_id, sizeof(XTactFreeRecEntryDRec) - offsetof(XTactFreeRecEntryDRec, fr_stat_id_1), &free_rec.fr_stat_id_1))
 
2853
                        return FAILED;
2326
2854
        }
2327
2855
        return OK;
2328
2856
}
2329
2857
 
2330
 
/* Functions that cleanup on failure, preserve the exception. */
2331
 
static void tab_set_row_on_fail(XTOpenTablePtr ot, xtWord4 row_id, off_t variation, xtBool write_thru)
2332
 
{
2333
 
        XTExceptionRec e;
2334
 
 
2335
 
        tab_save_exception(&e);
2336
 
        xt_tab_set_row(ot, row_id, variation, write_thru);
2337
 
        tab_restore_exception(&e);
2338
 
}
2339
 
 
2340
 
static void tab_free_row_on_fail(XTOpenTablePtr ot, XTTableHPtr tab, xtWord4 row_id)
 
2858
static void tab_free_row_on_fail(XTOpenTablePtr ot, XTTableHPtr tab, xtRowID row_id)
2341
2859
{
2342
2860
        XTExceptionRec e;
2343
2861
 
2346
2864
        tab_restore_exception(&e);
2347
2865
}
2348
2866
 
2349
 
static void tab_remove_record_on_fail(XTOpenTablePtr ot, off_t address, XTTabRecHeadDPtr row_ptr, xtWord1 *rec_data, u_int key_count)
2350
 
{
2351
 
        XTExceptionRec e;
 
2867
static xtBool tab_add_record(XTOpenTablePtr ot, XTTabRecInfoPtr rec_info, u_int status)
 
2868
{
 
2869
        register XTTableHPtr    tab = ot->ot_table;
 
2870
        XTThreadPtr                             thread = ot->ot_thread;
 
2871
        xtRecordID                              rec_id;
 
2872
        xtLogID                                 log_id;
 
2873
        xtLogOffset                             log_offset;
 
2874
        xtOpSeqNo                               op_seq;
 
2875
        xtRecordID                              next_rec_id = 0;
 
2876
 
 
2877
        if (rec_info->ri_ext_rec) {
 
2878
                /* Determine where the overflow will go... */
 
2879
                if (!thread->st_dlog_buf.dlb_get_log_offset(&log_id, &log_offset, rec_info->ri_log_data_size + offsetof(XTactExtRecEntryDRec, er_data)))
 
2880
                        return FAILED;
 
2881
                XT_SET_LOG_REF(rec_info->ri_ext_rec, log_id, log_offset);
 
2882
        }
 
2883
 
 
2884
        /* Write the record to disk: */
 
2885
        xt_lock_mutex_ns(&tab->tab_rec_lock);
 
2886
        if ((rec_id = tab->tab_rec_free_id)) {
 
2887
                XTTabRecFreeDRec free_block;
 
2888
 
 
2889
                ASSERT_NS(rec_id < tab->tab_rec_eof_id);
 
2890
                if (!xt_tab_get_rec_data(ot, rec_id, sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_block)) {
 
2891
                        xt_unlock_mutex_ns(&tab->tab_rec_lock);
 
2892
                        return FAILED;
 
2893
                }
 
2894
                next_rec_id = XT_GET_DISK_4(free_block.rf_next_rec_id_4);
 
2895
                tab->tab_rec_free_id = next_rec_id;
 
2896
                        
 
2897
                tab->tab_rec_fnum--;
 
2898
                
 
2899
                /* XT_LOG_ENT_UPDATE --> XT_LOG_ENT_UPDATE_FL */
 
2900
                /* XT_LOG_ENT_INSERT --> XT_LOG_ENT_INSERT_FL */
 
2901
                /* XT_LOG_ENT_DELETE --> XT_LOG_ENT_DELETE_FL */
 
2902
                status += 2;
 
2903
        }
 
2904
        else {
 
2905
                rec_id = tab->tab_rec_eof_id;
 
2906
                tab->tab_rec_eof_id++;
 
2907
        }
 
2908
        if (!xt_tab_put_rec_data(ot, rec_id, rec_info->ri_rec_buf_size, (xtWord1 *) rec_info->ri_fix_rec_buf, &op_seq)) {
 
2909
                xt_unlock_mutex_ns(&tab->tab_rec_lock);
 
2910
                return FAILED;
 
2911
        }
 
2912
        xt_unlock_mutex_ns(&tab->tab_rec_lock);
 
2913
 
 
2914
        if (!thread->st_xact_buf.xbuf_modify_table(ot, status, op_seq, next_rec_id, rec_id,  rec_info->ri_rec_buf_size, (xtWord1 *) rec_info->ri_fix_rec_buf))
 
2915
                return FAILED;
 
2916
 
 
2917
        if (rec_info->ri_ext_rec) {
 
2918
                /* Write the log buffer overflow: */            
 
2919
                rec_info->ri_log_buf->er_status_1 = XT_LOG_ENT_EXT_REC_OK;
 
2920
                XT_SET_DISK_4(rec_info->ri_log_buf->er_data_size_4, rec_info->ri_log_data_size);
 
2921
                XT_SET_DISK_4(rec_info->ri_log_buf->er_tab_id_4, tab->tab_id);
 
2922
                XT_SET_DISK_4(rec_info->ri_log_buf->er_rec_id_4, rec_id);
 
2923
                if (!thread->st_dlog_buf.dlb_append_log(log_id, log_offset, offsetof(XTactExtRecEntryDRec, er_data) + rec_info->ri_log_data_size, (xtWord1 *) rec_info->ri_log_buf)) {
 
2924
                        /* Failed to write the overflow, free the record allocated above: */
 
2925
                        return FAILED;
 
2926
                }
 
2927
        }
 
2928
 
 
2929
        XT_DISABLED_TRACE(("new rec tx=%d val=%d\n", (int) thread->st_xact_data->xd_start_xn_id, (int) rec_id));
 
2930
        rec_info->ri_rec_id = rec_id;
 
2931
        return OK;
 
2932
}
 
2933
 
 
2934
static void tab_delete_record_on_fail(XTOpenTablePtr ot, xtRowID row_id, xtRecordID rec_id, XTTabRecHeadDPtr row_ptr, xtWord1 *rec_data, u_int key_count)
 
2935
{
 
2936
        XTExceptionRec  e;
 
2937
        xtBool                  log_err = TRUE;
 
2938
        XTTabRecInfoRec rec_info;
2352
2939
 
2353
2940
        tab_save_exception(&e);
 
2941
        
 
2942
        if (e.e_xt_err == XT_ERR_DUPLICATE_KEY || 
 
2943
                e.e_xt_err == XT_ERR_DUPLICATE_FKEY) {
 
2944
                /* If the error does not cause rollback, then we will ignore the
 
2945
                 * error if an error occurs in the UNDO!
 
2946
                 */
 
2947
                log_err = FALSE;
 
2948
                tab_restore_exception(&e);
 
2949
        }
2354
2950
        if (key_count) {
2355
2951
                XTIndexPtr      *ind;
2356
2952
 
2357
2953
                ind = ot->ot_table->tab_dic.dic_keys;
2358
2954
                for (u_int i=0; i<key_count; i++, ind++) {
2359
 
                        if (!xt_idx_delete(ot, *ind, address, rec_data))
2360
 
                                xt_log_and_clear_exception_ns();
2361
 
                }
2362
 
        }
2363
 
 
2364
 
        if (row_ptr->tr_rec_type_1 == XT_TAB_STATUS_EXTENDED || row_ptr->tr_rec_type_1 == XT_TAB_STATUS_EXT_CLEAN)
2365
 
                tab_free_extended_record(ot, address, (XTTabRecExtDPtr) row_ptr);
2366
 
 
2367
 
        if (!xt_tab_free_record(ot, address))
2368
 
                xt_log_and_clear_exception_ns();
2369
 
 
2370
 
        tab_restore_exception(&e);
2371
 
}
2372
 
 
2373
 
static void tab_free_record_on_fail(XTOpenTablePtr ot, off_t address)
2374
 
{
2375
 
        XTExceptionRec e;
2376
 
 
2377
 
        tab_save_exception(&e);
2378
 
        if (!xt_tab_free_record(ot, address))
2379
 
                xt_log_and_clear_exception_ns();
2380
 
        tab_restore_exception(&e);
2381
 
}
2382
 
 
2383
 
static xtBool tab_add_record(XTOpenTablePtr ot, XTTabRecInfoPtr rec_info)
2384
 
{
2385
 
        register XTTableHPtr    tab = ot->ot_table;
2386
 
        off_t                                   rec_address;
2387
 
        xtWord4                                 log_id;
2388
 
        off_t                                   log_offset;
2389
 
 
2390
 
        if (rec_info->ri_ext_rec) {
2391
 
                /* Determine where the overflow will go... */
2392
 
                if (!xt_dl_get_log_offset(ot, &log_id, &log_offset, rec_info->ri_log_data_size + XT_LOG_REC_HEADER_SIZE))
2393
 
                        return FAILED;
2394
 
                XT_SET_LOG_REF_6(rec_info->ri_ext_rec->re_log_rec_6, log_id, log_offset);
2395
 
        }
2396
 
 
2397
 
        /* Write the record to disk: */
2398
 
        xt_mutex_lock(&tab->tab_free_lock);
2399
 
        if ((rec_address = tab->tab_data_free)) {
2400
 
                XTTabRecFreeDRec free_block;
2401
 
 
2402
 
                if (!xt_tab_get_record(ot, rec_address, sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_block)) {
2403
 
                        xt_mutex_unlock(&tab->tab_free_lock);
2404
 
                        return FAILED;
2405
 
                }
2406
 
                tab->tab_data_free = XT_GET_DISK_6(free_block.rf_next_block_6);
2407
 
                /* Can happen if, we allocate a record, update the record,
2408
 
                 * then crash before we can update the free list pointer.
2409
 
                 * TODO: fix this!
2410
 
                 */
2411
 
                if (tab->tab_data_free >= tab->tab_data_eof + tab->tab_buf_offset ||
2412
 
                        ((tab->tab_data_free - sizeof(XTTabDataHeadDRec)) % tab->tab_dic.dic_rec_size) != 0) {
2413
 
                        /* We have to drop the free list: */
2414
 
                        tab->tab_data_free = 0;
2415
 
                        tab->tab_data_fnum = 0;
2416
 
                        tab->tab_head_dirty = TRUE;
2417
 
                        goto write_eof;
2418
 
                }
2419
 
                        
2420
 
                ASSERT_NS(tab->tab_data_free < tab->tab_data_eof + tab->tab_buf_offset);
2421
 
                tab->tab_data_fnum--;
2422
 
                tab->tab_head_dirty = TRUE;
2423
 
                xt_mutex_unlock(&tab->tab_free_lock);
2424
 
 
2425
 
                /* Threads can do this together: */
2426
 
                if (!xt_tab_put_data(ot, rec_address, rec_info->ri_rec_buf_size, (xtWord1 *) rec_info->ri_fix_rec_buf))
2427
 
                        return FAILED;
2428
 
 
2429
 
                goto write_overflow;
2430
 
        }
2431
 
        write_eof:
2432
 
        xt_mutex_unlock(&tab->tab_free_lock);
2433
 
 
2434
 
        /* Fixed length records must always fit into the buffer: */
2435
 
        ASSERT_NS((tab->tab_buf_size % tab->tab_dic.dic_rec_size) == 0);
2436
 
        ASSERT_NS(ot->ot_rec_size == tab->tab_dic.dic_rec_size);
2437
 
        
2438
 
        xt_rwlock_wrlock(&tab->tab_buf_rwlock);
2439
 
 
2440
 
        rec_address = tab->tab_data_eof + tab->tab_buf_offset;
2441
 
        ASSERT_NS(((rec_address - sizeof(XTTabDataHeadDRec)) % tab->tab_dic.dic_rec_size) == 0);
2442
 
 
2443
 
        if (tab->tab_buf_offset + ot->ot_rec_size > tab->tab_buf_size) {
2444
 
                /* Because the buffer size is a multiple of the record size, this must be the case: */
2445
 
                ASSERT_NS(tab->tab_buf_size == tab->tab_buf_offset);
2446
 
 
2447
 
                /* TODO: Double buffering will ease this bottleneck: */
2448
 
                if (!xt_pwrite_file(ot->ot_data_file, tab->tab_data_eof, tab->tab_buf_size, tab->tab_data_buf)) {
2449
 
                        xt_rwlock_unlock(&tab->tab_buf_rwlock);
2450
 
                        return FAILED;
2451
 
                }
2452
 
                tab->tab_data_eof += tab->tab_buf_size;
2453
 
                memcpy(tab->tab_data_buf, ((xtWord1 *) rec_info->ri_fix_rec_buf), rec_info->ri_rec_buf_size);
2454
 
                tab->tab_buf_offset = ot->ot_rec_size;
2455
 
        }
2456
 
        else {
2457
 
                memcpy(tab->tab_data_buf + tab->tab_buf_offset, rec_info->ri_fix_rec_buf, rec_info->ri_rec_buf_size);
2458
 
                tab->tab_buf_offset += ot->ot_rec_size;
2459
 
        }
2460
 
 
2461
 
        xt_rwlock_unlock(&tab->tab_buf_rwlock);
2462
 
 
2463
 
        write_overflow:
2464
 
        if (rec_info->ri_ext_rec) {
2465
 
                /* Write the log buffer overflow: */            
2466
 
                rec_info->ri_log_buf->lb_status_1 = XT_DL_STATUS_RECORD;
2467
 
                XT_SET_DISK_4(rec_info->ri_log_buf->lb_data_size_4, rec_info->ri_log_data_size);
2468
 
                XT_SET_DISK_6(rec_info->ri_log_buf->lb_record_6, rec_address);
2469
 
                if (!xt_dl_append_log(ot, log_id, log_offset, rec_info->ri_log_data_size + XT_LOG_REC_HEADER_SIZE, rec_info->ri_log_buf)) {
2470
 
                        /* Failed to write the overflow, free the record allocated above: */
2471
 
                        tab_free_record_on_fail(ot, rec_address);
2472
 
                        return FAILED;
2473
 
                }
2474
 
        }
2475
 
 
2476
 
        XT_DISABLED_TRACE(("new rec tx=%d val=%d\n", (int) ot->ot_thread->st_xact_data->xd_start_id, (int) rec_address));
2477
 
        rec_info->ri_rec_address = rec_address;
2478
 
        return OK;
 
2955
                        if (!xt_idx_delete(ot, *ind, rec_id, rec_data)) {
 
2956
                                if (log_err)
 
2957
                                        xt_log_and_clear_exception_ns();
 
2958
                        }
 
2959
                }
 
2960
        }
 
2961
 
 
2962
        if (row_ptr->tr_rec_type_1 == XT_TAB_STATUS_EXT_DLOG || row_ptr->tr_rec_type_1 == XT_TAB_STATUS_EXT_CLEAN)
 
2963
                tab_free_ext_record_on_fail(ot, rec_id, (XTTabRecExtDPtr) row_ptr, log_err);
 
2964
 
 
2965
        rec_info.ri_fix_rec_buf = (XTTabRecFixDPtr) ot->ot_row_wbuffer;
 
2966
        rec_info.ri_rec_buf_size = offsetof(XTTabRecFixDRec, rf_data);
 
2967
        rec_info.ri_ext_rec = NULL;
 
2968
        rec_info.ri_fix_rec_buf->tr_rec_type_1 = XT_TAB_STATUS_DELETE;
 
2969
        rec_info.ri_fix_rec_buf->tr_stat_id_1 = 0;
 
2970
        XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_row_id_4, row_id);
 
2971
        XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_prev_rec_id_4, rec_id);
 
2972
        XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_xact_id_4, ot->ot_thread->st_xact_data->xd_start_xn_id);
 
2973
 
 
2974
        if (!tab_add_record(ot, &rec_info, XT_LOG_ENT_DELETE))
 
2975
                goto failed;
 
2976
 
 
2977
        if (!xt_tab_set_row(ot, XT_LOG_ENT_ROW_ADD_REC, row_id, rec_info.ri_rec_id))
 
2978
                goto failed;
 
2979
 
 
2980
        if (log_err)
 
2981
                tab_restore_exception(&e);
 
2982
        return;
 
2983
 
 
2984
        failed:
 
2985
        if (log_err)
 
2986
                xt_log_and_clear_exception_ns();
 
2987
        else
 
2988
                tab_restore_exception(&e);
2479
2989
}
2480
2990
 
2481
2991
/*
2483
2993
 * the given record have been rolled-back.
2484
2994
 * If any is committed, register a locked error, and return FAILED.
2485
2995
 */
2486
 
static xtBool tab_wait_for_rollback(XTOpenTablePtr ot, xtWord4 row_id, off_t commit_record)
 
2996
static xtBool tab_wait_for_rollback(XTOpenTablePtr ot, xtRowID row_id, xtRecordID commit_rec_id)
2487
2997
{
2488
2998
        register XTTableHPtr    tab = ot->ot_table;
2489
 
        off_t                                   variation;
 
2999
        xtRecordID                              var_rec_id;
2490
3000
        XTTabRecHeadDRec                var_head;
2491
 
        xtWord8                                 tn_id;
2492
 
        xtBool                                  wait;
 
3001
        xtXactID                                xn_id;
2493
3002
 
2494
3003
        retry:
2495
 
        if (!xt_tab_get_row(ot, row_id, &variation))
 
3004
        if (!xt_tab_get_row(ot, row_id, &var_rec_id))
2496
3005
                return FAILED;
2497
3006
 
2498
 
        while (variation != commit_record) {
2499
 
                if (!variation)
 
3007
        while (var_rec_id != commit_rec_id) {
 
3008
                if (!var_rec_id)
2500
3009
                        goto locked;
2501
 
                if (!xt_tab_get_record(ot, variation, sizeof(XTTabRecHeadDRec), (xtWord1 *) &var_head))
 
3010
                if (!xt_tab_get_rec_data(ot, var_rec_id, sizeof(XTTabRecHeadDRec), (xtWord1 *) &var_head))
2502
3011
                        return FAILED;
2503
3012
                if (XT_REC_IS_CLEAN(var_head.tr_rec_type_1))
2504
3013
                        goto locked;
2505
 
                if (!XT_TAB_IS_DELETED(var_head.tr_rec_type_1)) {
2506
 
                        tn_id = XT_GET_DISK_6(var_head.tr_xact_id_6);
2507
 
                        wait = FALSE;
2508
 
                        if (xt_xn_may_commit(ot, tn_id, variation, NULL, &wait)) {
2509
 
                                if (!wait)
2510
 
                                        goto locked;
 
3014
                if (XT_REC_IS_FREE(var_head.tr_rec_type_1))
 
3015
                        /* Should not happen: */
 
3016
                        goto locked;
 
3017
                xn_id = XT_GET_DISK_4(var_head.tr_xact_id_4);
 
3018
                switch (xt_xn_status(ot, xn_id, var_rec_id)) {
 
3019
                        case XT_XN_VISIBLE:
 
3020
                        case XT_XN_NOT_VISIBLE:
 
3021
                                goto locked;
 
3022
                        case XT_XN_ABORTED:
 
3023
                                /* Ingore the record, it will be removed. */
 
3024
                                break;
 
3025
                        case XT_XN_MY_UPDATE:
 
3026
                                /* Should not happen: */
 
3027
                                goto locked;
 
3028
                        case XT_XN_OTHER_UPDATE:
2511
3029
                                /* Wait for the transaction to commit or rollback: */
2512
 
                                xt_rwlock_unlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
2513
 
                                if (!xt_xn_wait_for_xact(ot->ot_thread, tn_id)) {
2514
 
                                        xt_rwlock_wrlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
 
3030
                                xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
 
3031
                                if (!xt_xn_wait_for_xact(ot->ot_thread, xn_id, FALSE)) {
 
3032
                                        xt_rwlock_wrlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
2515
3033
                                        return FAILED;
2516
3034
                                }
2517
 
                                xt_rwlock_wrlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
 
3035
                                xt_rwlock_wrlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
2518
3036
                                goto retry;
2519
 
                        }
2520
3037
                }
2521
 
                variation = XT_GET_DISK_6(var_head.tr_prev_var_6);
 
3038
                var_rec_id = XT_GET_DISK_4(var_head.tr_prev_rec_id_4);
2522
3039
        }
2523
3040
        return OK;
2524
3041
 
2527
3044
        return FAILED;
2528
3045
}
2529
3046
 
2530
 
/*
2531
 
 xt_tab_no_duplicate
2532
 
 XT_ERR --> XT_ERR
2533
 
 TRUE --> FALSE
2534
 
 FALSE --> XT_MAYBE
2535
 
 XT_DUP --> TRUE
2536
 
*/
2537
 
 
2538
 
/* Check if a record mat be visible:
 
3047
/* Check if a record may be visible:
2539
3048
 * Return TRUE of the record may be visible now.
2540
 
 * Return XT_MAYBE if the record may be visible in the future (set out_tn_id).
2541
 
 * Return FALSE of the record is not visible.
 
3049
 * Return XT_MAYBE if the record may be visible in the future (set out_xn_id).
 
3050
 * Return FALSE of the record is not valid (freed or is a delete record).
2542
3051
 * Return XT_ERR if an error occurred.
2543
3052
 */
2544
 
xtPublic int xt_tab_maybe_committed(XTOpenTablePtr ot, off_t record, xtWord8 *out_tn_id, xtWord4 *out_rowid, xtBool *out_updated)
 
3053
xtPublic int xt_tab_maybe_committed(XTOpenTablePtr ot, xtRecordID rec_id, xtXactID *out_xn_id, xtRowID *out_rowid, xtBool *out_updated)
2545
3054
{
2546
3055
        XTTabRecHeadDRec                rec_head;
2547
 
        xtWord8                                 rec_tn_id = 0;
2548
 
        xtWord8                                 wait_tn_id;
2549
 
        xtBool                                  wait;
2550
 
        xtWord4                                 row_id;
2551
 
        off_t                                   variation;
2552
 
        xtWord8                                 tn_id;
2553
 
#ifdef TRACE_VARIATIONS
 
3056
        xtXactID                                rec_xn_id = 0;
 
3057
        xtBool                                  wait = FALSE;
 
3058
        xtXactID                                wait_xn_id = 0;
 
3059
        xtRowID                                 row_id;
 
3060
        xtRecordID                              var_rec_id;
 
3061
        xtXactID                                xn_id;
 
3062
        register XTTableHPtr    tab;
 
3063
#ifdef TRACE_VARIATIONS_IN_DUP_CHECK
2554
3064
        char                                    t_buf[500];
2555
3065
        int                                             len;
 
3066
        char                                    *t_type = "C";
2556
3067
#endif
2557
3068
 
2558
 
        if (!xt_tab_get_record(ot, record, sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head))
 
3069
        if (!xt_tab_get_rec_data(ot, rec_id, sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head))
2559
3070
                return XT_ERR;
2560
3071
 
2561
 
        if (XT_REC_NOT_VISIBLE(rec_head.tr_rec_type_1))
 
3072
        if (XT_REC_NOT_VALID(rec_head.tr_rec_type_1))
2562
3073
                return FALSE;
2563
3074
 
2564
 
        wait = FALSE;
2565
3075
        if (!XT_REC_IS_CLEAN(rec_head.tr_rec_type_1)) {
2566
 
                rec_tn_id = XT_GET_DISK_6(rec_head.tr_xact_id_6);
2567
 
                if (!xt_xn_may_commit(ot, rec_tn_id, record, NULL, &wait))
2568
 
                        return FALSE;
2569
 
                if (wait)
2570
 
                        wait_tn_id = rec_tn_id;
 
3076
                rec_xn_id = XT_GET_DISK_4(rec_head.tr_xact_id_4);
 
3077
                switch (xt_xn_status(ot, rec_xn_id, rec_id)) {
 
3078
                        case XT_XN_VISIBLE:
 
3079
#ifdef TRACE_VARIATIONS_IN_DUP_CHECK
 
3080
                                t_type="V";
 
3081
#endif
 
3082
                                break;
 
3083
                        case XT_XN_NOT_VISIBLE:
 
3084
#ifdef TRACE_VARIATIONS_IN_DUP_CHECK
 
3085
                                t_type="NV";
 
3086
#endif
 
3087
                                break;
 
3088
                        case XT_XN_ABORTED:
 
3089
                                return FALSE;
 
3090
                        case XT_XN_MY_UPDATE:
 
3091
#ifdef TRACE_VARIATIONS_IN_DUP_CHECK
 
3092
                                t_type="My-Upd";
 
3093
#endif
 
3094
                                break;
 
3095
                        case XT_XN_OTHER_UPDATE:
 
3096
#ifdef TRACE_VARIATIONS_IN_DUP_CHECK
 
3097
                                t_type="Wait";
 
3098
#endif
 
3099
                                wait = TRUE;
 
3100
                                wait_xn_id = rec_xn_id;
 
3101
                                break;
 
3102
                }
2571
3103
        }
2572
3104
 
2573
3105
        /* Follow the variation chain until we come to this record.
2576
3108
         * variation chain, it is also not visible.
2577
3109
         */
2578
3110
        row_id = XT_GET_DISK_4(rec_head.tr_row_id_4);
2579
 
        //xt_rwlock_rdlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
2580
 
 
2581
 
        if (!(xt_tab_get_row(ot, row_id, &variation)))
 
3111
 
 
3112
        tab = ot->ot_table;
 
3113
        xt_rwlock_rdlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
 
3114
 
 
3115
        if (!(xt_tab_get_row(ot, row_id, &var_rec_id)))
2582
3116
                goto failed;
2583
 
#ifdef TRACE_VARIATIONS
2584
 
        //len = sprintf(t_buf, "dup row=%d", (int) row_id);
 
3117
#ifdef TRACE_VARIATIONS_IN_DUP_CHECK
 
3118
        len = sprintf(t_buf, "dup row=%d", (int) row_id);
2585
3119
#endif
2586
 
        while (variation != record) {
2587
 
                if (!variation)
 
3120
        while (var_rec_id != rec_id) {
 
3121
                if (!var_rec_id)
2588
3122
                        goto not_found;
2589
 
#ifdef TRACE_VARIATIONS
2590
 
                //if (len <= 450)
2591
 
                //      len += sprintf(t_buf+len, " -> %d", (int) variation);
 
3123
#ifdef TRACE_VARIATIONS_IN_DUP_CHECK
 
3124
                if (len <= 450)
 
3125
                        len += sprintf(t_buf+len, " -> %d", (int) var_rec_id);
2592
3126
#endif
2593
 
                if (!xt_tab_get_record(ot, variation, sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head))
 
3127
                if (!xt_tab_get_rec_data(ot, var_rec_id, sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head))
2594
3128
                        goto failed;
2595
3129
                /* All clean records are visible, by all transactions: */
2596
3130
                if (XT_REC_IS_CLEAN(rec_head.tr_rec_type_1))
2597
3131
                        goto not_found;
2598
 
                if (!XT_TAB_IS_DELETED(rec_head.tr_rec_type_1)) {
2599
 
                        xtBool var_wait = FALSE;
2600
 
 
2601
 
                        tn_id = XT_GET_DISK_6(rec_head.tr_xact_id_6);
2602
 
                        if (xt_xn_may_commit(ot, tn_id, variation, NULL, &var_wait)) {
2603
 
                                if (!var_wait)
2604
 
                                        goto not_found;
2605
 
                                /* See comment above.
2606
 
                                 */
 
3132
                if (XT_REC_IS_FREE(rec_head.tr_rec_type_1))
 
3133
                        /* Should not happen: */
 
3134
                        goto not_found;
 
3135
                xn_id = XT_GET_DISK_4(rec_head.tr_xact_id_4);
 
3136
                switch (xt_xn_status(ot, xn_id, var_rec_id)) {
 
3137
                        case XT_XN_VISIBLE:
 
3138
                        case XT_XN_NOT_VISIBLE:
 
3139
                                goto not_found;
 
3140
                        case XT_XN_ABORTED:
 
3141
                                /* Ingore the record, it will be removed. */
 
3142
#ifdef TRACE_VARIATIONS_IN_DUP_CHECK
 
3143
                                if (len <= 450)
 
3144
                                        len += sprintf(t_buf+len, "(T%d-A)", (int) xn_id);
 
3145
#endif
 
3146
                                break;
 
3147
                        case XT_XN_MY_UPDATE:
 
3148
                                goto not_found;
 
3149
                        case XT_XN_OTHER_UPDATE:
 
3150
#ifdef TRACE_VARIATIONS_IN_DUP_CHECK
 
3151
                                if (len <= 450)
 
3152
                                        len += sprintf(t_buf+len, "(T%d-wait)", (int) xn_id);
 
3153
#endif
 
3154
                                /* Wait for this update to commit or abort: */
2607
3155
                                if (!wait) {
2608
 
                                        wait_tn_id = tn_id;
2609
3156
                                        wait = TRUE;
 
3157
                                        wait_xn_id = xn_id;
2610
3158
                                }
2611
 
                        }
 
3159
                                break;
2612
3160
                }
2613
 
                variation = XT_GET_DISK_6(rec_head.tr_prev_var_6);
 
3161
                var_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
2614
3162
        }
2615
 
#ifdef TRACE_VARIATIONS
2616
 
        //if (len <= 450)
2617
 
        //      sprintf(t_buf+len, " -> %d\n", (int) variation);
2618
 
        //else
2619
 
        //      sprintf(t_buf+len, " ...\n", (int) variation);
 
3163
#ifdef TRACE_VARIATIONS_IN_DUP_CHECK
 
3164
        if (len <= 450)
 
3165
                sprintf(t_buf+len, " -> %d(T%d-%s)\n", (int) var_rec_id, (int) rec_xn_id, t_type);
 
3166
        else
 
3167
                sprintf(t_buf+len, " ...(T%d-%s)\n", (int) rec_xn_id, t_type);
2620
3168
#endif
2621
3169
 
2622
 
        //xt_rwlock_unlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
 
3170
        xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
2623
3171
        if (wait) {
2624
 
                *out_tn_id = wait_tn_id;
 
3172
                *out_xn_id = wait_xn_id;
2625
3173
                return XT_MAYBE;
2626
3174
        }
2627
 
#ifdef TRACE_VARIATIONS
2628
 
        //xt_trace("%s", t_buf);
 
3175
#ifdef TRACE_VARIATIONS_IN_DUP_CHECK
 
3176
        xt_trace("%s", t_buf);
2629
3177
#endif
2630
3178
        if (out_rowid) {
2631
3179
                *out_rowid = row_id;
2632
 
                *out_updated = (rec_tn_id == ot->ot_thread->st_xact_data->xd_start_id);
 
3180
                *out_updated = (rec_xn_id == ot->ot_thread->st_xact_data->xd_start_xn_id);
2633
3181
        }
2634
3182
        return TRUE;
2635
3183
 
2636
3184
        not_found:
2637
 
        //xt_rwlock_unlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
 
3185
        xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
2638
3186
        return FALSE;
2639
3187
 
2640
3188
        failed:
2641
 
        //xt_rwlock_unlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
 
3189
        xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
2642
3190
        return XT_ERR;
2643
3191
}
2644
3192
 
2647
3195
        register XTTableHPtr    tab = ot->ot_table;
2648
3196
        register XTThreadPtr    self = ot->ot_thread;
2649
3197
        XTTabRecInfoRec                 rec_info;
2650
 
        xtWord4                                 row_id;
 
3198
        xtRowID                                 row_id;
2651
3199
        u_int                                   idx_cnt = 0;
2652
3200
        XTIndexPtr                              *ind;
2653
3201
        void                                    *mybs_table;
2654
3202
 
2655
 
        if (!xt_xn_log_begin(ot))
2656
 
                return FAILED;
2657
 
 
2658
3203
        /* MyBS: Reference BLOBs!? */
2659
3204
        if (tab->tab_dic.dic_blob_count) {
2660
3205
                if (!myxt_use_blobs(ot, &mybs_table, rec_buf))
2670
3215
 
2671
3216
        rec_info.ri_fix_rec_buf->tr_stat_id_1 = self->st_update_id;
2672
3217
        XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_row_id_4, row_id);
2673
 
        XT_SET_DISK_6(rec_info.ri_fix_rec_buf->tr_prev_var_6, 0);
2674
 
        XT_SET_DISK_6(rec_info.ri_fix_rec_buf->tr_xact_id_6, self->st_xact_data->xd_start_id);
2675
 
 
2676
 
        if (!tab_add_record(ot, &rec_info))
2677
 
                goto failed_1;
2678
 
 
2679
 
        if (!xt_tab_set_row(ot, row_id, rec_info.ri_rec_address, FALSE))
2680
 
                goto failed_2;
2681
 
        XT_DISABLED_TRACE(("set new tx=%d row=%d rec=%d\n", (int) self->st_xact_data->xd_start_id, (int) row_id, (int) rec_info.ri_rec_address));
 
3218
        XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_prev_rec_id_4, 0);
 
3219
        XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_xact_id_4, self->st_xact_data->xd_start_xn_id);
 
3220
 
 
3221
        /* Note, it is important that this record is written BEFORE the row
 
3222
         * due to the problem distributed here [(5)]
 
3223
         */
 
3224
        if (!tab_add_record(ot, &rec_info, XT_LOG_ENT_INSERT))
 
3225
                goto failed_1;
 
3226
 
 
3227
#ifdef TRACE_VARIATIONS
 
3228
        xt_trace("%s insert: row=%d rec=%d T%d\n", self->t_name, (int) row_id, (int) rec_info.ri_rec_id, (int) self->st_xact_data->xd_start_xn_id);
 
3229
#endif
 
3230
        if (!xt_tab_set_row(ot, XT_LOG_ENT_ROW_ADD_REC, row_id, rec_info.ri_rec_id))
 
3231
                goto failed_1;
 
3232
        XT_DISABLED_TRACE(("set new tx=%d row=%d rec=%d\n", (int) self->st_xact_data->xd_start_xn_id, (int) row_id, (int) rec_info.ri_rec_id));
2682
3233
 
2683
3234
        /* Add the index references: */
2684
3235
        for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) {
2685
 
                if (!xt_idx_insert(ot, *ind, rec_info.ri_rec_address, rec_buf, NULL)) {
 
3236
                if (!xt_idx_insert(ot, *ind, rec_info.ri_rec_id, rec_buf, NULL, FALSE)) {
2686
3237
                        ot->ot_err_index_no = (*ind)->mi_index_no;
2687
3238
                        goto failed_2;
2688
3239
                }
2689
3240
        }
2690
3241
 
2691
 
        /* Log this change: */
2692
 
        if (!xt_xn_log_update(ot, rec_info.ri_rec_address, XT_XN_STATUS_INSERT, rec_info.ri_fix_rec_buf->tr_rec_type_1))
2693
 
                goto failed_2;
2694
 
 
2695
3242
        /* Reference the BLOBs in the row: */
2696
3243
        if (tab->tab_dic.dic_blob_count) {
2697
 
                if (!myxt_retain_blobs(ot, mybs_table, rec_info.ri_rec_address)) {
 
3244
                if (!myxt_retain_blobs(ot, mybs_table, rec_info.ri_rec_id)) {
2698
3245
                        mybs_table = NULL;
2699
3246
                        goto failed_2;
2700
3247
                }
2704
3251
        /* Do the foreign key stuff: */
2705
3252
        if (ot->ot_table->tab_dic.dic_table->dt_fkeys.size() > 0) {
2706
3253
                if (!ot->ot_table->tab_dic.dic_table->insertRow(ot, rec_buf))
2707
 
                        return FAILED;
 
3254
                        goto failed_2;
2708
3255
        }
2709
3256
 
2710
 
        return OK;
 
3257
        return OK;      
2711
3258
 
2712
3259
        failed_2:
2713
 
        tab_remove_record_on_fail(ot, rec_info.ri_rec_address, (XTTabRecHeadDPtr) rec_info.ri_fix_rec_buf, rec_buf, idx_cnt);
 
3260
        /* Once the row has been inserted, it is to late to remove it!
 
3261
         * Now all we can do is delete it!
 
3262
         */
 
3263
        tab_delete_record_on_fail(ot, row_id, rec_info.ri_rec_id, (XTTabRecHeadDPtr) rec_info.ri_fix_rec_buf, rec_buf, idx_cnt);
 
3264
        goto failed_0;
2714
3265
 
2715
3266
        failed_1:
2716
3267
        tab_free_row_on_fail(ot, tab, row_id);
2721
3272
        return FAILED;
2722
3273
}
2723
3274
 
 
3275
/* We cannot remove a change we have made to a row while a transaction
 
3276
 * is running, so we have to undo what we have done by
 
3277
 * overwriting the record we just created with
 
3278
 * the before image!
 
3279
 */
 
3280
static xtBool tab_overwrite_record_on_fail(XTOpenTablePtr ot, XTTabRecInfoPtr rec_info, xtWord1 *before_buf, xtWord1 *after_buf, u_int idx_cnt)
 
3281
{
 
3282
        register XTTableHPtr    tab = ot->ot_table;
 
3283
        XTTabRecHeadDRec                prev_rec_head;
 
3284
        u_int                                   i;
 
3285
        XTIndexPtr                              *ind;
 
3286
        XTThreadPtr                             thread = ot->ot_thread;
 
3287
        xtLogID                                 log_id;
 
3288
        xtLogOffset                             log_offset;
 
3289
        xtRecordID                              rec_id = rec_info->ri_rec_id;
 
3290
 
 
3291
        /* Remove the new extended record: */
 
3292
        if (rec_info->ri_ext_rec)
 
3293
                tab_free_ext_record_on_fail(ot, rec_id, (XTTabRecExtDPtr) rec_info->ri_fix_rec_buf, TRUE);
 
3294
 
 
3295
        /* Undo index entries of the new record: */
 
3296
        if (after_buf) {
 
3297
                for (i=0, ind=tab->tab_dic.dic_keys; i<idx_cnt; i++, ind++) {
 
3298
                        if (!xt_idx_delete(ot, *ind, rec_id, after_buf))
 
3299
                                return FAILED;
 
3300
                }
 
3301
        }
 
3302
 
 
3303
        memcpy(&prev_rec_head, rec_info->ri_fix_rec_buf, sizeof(XTTabRecHeadDRec));
 
3304
 
 
3305
        /* Restore the previous record! */
 
3306
        if (!myxt_store_row(ot, rec_info, (char *) before_buf))
 
3307
                return FAILED;
 
3308
 
 
3309
        memcpy(rec_info->ri_fix_rec_buf, &prev_rec_head, sizeof(XTTabRecHeadDRec));
 
3310
 
 
3311
        if (rec_info->ri_ext_rec) {
 
3312
                /* Determine where the overflow will go... */
 
3313
                if (!thread->st_dlog_buf.dlb_get_log_offset(&log_id, &log_offset, rec_info->ri_log_data_size + offsetof(XTactExtRecEntryDRec, er_data)))
 
3314
                        return FAILED;
 
3315
                XT_SET_LOG_REF(rec_info->ri_ext_rec, log_id, log_offset);
 
3316
        }
 
3317
 
 
3318
        if (!xt_tab_put_log_op_rec_data(ot, XT_LOG_ENT_REC_MODIFIED, 0, rec_id, rec_info->ri_rec_buf_size, (xtWord1 *) rec_info->ri_fix_rec_buf))
 
3319
                return FAILED;
 
3320
 
 
3321
        if (rec_info->ri_ext_rec) {
 
3322
                /* Write the log buffer overflow: */            
 
3323
                rec_info->ri_log_buf->er_status_1 = XT_LOG_ENT_EXT_REC_OK;
 
3324
                XT_SET_DISK_4(rec_info->ri_log_buf->er_data_size_4, rec_info->ri_log_data_size);
 
3325
                XT_SET_DISK_4(rec_info->ri_log_buf->er_tab_id_4, tab->tab_id);
 
3326
                XT_SET_DISK_4(rec_info->ri_log_buf->er_rec_id_4, rec_id);
 
3327
                if (!thread->st_dlog_buf.dlb_append_log(log_id, log_offset, offsetof(XTactExtRecEntryDRec, er_data) + rec_info->ri_log_data_size, (xtWord1 *) rec_info->ri_log_buf))
 
3328
                        return FAILED;
 
3329
        }
 
3330
 
 
3331
        /* Put the index entries back: */
 
3332
        for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) {
 
3333
                if (!xt_idx_insert(ot, *ind, rec_id, before_buf, after_buf, TRUE))
 
3334
                        /* Incomplete restore, there will be a rollback... */
 
3335
                        return FAILED;
 
3336
        }
 
3337
 
 
3338
        return OK;
 
3339
}
 
3340
 
2724
3341
/*
2725
3342
 * GOTCHA:
2726
3343
 * If a transaction updates the same record over again, we should update
2727
 
 * in place. This prevents producting unnecessary variations!
 
3344
 * in place. This prevents producing unnecessary variations!
2728
3345
 */
2729
3346
static xtBool tab_overwrite_record(XTOpenTablePtr ot, xtWord1 *before_buf, xtWord1 *after_buf)
2730
3347
{
2731
3348
        register XTTableHPtr    tab = ot->ot_table;
2732
 
        xtWord4                                 row_id = ot->ot_curr_row_id;
 
3349
        xtRowID                                 row_id = ot->ot_curr_row_id;
2733
3350
        register XTThreadPtr    self = ot->ot_thread;
2734
 
        off_t                                   rec_address = ot->ot_curr_rec;
2735
 
        xtWord1                                 rec_head[offsetof(XTTabRecExtDRec, re_data)];
 
3351
        xtRecordID                              rec_id = ot->ot_curr_rec_id;
 
3352
        XTTabRecExtDRec                 prev_rec_head;
2736
3353
        XTTabRecInfoRec                 rec_info;
2737
3354
        u_int                                   idx_cnt = 0, i;
2738
3355
        XTIndexPtr                              *ind;
2739
 
        xtWord4                                 log_id;
2740
 
        off_t                                   log_offset;
 
3356
        xtLogID                                 log_id;
 
3357
        xtLogOffset                             log_offset;
2741
3358
        void                                    *mybs_table;
 
3359
        xtBool                                  prev_ext_rec;
2742
3360
 
2743
3361
        if (tab->tab_dic.dic_blob_count) {
2744
3362
                if (!myxt_use_blobs(ot, &mybs_table, after_buf))
2749
3367
                goto failed_0;
2750
3368
 
2751
3369
        /* Read before we overwrite! */
 
3370
        if (!xt_tab_get_rec_data(ot, rec_id, XT_REC_EXT_HEADER_SIZE, (xtWord1 *) &prev_rec_head))
 
3371
                goto failed_0;
 
3372
 
 
3373
        prev_ext_rec = prev_rec_head.tr_rec_type_1 & XT_TAB_STATUS_EXT_DLOG;
 
3374
 
2752
3375
        if (rec_info.ri_ext_rec) {
2753
3376
                /* Determine where the overflow will go... */
2754
 
                if (!xt_tab_get_record(ot, rec_address, offsetof(XTTabRecExtDRec, re_data), rec_head))
2755
 
                        goto failed_0;
2756
 
 
2757
 
                if (!xt_dl_get_log_offset(ot, &log_id, &log_offset, rec_info.ri_log_data_size + XT_LOG_REC_HEADER_SIZE))
2758
 
                        goto failed_0;
2759
 
                XT_SET_LOG_REF_6(rec_info.ri_ext_rec->re_log_rec_6, log_id, log_offset);
2760
 
        }
2761
 
        else {
2762
 
                if (!xt_tab_get_record(ot, rec_address, sizeof(XTTabRecHeadDRec), rec_head))
2763
 
                        goto failed_0;
 
3377
                if (!self->st_dlog_buf.dlb_get_log_offset(&log_id, &log_offset, offsetof(XTactExtRecEntryDRec, er_data) + rec_info.ri_log_data_size))
 
3378
                        goto failed_0;
 
3379
                XT_SET_LOG_REF(rec_info.ri_ext_rec, log_id, log_offset);
2764
3380
        }
2765
3381
 
2766
3382
        rec_info.ri_fix_rec_buf->tr_stat_id_1 = self->st_update_id;
2767
3383
        XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_row_id_4, row_id);
2768
 
        XT_COPY_DISK_6(rec_info.ri_fix_rec_buf->tr_prev_var_6, ((XTTabRecHeadDPtr) rec_head)->tr_prev_var_6);
2769
 
        XT_SET_DISK_6(rec_info.ri_fix_rec_buf->tr_xact_id_6, self->st_xact_data->xd_start_id);
 
3384
        XT_COPY_DISK_4(rec_info.ri_fix_rec_buf->tr_prev_rec_id_4, prev_rec_head.tr_prev_rec_id_4);
 
3385
        XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_xact_id_4, self->st_xact_data->xd_start_xn_id);
2770
3386
 
2771
3387
        /* Remove the index references, that have changed: */
2772
3388
        for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) {
2773
 
                if (!xt_idx_delete(ot, *ind, rec_address, before_buf)) {
 
3389
                if (!xt_idx_delete(ot, *ind, rec_id, before_buf)) {
2774
3390
                        goto failed_0;
2775
3391
                }
2776
3392
        }
2777
3393
 
 
3394
#ifdef TRACE_VARIATIONS
 
3395
        xt_trace("%s overwrite: row=%d rec=%d T%d\n", self->t_name, (int) row_id, (int) rec_id, (int) self->st_xact_data->xd_start_xn_id);
 
3396
#endif
2778
3397
        /* Overwrite the record: */
2779
 
        if (!xt_tab_put_data(ot, rec_address, rec_info.ri_rec_buf_size, (xtWord1 *) rec_info.ri_fix_rec_buf))
2780
 
                goto undo_0;
 
3398
        if (!xt_tab_put_log_op_rec_data(ot, XT_LOG_ENT_REC_MODIFIED, 0, rec_id, rec_info.ri_rec_buf_size, (xtWord1 *) rec_info.ri_fix_rec_buf))
 
3399
                goto failed_0;
2781
3400
 
2782
3401
        if (rec_info.ri_ext_rec) {
2783
3402
                /* Write the log buffer overflow: */            
2784
 
                rec_info.ri_log_buf->lb_status_1 = XT_DL_STATUS_RECORD;
2785
 
                XT_SET_DISK_4(rec_info.ri_log_buf->lb_data_size_4, rec_info.ri_log_data_size);
2786
 
                XT_SET_DISK_6(rec_info.ri_log_buf->lb_record_6, rec_address);
2787
 
                if (!xt_dl_append_log(ot, log_id, log_offset, rec_info.ri_log_data_size + XT_LOG_REC_HEADER_SIZE, rec_info.ri_log_buf))
2788
 
                        goto undo_1;
 
3403
                rec_info.ri_log_buf->er_status_1 = XT_LOG_ENT_EXT_REC_OK;
 
3404
                XT_SET_DISK_4(rec_info.ri_log_buf->er_data_size_4, rec_info.ri_log_data_size);
 
3405
                XT_SET_DISK_4(rec_info.ri_log_buf->er_tab_id_4, tab->tab_id);
 
3406
                XT_SET_DISK_4(rec_info.ri_log_buf->er_rec_id_4, rec_id);
 
3407
                if (!self->st_dlog_buf.dlb_append_log(log_id, log_offset, offsetof(XTactExtRecEntryDRec, er_data) + rec_info.ri_log_data_size, (xtWord1 *) rec_info.ri_log_buf))
 
3408
                        goto failed_1;
2789
3409
        }
2790
3410
 
2791
3411
        /* Add the index references that have changed: */
2792
3412
        for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) {
2793
 
                if (!xt_idx_insert(ot, *ind, rec_address, after_buf, before_buf)) {
 
3413
                if (!xt_idx_insert(ot, *ind, rec_id, after_buf, before_buf, FALSE)) {
2794
3414
                        ot->ot_err_index_no = (*ind)->mi_index_no;
2795
 
                        goto undo_2;
2796
 
                }
2797
 
        }
2798
 
 
2799
 
        /* Reference the BLOBs in the row: */
2800
 
        if (tab->tab_dic.dic_blob_count) {
2801
 
                if (!myxt_retain_blobs(ot, mybs_table, rec_address)) {
2802
 
                        mybs_table = NULL;
2803
 
                        goto undo_2;
2804
 
                }
2805
 
                myxt_release_blobs(ot, before_buf, rec_address);
 
3415
                        goto failed_2;
 
3416
                }
2806
3417
        }
2807
3418
 
2808
3419
        /* Do the foreign key stuff: */
2809
3420
        if (ot->ot_table->tab_dic.dic_table->dt_trefs || ot->ot_table->tab_dic.dic_table->dt_fkeys.size() > 0) {
2810
3421
                if (!ot->ot_table->tab_dic.dic_table->updateRow(ot, before_buf, after_buf))
2811
 
                        goto undo_2;
 
3422
                        goto failed_2;
2812
3423
        }
2813
3424
        
2814
3425
        /* Delete the previous overflow area: */
 
3426
        if (prev_ext_rec)
 
3427
                tab_free_ext_record_on_fail(ot, rec_id, &prev_rec_head, TRUE);
 
3428
 
 
3429
        if (tab->tab_dic.dic_blob_count) {
 
3430
                /* Retain the BLOBs new record: */
 
3431
                if (!myxt_retain_blobs(ot, mybs_table, rec_id))
 
3432
                        return FAILED;
 
3433
                /* Release the BLOBs in the old record: */
 
3434
                myxt_release_blobs(ot, before_buf, rec_id);
 
3435
        }
 
3436
 
 
3437
        return OK;
 
3438
 
 
3439
        failed_2:
 
3440
        /* Remove the new extended record: */
2815
3441
        if (rec_info.ri_ext_rec)
2816
 
                tab_free_extended_record(ot, rec_address, (XTTabRecExtDPtr) rec_head);
2817
 
 
2818
 
        return OK;
2819
 
 
2820
 
        undo_2:
 
3442
                tab_free_ext_record_on_fail(ot, rec_id, (XTTabRecExtDPtr) rec_info.ri_fix_rec_buf, TRUE);
 
3443
 
 
3444
        /* Restore the previous record! */
2821
3445
        /* Undo index entries: */
2822
3446
        for (i=0, ind=tab->tab_dic.dic_keys; i<idx_cnt; i++, ind++) {
2823
 
                if (!xt_idx_delete(ot, *ind, rec_address, after_buf))
2824
 
                        xt_log_and_clear_exception_ns();
 
3447
                if (!xt_idx_delete(ot, *ind, rec_id, after_buf))
 
3448
                        goto failed_1;
2825
3449
        }
2826
3450
 
2827
 
        if (rec_info.ri_ext_rec)
2828
 
                tab_free_extended_record(ot, rec_address, (XTTabRecExtDPtr) rec_info.ri_fix_rec_buf);
2829
 
 
2830
 
        undo_1:
2831
3451
        /* Restore the record: */
2832
3452
        if (!myxt_store_row(ot, &rec_info, (char *) before_buf))
2833
 
                return FAILED;
 
3453
                goto failed_1;
2834
3454
 
2835
3455
        if (rec_info.ri_ext_rec)
2836
 
                memcpy(rec_info.ri_fix_rec_buf, rec_head, offsetof(XTTabRecExtDRec, re_data));
 
3456
                memcpy(rec_info.ri_fix_rec_buf, &prev_rec_head, XT_REC_EXT_HEADER_SIZE);
2837
3457
        else
2838
 
                memcpy(rec_info.ri_fix_rec_buf, rec_head, sizeof(XTTabRecHeadDRec));
 
3458
                memcpy(rec_info.ri_fix_rec_buf, &prev_rec_head, sizeof(XTTabRecHeadDRec));
2839
3459
 
2840
 
        if (!xt_tab_put_data(ot, rec_address, rec_info.ri_rec_buf_size, (xtWord1 *) rec_info.ri_fix_rec_buf))
2841
 
                return FAILED;
 
3460
        if (!xt_tab_put_log_op_rec_data(ot, XT_LOG_ENT_REC_MODIFIED, 0, rec_id, rec_info.ri_rec_buf_size, (xtWord1 *) rec_info.ri_fix_rec_buf))
 
3461
                goto failed_1;
2842
3462
 
2843
3463
        /* Put the index entries back: */
2844
 
        undo_0:
2845
3464
        for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) {
2846
 
                if (!xt_idx_insert(ot, *ind, rec_address, before_buf, after_buf))
2847
 
                        return FAILED;
 
3465
                if (!xt_idx_insert(ot, *ind, rec_id, before_buf, after_buf, TRUE))
 
3466
                        /* Incomplete restore, there will be a rollback... */
 
3467
                        goto failed_0;
2848
3468
        }
2849
3469
 
 
3470
        /* The previous record has now been restored. */
 
3471
        goto failed_0;
 
3472
 
 
3473
        failed_1:
 
3474
        /* The old record is overwritten, I must free the previous extended record: */
 
3475
        if (prev_ext_rec)
 
3476
                tab_free_ext_record_on_fail(ot, rec_id, &prev_rec_head, TRUE);
 
3477
 
2850
3478
        failed_0:
 
3479
        /* Unuse the BLOBs of the new record: */
2851
3480
        if (tab->tab_dic.dic_blob_count && mybs_table)
2852
3481
                myxt_unuse_blobs(ot, mybs_table);
2853
3482
        return FAILED;
2856
3485
xtPublic xtBool xt_tab_update_record(XTOpenTablePtr ot, xtWord1 *before_buf, xtWord1 *after_buf)
2857
3486
{
2858
3487
        register XTTableHPtr    tab;
2859
 
        xtWord4                                 row_id;
 
3488
        xtRowID                                 row_id;
2860
3489
        register XTThreadPtr    self;
2861
 
        off_t                                   curr_variation;
 
3490
        xtRecordID                              curr_var_rec_id;
2862
3491
        XTTabRecInfoRec                 rec_info;
2863
3492
        u_int                                   idx_cnt = 0;
2864
3493
        XTIndexPtr                              *ind;
2874
3503
        row_id = ot->ot_curr_row_id;
2875
3504
        self = ot->ot_thread;
2876
3505
 
2877
 
        if (!xt_xn_log_begin(ot))
2878
 
                return FAILED;
2879
 
 
2880
3506
        /* MyBS: Reference BLOBs!? */
2881
3507
        if (tab->tab_dic.dic_blob_count) {
2882
3508
                if (!myxt_use_blobs(ot, &mybs_table, after_buf))
2888
3514
 
2889
3515
        rec_info.ri_fix_rec_buf->tr_stat_id_1 = self->st_update_id;
2890
3516
        XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_row_id_4, row_id);
2891
 
        XT_SET_DISK_6(rec_info.ri_fix_rec_buf->tr_prev_var_6, ot->ot_curr_rec);
2892
 
        XT_SET_DISK_6(rec_info.ri_fix_rec_buf->tr_xact_id_6, self->st_xact_data->xd_start_id);
 
3517
        XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_prev_rec_id_4, ot->ot_curr_rec_id);
 
3518
        XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_xact_id_4, self->st_xact_data->xd_start_xn_id);
2893
3519
 
2894
3520
        /* Create the new record: */
2895
 
        if (!tab_add_record(ot, &rec_info))
 
3521
        if (!tab_add_record(ot, &rec_info, XT_LOG_ENT_UPDATE))
2896
3522
                goto failed_0;
2897
3523
 
2898
3524
        /* Link the new variation into the list: */
2899
 
        xt_rwlock_wrlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
 
3525
        xt_rwlock_wrlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
2900
3526
 
2901
 
        if (!xt_tab_get_row(ot, row_id, &curr_variation))
 
3527
        if (!xt_tab_get_row(ot, row_id, &curr_var_rec_id))
2902
3528
                goto failed_1;
2903
3529
 
2904
 
        if (curr_variation != ot->ot_curr_rec) {
2905
 
                if (!tab_wait_for_rollback(ot, row_id, ot->ot_curr_rec))
2906
 
                        goto failed_1;          
 
3530
        if (curr_var_rec_id != ot->ot_curr_rec_id) {
 
3531
                /* If the transaction does not rollback, I will get an
 
3532
                 * exception here:
 
3533
                 */
 
3534
                if (!tab_wait_for_rollback(ot, row_id, ot->ot_curr_rec_id))
 
3535
                        goto failed_1;
 
3536
                /* [(4)] This is the situation when we overwrite the
 
3537
                 * reference to curr_var_rec_id!
 
3538
                 * When curr_var_rec_id is cleaned up by the sweeper, the
 
3539
                 * sweeper will notice that the record is no longer in
 
3540
                 * the row list.
 
3541
                 */
2907
3542
        }
2908
3543
 
2909
 
        if (!xt_tab_set_row(ot, row_id, rec_info.ri_rec_address, FALSE))
 
3544
#ifdef TRACE_VARIATIONS
 
3545
        xt_trace("%s update: row=%d rec=%d T%d\n", self->t_name, (int) row_id, (int) rec_info.ri_rec_id, (int) self->st_xact_data->xd_start_xn_id);
 
3546
#endif
 
3547
        if (!xt_tab_set_row(ot, XT_LOG_ENT_ROW_ADD_REC, row_id, rec_info.ri_rec_id))
2910
3548
                goto failed_1;
2911
 
        XT_DISABLED_TRACE(("set upd tx=%d row=%d rec=%d\n", (int) self->st_xact_data->xd_start_id, (int) row_id, (int) rec_info.ri_rec_address));
 
3549
        XT_DISABLED_TRACE(("set upd tx=%d row=%d rec=%d\n", (int) self->st_xact_data->xd_start_xn_id, (int) row_id, (int) rec_info.ri_rec_id));
2912
3550
 
2913
 
        xt_rwlock_unlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
 
3551
        xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
2914
3552
 
2915
3553
        /* Add the index references: */
2916
3554
        for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) {
2917
 
                if (!xt_idx_insert(ot, *ind, rec_info.ri_rec_address, after_buf, before_buf)) {
 
3555
                if (!xt_idx_insert(ot, *ind, rec_info.ri_rec_id, after_buf, before_buf, FALSE)) {
2918
3556
                        ot->ot_err_index_no = (*ind)->mi_index_no;
2919
3557
                        goto failed_2;
2920
3558
                }
2921
3559
        }
2922
3560
 
2923
 
        /* Log this change: */
2924
 
        if (!xt_xn_log_update(ot, rec_info.ri_rec_address, XT_XN_STATUS_UPDATE, rec_info.ri_fix_rec_buf->tr_rec_type_1))
2925
 
                goto failed_2;
2926
 
 
2927
3561
        /* Reference the BLOBs in the row: */
2928
3562
        if (tab->tab_dic.dic_blob_count) {
2929
 
                if (!myxt_retain_blobs(ot, mybs_table, rec_info.ri_rec_address)) {
 
3563
                if (!myxt_retain_blobs(ot, mybs_table, rec_info.ri_rec_id)) {
2930
3564
                        mybs_table = NULL;
2931
3565
                        goto failed_2;
2932
3566
                }
2935
3569
 
2936
3570
        if (ot->ot_table->tab_dic.dic_table->dt_trefs || ot->ot_table->tab_dic.dic_table->dt_fkeys.size() > 0) {
2937
3571
                if (!ot->ot_table->tab_dic.dic_table->updateRow(ot, before_buf, after_buf))
2938
 
                        return FAILED;
 
3572
                        goto failed_2;
2939
3573
        }
2940
3574
 
2941
3575
        return OK;
2942
3576
 
2943
3577
        failed_2:
2944
 
        xt_rwlock_wrlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
2945
 
        tab_set_row_on_fail(ot, row_id, ot->ot_curr_rec, FALSE);
 
3578
        tab_overwrite_record_on_fail(ot, &rec_info, before_buf, after_buf, idx_cnt);
 
3579
        goto failed_0;
2946
3580
 
2947
3581
        failed_1:
2948
 
        xt_rwlock_unlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
2949
 
 
2950
 
        tab_remove_record_on_fail(ot, rec_info.ri_rec_address, (XTTabRecHeadDPtr) rec_info.ri_fix_rec_buf, after_buf, idx_cnt);
 
3582
        xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
2951
3583
 
2952
3584
        failed_0:
2953
3585
        if (tab->tab_dic.dic_blob_count && mybs_table)
2958
3590
xtPublic xtBool xt_tab_delete_record(XTOpenTablePtr ot, xtWord1 *rec_buf)
2959
3591
{
2960
3592
        register XTTableHPtr    tab = ot->ot_table;
2961
 
        xtWord4                                 row_id = ot->ot_curr_row_id;
2962
 
        off_t                                   curr_variation;
 
3593
        xtRowID                                 row_id = ot->ot_curr_row_id;
 
3594
        xtRecordID                              curr_var_rec_id;
2963
3595
        XTTabRecInfoRec                 rec_info;
2964
3596
 
2965
 
        if (!xt_xn_log_begin(ot))
2966
 
                return FAILED;
2967
 
 
2968
3597
        /* Setup a delete record: */
2969
3598
        rec_info.ri_fix_rec_buf = (XTTabRecFixDPtr) ot->ot_row_wbuffer;
2970
3599
        rec_info.ri_rec_buf_size = offsetof(XTTabRecFixDRec, rf_data);
2972
3601
        rec_info.ri_fix_rec_buf->tr_rec_type_1 = XT_TAB_STATUS_DELETE;
2973
3602
        rec_info.ri_fix_rec_buf->tr_stat_id_1 = 0;
2974
3603
        XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_row_id_4, row_id);
2975
 
        XT_SET_DISK_6(rec_info.ri_fix_rec_buf->tr_prev_var_6, ot->ot_curr_rec);
2976
 
        XT_SET_DISK_6(rec_info.ri_fix_rec_buf->tr_xact_id_6, ot->ot_thread->st_xact_data->xd_start_id);
 
3604
        XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_prev_rec_id_4, ot->ot_curr_rec_id);
 
3605
        XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_xact_id_4, ot->ot_thread->st_xact_data->xd_start_xn_id);
2977
3606
 
2978
 
        if (!tab_add_record(ot, &rec_info))
 
3607
        if (!tab_add_record(ot, &rec_info, XT_LOG_ENT_DELETE))
2979
3608
                return FAILED;
2980
3609
 
2981
 
        xt_rwlock_wrlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
 
3610
        xt_rwlock_wrlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
2982
3611
 
2983
 
        if (!xt_tab_get_row(ot, row_id, &curr_variation))
 
3612
        if (!xt_tab_get_row(ot, row_id, &curr_var_rec_id))
2984
3613
                goto failed_1;
2985
3614
 
2986
 
        if (curr_variation != ot->ot_curr_rec) {
2987
 
                if (!tab_wait_for_rollback(ot, row_id, ot->ot_curr_rec))
 
3615
        if (curr_var_rec_id != ot->ot_curr_rec_id) {
 
3616
                if (!tab_wait_for_rollback(ot, row_id, ot->ot_curr_rec_id))
2988
3617
                        goto failed_1;          
2989
3618
        }
2990
3619
 
2991
 
        if (!xt_tab_set_row(ot, row_id, rec_info.ri_rec_address, FALSE))
 
3620
#ifdef TRACE_VARIATIONS
 
3621
        xt_trace("%s update: row=%d rec=%d T%d\n", ot->ot_thread->t_name, (int) row_id, (int) rec_info.ri_rec_id, (int) ot->ot_thread->st_xact_data->xd_start_xn_id);
 
3622
#endif
 
3623
        if (!xt_tab_set_row(ot, XT_LOG_ENT_ROW_ADD_REC, row_id, rec_info.ri_rec_id))
2992
3624
                goto failed_1;
2993
 
        XT_DISABLED_TRACE(("del row tx=%d row=%d rec=%d\n", (int) ot->ot_thread->st_xact_data->xd_start_id, (int) row_id, (int) rec_info.ri_rec_address));
2994
 
 
2995
 
        xt_rwlock_unlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
2996
 
 
2997
 
        /* Log this change (after this point rollback will remove the record): */
2998
 
        if (!xt_xn_log_update(ot, rec_info.ri_rec_address, XT_XN_STATUS_DELETE, rec_info.ri_fix_rec_buf->tr_rec_type_1))
2999
 
                goto failed_2;
 
3625
        XT_DISABLED_TRACE(("del row tx=%d row=%d rec=%d\n", (int) ot->ot_thread->st_xact_data->xd_start_xn_id, (int) row_id, (int) rec_info.ri_rec_id));
 
3626
 
 
3627
        xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
3000
3628
 
3001
3629
        if (ot->ot_table->tab_dic.dic_table->dt_trefs) {
3002
3630
                if (!ot->ot_table->tab_dic.dic_table->deleteRow(ot, rec_buf))
3003
 
                        return FAILED;
 
3631
                        goto failed_2;
3004
3632
        }
3005
3633
 
3006
3634
        return OK;
3007
3635
 
3008
3636
        failed_2:
3009
 
        xt_rwlock_wrlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
3010
 
        tab_set_row_on_fail(ot, row_id, ot->ot_curr_rec, FALSE);
 
3637
        tab_overwrite_record_on_fail(ot, &rec_info, rec_buf, NULL, 0);
 
3638
        return FAILED;
3011
3639
 
3012
3640
        failed_1:
3013
 
        xt_rwlock_unlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
3014
 
 
3015
 
        tab_remove_record_on_fail(ot, rec_info.ri_rec_address, (XTTabRecHeadDPtr) rec_info.ri_fix_rec_buf, rec_buf, 0);
 
3641
        xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
3016
3642
        return FAILED;
3017
3643
}
3018
3644
 
3019
3645
xtPublic xtBool xt_tab_seq_init(XTOpenTablePtr ot)
3020
3646
{
3021
3647
        register XTTableHPtr tab = ot->ot_table;
3022
 
 
3023
 
        if (!ot->ot_thread->st_xact_data) {
3024
 
                /* MySQL ingores this error, so we
3025
 
                 * setup the sequential scan so that it will
3026
 
                 * deliver nothing!
3027
 
                 */
3028
 
                ot->ot_seq_pos = sizeof(XTTabDataHeadDRec);
3029
 
                ot->ot_seq_eof = ot->ot_seq_pos;
3030
 
                ot->ot_buf_pos = ot->ot_seq_pos + 1;
3031
 
                xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_TRANSACTION);
3032
 
                return FAILED;
3033
 
        }
3034
 
 
3035
 
        ot->ot_seq_pos = sizeof(XTTabDataHeadDRec);
3036
3648
        
 
3649
        ot->ot_seq_page = NULL;
 
3650
        ot->ot_seq_offset = 0;
 
3651
 
 
3652
        ot->ot_curr_rec_id = 0;                 // 0 is an invalid position!
 
3653
        ot->ot_curr_row_id = 0;                 // 0 is an invalid row ID!
 
3654
        ot->ot_curr_updated = FALSE;
 
3655
 
3037
3656
        /* We note the current EOF before we start a sequential scan.
3038
3657
         * It is basically possible to update the same record more than
3039
3658
         * once because an updated record creates a new record which
3041
3660
         * still to be scanned.
3042
3661
         *
3043
3662
         * By noting the EOF before we start a sequential scan we
3044
 
         * reduce this posibility of this.
 
3663
         * reduce the possibility of this.
3045
3664
         *
3046
 
         * However, the possibility still remains, however it should
 
3665
         * However, the possibility still remains, but it should
3047
3666
         * not be a problem because a record is not modified
3048
3667
         * if there is nothing to change, which is the case
3049
3668
         * if the record has already been changed!
 
3669
         *
 
3670
         * NOTE (2008-01-29) There is no longer a problem with updating a
 
3671
         * record twice because records are marked by an update.
 
3672
         *
 
3673
         * [(10)] I have changed this (see below). I now check the
 
3674
         * current EOF of the table.
 
3675
         *
 
3676
         * The reason is that committed read must be able to see the
 
3677
         * changes that occur during table table scan.   * 
3050
3678
         */
3051
 
        if (tab->tab_buf_offset) {
3052
 
                xt_rwlock_rdlock(&tab->tab_buf_rwlock);
3053
 
                ot->ot_seq_eof = tab->tab_data_eof + tab->tab_buf_offset;
3054
 
                xt_rwlock_unlock(&tab->tab_buf_rwlock);
 
3679
        ot->ot_seq_eof_id = tab->tab_rec_eof_id;
 
3680
 
 
3681
        if (!ot->ot_thread->st_xact_data) {
 
3682
                /* MySQL ignores this error, so we
 
3683
                 * setup the sequential scan so that it will
 
3684
                 * deliver nothing!
 
3685
                 */
 
3686
                ot->ot_seq_rec_id = ot->ot_seq_eof_id;
 
3687
                xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_TRANSACTION);
 
3688
                return FAILED;
3055
3689
        }
3056
 
        else
3057
 
                ot->ot_seq_eof = tab->tab_data_eof;
3058
 
        ot->ot_curr_rec = 0;                    // 0 is an invalid position!
3059
 
        ot->ot_curr_row_id = 0;                 // 0 is an invalid row ID!
3060
 
        ot->ot_curr_updated = FALSE;
3061
 
        /*
3062
 
         * This will make sure that the test below fails so that the
3063
 
         * execution jumps immendiately to load_buffer.
3064
 
         */
3065
 
        ot->ot_buf_pos = ot->ot_seq_pos + 1;
3066
 
        ot->ot_buf_len = 0;
 
3690
 
 
3691
        ot->ot_seq_rec_id = 1;
3067
3692
        
3068
3693
        return OK;
3069
3694
}
3070
3695
 
 
3696
xtPublic void xt_tab_seq_reset(XTOpenTablePtr ot)
 
3697
{
 
3698
        ot->ot_seq_rec_id = 0;
 
3699
        ot->ot_seq_eof_id = 0;
 
3700
        ot->ot_seq_page = NULL;
 
3701
        ot->ot_seq_offset = 0;
 
3702
}
 
3703
 
 
3704
xtPublic void xt_tab_seq_exit(XTOpenTablePtr ot)
 
3705
{
 
3706
        register XTTableHPtr    tab = ot->ot_table;
 
3707
 
 
3708
        if (ot->ot_seq_page) {
 
3709
                tab->tab_recs.xt_tc_unlock_page(ot->ot_seq_page);
 
3710
                ot->ot_seq_page = NULL;
 
3711
        }
 
3712
}
 
3713
 
3071
3714
xtPublic xtBool xt_tab_seq_next(XTOpenTablePtr ot, xtWord1 *buffer, xtBool *eof)
3072
3715
{
3073
3716
        register XTTableHPtr    tab = ot->ot_table;
3074
 
        register size_t                 size = tab->tab_dic.dic_rec_size;
3075
 
        size_t                                  boff, tfer;
3076
 
        xtBool                                  head_read = FALSE;
 
3717
        register size_t                 rec_size = tab->tab_dic.dic_rec_size;
3077
3718
        xtWord1                                 *buff_ptr;
3078
 
        u_int                                   block_type;
3079
 
 
3080
 
        if (ot->ot_seq_pos < ot->ot_buf_pos)
3081
 
                goto load_buffer;
3082
 
 
3083
 
        copy_buffer:
3084
 
        /* Start read position is on or after buffer position */
3085
 
        if (ot->ot_seq_pos + size <= ot->ot_buf_pos + ot->ot_buf_len) {
3086
 
                /* Read is completely in the buffer (this is the fast track): */
3087
 
                boff = (size_t) (ot->ot_seq_pos - ot->ot_buf_pos);
3088
 
                
3089
 
                // Check for deleted record:
3090
 
                if (!head_read) {
3091
 
                        ot->ot_curr_rec = ot->ot_seq_pos;
3092
 
                        ot->ot_seq_pos += size;
3093
 
 
3094
 
                        block_type = ot->ot_read_buf[boff];
3095
 
                        if (XT_REC_NOT_VISIBLE(block_type))
3096
 
                                goto copy_buffer;
3097
 
                        switch (tab_visible(ot, (XTTabRecHeadDPtr) (ot->ot_read_buf + boff))) {
3098
 
                                case FALSE:
3099
 
                                        goto copy_buffer;
3100
 
                                case XT_ERR:
3101
 
                                        return FAILED;
3102
 
                        }
3103
 
                        switch (block_type) {
3104
 
                                case XT_TAB_STATUS_FIXED:
3105
 
                                case XT_TAB_STATUS_FIX_CLEAN:
3106
 
                                        memcpy(buffer, ot->ot_read_buf + boff + XT_REC_FIX_HEADER_SIZE, size - XT_REC_FIX_HEADER_SIZE);
3107
 
                                        break;
3108
 
                                case XT_TAB_STATUS_VARIABLE:
3109
 
                                case XT_TAB_STATUS_VAR_CLEAN:
3110
 
                                        if (!myxt_load_row(ot, ot->ot_read_buf + boff + XT_REC_FIX_HEADER_SIZE, buffer))
3111
 
                                                return FAILED;
3112
 
                                        break;
3113
 
                                case XT_TAB_STATUS_EXTENDED:
3114
 
                                case XT_TAB_STATUS_EXT_CLEAN:
3115
 
                                        memcpy(ot->ot_row_rbuffer, ot->ot_read_buf + boff, size);
3116
 
                                        if (!tab_load_ext_data(ot, ot->ot_curr_rec, buffer))
3117
 
                                                return FAILED;
3118
 
                                        break;
3119
 
                                default:
3120
 
                                        /* Unknown record type? */
3121
 
                                        goto copy_buffer;
3122
 
                        }
3123
 
                }
3124
 
                else {
3125
 
                        ot->ot_seq_pos += size;
3126
 
                        memcpy(buff_ptr, ot->ot_read_buf + boff, size);
3127
 
                        switch (tab_visible(ot, (XTTabRecHeadDPtr) ot->ot_row_rbuffer)) {
3128
 
                                case FALSE:
3129
 
                                        goto copy_buffer;
3130
 
                                case XT_ERR:
3131
 
                                        return FAILED;
3132
 
                        }
3133
 
                        switch (block_type) {
3134
 
                                case XT_TAB_STATUS_FIXED:
3135
 
                                case XT_TAB_STATUS_FIX_CLEAN:
3136
 
                                        memcpy(buffer, ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE, tab->tab_dic.dic_rec_size - XT_REC_FIX_HEADER_SIZE);
3137
 
                                        break;
3138
 
                                case XT_TAB_STATUS_VARIABLE:
3139
 
                                case XT_TAB_STATUS_VAR_CLEAN:
3140
 
                                        if (!myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE, buffer))
3141
 
                                                return FAILED;
3142
 
                                        break;
3143
 
                                case XT_TAB_STATUS_EXTENDED:
3144
 
                                case XT_TAB_STATUS_EXT_CLEAN:
3145
 
                                        if (!tab_load_ext_data(ot, ot->ot_curr_rec, buffer))
3146
 
                                                return FAILED;
3147
 
                                        break;
3148
 
                        }
3149
 
                }
3150
 
 
3151
 
                *eof = FALSE;
 
3719
        xtRecordID                              new_rec_id;
 
3720
 
 
3721
        next_page:
 
3722
        if (!ot->ot_seq_page) {
 
3723
                if (!(ot->ot_seq_page = tab->tab_recs.xt_tc_lock_page(ot->ot_rec_file, ot->ot_seq_rec_id, &ot->ot_seq_offset)))
 
3724
                        return FAILED;
 
3725
        }
 
3726
 
 
3727
        next_record:
 
3728
        /* [(10)] The current EOF is used: */
 
3729
        if (ot->ot_seq_rec_id >= ot->ot_seq_eof_id) {
 
3730
                *eof = TRUE;
3152
3731
                return OK;
3153
3732
        }
3154
3733
 
3155
 
        if (ot->ot_seq_pos < ot->ot_buf_pos + ot->ot_buf_len) {
3156
 
                /* Partially in the buffer: */
3157
 
                boff = (size_t) (ot->ot_seq_pos - ot->ot_buf_pos);
3158
 
 
3159
 
                // Check for deleted record:
3160
 
                if (!head_read) {
3161
 
                        block_type = ot->ot_read_buf[boff];
3162
 
 
3163
 
                        /* Skip these records, they are deleted, or will be deleted. */
3164
 
                        if (XT_REC_NOT_VISIBLE(block_type)) {
3165
 
                                ot->ot_seq_pos += size;
3166
 
                                goto load_buffer;
3167
 
                        }
3168
 
 
3169
 
                        /* We have found a valid record. */
3170
 
                        head_read = TRUE;
3171
 
                        ot->ot_curr_rec = ot->ot_seq_pos;
 
3734
        if (ot->ot_seq_offset >= tab->tab_recs.tci_page_size) {
 
3735
                tab->tab_recs.xt_tc_unlock_page(ot->ot_seq_page);
 
3736
                ot->ot_seq_page = NULL;
 
3737
                goto next_page;
 
3738
        }
 
3739
 
 
3740
        buff_ptr = (xtWord1 *) &ot->ot_seq_page->tcp_data[ot->ot_seq_offset];
 
3741
 
 
3742
        /* This is the current record: */
 
3743
        ot->ot_curr_rec_id = ot->ot_seq_rec_id;
 
3744
        ot->ot_curr_row_id = 0;
 
3745
 
 
3746
        /* Move to the next record: */
 
3747
        ot->ot_seq_rec_id++;
 
3748
        ot->ot_seq_offset += rec_size;
 
3749
 
 
3750
        retry:
 
3751
        switch (tab_visible(ot, (XTTabRecHeadDPtr) buff_ptr, &new_rec_id)) {
 
3752
                case FALSE:
 
3753
                        goto next_record;
 
3754
                case XT_ERR:
 
3755
                        return FAILED;
 
3756
                case XT_DEL:
 
3757
                        /* Skip the records that are deleted, or will be deleted. */
 
3758
                        goto next_record;
 
3759
                case XT_NEW:
3172
3760
                        buff_ptr = ot->ot_row_rbuffer;
3173
 
                }
3174
 
 
3175
 
                tfer = (size_t) ((ot->ot_buf_pos + ot->ot_buf_len) - ot->ot_seq_pos);
3176
 
                memcpy(buff_ptr, ot->ot_read_buf + boff, tfer);
3177
 
                buff_ptr += tfer;
3178
 
                size -= tfer;
3179
 
                ot->ot_seq_pos += tfer;
3180
 
                
3181
 
                /* Continue to get the rest... */
 
3761
                        if (!xt_tab_get_rec_data(ot, new_rec_id, rec_size, ot->ot_row_rbuffer))
 
3762
                                return XT_ERR;
 
3763
                        ot->ot_curr_rec_id = new_rec_id;
 
3764
                        break;
 
3765
                case XT_RETRY:
 
3766
                        goto retry;
3182
3767
        }
3183
3768
 
3184
 
        load_buffer:
3185
 
 
3186
 
        /* The start read position is not in the buffer, read as much as we can */
3187
 
        ASSERT_NS(ot->ot_seq_pos <= tab->tab_data_eof + tab->tab_buf_offset);
3188
 
        
3189
 
        tfer = tab->tab_buf_size;
3190
 
        if (ot->ot_seq_pos + tfer > ot->ot_seq_eof) {
3191
 
                if (ot->ot_seq_pos >= ot->ot_seq_eof) {
3192
 
                        if (!ot->ot_thread->st_xact_data) {
3193
 
                                /* If MySQL ingores this error above, then
3194
 
                                 * we generate the error again here
3195
 
                                 * (I just want to avoid doing this on
3196
 
                                 * each xt_tab_seq_next() call)!
3197
 
                                 * [ Every little instruction is bad! ]
3198
 
                                 */
3199
 
                                xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_TRANSACTION);
 
3769
        switch (*buff_ptr) {
 
3770
                case XT_TAB_STATUS_FIXED:
 
3771
                case XT_TAB_STATUS_FIX_CLEAN:
 
3772
                        memcpy(buffer, buff_ptr + XT_REC_FIX_HEADER_SIZE, rec_size - XT_REC_FIX_HEADER_SIZE);
 
3773
                        break;
 
3774
                case XT_TAB_STATUS_VARIABLE:
 
3775
                case XT_TAB_STATUS_VAR_CLEAN:
 
3776
                        if (!myxt_load_row(ot, buff_ptr + XT_REC_FIX_HEADER_SIZE, buffer, ot->ot_cols_req))
3200
3777
                                return FAILED;
3201
 
                        }
 
3778
                        break;
 
3779
                case XT_TAB_STATUS_EXT_DLOG:
 
3780
                case XT_TAB_STATUS_EXT_CLEAN: {
 
3781
                        u_int cols_req = ot->ot_cols_req;
3202
3782
 
3203
 
                        *eof = TRUE;
3204
 
                        return OK;
 
3783
                        ASSERT_NS(cols_req);
 
3784
                        if (cols_req && cols_req <= tab->tab_dic.dic_fix_col_count) {
 
3785
                                if (!myxt_load_row(ot, buff_ptr + XT_REC_EXT_HEADER_SIZE, buffer, cols_req))
 
3786
                                        return FAILED;
 
3787
                        }
 
3788
                        else {
 
3789
                                if (buff_ptr != ot->ot_row_rbuffer)
 
3790
                                        memcpy(ot->ot_row_rbuffer, buff_ptr, rec_size);
 
3791
                                if (!xt_tab_load_ext_data(ot, ot->ot_curr_rec_id, buffer, cols_req))
 
3792
                                        return FAILED;
 
3793
                        }
 
3794
                        break;
3205
3795
                }
3206
 
                tfer = (size_t) (ot->ot_seq_eof - ot->ot_seq_pos);
3207
 
        }
3208
 
 
3209
 
        if (!xt_tab_get_data(ot, ot->ot_seq_pos, tfer, ot->ot_read_buf, &ot->ot_buf_len))
3210
 
                return FAILED;
3211
 
        if (!ot->ot_buf_len) {
3212
 
                *eof = TRUE;
3213
 
                return OK;
3214
 
        }
3215
 
 
3216
 
        ot->ot_buf_pos = ot->ot_seq_pos;
3217
 
        goto copy_buffer;
3218
 
}
3219
 
 
3220
 
xtPublic void xt_tab_get_stats(XTThreadPtr self, XTTableHPtr tab, xtWord4 *file_size, xtWord4 *free_count)
3221
 
{
3222
 
}
3223
 
 
3224
 
xtPublic void xt_tab_io_failed(XTOpenFilePtr of)
3225
 
{
3226
 
}
3227
 
 
3228
 
/*
3229
 
 * -----------------------------------------------------------------------
3230
 
 * OPEN TABLE POOL
3231
 
 */
3232
 
 
3233
 
/* Wait until all open tables are closed. */
3234
 
 
3235
 
xtPublic XTOpenTablePtr xt_open_table_from_pool(XTTableHPtr tab, XTThreadPtr thread)
3236
 
{
3237
 
        XTOpenTablePtr ot;
3238
 
 
3239
 
        xt_mutex_lock(&tab->tab_open_lock);
3240
 
        if (tab->tab_will_close) {
3241
 
                xt_register_ixterr(XT_REG_CONTEXT, XT_ERR_TABLE_LOCKED, tab->tab_name);
3242
 
                ot = NULL;
3243
 
        }
3244
 
        else if (tab->tab_open_pool) {
3245
 
                ot = tab->tab_open_pool;
3246
 
                tab->tab_open_pool = ot->ot_pool_next;
3247
 
        }
3248
 
        else if ((ot = tab_open_table(tab))) {
3249
 
                tab->tab_open_count++;
3250
 
        }
3251
 
        xt_mutex_unlock(&tab->tab_open_lock);
3252
 
        ot->ot_thread = thread;
3253
 
        return ot;
3254
 
}
3255
 
 
3256
 
xtPublic void xt_return_table_to_pool(XTOpenTablePtr ot)
3257
 
{
3258
 
        XTTableHPtr tab = ot->ot_table;
3259
 
 
3260
 
        ot->ot_thread = NULL;
3261
 
        xt_mutex_lock(&tab->tab_open_lock);
3262
 
        if (tab->tab_will_close) {
3263
 
                ASSERT_NS(tab->tab_open_count > 0);
3264
 
                tab->tab_open_count--;
3265
 
                tab_close_table(ot);
3266
 
        }
3267
 
        else {
3268
 
                ot->ot_pool_next = tab->tab_open_pool;
3269
 
                tab->tab_open_pool = ot;
3270
 
        }
3271
 
        xt_mutex_unlock(&tab->tab_open_lock);
3272
 
}
3273
 
 
3274
 
xtPublic void xt_close_all_open_tables(XTThreadPtr self, XTTableHPtr tab)
3275
 
{
3276
 
        XTDatabaseHPtr  db = tab->tab_db;
3277
 
 
3278
 
        enter_();
3279
 
        /* Lock order: TABLE, SWEEPER, COMPACTOR! */
3280
 
        /* Force the sweeper to close all tables: */
3281
 
        xt_sw_lock_sweeper(self, db);
3282
 
        pushr_(xt_sw_unlock_sweeper, db);
3283
 
        /* Force the compactor to close all files */
3284
 
        xt_dl_lock_compactor(self, db);
3285
 
        pushr_(xt_dl_unlock_compactor, db);
3286
 
 
3287
 
        /* Wait for all open tables to close: */
3288
 
        tab_wait_for_open_tables(self, tab);
3289
 
 
3290
 
        freer_(); // xt_dl_unlock_compactor(db)
3291
 
        freer_(); // xt_sw_unlock_sweeper(db)
3292
 
        exit_();
3293
 
}
 
3796
        }
 
3797
 
 
3798
        *eof = FALSE;
 
3799
        return OK;
 
3800
}
 
3801