492
497
dic->dic_bad_ind_ver = FALSE;
494
499
*ret_head_size = head_size;
502
static void tab_load_index_header(XTThreadPtr self, XTTableHPtr tab, XTOpenFilePtr file)
504
u_int rec_size = tab->tab_index_head_size;
508
/* Load the pointers: */
509
if (tab->tab_index_head)
510
xt_free_ns(tab->tab_index_head);
511
tab->tab_index_head = (XTIndexHeadDPtr) xt_malloc(self, rec_size);
513
if (!xt_pread_file(file, 0, rec_size, rec_size, tab->tab_index_head, NULL))
516
tab->tab_ind_rec_log_id = XT_GET_DISK_4(tab->tab_index_head->tp_rec_log_id_4);
517
tab->tab_ind_rec_log_offset = XT_GET_DISK_6(tab->tab_index_head->tp_rec_log_offs_6);
519
tab->tab_ind_eof = XT_GET_DISK_6(tab->tab_index_head->tp_ind_eof_6);
520
tab->tab_ind_free = XT_GET_DISK_6(tab->tab_index_head->tp_ind_free_6);
522
data = tab->tab_index_head->tp_data;
523
ind = tab->tab_dic.dic_keys;
524
for (u_int i=0; i<tab->tab_dic.dic_key_count; i++, ind++) {
525
(*ind)->mi_root = XT_GET_NODE_REF(data);
526
data += XT_NODE_REF_SIZE;
530
static xtBool tab_store_index_header(XTTableHPtr tab, XTOpenFilePtr file)
536
/* This flag is set without locking, so we need to
537
* set it to FALSE before we actually write the data.
539
tab->tab_ind_head_dirty = FALSE;
541
XT_SET_DISK_4(tab->tab_index_head->tp_rec_log_id_4, tab->tab_ind_rec_log_id);
542
XT_SET_DISK_6(tab->tab_index_head->tp_rec_log_offs_6, tab->tab_ind_rec_log_offset);
544
XT_SET_DISK_6(tab->tab_index_head->tp_ind_eof_6, tab->tab_ind_eof);
545
XT_SET_DISK_6(tab->tab_index_head->tp_ind_free_6, tab->tab_ind_free);
547
data = tab->tab_index_head->tp_data;
548
for (i=0, ind=tab->tab_dic.dic_keys; i<tab->tab_dic.dic_key_count; i++, ind++) {
549
XT_SET_NODE_REF(data, (*ind)->mi_root);
550
data += XT_NODE_REF_SIZE;
552
if (!xt_pwrite_file(file, 0, tab->tab_index_head_size, tab->tab_index_head)) {
553
tab->tab_ind_head_dirty = TRUE;
559
static void tab_load_table_format(XTThreadPtr self, XTOpenFilePtr file, char *table_name, size_t *ret_head_size, XTDictionaryPtr dic)
561
XTDiskValue4 size_buf;
563
XTTableFormatDRec tab_fmt;
566
if (!xt_pread_file(file, 0, 4, 4, &size_buf, NULL))
569
head_size = XT_GET_DISK_4(size_buf);
571
/* Load the table format information: */
572
if (!xt_pread_file(file, head_size, offsetof(XTTableFormatDRec, tf_definition), offsetof(XTTableFormatDRec, tf_tab_version_2) + 2, &tab_fmt, NULL))
575
/* If the table version is less than or equal to an incompatible (unsupported
576
* version), or greater than the current version, then we cannot open this table
578
if (XT_GET_DISK_2(tab_fmt.tf_tab_version_2) <= XT_TAB_INCOMPATIBLE_VERSION ||
579
XT_GET_DISK_2(tab_fmt.tf_tab_version_2) > XT_TAB_CURRENT_VERSION) {
580
switch (XT_GET_DISK_2(tab_fmt.tf_tab_version_2)) {
582
xt_throw_i2xterr(XT_CONTEXT, XT_ERR_UPGRADE_TABLE, table_name, "0.9.91 Beta");
585
xt_throw_i2xterr(XT_CONTEXT, XT_ERR_UPGRADE_TABLE, table_name, "0.9.85 Beta");
588
xt_throw_ixterr(XT_CONTEXT, XT_ERR_BAD_TABLE_VERSION, table_name);
495
594
fmt_size = XT_GET_DISK_4(tab_fmt.tf_format_size_4);
595
*ret_head_size = XT_GET_DISK_4(tab_fmt.tf_tab_head_size_4);
496
596
dic->dic_rec_size = XT_GET_DISK_4(tab_fmt.tf_rec_size_4);
497
597
dic->dic_rec_fixed = XT_GET_DISK_1(tab_fmt.tf_rec_fixed_1);
498
if (fmt_size >= offsetof(XTTabFormatDRec, tf_min_auto_inc_8) + 8)
499
dic->dic_min_auto_inc = XT_GET_DISK_8(tab_fmt.tf_min_auto_inc_8);
501
dic->dic_min_auto_inc = 0;
502
if (fmt_size > offsetof(XTTabFormatDRec, tf_definition)) {
503
size_t def_size = fmt_size - offsetof(XTTabFormatDRec, tf_definition);
598
dic->dic_min_auto_inc = XT_GET_DISK_8(tab_fmt.tf_min_auto_inc_8);
599
if (fmt_size > offsetof(XTTableFormatDRec, tf_definition)) {
600
size_t def_size = fmt_size - offsetof(XTTableFormatDRec, tf_definition);
506
603
pushsr_(def_sql, xt_free, (char *) xt_malloc(self, def_size));
507
if (!xt_pread_file(file, head_size+offsetof(XTTabFormatDRec, tf_definition), def_size, def_size, def_sql, NULL)) {
604
if (!xt_pread_file(file, head_size+offsetof(XTTableFormatDRec, tf_definition), def_size, def_size, def_sql, NULL))
510
606
dic->dic_table = xt_ri_create_table(self, false, def_sql, myxt_create_table_from_table(self, dic->dic_my_table));
511
607
freer_(); // xt_free(def_sql)
514
610
dic->dic_table = myxt_create_table_from_table(self, dic->dic_my_table);
517
static void tab_load_table_pointers(XTThreadPtr self, XTTableHPtr tab, XTOpenFilePtr file)
519
u_int rec_size = tab->tab_head_size;
521
/* Load the pointers: */
522
if (tab->tab_pointers)
523
xt_sys_free(tab->tab_pointers);
524
tab->tab_pointers = (XTTabPointersDPtr) xt_malloc(self, rec_size);
526
if (!xt_pread_file(file, 0, rec_size, rec_size, tab->tab_pointers, NULL))
529
tab->tab_row_eof = XT_GET_DISK_4(tab->tab_pointers->tp_tab_eof_4) << XT_TAB_ROW_SHIFTS;
530
tab->tab_row_free = XT_GET_DISK_4(tab->tab_pointers->tp_tab_free_4) << XT_TAB_ROW_SHIFTS;
531
tab->tab_row_fnum = XT_GET_DISK_4(tab->tab_pointers->tp_tab_fnum_4);
533
tab->tab_data_eof = XT_GET_DISK_6(tab->tab_pointers->tp_data_eof_6);
534
tab->tab_data_free = XT_GET_DISK_6(tab->tab_pointers->tp_data_free_6);
535
// ASSERT(tab->tab_data_free <= tab->tab_data_eof);
536
tab->tab_data_fnum = XT_GET_DISK_4(tab->tab_pointers->tp_data_fnum_4);
538
tab->tab_ind_eof = XT_GET_DISK_6(tab->tab_pointers->tp_ind_eof_6);
539
tab->tab_ind_free = XT_GET_DISK_6(tab->tab_pointers->tp_ind_free_6);
541
xt_tab_load_index_roots(tab);
544
xtPublic void xt_tab_load_index_roots(XTTableHPtr tab)
550
data = tab->tab_pointers->tp_data;
551
for (i=0, ind=tab->tab_dic.dic_keys; i<tab->tab_dic.dic_key_count; i++, ind++) {
552
(*ind)->mi_root = XT_GET_NODE_REF(data);
553
data += XT_NODE_REF_SIZE;
557
static xtBool tab_store_table_header(XTTableHPtr tab, XTOpenFilePtr file)
563
tab->tab_head_dirty = FALSE;
565
XT_SET_DISK_4(tab->tab_pointers->tp_tab_eof_4, tab->tab_row_eof >> XT_TAB_ROW_SHIFTS);
566
XT_SET_DISK_4(tab->tab_pointers->tp_tab_free_4, tab->tab_row_free >> XT_TAB_ROW_SHIFTS);
567
XT_SET_DISK_4(tab->tab_pointers->tp_tab_fnum_4, tab->tab_row_fnum);
569
XT_SET_DISK_6(tab->tab_pointers->tp_data_eof_6, tab->tab_data_eof);
570
XT_SET_DISK_6(tab->tab_pointers->tp_data_free_6, tab->tab_data_free);
571
XT_SET_DISK_4(tab->tab_pointers->tp_data_fnum_4, tab->tab_data_fnum);
573
XT_SET_DISK_6(tab->tab_pointers->tp_ind_eof_6, tab->tab_ind_eof);
574
XT_SET_DISK_6(tab->tab_pointers->tp_ind_free_6, tab->tab_ind_free);
576
data = tab->tab_pointers->tp_data;
577
for (i=0, ind=tab->tab_dic.dic_keys; i<tab->tab_dic.dic_key_count; i++, ind++) {
578
XT_SET_NODE_REF(data, (*ind)->mi_root);
579
data += XT_NODE_REF_SIZE;
581
return xt_pwrite_file(file, 0, tab->tab_head_size, tab->tab_pointers);
584
static void tab_alloc_data_buf(XTThreadPtr self, XTTableHPtr tab, size_t rec_size)
589
/* Make the buffer size a multiple of the record size: */
590
recs_in_buf = ((XT_MIN_TAB_BUFFER_SIZE - 1) / rec_size) + 1;
591
buf_size = recs_in_buf * rec_size;
592
tab->tab_buf_size = buf_size;
593
tab->tab_data_buf = (xtWord1 *) xt_malloc(self, buf_size);
613
static void tab_load_table_header(XTThreadPtr self, XTTableHPtr tab, XTOpenFilePtr file)
615
XTTableHeadDRec rec_head;
617
if (!xt_pread_file(file, 0, sizeof(XTTableHeadDRec), sizeof(XTTableHeadDRec), (xtWord1 *) &rec_head, NULL))
620
tab->tab_head_op_seq = XT_GET_DISK_4(rec_head.th_op_seq_4);
621
tab->tab_head_row_free_id = (xtRowID) XT_GET_DISK_6(rec_head.th_row_free_6);
622
tab->tab_head_row_eof_id = XT_GET_DISK_6(rec_head.th_row_eof_6);
623
tab->tab_head_row_fnum = XT_GET_DISK_6(rec_head.th_row_fnum_6);
624
tab->tab_head_rec_free_id = XT_GET_DISK_6(rec_head.th_rec_free_6);
625
tab->tab_head_rec_eof_id = XT_GET_DISK_6(rec_head.th_rec_eof_6);
626
tab->tab_head_rec_fnum = XT_GET_DISK_6(rec_head.th_rec_fnum_6);
629
xtPublic void xt_tab_store_header(struct XTThread *self, XTOpenTablePtr ot)
631
XTTableHPtr tab = ot->ot_table;
632
XTTableHeadDRec rec_head;
634
XT_SET_DISK_4(rec_head.th_op_seq_4, tab->tab_head_op_seq);
635
XT_SET_DISK_6(rec_head.th_row_free_6, tab->tab_head_row_free_id);
636
XT_SET_DISK_6(rec_head.th_row_eof_6, tab->tab_head_row_eof_id);
637
XT_SET_DISK_6(rec_head.th_row_fnum_6, tab->tab_head_row_fnum);
638
XT_SET_DISK_6(rec_head.th_rec_free_6, tab->tab_head_rec_free_id);
639
XT_SET_DISK_6(rec_head.th_rec_eof_6, tab->tab_head_rec_eof_id);
640
XT_SET_DISK_6(rec_head.th_rec_fnum_6, tab->tab_head_rec_fnum);
641
if (!xt_pwrite_file(ot->ot_rec_file, offsetof(XTTableHeadDRec, th_op_seq_4), 40, (xtWord1 *) &rec_head.th_op_seq_4))
635
tab->tab_ind_file = xt_dup_string(self, path);
637
of_ind = xt_open_file(self, path, missing_ok ? XT_FS_MISSING_OK : XT_FS_DEFAULT);
639
freer_(); // xt_heap_release(tab)
640
return_(XT_TAB_NOT_FOUND);
642
pushr_(xt_close_file, of_ind);
644
tab_load_table_format(self, of_ind, table_name, &head_size, &tab->tab_dic);
646
tab->tab_dic.dic_table->dt_table = tab;
647
tab->tab_head_size = head_size;
648
tab_alloc_data_buf(self, tab, tab->tab_dic.dic_rec_size);
649
xt_heap_set_release_callback(self, tab, tab_onrelease);
651
xt_remove_last_name_of_path(path);
652
tab_get_row_file_name(file_name, table_name, tab_id);
653
xt_strcat(PATH_MAX, path, file_name);
654
tab->tab_row_file = xt_dup_string(self, path);
680
tab->tab_seq.xt_op_seq_init(self);
656
681
xt_init_mutex_tr(self, &tab->tab_ainc_lock);
657
682
xt_init_mutex(self, &tab->dic_field_lock);
658
xt_init_mutex(self, &tab->tab_open_lock);
659
xt_init_cond(self, &tab->tab_open_cond);
660
683
xt_init_mutex(self, &tab->tab_row_lock);
661
684
xt_init_mutex(self, &tab->tab_ind_lock);
662
xt_init_mutex(self, &tab->tab_log_lock);
663
xt_init_mutex(self, &tab->tab_free_lock);
664
xt_init_rwlock(self, &tab->tab_buf_rwlock);
665
for (u_int i=0; i<XT_ROW_LOCK_TABLE_SIZE; i++)
666
xt_init_rwlock(self, &tab->tab_row_locks[i]);
685
xt_init_mutex(self, &tab->tab_rec_lock);
686
for (u_int i=0; i<XT_ROW_RWLOCKS; i++)
687
xt_init_rwlock(self, &tab->tab_row_rwlock[i]);
667
688
tab->tab_free_locks = TRUE;
690
xt_strcpy(PATH_MAX, path, db->db_path);
691
xt_add_dir_char(PATH_MAX, path);
692
tab_get_row_file_name(file_name, table_name, tab_id);
693
xt_strcat(PATH_MAX, path, file_name);
694
tab->tab_row_file = xt_fs_get_file(self, path);
669
696
xt_remove_last_name_of_path(path);
670
697
tab_get_data_file_name(file_name, table_name, tab_id);
671
698
xt_strcat(PATH_MAX, path, file_name);
672
tab->tab_data_file = xt_dup_string(self, path);
674
tab_load_table_pointers(self, tab, of_ind);
699
tab->tab_rec_file = xt_fs_get_file(self, path);
701
xt_remove_last_name_of_path(path);
702
tab_get_index_file_name(file_name, table_name, tab_id);
703
xt_strcat(PATH_MAX, path, file_name);
704
tab->tab_ind_file = xt_fs_get_file(self, path);
706
of_ind = xt_open_file(self, tab->tab_ind_file->fil_path, missing_ok ? XT_FS_MISSING_OK : XT_FS_DEFAULT);
708
freer_(); // xt_heap_release(tab)
709
return_(XT_TAB_NOT_FOUND);
711
pushr_(xt_close_file, of_ind);
712
tab_load_index_format(self, of_ind, table_name, &ind_head_size, &tab->tab_dic);
713
tab->tab_index_head_size = ind_head_size;
714
tab_load_index_header(self, tab, of_ind);
676
715
freer_(); // xt_close_file(of_ind)
678
xt_dl_init_tab(self, tab);
717
of_rec = xt_open_file(self, tab->tab_rec_file->fil_path, missing_ok ? XT_FS_MISSING_OK : XT_FS_DEFAULT);
719
freer_(); // xt_heap_release(tab)
720
return_(XT_TAB_NOT_FOUND);
722
pushr_(xt_close_file, of_rec);
723
tab_load_table_format(self, of_rec, table_name, &tab_head_size, &tab->tab_dic);
724
tab->tab_table_head_size = tab_head_size;
725
tab->tab_dic.dic_table->dt_table = tab;
726
tab_load_table_header(self, tab, of_rec);
727
freer_(); // xt_close_file(of_rec)
729
tab->tab_seq.xt_op_seq_set(self, tab->tab_head_op_seq+1);
730
tab->tab_row_eof_id = tab->tab_head_row_eof_id;
731
tab->tab_row_free_id = tab->tab_head_row_free_id;
732
tab->tab_row_fnum = tab->tab_head_row_fnum;
733
tab->tab_rec_eof_id = tab->tab_head_rec_eof_id;
734
tab->tab_rec_free_id = tab->tab_head_rec_free_id;
735
tab->tab_rec_fnum = tab->tab_head_rec_fnum;
737
tab->tab_rows.xt_tc_setup(tab, sizeof(XTTabRowHeadDRec), sizeof(XTTabRowRefDRec));
738
tab->tab_recs.xt_tc_setup(tab, tab_head_size, tab->tab_dic.dic_rec_size);
740
xt_xres_init_tab(self, tab);
742
if (!xt_init_row_locks(&tab->tab_locks))
680
745
if (tab->tab_dic.dic_table)
681
tab->tab_dic.dic_table->attachReferences(self);
746
tab->tab_dic.dic_table->attachReferences(self, db);
748
xt_heap_set_release_callback(self, tab, tab_onrelease);
683
750
popr_(); // Discard xt_heap_release(tab)
842
852
xt_strcat(XT_DATABASE_NAME_SIZE + XT_TABLE_NAME_SIZE, url, name);
843
853
xt_mybs_close_all_tables(url);
845
if ((tab = xt_use_table_no_lock(self, db, name, no_load, FALSE, NULL)))
846
/* Wait for all open tables to close: */
847
tab_wait_for_open_tables(self, tab);
849
popr_(); // Discard xt_ht_unlock(db->db_tables)
851
freer_(); // xt_dl_unlock_compactor(db)
852
freer_(); // xt_sw_unlock_sweeper(db)
855
/* Wait for all open tables to close: */
856
xt_db_wait_for_open_tables(self, table_pool);
858
popr_(); // Discard xt_db_unlock_table_pool(table_pool)
857
863
* Return the ID of the table. 0 if table not found.
859
static xtWord4 tab_lock_table_entry(XTThreadPtr self, char *name, xtBool missing_ok)
865
static XTOpenTablePoolPtr tab_lock_table_entry(XTThreadPtr self, char *name, xtBool missing_ok, xtTableID *tab_id)
861
XTDatabaseHPtr db = self->st_database;
867
XTDatabaseHPtr db = self->st_database;
868
XTOpenTablePoolPtr table_pool;
865
/* Lock order: TABLE, SWEEPER, COMPACTOR! */
866
/* Force the sweeper to close all tables: */
867
xt_sw_lock_sweeper(self, db);
868
pushr_(xt_sw_unlock_sweeper, db);
869
/* Force the compactor to close all files */
870
xt_dl_lock_compactor(self, db);
871
pushr_(xt_dl_unlock_compactor, db);
873
xt_ht_lock(self, db->db_tables);
874
pushr_(xt_ht_unlock, db->db_tables);
876
if (!tab_find_table(self, db, name, NULL, &tab_id)) {
878
xt_throw_ixterr(XT_CONTEXT, XT_ERR_TABLE_NOT_FOUND, name);
882
popr_(); // Discard xt_ht_unlock(db->db_tables)
884
freer_(); // xt_dl_unlock_compactor(db)
885
freer_(); // xt_sw_unlock_sweeper(db)
889
static void tab_unlock_table(XTThreadPtr self, XTTableHPtr tab)
891
XTDatabaseHPtr db = self->st_database;
894
xt_lock_mutex(self, &tab->tab_open_lock);
895
tab->tab_will_close--;
896
xt_unlock_mutex(self, &tab->tab_open_lock);
897
xt_heap_release(self, tab);
899
xt_ht_unlock(self, db->db_tables);
902
static void tab_delete_table_files(XTThreadPtr self, XTDatabaseHPtr db, char *tab_name, xtWord4 tab_id)
872
/* Lock the table, and close all references: */
873
pushsr_(table_pool, xt_db_unlock_table_pool, xt_db_lock_table_pool_by_name(self, db, name, FALSE, TRUE, missing_ok, FALSE, &tab));
875
freer_(); // xt_db_unlock_table_pool(db)
879
*tab_id = tab->tab_id;
880
xt_heap_release(self, tab);
882
popr_(); // Discart xt_db_unlock_table_pool(table_pool)
886
static void tab_delete_table_files(XTThreadPtr self, XTDatabaseHPtr db, char *tab_name, xtTableID tab_id)
904
888
XTFilesOfTableRec ft;
1003
XTOpenFilePtr of, of_ind;
997
XTOpenFilePtr of_row, of_rec, of_ind;
1006
1001
tab = (XTTableHPtr) xt_heap_new(self, sizeof(XTTableHRec), tab_finalize);
1007
1002
pushr_(xt_heap_release, tab);
1009
pnt_size = offsetof(XTTabPointersDRec, tp_data) + dic->dic_key_count * XT_NODE_REF_SIZE;
1010
tab->tab_head_size = pnt_size;
1011
if (!(tab->tab_pointers = (XTTabPointersDPtr) xt_sys_calloc(tab->tab_head_size)))
1004
/* This is the size of the index header: */
1005
index_head_size = offsetof(XTIndexHeadDRec, tp_data) + dic->dic_key_count * XT_NODE_REF_SIZE;
1006
tab->tab_index_head_size = index_head_size;
1007
if (!(tab->tab_index_head = (XTIndexHeadDPtr) xt_calloc_ns(tab->tab_index_head_size)))
1012
1008
xt_throw(self);
1010
/* The length of the foreign key definition: */
1011
if (dic->dic_table) {
1012
dic->dic_table->loadString(self, &tab_def);
1013
def_len = tab_def.sb_len + 1;
1017
tab->tab_head_op_seq = 0xFFFFFFFF - 12;
1019
tab->tab_head_op_seq = 0;
1022
/* ------- ROW FILE: */
1015
1023
xt_strcpy(PATH_MAX, path, db->db_path);
1016
1024
xt_add_dir_char(PATH_MAX, path);
1017
1025
tab_get_row_file_name(table_name, name, tab_id);
1018
1026
xt_strcat(PATH_MAX, path, table_name);
1020
of = xt_open_file(self, path, XT_FS_CREATE | XT_FS_EXCLUSIVE);
1021
pushr_(xt_close_file, of);
1028
of_row = xt_open_file(self, path, XT_FS_CREATE | XT_FS_EXCLUSIVE);
1029
pushr_(xt_close_file, of_row);
1022
1030
XT_SET_DISK_4(row_head.rh_magic_4, XT_TAB_ROW_MAGIC);
1023
XT_SET_DISK_4(row_head.rh_reserved_4, 0);
1024
if (!xt_pwrite_file(of, 0, sizeof(row_head), &row_head))
1031
if (!xt_pwrite_file(of_row, 0, sizeof(row_head), &row_head))
1025
1032
xt_throw(self);
1026
freer_(); // xt_close_file(of)
1033
freer_(); // xt_close_file(of_row)
1028
1035
(void) ASSERT(sizeof(XTTabRowHeadDRec) == sizeof(XTTabRowRefDRec));
1029
1036
(void) ASSERT(sizeof(XTTabRowRefDRec) == 1 << XT_TAB_ROW_SHIFTS);
1030
tab->tab_row_eof = sizeof(row_head);
1031
tab->tab_row_free = 0;
1038
tab->tab_row_eof_id = 1;
1039
tab->tab_row_free_id = 0;
1032
1040
tab->tab_row_fnum = 0;
1042
tab->tab_head_row_eof_id = 1;
1043
tab->tab_head_row_free_id = 0;
1044
tab->tab_head_row_fnum = 0;
1046
/* ------------ DATA FILE: */
1047
xt_remove_last_name_of_path(path);
1048
tab_get_data_file_name(table_name, name, tab_id);
1049
xt_strcat(PATH_MAX, path, table_name);
1050
of_rec = xt_open_file(self, path, XT_FS_CREATE | XT_FS_EXCLUSIVE);
1051
pushr_(xt_close_file, of_rec);
1053
/* Calculate the offset of the first record in the data handle file. */
1054
eof = sizeof(XTTableHeadDRec) + offsetof(XTTableFormatDRec, tf_definition) + def_len + XT_FORMAT_DEF_SPACE;
1055
eof = (eof + 1024 - 1) / 1024 * 1024; // Round to a value divisible by 1024
1057
tab->tab_table_head_size = eof;
1059
tab->tab_rec_eof_id = 1; // This is the first record ID!
1060
tab->tab_rec_free_id = 0;
1061
tab->tab_rec_fnum = 0;
1063
tab->tab_head_rec_eof_id = 1; // The first record ID
1064
tab->tab_head_rec_free_id = 0;
1065
tab->tab_head_rec_fnum = 0;
1067
tab->tab_dic.dic_rec_size = dic->dic_rec_size;
1068
tab->tab_dic.dic_rec_fixed = dic->dic_rec_fixed;
1069
tab->tab_dic.dic_min_auto_inc = dic->dic_min_auto_inc;
1071
XT_SET_DISK_4(rec_head.th_head_size_4, sizeof(XTTableHeadDRec));
1072
XT_SET_DISK_4(rec_head.th_op_seq_4, tab->tab_head_op_seq);
1073
XT_SET_DISK_6(rec_head.th_row_free_6, tab->tab_head_row_free_id);
1074
XT_SET_DISK_6(rec_head.th_row_eof_6, tab->tab_head_row_eof_id);
1075
XT_SET_DISK_6(rec_head.th_row_fnum_6, tab->tab_head_row_fnum);
1076
XT_SET_DISK_6(rec_head.th_rec_free_6, tab->tab_head_rec_free_id);
1077
XT_SET_DISK_6(rec_head.th_rec_eof_6, tab->tab_head_rec_eof_id);
1078
XT_SET_DISK_6(rec_head.th_rec_fnum_6, tab->tab_head_rec_fnum);
1080
if (!xt_pwrite_file(of_rec, 0, sizeof(XTTableHeadDRec), &rec_head))
1083
/* Store the table format: */
1084
memset(&table_fmt, 0, offsetof(XTTableFormatDRec, tf_definition));
1085
XT_SET_DISK_4(table_fmt.tf_format_size_4, offsetof(XTTableFormatDRec, tf_definition) + def_len);
1086
XT_SET_DISK_4(table_fmt.tf_tab_head_size_4, eof);
1087
XT_SET_DISK_2(table_fmt.tf_tab_version_2, XT_TAB_CURRENT_VERSION);
1088
XT_SET_DISK_4(table_fmt.tf_rec_size_4, tab->tab_dic.dic_rec_size);
1089
XT_SET_DISK_1(table_fmt.tf_rec_fixed_1, tab->tab_dic.dic_rec_fixed);
1090
XT_SET_DISK_8(table_fmt.tf_min_auto_inc_8, tab->tab_dic.dic_min_auto_inc);
1092
if (!xt_pwrite_file(of_rec, sizeof(XTTableHeadDRec), offsetof(XTTableFormatDRec, tf_definition), &table_fmt))
1095
if (!xt_pwrite_file(of_rec, sizeof(XTTableHeadDRec) + offsetof(XTTableFormatDRec, tf_definition), def_len, tab_def.sb_cstring))
1099
freer_(); // xt_close_file(of_rec)
1101
/* ----------- INDEX FILE: */
1035
1102
xt_remove_last_name_of_path(path);
1036
1103
tab_get_index_file_name(table_name, name, tab_id);
1037
1104
xt_strcat(PATH_MAX, path, table_name);
1043
1110
* blocks. This is important to ensure that the writing of the cache
1044
1111
* blocks does not conflict with the writing of the header.
1048
if (dic->dic_table) {
1049
dic->dic_table->loadString(self, &tab_def);
1050
def_len = tab_def.sb_len + 1;
1053
eof = tab->tab_head_size + offsetof(XTTabFormatDRec, tf_definition) + def_len;
1054
eof = (eof + XT_DC_BLOCK_SIZE - 1) / XT_DC_BLOCK_SIZE * XT_DC_BLOCK_SIZE;
1055
eof = (eof + XT_IDX_PAGE_SIZE - 1) / XT_IDX_PAGE_SIZE * XT_IDX_PAGE_SIZE;
1114
eof = tab->tab_index_head_size + sizeof(XTIndexFormatDRec);
1115
eof = (eof + XT_INDEX_PAGE_SIZE - 1) / XT_INDEX_PAGE_SIZE * XT_INDEX_PAGE_SIZE;
1116
eof = (eof + XT_INDEX_PAGE_SIZE - 1) / XT_INDEX_PAGE_SIZE * XT_INDEX_PAGE_SIZE;
1118
xt_lock_mutex_ns(&db->db_wr_lock);
1119
tab->tab_ind_rec_log_id = db->db_xlog.xl_flush_log_id;
1120
tab->tab_ind_rec_log_offset = db->db_xlog.xl_flush_log_offset;
1121
xt_unlock_mutex_ns(&db->db_wr_lock);
1057
1122
tab->tab_ind_eof = eof;
1058
1123
tab->tab_ind_free = 0;
1061
xt_remove_last_name_of_path(path);
1062
tab_get_data_file_name(table_name, name, tab_id);
1063
xt_strcat(PATH_MAX, path, table_name);
1064
of = xt_open_file(self, path, XT_FS_CREATE | XT_FS_EXCLUSIVE);
1065
pushr_(xt_close_file, of);
1066
XT_SET_DISK_4(data_head.dh_magic_4, XT_TAB_DATA_MAGIC);
1067
XT_SET_DISK_4(data_head.dh_reserved_4, 0);
1068
if (!xt_pwrite_file(of, 0, sizeof(row_head), &row_head))
1070
freer_(); // xt_close_file(of)
1072
tab->tab_data_eof = sizeof(XTTabDataHeadDRec);
1073
tab->tab_data_free = 0;
1074
tab->tab_data_fnum = 0;
1076
tab->tab_dic.dic_rec_size = dic->dic_rec_size;
1077
tab->tab_dic.dic_rec_fixed = dic->dic_rec_fixed;
1078
tab->tab_dic.dic_min_auto_inc = dic->dic_min_auto_inc;
1080
XT_SET_DISK_4(tab->tab_pointers->tp_head_size_4, tab->tab_head_size);
1125
XT_SET_DISK_4(tab->tab_index_head->tp_head_size_4, tab->tab_index_head_size);
1082
1127
/* Save the header: */
1083
if (!tab_store_table_header(tab, of_ind))
1086
/* Store the table format: */
1087
memset(&head_fmt, 0, offsetof(XTTabFormatDRec, tf_definition));
1088
XT_SET_DISK_4(head_fmt.tf_format_size_4, offsetof(XTTabFormatDRec, tf_definition) + def_len);
1089
XT_SET_DISK_2(head_fmt.tf_tab_version_2, XT_TAB_CURRENT_VERSION);
1090
XT_SET_DISK_2(head_fmt.tf_ind_version_2, XT_IND_CURRENT_VERSION);
1091
XT_SET_DISK_1(head_fmt.tf_node_ref_size_1, XT_NODE_REF_SIZE);
1092
XT_SET_DISK_1(head_fmt.tf_rec_ref_size_1, XT_RECORD_REF_SIZE);
1093
XT_SET_DISK_4(head_fmt.tf_rec_size_4, tab->tab_dic.dic_rec_size);
1094
XT_SET_DISK_1(head_fmt.tf_rec_fixed_1, tab->tab_dic.dic_rec_fixed);
1095
XT_SET_DISK_8(head_fmt.tf_min_auto_inc_8, tab->tab_dic.dic_min_auto_inc);
1096
XT_SET_DISK_2(head_fmt.tf_no_of_data_logs_2, XT_NO_OF_DATA_LOGS);
1098
if (!xt_pwrite_file(of_ind, pnt_size, offsetof(XTTabFormatDRec, tf_definition), &head_fmt))
1102
if (!xt_pwrite_file(of_ind, pnt_size + offsetof(XTTabFormatDRec, tf_definition), def_len, tab_def.sb_cstring))
1128
if (!tab_store_index_header(tab, of_ind))
1131
/* Store the index format: */
1132
memset(&index_fmt, 0, sizeof(XTIndexFormatDRec));
1133
XT_SET_DISK_4(index_fmt.if_format_size_4, sizeof(XTIndexFormatDRec));
1134
XT_SET_DISK_2(index_fmt.if_tab_version_2, XT_TAB_CURRENT_VERSION);
1135
XT_SET_DISK_2(index_fmt.if_ind_version_2, XT_IND_CURRENT_VERSION);
1136
XT_SET_DISK_1(index_fmt.if_node_ref_size_1, XT_NODE_REF_SIZE);
1137
XT_SET_DISK_1(index_fmt.if_rec_ref_size_1, XT_RECORD_REF_SIZE);
1139
if (!xt_pwrite_file(of_ind, tab->tab_index_head_size, sizeof(XTIndexFormatDRec), &index_fmt))
1142
freer_(); // xt_close_file(of_ind)
1106
1145
/* Log the new table ID! */
1107
1146
db->db_curr_tab_id = tab_id;
1108
if (!xt_xn_log_ids(self, db)) {
1147
if (!xt_xn_log_tab_id(self, tab_id)) {
1109
1148
db->db_curr_tab_id = tab_id - 1;
1110
1149
xt_throw(self);
1113
freer_(); // xt_close_file(of_ind)
1114
1152
freer_(); // xt_heap_release(tab)
1164
1203
xtPublic void xt_drop_table(XTThreadPtr self, char *tab_name)
1166
XTDatabaseHPtr db = self->st_database;
1205
XTDatabaseHPtr db = self->st_database;
1206
XTOpenTablePoolPtr table_pool;
1207
xtTableID tab_id = 0;
1171
tab_id = tab_lock_table_entry(self, tab_name, TRUE);
1172
pushr_(tab_unlock_table, NULL);
1174
xt_ht_del(self, db->db_tables, tab_name);
1176
tab_delete_table_files(self, db, tab_name, tab_id);
1211
table_pool = tab_lock_table_entry(self, tab_name, TRUE, &tab_id);
1212
pushr_(xt_db_unlock_table_pool, table_pool);
1213
xt_ht_lock(self, db->db_tables);
1214
pushr_(xt_ht_unlock, db->db_tables);
1179
1217
XTTableHPtr tab;
1219
if ((tab = xt_use_table_no_lock(self, db, tab_name, TRUE, TRUE, NULL))) {
1220
xt_dl_delete_ext_data(self, tab, TRUE);
1221
xt_heap_release(self, tab);
1223
tab_delete_table_files(self, db, tab_name, tab_id);
1181
1224
xt_sl_delete(self, db->db_table_by_id, &tab_id);
1182
if ((tab = xt_use_table_no_lock(self, db, tab_name, TRUE, TRUE, NULL))) {
1183
xt_dl_logs_deleted(self, tab, FALSE);
1184
xt_heap_release(self, tab);
1188
/* Release the lock on the table directory: */
1189
freer_(); // tab_unlock_table(NULL)
1227
xt_ht_del(self, db->db_tables, tab_name);
1229
freer_(); // xt_ht_unlock(db->db_tables)
1230
freer_(); // xt_db_unlock_table_pool(table_pool)
1234
xtPublic void xt_check_table(XTThreadPtr self, XTOpenTablePtr ot)
1236
XTTableHPtr tab = ot->ot_table;
1238
XTTabRecExtDRec rec_buf;
1239
xtWord4 free_count = 0, free_count2 = 0;
1240
XTactExtRecEntryDRec ext_rec;
1243
xtLogOffset log_offset;
1245
xtRecordID prev_rec_id;
1249
//*DBG*/xt_dump_xlogs(tab->tab_db);
1250
printf("\nCHECK TABLE: %s\n", tab->tab_name);
1252
xt_lock_mutex(self, &tab->tab_db->db_co_ext_lock);
1253
pushr_(xt_unlock_mutex, &tab->tab_db->db_co_ext_lock);
1255
xt_lock_mutex(self, &tab->tab_rec_lock);
1256
pushr_(xt_unlock_mutex, &tab->tab_rec_lock);
1258
printf("Records:-\n");
1259
printf("Free list: %llu (%llu)\n", (u_llong) tab->tab_rec_free_id, (u_llong) tab->tab_rec_fnum);
1260
printf("EOF: %llu\n", (u_llong) tab->tab_rec_eof_id);
1263
while (rec_id < tab->tab_rec_eof_id) {
1264
if (!xt_tab_get_rec_data(ot, rec_id, XT_REC_EXT_HEADER_SIZE, (xtWord1 *) &rec_buf))
1267
printf("%-4llu ", (u_llong) rec_id);
1268
switch (rec_buf.tr_rec_type_1 & XT_TAB_STATUS_MASK) {
1269
case XT_TAB_STATUS_FREED:
1270
printf("======== ");
1273
case XT_TAB_STATUS_DELETE:
1276
case XT_TAB_STATUS_FIXED:
1277
printf("record-F ");
1279
case XT_TAB_STATUS_VARIABLE:
1280
printf("record-V ");
1282
case XT_TAB_STATUS_EXT_DLOG:
1283
printf("record-X ");
1286
if (rec_buf.tr_rec_type_1 & XT_TAB_STATUS_CLEANED_BIT)
1290
prev_rec_id = XT_GET_DISK_4(rec_buf.tr_prev_rec_id_4);
1291
xn_id = XT_GET_DISK_4(rec_buf.tr_xact_id_4);
1292
row_id = XT_GET_DISK_4(rec_buf.tr_row_id_4);
1293
switch (rec_buf.tr_rec_type_1 & XT_TAB_STATUS_MASK) {
1294
case XT_TAB_STATUS_FREED:
1295
printf(" prev=%-3llu (xact=%-3llu row=%lu)\n", (u_llong) prev_rec_id, (u_llong) xn_id, (u_long) row_id);
1297
case XT_TAB_STATUS_EXT_DLOG:
1298
printf(" prev=%-3llu xact=%-3llu row=%lu Xlog=%lu Xoff=%llu Xsiz=%lu\n", (u_llong) prev_rec_id, (u_llong) xn_id, (u_long) row_id, (u_long) XT_GET_DISK_2(rec_buf.re_log_id_2), (u_llong) XT_GET_DISK_6(rec_buf.re_log_offs_6), (u_long) XT_GET_DISK_4(rec_buf.re_log_dat_siz_4));
1300
log_size = XT_GET_DISK_4(rec_buf.re_log_dat_siz_4);
1301
XT_GET_LOG_REF(log_id, log_offset, &rec_buf);
1302
if (!self->st_dlog_buf.dlb_read_log(log_id, log_offset, offsetof(XTactExtRecEntryDRec, er_data), (xtWord1 *) &ext_rec))
1303
xt_log_and_clear_exception(self);
1306
xtTableID curr_tab_id;
1307
xtRecordID curr_rec_id;
1309
log_size2 = XT_GET_DISK_4(ext_rec.er_data_size_4);
1310
curr_tab_id = XT_GET_DISK_4(ext_rec.er_tab_id_4);
1311
curr_rec_id = XT_GET_DISK_4(ext_rec.er_rec_id_4);
1312
if (log_size2 != log_size || curr_tab_id != tab->tab_id || curr_rec_id != rec_id) {
1313
xt_logf(XT_INFO, "Table %s: record %llu, extended record %lu:%llu not valid\n", tab->tab_name, (u_llong) rec_id, (u_long) log_id, (u_llong) log_offset);
1318
printf(" prev=%-3llu xact=%-3llu row=%lu\n", (u_llong) prev_rec_id, (u_llong) xn_id, (u_long) row_id);
1324
if (tab->tab_rec_fnum != free_count)
1325
xt_logf(XT_INFO, "Table %s: incorrect number of free blocks, %llu, should be: %llu\n", tab->tab_name, (u_llong) free_count, (u_llong) tab->tab_rec_fnum);
1327
/* Checking the free list: */
1329
rec_id = tab->tab_rec_free_id;
1331
if (rec_id >= tab->tab_rec_eof_id) {
1332
xt_logf(XT_INFO, "Table %s: invalid reference on free list: %llu, ", tab->tab_name, (u_llong) rec_id);
1334
xt_logf(XT_INFO, "reference by: %llu\n", (u_llong) prec_id);
1336
xt_logf(XT_INFO, "reference by list head pointer\n");
1339
if (!xt_tab_get_rec_data(ot, rec_id, XT_REC_EXT_HEADER_SIZE, (xtWord1 *) &rec_buf)) {
1340
xt_log_and_clear_exception(self);
1343
if ((rec_buf.tr_rec_type_1 & XT_TAB_STATUS_MASK) != XT_TAB_STATUS_FREED)
1344
xt_logf(XT_INFO, "Table %s: record, %llu, on free list is not free\n", tab->tab_name, (u_llong) rec_id);
1347
rec_id = XT_GET_DISK_4(rec_buf.tr_prev_rec_id_4);
1349
if (free_count2 < free_count)
1350
xt_logf(XT_INFO, "Table %s: not all free blocks (%llu) on free list: %llu\n", tab->tab_name, (u_llong) free_count, (u_llong) free_count2);
1352
freer_(); // xt_unlock_mutex_ns(&tab->tab_rec_lock);
1354
XTTabRowRefDRec row_buf;
1357
xt_lock_mutex(self, &tab->tab_row_lock);
1358
pushr_(xt_unlock_mutex, &tab->tab_row_lock);
1361
printf("Free list: %llu (%llu)\n", (u_llong) tab->tab_row_free_id, (u_llong) tab->tab_row_fnum);
1362
printf("EOF: %llu\n", (u_llong) tab->tab_row_eof_id);
1365
while (rec_id < tab->tab_row_eof_id) {
1366
if (!tab->tab_rows.xt_tc_read_4(ot->ot_row_file, rec_id, (xtWord4 *) &row_buf))
1368
printf("%-3llu ", (u_llong) rec_id);
1369
ref_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
1371
printf("====== 0\n");
1373
printf("in use %llu\n", (u_llong) ref_id);
1377
freer_(); // xt_unlock_mutex(&tab->tab_row_lock);
1379
freer_(); // xt_unlock_mutex(&tab->tab_db->db_co_ext_lock);
1193
1382
xtPublic void xt_rename_table(XTThreadPtr self, char *old_name, char *new_name)
1195
1384
XTDatabaseHPtr db = self->st_database;
1385
XTOpenTablePoolPtr table_pool;
1196
1386
XTTableHPtr tab;
1197
1387
char to_path[PATH_MAX];
1198
1388
char table_name[XT_MAX_TABLE_FILE_NAME_SIZE];
1200
1390
XTFilesOfTableRec ft;
1201
1391
XTDictionaryRec dic = { 0, 0, 0, 0, 0, 0, 0, 0 };
1203
1393
XTTableEntryPtr te_ptr;
1204
1394
char *te_new_name;
1345
1541
xt_heap_release(self, tab);
1347
1543
freer_(); // myxt_free_dictionary(&dic)
1348
freer_(); // tab_unlock_table(NULL)
1544
freer_(); // xt_ht_unlock(db->db_tables)
1545
freer_(); // xt_db_unlock_table_pool(table_pool)
1351
xtPublic XTTableHPtr xt_use_table(XTThreadPtr self, char *name, xtBool missing_ok)
1548
xtPublic XTTableHPtr xt_use_table(XTThreadPtr self, char *name, xtBool no_load, xtBool missing_ok)
1353
1550
XTTableHPtr tab;
1354
1551
XTDatabaseHPtr db = self->st_database;
1356
1553
xt_ht_lock(self, db->db_tables);
1357
1554
pushr_(xt_ht_unlock, db->db_tables);
1358
tab = xt_use_table_no_lock(self, db, name, FALSE, missing_ok, NULL);
1555
tab = xt_use_table_no_lock(self, db, name, no_load, missing_ok, NULL);
1560
xtPublic void xt_flush_table(XTThreadPtr self, XTOpenTablePtr ot)
1562
XTTableHPtr tab = ot->ot_table;
1563
XTDatabaseHPtr db = tab->tab_db;
1565
/* Wakeup the sweeper:
1566
* We want the sweeper to check if there is anything to do,
1567
* so we must wake it up.
1568
* Once it has done all it can, it will go back to sleep.
1569
* This should be good enough.
1571
if (db->db_sw_idle) {
1572
u_int check_count = db->db_sw_check_count;
1575
xt_broadcast_cond(NULL, &db->db_xn_wait_cond);
1576
if (!db->db_sw_thread || db->db_sw_idle || check_count != db->db_sw_check_count)
1578
xt_sleep_100th_second(1);
1582
/* Wait for the sweeper become idle: */
1583
xt_lock_mutex(self, &db->db_xn_wait_lock);
1584
pushr_(xt_unlock_mutex, &db->db_xn_wait_lock);
1585
while (db->db_sw_thread && !db->db_sw_idle) {
1586
xt_timed_wait_cond(self, &db->db_xn_wait_cond, &db->db_xn_wait_lock, 10);
1588
freer_(); // xt_unlock_mutex(&db->db_xn_wait_lock)
1590
/* Wait for the writer to write out all operations on the table: */
1591
while (XTTableSeq::xt_op_is_before(tab->tab_head_op_seq+1, tab->tab_seq.ts_next_seq)) {
1592
/* Flush the log, in case this is holding up the
1595
if (!db->db_xlog.xlog_flush(self))
1598
xt_lock_mutex(self, &db->db_wr_lock);
1599
pushr_(xt_unlock_mutex, &db->db_wr_lock);
1600
db->db_wr_thread_waiting++;
1602
* Wake the writer if it is sleeping. In order to
1603
* flush a table we must wait for the writer to complete
1604
* committing all the changes in the table to the database.
1606
if (db->db_wr_idle) {
1607
if (!xt_broadcast_cond(NULL, &db->db_wr_cond))
1608
xt_log_and_clear_exception_ns();
1611
freer_(); // xt_unlock_mutex(&db->db_wr_lock)
1612
xt_sleep_100th_second(1);
1614
xt_lock_mutex(self, &db->db_wr_lock);
1615
pushr_(xt_unlock_mutex, &db->db_wr_lock);
1616
db->db_wr_thread_waiting--;
1617
freer_(); // xt_unlock_mutex(&db->db_wr_lock)
1620
/* Write the table header: */
1621
if (tab->tab_flush_pending) {
1622
tab->tab_flush_pending = FALSE;
1623
xt_tab_store_header(self, ot);
1625
/* Flush the table data: */
1626
if (!xt_flush_file(ot->ot_rec_file) ||
1627
!xt_flush_file(ot->ot_row_file)) {
1628
tab->tab_flush_pending = TRUE;
1633
if (!xt_flush_table_index(ot))
1363
1637
xtPublic XTOpenTablePtr tab_open_table(XTTableHPtr tab)
1365
1639
volatile XTOpenTablePtr ot;
1366
1640
XTThreadPtr self;
1368
if (!(ot = (XTOpenTablePtr) xt_sys_malloc(offsetof(XTOpenTableRec, ot_read_buf) + tab->tab_buf_size)))
1642
if (!(ot = (XTOpenTablePtr) xt_malloc_ns(sizeof(XTOpenTableRec))))
1370
1644
memset(ot, 0, offsetof(XTOpenTableRec, ot_ind_rbuf));
1495
xtPublic xtBool xt_tab_flush_data(XTOpenTablePtr ot)
1497
register XTTableHPtr tab = ot->ot_table;
1500
ASSERT_NS(((tab->tab_data_eof + tab->tab_buf_offset) - sizeof(XTTabDataHeadDRec)) % tab->tab_dic.dic_rec_size == 0);
1501
if (tab->tab_buf_offset) {
1502
xt_rwlock_wrlock(&tab->tab_buf_rwlock);
1503
if (tab->tab_buf_offset) {
1504
if ((ok = xt_pwrite_file(ot->ot_data_file, tab->tab_data_eof, tab->tab_buf_offset, tab->tab_data_buf))) {
1505
tab->tab_data_eof += tab->tab_buf_offset;
1506
tab->tab_buf_offset = 0;
1507
ok = tab_store_table_header(tab, ot->ot_ind_file);
1510
else if (tab->tab_head_dirty)
1511
ok = tab_store_table_header(tab, ot->ot_ind_file);
1512
xt_rwlock_unlock(&tab->tab_buf_rwlock);
1514
else if (tab->tab_head_dirty) {
1515
xt_rwlock_wrlock(&tab->tab_buf_rwlock);
1516
if (tab->tab_head_dirty)
1517
ok = tab_store_table_header(tab, ot->ot_ind_file);
1518
xt_rwlock_unlock(&tab->tab_buf_rwlock);
1523
1742
/* The fixed part of the record is already in the row buffer.
1524
1743
* This function loads the extended part, expanding the row
1525
1744
* buffer if necessary.
1527
static xtBool tab_load_ext_data(XTOpenTablePtr ot, off_t load_rec, xtWord1 *buffer)
1746
xtPublic xtBool xt_tab_load_ext_data(XTOpenTablePtr ot, xtRecordID load_rec_id, xtWord1 *buffer, u_int cols_req)
1532
xtWord1 save_buffer[XT_LOG_REC_HEADER_SIZE];
1533
u_int retry_count = 10;
1534
XTDataLogBufferDPtr ext_data_ptr;
1750
xtLogOffset log_offset;
1751
xtWord1 save_buffer[offsetof(XTactExtRecEntryDRec, er_data)];
1752
xtBool retried = FALSE;
1753
XTactExtRecEntryDPtr ext_data_ptr;
1755
xtTableID curr_tab_id;
1756
xtRecordID curr_rec_id;
1538
1758
log_size = XT_GET_DISK_4(((XTTabRecExtDPtr) ot->ot_row_rbuffer)->re_log_dat_siz_4);
1539
XT_GET_LOG_REF_6(log_id, log_offset, ((XTTabRecExtDPtr) ot->ot_row_rbuffer)->re_log_rec_6);
1759
XT_GET_LOG_REF(log_id, log_offset, (XTTabRecExtDPtr) ot->ot_row_rbuffer);
1541
1761
if (ot->ot_rec_size + log_size > ot->ot_row_rbuf_size) {
1542
if (!xt_sys_realloc((void **) &ot->ot_row_rbuffer, ot->ot_rec_size + log_size))
1762
if (!xt_realloc_ns((void **) &ot->ot_row_rbuffer, ot->ot_rec_size + log_size))
1544
1764
ot->ot_row_rbuf_size = ot->ot_rec_size + log_size;
1547
1767
/* Read the extended part first: */
1548
ext_data_ptr = (XTDataLogBufferDPtr) (ot->ot_row_rbuffer + ot->ot_rec_size - XT_LOG_REC_HEADER_SIZE);
1768
ext_data_ptr = (XTactExtRecEntryDPtr) (ot->ot_row_rbuffer + ot->ot_rec_size - offsetof(XTactExtRecEntryDRec, er_data));
1550
1770
/* Save the data which the header will overwrite: */
1551
memcpy(save_buffer, ext_data_ptr, XT_LOG_REC_HEADER_SIZE);
1771
memcpy(save_buffer, ext_data_ptr, offsetof(XTactExtRecEntryDRec, er_data));
1554
if (!xt_dl_read_log(ot, log_id, log_offset, log_size + XT_LOG_REC_HEADER_SIZE, (xtWord1 *) ext_data_ptr))
1774
if (!ot->ot_thread->st_dlog_buf.dlb_read_log(log_id, log_offset, offsetof(XTactExtRecEntryDRec, er_data) + log_size, (xtWord1 *) ext_data_ptr))
1555
1775
goto retry_read;
1557
log_size2 = XT_GET_DISK_4(ext_data_ptr->lb_data_size_4);
1558
curr_rec = XT_GET_DISK_6(ext_data_ptr->lb_record_6);
1777
log_size2 = XT_GET_DISK_4(ext_data_ptr->er_data_size_4);
1778
curr_tab_id = XT_GET_DISK_4(ext_data_ptr->er_tab_id_4);
1779
curr_rec_id = XT_GET_DISK_4(ext_data_ptr->er_rec_id_4);
1560
if (log_size2 != log_size || curr_rec != load_rec) {
1781
if (log_size2 != log_size || curr_tab_id != ot->ot_table->tab_id || curr_rec_id != load_rec_id) {
1782
/* [(3)] This can happen in the following circumstances:
1783
* - A new record is created, but the data log is not
1785
* - The server quits.
1786
* - On restart the transaction is rolled back, but the data record
1787
* was not written, so later a new record could be written at this
1789
* - Later the sweeper tries to cleanup this record, and finds
1790
* that a different record has been written at this position.
1792
* NOTE: Index entries can only be written to disk for records
1793
* that have been committed to the disk, because uncommitted
1794
* records may not exist in order to remove the index entry
1561
1797
xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_EXT_RECORD);
1562
1798
goto retry_read;
1565
1801
/* Restore the saved area: */
1566
memcpy(ext_data_ptr, save_buffer, XT_LOG_REC_HEADER_SIZE);
1568
return myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_EXT_HEADER_SIZE, buffer);
1802
memcpy(ext_data_ptr, save_buffer, offsetof(XTactExtRecEntryDRec, er_data));
1805
xt_unlock_mutex_ns(&ot->ot_table->tab_db->db_co_ext_lock);
1806
return myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_EXT_HEADER_SIZE, buffer, cols_req);
1571
1810
/* (1) It may be that reading the log fails because the garbage collector
1572
1811
* has moved the record since we determined the location.
1573
1812
* We handle this here, by re-reading the data the garbage collector
1588
1827
* part of an uncommitted record (belonging to some other thread/
1589
1828
* transaction).
1594
if (!xt_tab_get_data(ot, load_rec + offsetof(XTTabRecExtDRec, re_log_rec_6), 6,
1595
(xtWord1 *) &((XTTabRecExtDPtr) ot->ot_row_rbuffer)->re_log_rec_6, NULL))
1598
XT_GET_LOG_REF_6(log_id2, log_offset2, ((XTTabRecExtDPtr) ot->ot_row_rbuffer)->re_log_rec_6);
1599
if (log_id != log_id2 || log_offset != log_offset2) {
1601
log_offset = log_offset2;
1604
else if (retry_count) {
1830
XTTabRecExtDRec rec_buf;
1832
xt_lock_mutex_ns(&ot->ot_table->tab_db->db_co_ext_lock);
1835
if (!xt_tab_get_rec_data(ot, load_rec_id, XT_REC_EXT_HEADER_SIZE, (xtWord1 *) &rec_buf))
1838
XT_GET_LOG_REF(log_id, log_offset, &rec_buf);
1844
xt_unlock_mutex_ns(&ot->ot_table->tab_db->db_co_ext_lock);
1614
static xtBool tab_delete_log_record(XTOpenTablePtr ot, off_t address, xtWord4 curr_log_id, off_t curr_log_offset, size_t curr_log_size)
1619
while (!xt_dl_delete_log(ot, curr_log_id, curr_log_offset, curr_log_size, address)) {
1620
/* It may be that the garbage collector has moved the record, try again: */
1621
if (!xt_tab_get_data(ot, address + offsetof(XTTabRecExtDRec, re_log_rec_6), 6,
1622
(xtWord1 *) &((XTTabRecExtDPtr) ot->ot_row_wbuffer)->re_log_rec_6, NULL))
1625
XT_GET_LOG_REF_6(log_id, log_offset, ((XTTabRecExtDPtr) ot->ot_row_wbuffer)->re_log_rec_6);
1627
if (log_id == curr_log_id && log_offset == curr_log_offset)
1628
/* Nothing has changed, this must be a real failure: */
1631
/* Try with the new log position: */
1632
curr_log_id = log_id;
1633
curr_log_offset = log_offset;
1638
xtPublic xtBool xt_tab_put_data(XTOpenTablePtr ot, off_t address, u_int size, xtWord1 *buffer)
1640
register XTTableHPtr tab = ot->ot_table;
1642
ASSERT_NS((tab->tab_data_eof - sizeof(XTTabDataHeadDRec)) % tab->tab_dic.dic_rec_size == 0);
1644
if (!tab->tab_buf_offset || address + (off_t) size <= tab->tab_data_eof)
1645
return xt_rc_write(ot, address, size, buffer);
1647
/* Writing the data include part of the write buffer: */
1648
xt_rwlock_wrlock(&tab->tab_buf_rwlock);
1649
/* Get the write buffer part first: */
1650
if (!tab->tab_buf_offset || address + (off_t) size <= tab->tab_data_eof) {
1651
/* May have changed between check and lock... */
1652
xt_rwlock_unlock(&tab->tab_buf_rwlock);
1656
/* You cannot put passed the current EOF: */
1657
if (address + size > tab->tab_data_eof + tab->tab_buf_offset) {
1658
xt_rwlock_unlock(&tab->tab_buf_rwlock);
1659
return xt_register_ferrno(XT_REG_CONTEXT, ESPIPE, xt_file_path(ot->ot_data_file));
1662
if (address < tab->tab_data_eof) {
1663
/* Part of the data is in the write buffer (this should not happen): */
1667
tfer = (size_t) (address + (off_t) size - tab->tab_data_eof);
1669
memcpy(tab->tab_data_buf, buffer + boff, tfer);
1670
xt_rwlock_unlock(&tab->tab_buf_rwlock);
1671
return xt_rc_write(ot, address, boff, buffer);
1674
/* Complete copy from the write buffer: */
1675
memcpy(tab->tab_data_buf + (address - tab->tab_data_eof), buffer, size);
1676
xt_rwlock_unlock(&tab->tab_buf_rwlock);
1681
xtPublic xtBool xt_tab_get_record(register XTOpenTablePtr ot, off_t address, u_int size, xtWord1 *buffer)
1683
register XTTableHPtr tab = ot->ot_table;
1686
/* We assume we are reading a whole record, from
1687
* a record boundary: */
1688
ASSERT_NS(size <= tab->tab_dic.dic_rec_size);
1689
ASSERT_NS(tab->tab_buf_size % tab->tab_dic.dic_rec_size == 0);
1690
ASSERT_NS((address - sizeof(XTTabDataHeadDRec)) % tab->tab_dic.dic_rec_size == 0);
1691
ASSERT_NS(ot->ot_rec_size == tab->tab_dic.dic_rec_size);
1692
(void) ASSERT_NS(tab->tab_dic.dic_rec_size == tab->tab_dic.dic_rec_size);
1694
if (address < tab->tab_data_eof) {
1695
/* Fast track to read (nothing in the write buffer): */
1696
ASSERT_NS(address + (off_t) tab->tab_dic.dic_rec_size <= tab->tab_data_eof);
1697
return xt_rc_read_record(ot, address, size, buffer);
1700
/* Loading the buffer will include part of the write buffer: */
1701
xt_rwlock_rdlock(&tab->tab_buf_rwlock);
1702
/* Get the write buffer part first: */
1703
if (address < tab->tab_data_eof) {
1704
/* May have changed between check and lock... */
1705
xt_rwlock_unlock(&tab->tab_buf_rwlock);
1706
return xt_rc_read_record(ot, address, size, buffer);
1708
ASSERT_NS(address + tab->tab_dic.dic_rec_size <= tab->tab_data_eof + tab->tab_buf_offset);
1710
boff = (size_t) (address - tab->tab_data_eof);
1711
if (boff + size > tab->tab_buf_offset)
1712
/* Return an empty buffer if no record at this offset! */
1713
memset(buffer, 0, size);
1715
memcpy(buffer, tab->tab_data_buf + boff, size);
1716
xt_rwlock_unlock(&tab->tab_buf_rwlock);
1720
xtPublic xtBool xt_tab_get_data(register XTOpenTablePtr ot, off_t address, u_int size, xtWord1 *buffer, u_int *red_size)
1722
register XTTableHPtr tab = ot->ot_table;
1727
if (!tab->tab_buf_offset) {
1728
/* Fastest track to read (nothing in the write buffer): */
1729
diff = tab->tab_data_eof - address;
1733
if (!(tfer = (size_t) diff)) {
1737
return xt_register_ferrno(XT_REG_CONTEXT, ESPIPE, xt_file_path(ot->ot_data_file));
1741
if (!xt_rc_read(ot, address, tfer, buffer))
1745
else if (size != tfer)
1746
return xt_register_ferrno(XT_REG_CONTEXT, ESPIPE, xt_file_path(ot->ot_data_file));
1750
if (address + (off_t) size <= tab->tab_data_eof) {
1751
/* Read is completely before the write buffer: */
1752
/* This code reads from the file system cache:
1753
if (!xt_pread_file(ot->ot_data_file, address, size, size, buffer, NULL))
1848
xtPublic xtBool xt_tab_put_rec_data(XTOpenTablePtr ot, xtRecordID rec_id, size_t size, xtWord1 *buffer, xtOpSeqNo *op_seq)
1850
register XTTableHPtr tab = ot->ot_table;
1854
return tab->tab_recs.xt_tc_write(ot->ot_rec_file, rec_id, 0, size, buffer, op_seq);
1857
xtPublic xtBool xt_tab_put_log_op_rec_data(XTOpenTablePtr ot, u_int status, xtRecordID free_rec_id, xtRecordID rec_id, size_t size, xtWord1 *buffer)
1859
register XTTableHPtr tab = ot->ot_table;
1864
if (status == XT_LOG_ENT_REC_MOVED) {
1865
if (!tab->tab_recs.xt_tc_write(ot->ot_rec_file, rec_id, offsetof(XTTabRecExtDRec, re_log_id_2), size, buffer, &op_seq))
1869
if (!tab->tab_recs.xt_tc_write(ot->ot_rec_file, rec_id, 0, size, buffer, &op_seq))
1873
return ot->ot_thread->st_xact_buf.xbuf_modify_table(ot, status, op_seq, free_rec_id, rec_id, size, buffer);
1876
xtPublic xtBool xt_tab_put_log_rec_data(XTOpenTablePtr ot, u_int status, xtRecordID free_rec_id, xtRecordID rec_id, size_t size, xtWord1 *buffer, xtOpSeqNo *op_seq)
1878
register XTTableHPtr tab = ot->ot_table;
1882
if (status == XT_LOG_ENT_REC_MOVED) {
1883
if (!tab->tab_recs.xt_tc_write(ot->ot_rec_file, rec_id, offsetof(XTTabRecExtDRec, re_log_id_2), size, buffer, op_seq))
1887
if (!tab->tab_recs.xt_tc_write(ot->ot_rec_file, rec_id, 0, size, buffer, op_seq))
1891
return ot->ot_thread->st_xact_buf.xbuf_modify_table(ot, status, *op_seq, free_rec_id, rec_id, size, buffer);
1894
xtPublic xtBool xt_tab_get_rec_data(XTOpenTablePtr ot, xtRecordID rec_id, size_t size, xtWord1 *buffer)
1896
register XTTableHPtr tab = ot->ot_table;
1900
return tab->tab_recs.xt_tc_read(ot->ot_rec_file, rec_id, (size_t) size, buffer);
1903
static xtBool tab_wait_for_update(XTThreadPtr thread, register XTOpenTablePtr ot, xtRowID row_id, xtXactID xn_id)
1905
register XTTableHPtr tab = ot->ot_table;
1908
lock_type = tab->tab_locks.xt_release_locks(ot, row_id, &thread->st_lock_list);
1910
/* The variation may be visible, we have to wait for the
1911
* transaction that wrote it to commit!
1756
/* This code reads from row cache: */
1757
if (!xt_rc_read(ot, address, size, buffer))
1764
/* Loading the buffer will include part of the write buffer: */
1765
xt_rwlock_rdlock(&tab->tab_buf_rwlock);
1766
/* Get the write buffer part first: */
1767
if (!tab->tab_buf_offset || address + (off_t) size <= tab->tab_data_eof) {
1768
/* May have changed between check and lock... */
1769
xt_rwlock_unlock(&tab->tab_buf_rwlock);
1773
if (address < tab->tab_data_eof) {
1774
/* Part of the read is in the write buffer (copy this first): */
1775
/* This can happen during a sequential scan, which uses
1776
* a buffer larger than one row in size.
1778
tfer = (size_t) (address + (off_t) size - tab->tab_data_eof);
1779
if (tfer > tab->tab_buf_offset)
1780
tfer = tab->tab_buf_offset;
1781
boff = (size_t) (tab->tab_data_eof - address);
1782
memcpy(buffer + boff, tab->tab_data_buf, tfer);
1783
xt_rwlock_unlock(&tab->tab_buf_rwlock);
1784
if (!xt_rc_read(ot, address, boff, buffer))
1787
*red_size = boff + tfer;
1788
else if (size != boff + tfer)
1789
return xt_register_ferrno(XT_REG_CONTEXT, ESPIPE, xt_file_path(ot->ot_data_file));
1792
/* Complete load from the write buffer: */
1793
tfer = (size_t) (tab->tab_data_eof + (off_t) tab->tab_buf_offset - address);
1797
xt_rwlock_unlock(&tab->tab_buf_rwlock);
1802
boff = (size_t) (address - tab->tab_data_eof);
1803
memcpy(buffer, tab->tab_data_buf + boff, tfer);
1804
xt_rwlock_unlock(&tab->tab_buf_rwlock);
1807
else if (size != tfer)
1808
return xt_register_ferrno(XT_REG_CONTEXT, ESPIPE, xt_file_path(ot->ot_data_file));
1913
if (!xt_xn_wait_for_xact(thread, xn_id, lock_type == XT_TEMP_LOCK))
1917
if (!tab->tab_locks.xt_regain_locks(ot, &lock_type, &xn_id, &thread->st_lock_list))
1919
} while (lock_type);
1814
1924
* Is a record visible?
1815
* Return TRUE, FALSE or XT_ERR
1925
* Returns TRUE, FALSE, XT_ERR, XT_DEL.
1927
* TRUE - The record is visible.
1928
* FALSE - The record is not visible.
1929
* XT_ERR - An exception (error) occurred.
1930
* XT_DEL - The record is not valid (a free or delete record). If an
1931
* index is pointing to this record, then it should be deleted.
1932
* XT_NEW - The most recent variation of this row has been returned
1933
* and is to be used instead of the input!
1935
* Basically, a record is visible if it was committed on or before
1936
* the transactions "visible time" (st_visible_time), and there
1937
* are no other visible records before this record in the
1938
* variation chain for the record.
1940
* This holds in general, but you don't always get to see the
1941
* visible record (as defined in this sence).
1943
* On any kind of update (SELECT FOR UPDATE, UPDATE or DELETE), you
1944
* get to see the most recent variation of the row!
1946
* So on update, this function will wait if necessary for a recent
1947
* update to be committed.
1949
* So an update is a kind of "committed read" with a wait for
1950
* uncommitted records.
1953
* - INSERTS may not seen by the update read, depending on when
1955
* - Records may be returned in non-index order.
1956
* - New records returned must be checked again by an index scan
1957
* to make sure they conform to the condition!
1959
* CREATE TABLE test_tab (ID int primary key, Value int, Name varchar(20),
1960
* index(Value, Name)) ENGINE=pbxt;
1961
* INSERT test_tab values(4, 2, 'D');
1962
* INSERT test_tab values(5, 2, 'E');
1963
* INSERT test_tab values(6, 2, 'F');
1964
* INSERT test_tab values(7, 2, 'G');
1968
* select * from test_tab where id = 6 for update;
1971
* select * from test_tab where value = 2 order by value, name for update;
1973
* update test_tab set Name = 'A' where id = 7;
1976
* Result order D, E, F, A.
1978
* But Jim does it like this, so it should be OK.
1817
static int tab_visible(register XTOpenTablePtr ot, XTTabRecHeadDPtr rec_head)
1980
static int tab_visible(register XTOpenTablePtr ot, XTTabRecHeadDPtr rec_head, xtRecordID *new_rec_id)
1819
XTThreadPtr self = ot->ot_thread;
1982
XTThreadPtr thread = ot->ot_thread;
1820
1984
XTTabRecHeadDRec var_head;
1821
xtWord8 rec_tn_id = 0;
1822
#ifdef XT_REPEATABLE_READ_BLOCKS
1986
xtRecordID var_rec_id;
1987
register XTTableHPtr tab;
1824
1988
xtBool wait = FALSE;
1829
//register XTTableHPtr tab;
1989
xtXactID wait_xn_id = 0;
1830
1990
#ifdef TRACE_VARIATIONS
1831
1991
char t_buf[500];
1997
if (XT_REC_NOT_VALID(rec_head->tr_rec_type_1))
2000
row_id = XT_GET_DISK_4(rec_head->tr_row_id_4);
2002
ASSERT_NS(!ot->ot_curr_row_id || row_id == ot->ot_curr_row_id);
2004
if (ot->ot_curr_row_id && row_id != ot->ot_curr_row_id)
2008
#ifdef TRACE_VARIATIONS
2009
len = sprintf(t_buf, "%s visible: row=%d rec=%d ", thread->t_name, (int) row_id, (int) ot->ot_curr_rec_id);
1835
2011
if (!XT_REC_IS_CLEAN(rec_head->tr_rec_type_1)) {
1836
2012
/* The record is not clean, which means it has not been swept.
1837
2013
* So we have to check if it is visible.
1839
xtBool mine = FALSE;
1841
rec_tn_id = XT_GET_DISK_6(rec_head->tr_xact_id_6);
1842
if (self->st_xact_mode >= XT_XACT_REPEATABLE_READ) {
1843
if (!xt_xn_visible(ot, rec_tn_id, ot->ot_curr_rec, &mine))
1847
#ifdef XT_REPEATABLE_READ_BLOCKS
1848
if (!xt_xn_may_commit(ot, rec_tn_id, ot->ot_curr_rec, &mine, &wait))
1851
wait_tn_id = rec_tn_id;
1853
if (!xt_xn_committed(ot, rec_tn_id, ot->ot_curr_rec, &mine))
1858
/* This is a record written by this transaction. */
1859
/* Check that it was not written by the current update statement: */
1860
if (mine && self->st_is_update) {
1861
if (XT_STAT_ID_MASK(self->st_update_id) == rec_head->tr_stat_id_1)
2016
xn_id = XT_GET_DISK_4(rec_head->tr_xact_id_4);
2017
switch (xt_xn_status(ot, xn_id, ot->ot_curr_rec_id)) {
2020
case XT_XN_NOT_VISIBLE:
2021
if (ot->ot_for_update) {
2022
/* It is visible, only if it is an insert,
2023
* which means if has no previous variation.
2024
* Note, if an insert is updated, the record
2025
* should be overwritten (TODO - check this).
2027
var_rec_id = XT_GET_DISK_4(rec_head->tr_prev_rec_id_4);
2030
#ifdef TRACE_VARIATIONS
2032
len += sprintf(t_buf+len, "OTHER COMMIT (OVERWRITTEN) T%d\n", (int) xn_id);
2033
xt_trace("%s", t_buf);
2036
#ifdef TRACE_VARIATIONS
2039
len += sprintf(t_buf+len, "OTHER COMMIT T%d\n", (int) xn_id);
2040
xt_trace("%s", t_buf);
2045
#ifdef TRACE_VARIATIONS
2047
len += sprintf(t_buf+len, "ABORTED T%d\n", (int) xn_id);
2048
xt_trace("%s", t_buf);
2051
case XT_XN_MY_UPDATE:
2052
/* This is a record written by this transaction. */
2053
if (thread->st_is_update) {
2054
/* Check that it was not written by the current update statement: */
2055
if (XT_STAT_ID_MASK(thread->st_update_id) == rec_head->tr_stat_id_1) {
2056
#ifdef TRACE_VARIATIONS
2058
len += sprintf(t_buf+len, "MY UPDATE IN THIS STATEMENT T%d\n", (int) xn_id);
2059
xt_trace("%s", t_buf);
2064
ot->ot_curr_row_id = row_id;
2065
ot->ot_curr_updated = TRUE;
2066
if (!(xt_tab_get_row(ot, row_id, &var_rec_id)))
2068
/* It is visible if it is at the front of the list.
2069
* An update can end up not being at the front of the list
2070
* if it is deleted afterwards!
2072
#ifdef TRACE_VARIATIONS
2074
if (var_rec_id == ot->ot_curr_rec_id)
2075
len += sprintf(t_buf+len, "MY UPDATE T%d\n", (int) xn_id);
2077
len += sprintf(t_buf+len, "MY UPDATE (OVERWRITTEN) T%d\n", (int) xn_id);
2079
xt_trace("%s", t_buf);
2081
return var_rec_id == ot->ot_curr_rec_id;
2082
case XT_XN_OTHER_UPDATE:
2083
if (ot->ot_for_update) {
2084
/* If this is an insert, we are interested!
2085
* Updated values are handled below. This is because
2086
* the changed (new) records returned below are always
2087
* followed (in the version chain) by the record
2088
* we would have returned (if nothing had changed).
2090
* As a result, we only return records here which have
2093
var_rec_id = XT_GET_DISK_4(rec_head->tr_prev_rec_id_4);
2095
#ifdef TRACE_VARIATIONS
2097
len += sprintf(t_buf+len, "OTHER INSERT (WAIT FOR) T%d\n", (int) xn_id);
2098
xt_trace("%s", t_buf);
2100
if (!tab_wait_for_update(thread, ot, row_id, xn_id))
2102
if (!xt_tab_get_rec_data(ot, ot->ot_curr_rec_id, sizeof(XTTabRecHeadDRec), (xtWord1 *) &var_head))
2104
rec_head = &var_head;
2108
#ifdef TRACE_VARIATIONS
2110
len += sprintf(t_buf+len, "OTHER UPDATE T%d\n", (int) xn_id);
2111
xt_trace("%s", t_buf);
1868
2119
* it is not visible at all. If it in not found on the
1869
2120
* variation chain, it is also not visible.
1871
//tab = ot->ot_table;
1872
row_id = XT_GET_DISK_4(rec_head->tr_row_id_4);
1874
/* Not required because records that may have been read by this
1875
* transactions are not free until this transaction
1878
//xt_rwlock_rdlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
1880
#ifdef XT_REPEATABLE_READ_BLOCKS
1883
if (!(xt_tab_get_row(ot, row_id, &variation)))
2125
if (ot->ot_for_update)
2126
xt_rwlock_wrlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
2128
xt_rwlock_rdlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
2130
if (!(xt_tab_get_row(ot, row_id, &var_rec_id)))
1885
2132
#ifdef TRACE_VARIATIONS
1886
len = sprintf(t_buf, "vis row=%d", (int) row_id);
2133
len += sprintf(t_buf+len, "vis row=%d", (int) row_id);
1888
while (variation != ot->ot_curr_rec) {
2135
while (var_rec_id != ot->ot_curr_rec_id) {
1890
2137
goto not_found;
2138
if (!xt_tab_get_rec_data(ot, var_rec_id, sizeof(XTTabRecHeadDRec), (xtWord1 *) &var_head))
1891
2140
#ifdef TRACE_VARIATIONS
1892
2141
if (len <= 450)
1893
len += sprintf(t_buf+len, " -> %d", (int) variation);
2142
len += sprintf(t_buf+len, " -> %d(%d)", (int) var_rec_id, (int) var_head.tr_rec_type_1);
1895
if (!xt_tab_get_record(ot, variation, sizeof(XTTabRecHeadDRec), (xtWord1 *) &var_head))
1897
2144
/* All clean records are visible, by all transactions: */
1898
2145
if (XT_REC_IS_CLEAN(var_head.tr_rec_type_1))
1899
2146
goto not_found;
1900
if (!XT_TAB_IS_DELETED(var_head.tr_rec_type_1)) {
1901
tn_id = XT_GET_DISK_6(var_head.tr_xact_id_6);
1902
if (self->st_xact_mode >= XT_XACT_REPEATABLE_READ) {
1903
/* XT_XACT_REPEATABLE_READ & XT_XACT_SERIALIZABLE */
1904
if (xt_xn_visible(ot, tn_id, variation, NULL)) {
1905
/* This variation is visible, i.e committed before this
1906
* transaction started, or updated by this transaction.
1908
* We now know that this is the valid variation for
1909
* this record (for this table) for this transaction!
1910
* This will not change, unless the transaction
1911
* updates the record (again).
1913
* So we can store this information as a hint, if
1914
* we see other variations belonging to this record,
1915
* then we can ignore them immediately!
1921
#ifdef XT_REPEATABLE_READ_BLOCKS
1922
/* XT_XACT_UNCOMMITTED_READ & XT_XACT_COMMITTED_READ
1923
* If this record is not committed, then we will have to wait for
1924
* it because, in COMMITTED READ mode, we see committed
1925
* records immediately.
1927
xtBool var_wait = FALSE; /* Assume we won't need to wait. */
1928
if (xt_xn_may_commit(ot, tn_id, variation, NULL, &var_wait)) {
1929
/* This variation is not aborted, i.e. committed or
1930
* not yet committed (uncommitted).
1933
/* The record was committed, or updated by this transaction. */
1935
/* If the variation is not yet committed and was
1936
* updated by some other transaction.
1937
* So we must wait for it (the variation we are reading would
1938
* be valid if this uncommitted variation was not
1941
* If it would not be valid anyway (because there
1942
* is something in the chain before it), then
1943
* we need not wait for this variation to commit or
1944
* abort. So we continue in this loop.
1946
* NOTE: There should only be one uncommitted variation in
2147
if (XT_REC_IS_FREE(var_head.tr_rec_type_1))
2148
/* Should not happen! */
2150
xn_id = XT_GET_DISK_4(var_head.tr_xact_id_4);
2151
/* This variation is visibleif committed before this
2152
* transaction started, or updated by this transaction.
2154
* We now know that this is the valid variation for
2155
* this record (for this table) for this transaction!
2156
* This will not change, unless the transaction
2157
* updates the record (again).
2159
* So we can store this information as a hint, if
2160
* we see other variations belonging to this record,
2161
* then we can ignore them immediately!
2163
switch (xt_xn_status(ot, xn_id, var_rec_id)) {
2166
case XT_XN_NOT_VISIBLE:
2167
if (ot->ot_for_update) {
2168
/* Substitute this record for the one we
2171
if (result == TRUE) {
2172
if (XT_REC_IS_DELETE(var_head.tr_rec_type_1))
2175
*new_rec_id = var_rec_id;
1955
if (xt_xn_committed(ot, tn_id, variation, NULL)) {
2182
/* Ignore the record, it will be removed. */
2184
case XT_XN_MY_UPDATE:
2186
case XT_XN_OTHER_UPDATE:
2187
/* Wait for this update to commit or abort: */
2192
#ifdef TRACE_VARIATIONS
2194
len += sprintf(t_buf+len, "-T%d", (int) wait_xn_id);
1961
variation = XT_GET_DISK_6(var_head.tr_prev_var_6);
2198
var_rec_id = XT_GET_DISK_4(var_head.tr_prev_rec_id_4);
1963
2200
#ifdef TRACE_VARIATIONS
1964
2201
if (len <= 450)
1965
sprintf(t_buf+len, " -> %d\n", (int) variation);
2202
sprintf(t_buf+len, " -> %d(%d)\n", (int) var_rec_id, (int) rec_head->tr_rec_type_1);
1967
sprintf(t_buf+len, " ...\n", (int) variation);
1970
//xt_rwlock_unlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
1971
#ifdef XT_REPEATABLE_READ_BLOCKS
1973
/* The variation may be visible, we have to wait for the
1974
* transaction that wrote it to commit!
1976
if (!xt_xn_wait_for_xact(self, wait_tn_id))
1978
wait_tn_id = XT_GET_DISK_6(rec_head->tr_xact_id_6);
1983
#ifdef TRACE_VARIATIONS
2204
sprintf(t_buf+len, " ...\n");
1984
2205
xt_trace("%s", t_buf);
2208
if (ot->ot_for_update) {
2212
#ifdef TRACE_VARIATIONS
2213
xt_trace("%s %d WAIT FOR %d\n", thread->t_name, (int) thread->st_xact_data->xd_start_xn_id, (int) wait_xn_id);
2215
xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
2216
if (!tab_wait_for_update(thread, ot, row_id, wait_xn_id))
2222
lock_type = tab->tab_locks.xt_set_temp_lock(ot, row_id, &xn_id, &thread->st_lock_list);
2224
#ifdef TRACE_VARIATIONS
2225
xt_trace("%s T%d WAIT FOR LOCK(%D) T%d\n", thread->t_name, (int) thread->st_xact_data->xd_start_xn_id, (int) lock_type, (int) xn_id);
2227
xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
2229
if (!xt_xn_wait_for_xact(thread, xn_id, lock_type == XT_TEMP_LOCK))
2231
lock_type = tab->tab_locks.xt_is_locked(ot, row_id, &xn_id);
2234
#ifdef TRACE_VARIATIONS
2235
len = sprintf(t_buf, "%s visible (retry): row=%d rec=%d ", thread->t_name, (int) row_id, (int) ot->ot_curr_rec_id);
2238
* Reset the result before we go down the list again, to make sure we
2239
* get the latest record!!
2245
#ifdef TRACE_VARIATIONS
2246
if (result == XT_NEW)
2247
xt_trace("%s RETURN NEW %d\n", thread->t_name, (int) *new_rec_id);
2249
xt_trace("%s RETURN NOT VISIBLE (NEW)\n", thread->t_name);
2251
xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
1986
2253
ot->ot_curr_row_id = row_id;
1987
ot->ot_curr_updated = (rec_tn_id == self->st_xact_data->xd_start_id);
2254
ot->ot_curr_updated = FALSE;
1991
//xt_rwlock_unlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
2258
xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
1995
//xt_rwlock_unlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
2262
xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
2000
2267
* Return TRUE if the record has been read, and is visible.
2001
2268
* Return FALSE if the record is not visible.
2002
2269
* Return XT_ERR if an error occurs.
2003
* REturn XT_DEL if the record has been deleted.
2270
* REturn XT_DEL if the record is not valid (freed or is a delete record).
2005
2272
xtPublic int xt_tab_visible(XTOpenTablePtr ot)
2007
XTTabRecHeadDRec rec_head;
2009
if (!xt_tab_get_record(ot, ot->ot_curr_rec, sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head))
2275
XTTabRecHeadDRec rec_head;
2276
xtRecordID new_rec_id;
2279
if ((row_id = ot->ot_curr_row_id)) {
2280
/* Fast track, do a quick check.
2281
* Row ID is only set if this record has been committed.
2282
* Check if it is the first on the list!
2284
xtRecordID var_rec_id;
2287
if (!(xt_tab_get_row(ot, row_id, &var_rec_id)))
2289
if (ot->ot_curr_rec_id == var_rec_id) {
2291
if (ot->ot_for_update) {
2292
XTThreadPtr thread = ot->ot_thread;
2295
XTTableHPtr tab = ot->ot_table;
2297
lock_type = tab->tab_locks.xt_set_temp_lock(ot, row_id, &xn_id, &thread->st_lock_list);
2299
xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
2301
if (!xt_xn_wait_for_xact(thread, xn_id, lock_type == XT_TEMP_LOCK))
2303
lock_type = tab->tab_locks.xt_is_locked(ot, row_id, &xn_id);
2313
if (!xt_tab_get_rec_data(ot, ot->ot_curr_rec_id, sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head))
2011
if (XT_REC_NOT_VISIBLE(rec_head.tr_rec_type_1))
2013
return tab_visible(ot, &rec_head);
2316
if ((r = tab_visible(ot, &rec_head, &new_rec_id)) == XT_NEW)
2317
ot->ot_curr_rec_id = new_rec_id;
2017
* Return TRUE if the record has been read, and is visible.
2018
* Return FALSE if the record is not visible.
2019
* Return XT_ERR if an error occurs.
2322
* Read a record, and return one of the following:
2323
* TRUE - the record has been read, and is visible.
2324
* FALSE - the record is not visible.
2325
* XT_ERR - an error occurs.
2326
* XT_DEL - The record is invalid, of an index references this
2327
* record, it should be deleted.
2328
* XT_NEW - Means the expected record has been changed.
2329
* When doing an index scan, the conditions must be checked again!
2021
2331
xtPublic int xt_tab_read_record(register XTOpenTablePtr ot, xtWord1 *buffer)
2023
2333
register XTTableHPtr tab = ot->ot_table;
2024
2334
size_t rec_size = tab->tab_dic.dic_rec_size;
2335
xtRecordID new_rec_id;
2026
2338
if (!(ot->ot_thread->st_xact_data)) {
2027
2339
xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_TRANSACTION);
2030
if (!xt_tab_get_record(ot, ot->ot_curr_rec, rec_size, ot->ot_row_rbuffer))
2343
if (!xt_tab_get_rec_data(ot, ot->ot_curr_rec_id, rec_size, ot->ot_row_rbuffer))
2032
if (XT_REC_NOT_VISIBLE(ot->ot_row_rbuffer[0]))
2035
switch (tab_visible(ot, (XTTabRecHeadDPtr) ot->ot_row_rbuffer)) {
2346
ASSERT_NS(!XT_REC_NOT_VALID(*ot->ot_row_rbuffer));
2347
switch (tab_visible(ot, (XTTabRecHeadDPtr) ot->ot_row_rbuffer, &new_rec_id)) {
2042
if (ot->ot_rec_fixed)
2043
memcpy(buffer, ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE, rec_size - XT_REC_FIX_HEADER_SIZE);
2044
else if (ot->ot_row_rbuffer[0] == XT_TAB_STATUS_VARIABLE || ot->ot_row_rbuffer[0] == XT_TAB_STATUS_VAR_CLEAN) {
2045
if (!myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE, buffer))
2049
if (!tab_load_ext_data(ot, ot->ot_curr_rec, buffer))
2355
if (!xt_tab_get_rec_data(ot, new_rec_id, rec_size, ot->ot_row_rbuffer))
2357
ot->ot_curr_rec_id = new_rec_id;
2367
if (ot->ot_rec_fixed)
2368
memcpy(buffer, ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE, rec_size - XT_REC_FIX_HEADER_SIZE);
2369
else if (ot->ot_row_rbuffer[0] == XT_TAB_STATUS_VARIABLE || ot->ot_row_rbuffer[0] == XT_TAB_STATUS_VAR_CLEAN) {
2370
if (!myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE, buffer, ot->ot_cols_req))
2374
u_int cols_req = ot->ot_cols_req;
2376
ASSERT_NS(cols_req);
2377
if (cols_req && cols_req <= tab->tab_dic.dic_fix_col_count) {
2378
if (!myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_EXT_HEADER_SIZE, buffer, cols_req))
2382
if (!xt_tab_load_ext_data(ot, ot->ot_curr_rec_id, buffer, cols_req))
2393
* TRUE/OK - record was read.
2394
* FALSE/FAILED - An error occurred.
2395
* XT_DEL - Record deleted.
2397
xtPublic int xt_tab_dirty_read_record(register XTOpenTablePtr ot, xtWord1 *buffer)
2399
register XTTableHPtr tab = ot->ot_table;
2400
size_t rec_size = tab->tab_dic.dic_rec_size;
2402
if (!xt_tab_get_rec_data(ot, ot->ot_curr_rec_id, rec_size, ot->ot_row_rbuffer))
2405
if (XT_REC_NOT_VALID(ot->ot_row_rbuffer[0]))
2406
/* Should not happen! */
2409
ot->ot_curr_row_id = XT_GET_DISK_4(((XTTabRecHeadDPtr) ot->ot_row_rbuffer)->tr_row_id_4);
2410
ot->ot_curr_updated =
2411
(XT_GET_DISK_4(((XTTabRecHeadDPtr) ot->ot_row_rbuffer)->tr_xact_id_4) == ot->ot_thread->st_xact_data->xd_start_xn_id);
2413
if (ot->ot_rec_fixed)
2414
memcpy(buffer, ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE, rec_size - XT_REC_FIX_HEADER_SIZE);
2415
else if (ot->ot_row_rbuffer[0] == XT_TAB_STATUS_VARIABLE || ot->ot_row_rbuffer[0] == XT_TAB_STATUS_VAR_CLEAN) {
2416
if (!myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE, buffer, ot->ot_cols_req))
2420
u_int cols_req = ot->ot_cols_req;
2422
ASSERT_NS(cols_req);
2423
if (cols_req && cols_req <= tab->tab_dic.dic_fix_col_count) {
2424
if (!myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_EXT_HEADER_SIZE, buffer, cols_req))
2428
if (!xt_tab_load_ext_data(ot, ot->ot_curr_rec_id, buffer, cols_req))
2056
xtPublic xtBool xt_tab_load_record(register XTOpenTablePtr ot, off_t address, XTInfoBufferPtr rec_buf)
2436
xtPublic void xt_tab_load_row_pointers(XTThreadPtr self, XTOpenTablePtr ot)
2438
XTTableHPtr tab = ot->ot_table;
2439
off_t eof = xt_seek_eof_file(self, ot->ot_row_file);
2442
/* Check if there is enough cache: */
2443
usage = xt_tc_get_usage();
2444
if (xt_tc_get_high() > usage)
2445
usage = xt_tc_get_high();
2446
if (usage + eof < xt_tc_get_size()) {
2448
xtRecordID eof_rec_id;
2450
XTTabCachePagePtr page;
2452
eof_rec_id = xt_row_offset_row_id(tab, eof);
2454
while (rec_id < eof_rec_id) {
2455
if (!(page = tab->tab_rows.xt_tc_lock_page(ot->ot_row_file, rec_id, &offset)))
2457
tab->tab_rows.xt_tc_unlock_page(page);
2458
rec_id += tab->tab_rows.tci_rows_per_page;
2463
xtPublic xtBool xt_tab_load_record(register XTOpenTablePtr ot, xtRecordID rec_id, XTInfoBufferPtr rec_buf)
2058
2465
register XTTableHPtr tab = ot->ot_table;
2059
2466
size_t rec_size = tab->tab_dic.dic_rec_size;
2061
if (!xt_tab_get_record(ot, address, rec_size, ot->ot_row_rbuffer))
2468
if (!xt_tab_get_rec_data(ot, rec_id, rec_size, ot->ot_row_rbuffer))
2064
if (XT_REC_NOT_VISIBLE(ot->ot_row_rbuffer[0])) {
2471
if (XT_REC_NOT_VALID(ot->ot_row_rbuffer[0])) {
2065
2472
/* Should not happen! */
2066
2473
XTThreadPtr self = ot->ot_thread;
2079
2490
if (!xt_ib_alloc(NULL, rec_buf, tab->tab_dic.dic_buf_size))
2081
2492
if (ot->ot_row_rbuffer[0] == XT_TAB_STATUS_VARIABLE || ot->ot_row_rbuffer[0] == XT_TAB_STATUS_VAR_CLEAN) {
2082
if (!myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE, rec_buf->ib_db.db_data))
2493
if (!myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE, rec_buf->ib_db.db_data, ot->ot_cols_req))
2086
if (!tab_load_ext_data(ot, ot->ot_curr_rec, rec_buf->ib_db.db_data))
2497
u_int cols_req = ot->ot_cols_req;
2499
ASSERT_NS(cols_req);
2500
if (cols_req && cols_req <= tab->tab_dic.dic_fix_col_count) {
2501
if (!myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_EXT_HEADER_SIZE, rec_buf->ib_db.db_data, cols_req))
2505
if (!xt_tab_load_ext_data(ot, ot->ot_curr_rec_id, rec_buf->ib_db.db_data, cols_req))
2094
xtPublic xtBool xt_tab_free_row(XTOpenTablePtr ot, XTTableHPtr tab, xtWord4 row_id)
2514
xtPublic xtBool xt_tab_free_row(XTOpenTablePtr ot, XTTableHPtr tab, xtRowID row_id)
2096
2516
XTTabRowRefDRec free_row;
2097
off_t offset = (off_t) row_id << XT_TAB_ROW_SHIFTS;
2099
2520
ASSERT_NS(row_id); // Cannot free the header!
2100
free_row.rr_rec_type_1 = XT_TAB_ROW_FREE;
2101
free_row.rr_unused_1 = 0;
2103
xt_mutex_lock(&tab->tab_row_lock);
2104
XT_SET_DISK_6(free_row.rr_variation_6, tab->tab_row_free);
2105
if (!xt_dc_write(ot->ot_row_file, offset, sizeof(XTTabRowRefDRec), (xtWord1 *) &free_row)) {
2106
xt_mutex_unlock(&tab->tab_row_lock);
2522
xt_lock_mutex_ns(&tab->tab_row_lock);
2523
prev_row = tab->tab_row_free_id;
2524
XT_SET_DISK_4(free_row.rr_ref_id_4, prev_row);
2525
if (!tab->tab_rows.xt_tc_write(ot->ot_row_file, row_id, 0, sizeof(XTTabRowRefDRec), (xtWord1 *) &free_row, &op_seq)) {
2526
xt_unlock_mutex_ns(&tab->tab_row_lock);
2109
tab->tab_row_free = offset;
2529
tab->tab_row_free_id = row_id;
2110
2530
tab->tab_row_fnum++;
2111
tab->tab_head_dirty = TRUE;
2112
xt_mutex_unlock(&tab->tab_row_lock);
2531
xt_unlock_mutex_ns(&tab->tab_row_lock);
2533
if (!ot->ot_thread->st_xact_buf.xbuf_modify_table(ot, XT_LOG_ENT_ROW_FREED, op_seq, 0, row_id, sizeof(XTTabRowRefDRec), (xtWord1 *) &free_row))
2116
static void tab_free_extended_record(XTOpenTablePtr ot, off_t address, XTTabRecExtDPtr ext_rec)
2539
static void tab_free_ext_record_on_fail(XTOpenTablePtr ot, xtRecordID rec_id, XTTabRecExtDPtr ext_rec, xtBool log_err)
2118
xtWord4 log_over_size = XT_GET_DISK_4(ext_rec->re_log_dat_siz_4);
2122
XT_GET_LOG_REF_6(log_id, log_offset, ext_rec->re_log_rec_6);
2123
if (!tab_delete_log_record(ot, address, log_id, log_offset, log_over_size))
2124
xt_log_and_clear_exception_ns();
2541
xtWord4 log_over_size = XT_GET_DISK_4(ext_rec->re_log_dat_siz_4);
2543
xtLogOffset log_offset;
2545
XT_GET_LOG_REF(log_id, log_offset, ext_rec);
2547
if (!ot->ot_thread->st_dlog_buf.dlb_delete_log(log_id, log_offset, log_over_size, ot->ot_table->tab_id, rec_id)) {
2549
xt_log_and_clear_exception_ns();
2127
2553
static void tab_save_exception(XTExceptionPtr e)
2218
2682
if (clean_delete)
2219
2683
ot->ot_row_rbuffer[0] |= XT_TAB_STATUS_CLEANED_BIT;
2220
2684
case XT_TAB_STATUS_DEL_CLEAN:
2225
if (ot->ot_row_rbuffer[0] == XT_TAB_STATUS_EXTENDED || ot->ot_row_rbuffer[0] == XT_TAB_STATUS_EXT_CLEAN)
2226
tab_free_extended_record(ot, address, (XTTabRecExtDPtr) ot->ot_row_rbuffer);
2229
ot->ot_row_rbuffer[0] |= XT_TAB_STATUS_REMOVED_BIT;
2230
if (!xt_tab_put_data(ot, address + offsetof(XTTabRecHeadDRec, tr_rec_type_1), 1, ot->ot_row_rbuffer))
2690
if (XT_REC_IS_EXT_DLOG(ot->ot_row_rbuffer[0])) {
2691
/* [(1)] Lock, and read again to make sure that the
2692
* compactor does not change this record, while
2693
* we are removing it! */
2694
xt_lock_mutex_ns(&tab->tab_db->db_co_ext_lock);
2696
if (!xt_tab_free_record(ot, XT_LOG_ENT_REC_REMOVED_EXT, rec_id, clean_delete)) {
2697
xt_unlock_mutex_ns(&tab->tab_db->db_co_ext_lock);
2701
xt_unlock_mutex_ns(&tab->tab_db->db_co_ext_lock);
2705
if (XT_REC_IS_DELETE(ot->ot_row_rbuffer[0])) {
2706
/* No attached resources, does not need to be removed! */
2707
if (!xt_tab_free_record(ot, XT_LOG_ENT_REC_FREED, rec_id, clean_delete))
2711
if (!xt_tab_free_record(ot, XT_LOG_ENT_REC_REMOVED, rec_id, clean_delete))
2236
static xtWord4 tab_new_row(XTOpenTablePtr ot, XTTableHPtr tab)
2717
static xtRowID tab_new_row(XTOpenTablePtr ot, XTTableHPtr tab)
2239
2720
XTTabRowRefDRec row_buf;
2242
xt_mutex_lock(&tab->tab_row_lock);
2243
//D op_seq = tab->tab_seq.ts_get_op_seq();
2244
if ((row_id = tab->tab_row_free)) {
2246
if (!xt_dc_read(ot->ot_row_file, row_id, sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf)) {
2247
xt_mutex_unlock(&tab->tab_row_lock);
2722
xtRowID next_row_id = 0;
2725
xt_lock_mutex_ns(&tab->tab_row_lock);
2726
if ((row_id = tab->tab_row_free_id)) {
2727
status = XT_LOG_ENT_ROW_NEW_FL;
2729
if (!tab->tab_rows.xt_tc_read_4(ot->ot_row_file, row_id, (xtWord4 *) &row_buf)) {
2730
xt_unlock_mutex_ns(&tab->tab_row_lock);
2250
tab->tab_row_free = XT_GET_DISK_6(row_buf.rr_variation_6);
2733
next_row_id = XT_GET_DISK_4(row_buf.rr_ref_id_4);
2734
tab->tab_row_free_id = next_row_id;
2251
2735
tab->tab_row_fnum--;
2254
row_id = tab->tab_row_eof;
2255
tab->tab_row_eof += sizeof(XTTabRowRefDRec);
2738
status = XT_LOG_ENT_ROW_NEW;
2739
row_id = tab->tab_row_eof_id;
2740
if (row_id == 0xFFFFFFFF) {
2741
xt_unlock_mutex_ns(&tab->tab_row_lock);
2742
xt_register_xterr(XT_REG_CONTEXT, XT_ERR_MAX_ROW_COUNT);
2745
tab->tab_row_eof_id++;
2257
tab->tab_head_dirty = TRUE;
2258
xt_mutex_unlock(&tab->tab_row_lock);
2261
//D if (!ot->ot_thread.alloc_rec_from_free(&tab->tab_db->db_xlog, op_seq, xtWord8 rec, xtWord8 next_rec))
2265
//D xtBool alloc_rec_from_eof(XTDatabaseLogPtr log, xtWord4 op_seq, xtWord8 rec);
2268
XT_DISABLED_TRACE(("new row tx=%d row=%d\n", (int) ot->ot_thread->st_xact_data->xd_start_id, (int) (row_id >> XT_TAB_ROW_SHIFTS)));
2269
ASSERT_NS(row_id >> XT_TAB_ROW_SHIFTS);
2270
return (xtWord4) (row_id >> XT_TAB_ROW_SHIFTS);
2747
op_seq = tab->tab_seq.ts_get_op_seq();
2748
xt_unlock_mutex_ns(&tab->tab_row_lock);
2750
if (!ot->ot_thread->st_xact_buf.xbuf_modify_table(ot, status, op_seq, next_row_id, row_id, 0, NULL))
2753
XT_DISABLED_TRACE(("new row tx=%d row=%d\n", (int) ot->ot_thread->st_xact_data->xd_start_xn_id, (int) row_id));
2273
xtPublic xtBool xt_tab_get_row(register XTOpenTablePtr ot, xtWord4 row_id, off_t *variation)
2758
xtPublic xtBool xt_tab_get_row(register XTOpenTablePtr ot, xtRowID row_id, xtRecordID *var_rec_id)
2276
XTTabRowRefDRec row_buf;
2280
(void) ASSERT_NS(sizeof(XTTabRowRefDRec) == 8);
2281
if (!xt_dc_read_8(ot->ot_row_file, (off_t) row_id << XT_TAB_ROW_SHIFTS, &x.buff_8))
2760
register XTTableHPtr tab = ot->ot_table;
2761
XTTabRowRefDRec row_buf;
2763
(void) ASSERT_NS(sizeof(XTTabRowRefDRec) == 4);
2765
if (!tab->tab_rows.xt_tc_read_4(ot->ot_row_file, row_id, (xtWord4 *) &row_buf))
2283
if (x.row_buf.rr_rec_type_1 == XT_TAB_ROW_FREE)
2286
*variation = XT_GET_DISK_6(x.row_buf.rr_variation_6);
2767
*var_rec_id = (xtRecordID) XT_GET_DISK_4(row_buf.rr_ref_id_4);
2290
xtPublic xtBool xt_tab_set_row(XTOpenTablePtr ot, xtWord4 row_id, off_t variation, xtBool write_thru)
2771
xtPublic xtBool xt_tab_set_row(XTOpenTablePtr ot, u_int status, xtRowID row_id, xtRecordID var_rec_id)
2292
XTTabRowRefDRec row_buf;
2294
row_buf.rr_rec_type_1 = XT_TAB_ROW_IN_USE;
2295
XT_SET_DISK_6(row_buf.rr_variation_6, variation);
2296
row_buf.rr_unused_1 = 0;
2298
return xt_dc_write_thru(ot->ot_row_file, (off_t) row_id << XT_TAB_ROW_SHIFTS, sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf);
2299
return xt_dc_write(ot->ot_row_file, (off_t) row_id << XT_TAB_ROW_SHIFTS, sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf);
2773
register XTTableHPtr tab = ot->ot_table;
2774
XTTabRowRefDRec row_buf;
2777
ASSERT_NS(var_rec_id < tab->tab_rec_eof_id);
2778
XT_SET_DISK_4(row_buf.rr_ref_id_4, var_rec_id);
2780
if (!tab->tab_rows.xt_tc_write(ot->ot_row_file, row_id, 0, sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf, &op_seq))
2783
return ot->ot_thread->st_xact_buf.xbuf_modify_table(ot, status, op_seq, 0, row_id, sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf);
2302
xtPublic xtBool xt_tab_free_record(XTOpenTablePtr ot, off_t address)
2786
xtPublic xtBool xt_tab_free_record(XTOpenTablePtr ot, u_int status, xtRecordID rec_id, xtBool clean_delete)
2304
2788
register XTTableHPtr tab = ot->ot_table;
2305
XTTabRecFreeDRec rec_buf;
2789
XTTabRecHeadDRec rec_head;
2790
XTactFreeRecEntryDRec free_rec;
2791
xtRecordID prev_rec_id;
2307
2793
/* Don't free the record if it is already free! */
2308
if (!xt_tab_get_record(ot, address, 1, (xtWord1 *) &rec_buf))
2794
if (!xt_tab_get_rec_data(ot, rec_id, sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head))
2311
if (rec_buf.tr_rec_type_1 != XT_TAB_STATUS_FREED) {
2312
rec_buf.tr_rec_type_1 = XT_TAB_STATUS_FREED;
2313
rec_buf.tr_stat_id_1 = 0;
2315
xt_mutex_lock(&tab->tab_free_lock);
2316
XT_SET_DISK_6(rec_buf.rf_next_block_6, tab->tab_data_free);
2317
if (!xt_tab_put_data(ot, address, sizeof(XTTabRecFreeDRec), (xtWord1 *) &rec_buf)) {
2318
xt_mutex_unlock(&tab->tab_free_lock);
2797
if (!XT_REC_IS_FREE(rec_head.tr_rec_type_1)) {
2800
/* This information will be used to determine if the resources of the record
2801
* should be removed.
2803
free_rec.fr_stat_id_1 = rec_head.tr_stat_id_1;
2804
XT_COPY_DISK_4(free_rec.fr_xact_id_4, rec_head.tr_xact_id_4);
2806
/* A record is "clean" deleted if the record was
2807
* XT_TAB_STATUS_DELETE which was comitted.
2808
* This makes sure that the record will still invalidate
2809
* following records in a row.
2813
* 1. INSERT A ROW, then DELETE it, assume the sweeper is delayed.
2815
* We now have the sequence row X --> del rec A --> valid rec B.
2817
* 2. A SELECT can still find B. Assume it now goes to check
2818
* if the record is valid, ti reads row X, and gets A.
2820
* 3. Now the sweeper gets control and removes X, A and B.
2821
* It frees A with the clean bit.
2823
* 4. Now the SELECT gets control and reads A. Normally a freed record
2824
* would be ignored, and it would go onto B, which would then
2825
* be considered valid (note, even after the free, the next
2826
* pointer is not affected).
2828
* However, because the clean bit has been set, it will stop at A
2829
* and consider B invalid (which is the desired result).
2831
* NOTE: We assume it is not possible for A to be allocated and refer
2832
* to B, because B is freed before A. This means that B may refer to
2833
* A after the next allocation.
2836
ASSERT_NS(sizeof(XTTabRecFreeDRec) == sizeof(XTactFreeRecEntryDRec) - offsetof(XTactFreeRecEntryDRec, fr_rec_type_1));
2837
free_rec.fr_rec_type_1 = XT_TAB_STATUS_FREED | (clean_delete ? XT_TAB_STATUS_CLEANED_BIT : 0);
2838
free_rec.fr_not_used_1 = 0;
2840
xt_lock_mutex_ns(&tab->tab_rec_lock);
2841
prev_rec_id = tab->tab_rec_free_id;
2842
XT_SET_DISK_4(free_rec.fr_next_rec_id_4, prev_rec_id);
2843
if (!xt_tab_put_rec_data(ot, rec_id, sizeof(XTTabRecFreeDRec), &free_rec.fr_rec_type_1, &op_seq)) {
2844
xt_unlock_mutex_ns(&tab->tab_rec_lock);
2321
tab->tab_data_free = address;
2322
ASSERT_NS(tab->tab_data_free < tab->tab_data_eof + tab->tab_buf_offset);
2323
tab->tab_data_fnum++;
2324
tab->tab_head_dirty = TRUE;
2325
xt_mutex_unlock(&tab->tab_free_lock);
2847
tab->tab_rec_free_id = rec_id;
2848
ASSERT_NS(tab->tab_rec_free_id < tab->tab_rec_eof_id);
2849
tab->tab_rec_fnum++;
2850
xt_unlock_mutex_ns(&tab->tab_rec_lock);
2852
if (!ot->ot_thread->st_xact_buf.xbuf_modify_table(ot, status, op_seq, rec_id, rec_id, sizeof(XTactFreeRecEntryDRec) - offsetof(XTactFreeRecEntryDRec, fr_stat_id_1), &free_rec.fr_stat_id_1))
2330
/* Functions that cleanup on failure, preserve the exception. */
2331
static void tab_set_row_on_fail(XTOpenTablePtr ot, xtWord4 row_id, off_t variation, xtBool write_thru)
2335
tab_save_exception(&e);
2336
xt_tab_set_row(ot, row_id, variation, write_thru);
2337
tab_restore_exception(&e);
2340
static void tab_free_row_on_fail(XTOpenTablePtr ot, XTTableHPtr tab, xtWord4 row_id)
2858
static void tab_free_row_on_fail(XTOpenTablePtr ot, XTTableHPtr tab, xtRowID row_id)
2342
2860
XTExceptionRec e;
2346
2864
tab_restore_exception(&e);
2349
static void tab_remove_record_on_fail(XTOpenTablePtr ot, off_t address, XTTabRecHeadDPtr row_ptr, xtWord1 *rec_data, u_int key_count)
2867
static xtBool tab_add_record(XTOpenTablePtr ot, XTTabRecInfoPtr rec_info, u_int status)
2869
register XTTableHPtr tab = ot->ot_table;
2870
XTThreadPtr thread = ot->ot_thread;
2873
xtLogOffset log_offset;
2875
xtRecordID next_rec_id = 0;
2877
if (rec_info->ri_ext_rec) {
2878
/* Determine where the overflow will go... */
2879
if (!thread->st_dlog_buf.dlb_get_log_offset(&log_id, &log_offset, rec_info->ri_log_data_size + offsetof(XTactExtRecEntryDRec, er_data)))
2881
XT_SET_LOG_REF(rec_info->ri_ext_rec, log_id, log_offset);
2884
/* Write the record to disk: */
2885
xt_lock_mutex_ns(&tab->tab_rec_lock);
2886
if ((rec_id = tab->tab_rec_free_id)) {
2887
XTTabRecFreeDRec free_block;
2889
ASSERT_NS(rec_id < tab->tab_rec_eof_id);
2890
if (!xt_tab_get_rec_data(ot, rec_id, sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_block)) {
2891
xt_unlock_mutex_ns(&tab->tab_rec_lock);
2894
next_rec_id = XT_GET_DISK_4(free_block.rf_next_rec_id_4);
2895
tab->tab_rec_free_id = next_rec_id;
2897
tab->tab_rec_fnum--;
2899
/* XT_LOG_ENT_UPDATE --> XT_LOG_ENT_UPDATE_FL */
2900
/* XT_LOG_ENT_INSERT --> XT_LOG_ENT_INSERT_FL */
2901
/* XT_LOG_ENT_DELETE --> XT_LOG_ENT_DELETE_FL */
2905
rec_id = tab->tab_rec_eof_id;
2906
tab->tab_rec_eof_id++;
2908
if (!xt_tab_put_rec_data(ot, rec_id, rec_info->ri_rec_buf_size, (xtWord1 *) rec_info->ri_fix_rec_buf, &op_seq)) {
2909
xt_unlock_mutex_ns(&tab->tab_rec_lock);
2912
xt_unlock_mutex_ns(&tab->tab_rec_lock);
2914
if (!thread->st_xact_buf.xbuf_modify_table(ot, status, op_seq, next_rec_id, rec_id, rec_info->ri_rec_buf_size, (xtWord1 *) rec_info->ri_fix_rec_buf))
2917
if (rec_info->ri_ext_rec) {
2918
/* Write the log buffer overflow: */
2919
rec_info->ri_log_buf->er_status_1 = XT_LOG_ENT_EXT_REC_OK;
2920
XT_SET_DISK_4(rec_info->ri_log_buf->er_data_size_4, rec_info->ri_log_data_size);
2921
XT_SET_DISK_4(rec_info->ri_log_buf->er_tab_id_4, tab->tab_id);
2922
XT_SET_DISK_4(rec_info->ri_log_buf->er_rec_id_4, rec_id);
2923
if (!thread->st_dlog_buf.dlb_append_log(log_id, log_offset, offsetof(XTactExtRecEntryDRec, er_data) + rec_info->ri_log_data_size, (xtWord1 *) rec_info->ri_log_buf)) {
2924
/* Failed to write the overflow, free the record allocated above: */
2929
XT_DISABLED_TRACE(("new rec tx=%d val=%d\n", (int) thread->st_xact_data->xd_start_xn_id, (int) rec_id));
2930
rec_info->ri_rec_id = rec_id;
2934
static void tab_delete_record_on_fail(XTOpenTablePtr ot, xtRowID row_id, xtRecordID rec_id, XTTabRecHeadDPtr row_ptr, xtWord1 *rec_data, u_int key_count)
2937
xtBool log_err = TRUE;
2938
XTTabRecInfoRec rec_info;
2353
2940
tab_save_exception(&e);
2942
if (e.e_xt_err == XT_ERR_DUPLICATE_KEY ||
2943
e.e_xt_err == XT_ERR_DUPLICATE_FKEY) {
2944
/* If the error does not cause rollback, then we will ignore the
2945
* error if an error occurs in the UNDO!
2948
tab_restore_exception(&e);
2354
2950
if (key_count) {
2355
2951
XTIndexPtr *ind;
2357
2953
ind = ot->ot_table->tab_dic.dic_keys;
2358
2954
for (u_int i=0; i<key_count; i++, ind++) {
2359
if (!xt_idx_delete(ot, *ind, address, rec_data))
2360
xt_log_and_clear_exception_ns();
2364
if (row_ptr->tr_rec_type_1 == XT_TAB_STATUS_EXTENDED || row_ptr->tr_rec_type_1 == XT_TAB_STATUS_EXT_CLEAN)
2365
tab_free_extended_record(ot, address, (XTTabRecExtDPtr) row_ptr);
2367
if (!xt_tab_free_record(ot, address))
2368
xt_log_and_clear_exception_ns();
2370
tab_restore_exception(&e);
2373
static void tab_free_record_on_fail(XTOpenTablePtr ot, off_t address)
2377
tab_save_exception(&e);
2378
if (!xt_tab_free_record(ot, address))
2379
xt_log_and_clear_exception_ns();
2380
tab_restore_exception(&e);
2383
static xtBool tab_add_record(XTOpenTablePtr ot, XTTabRecInfoPtr rec_info)
2385
register XTTableHPtr tab = ot->ot_table;
2390
if (rec_info->ri_ext_rec) {
2391
/* Determine where the overflow will go... */
2392
if (!xt_dl_get_log_offset(ot, &log_id, &log_offset, rec_info->ri_log_data_size + XT_LOG_REC_HEADER_SIZE))
2394
XT_SET_LOG_REF_6(rec_info->ri_ext_rec->re_log_rec_6, log_id, log_offset);
2397
/* Write the record to disk: */
2398
xt_mutex_lock(&tab->tab_free_lock);
2399
if ((rec_address = tab->tab_data_free)) {
2400
XTTabRecFreeDRec free_block;
2402
if (!xt_tab_get_record(ot, rec_address, sizeof(XTTabRecFreeDRec), (xtWord1 *) &free_block)) {
2403
xt_mutex_unlock(&tab->tab_free_lock);
2406
tab->tab_data_free = XT_GET_DISK_6(free_block.rf_next_block_6);
2407
/* Can happen if, we allocate a record, update the record,
2408
* then crash before we can update the free list pointer.
2411
if (tab->tab_data_free >= tab->tab_data_eof + tab->tab_buf_offset ||
2412
((tab->tab_data_free - sizeof(XTTabDataHeadDRec)) % tab->tab_dic.dic_rec_size) != 0) {
2413
/* We have to drop the free list: */
2414
tab->tab_data_free = 0;
2415
tab->tab_data_fnum = 0;
2416
tab->tab_head_dirty = TRUE;
2420
ASSERT_NS(tab->tab_data_free < tab->tab_data_eof + tab->tab_buf_offset);
2421
tab->tab_data_fnum--;
2422
tab->tab_head_dirty = TRUE;
2423
xt_mutex_unlock(&tab->tab_free_lock);
2425
/* Threads can do this together: */
2426
if (!xt_tab_put_data(ot, rec_address, rec_info->ri_rec_buf_size, (xtWord1 *) rec_info->ri_fix_rec_buf))
2429
goto write_overflow;
2432
xt_mutex_unlock(&tab->tab_free_lock);
2434
/* Fixed length records must always fit into the buffer: */
2435
ASSERT_NS((tab->tab_buf_size % tab->tab_dic.dic_rec_size) == 0);
2436
ASSERT_NS(ot->ot_rec_size == tab->tab_dic.dic_rec_size);
2438
xt_rwlock_wrlock(&tab->tab_buf_rwlock);
2440
rec_address = tab->tab_data_eof + tab->tab_buf_offset;
2441
ASSERT_NS(((rec_address - sizeof(XTTabDataHeadDRec)) % tab->tab_dic.dic_rec_size) == 0);
2443
if (tab->tab_buf_offset + ot->ot_rec_size > tab->tab_buf_size) {
2444
/* Because the buffer size is a multiple of the record size, this must be the case: */
2445
ASSERT_NS(tab->tab_buf_size == tab->tab_buf_offset);
2447
/* TODO: Double buffering will ease this bottleneck: */
2448
if (!xt_pwrite_file(ot->ot_data_file, tab->tab_data_eof, tab->tab_buf_size, tab->tab_data_buf)) {
2449
xt_rwlock_unlock(&tab->tab_buf_rwlock);
2452
tab->tab_data_eof += tab->tab_buf_size;
2453
memcpy(tab->tab_data_buf, ((xtWord1 *) rec_info->ri_fix_rec_buf), rec_info->ri_rec_buf_size);
2454
tab->tab_buf_offset = ot->ot_rec_size;
2457
memcpy(tab->tab_data_buf + tab->tab_buf_offset, rec_info->ri_fix_rec_buf, rec_info->ri_rec_buf_size);
2458
tab->tab_buf_offset += ot->ot_rec_size;
2461
xt_rwlock_unlock(&tab->tab_buf_rwlock);
2464
if (rec_info->ri_ext_rec) {
2465
/* Write the log buffer overflow: */
2466
rec_info->ri_log_buf->lb_status_1 = XT_DL_STATUS_RECORD;
2467
XT_SET_DISK_4(rec_info->ri_log_buf->lb_data_size_4, rec_info->ri_log_data_size);
2468
XT_SET_DISK_6(rec_info->ri_log_buf->lb_record_6, rec_address);
2469
if (!xt_dl_append_log(ot, log_id, log_offset, rec_info->ri_log_data_size + XT_LOG_REC_HEADER_SIZE, rec_info->ri_log_buf)) {
2470
/* Failed to write the overflow, free the record allocated above: */
2471
tab_free_record_on_fail(ot, rec_address);
2476
XT_DISABLED_TRACE(("new rec tx=%d val=%d\n", (int) ot->ot_thread->st_xact_data->xd_start_id, (int) rec_address));
2477
rec_info->ri_rec_address = rec_address;
2955
if (!xt_idx_delete(ot, *ind, rec_id, rec_data)) {
2957
xt_log_and_clear_exception_ns();
2962
if (row_ptr->tr_rec_type_1 == XT_TAB_STATUS_EXT_DLOG || row_ptr->tr_rec_type_1 == XT_TAB_STATUS_EXT_CLEAN)
2963
tab_free_ext_record_on_fail(ot, rec_id, (XTTabRecExtDPtr) row_ptr, log_err);
2965
rec_info.ri_fix_rec_buf = (XTTabRecFixDPtr) ot->ot_row_wbuffer;
2966
rec_info.ri_rec_buf_size = offsetof(XTTabRecFixDRec, rf_data);
2967
rec_info.ri_ext_rec = NULL;
2968
rec_info.ri_fix_rec_buf->tr_rec_type_1 = XT_TAB_STATUS_DELETE;
2969
rec_info.ri_fix_rec_buf->tr_stat_id_1 = 0;
2970
XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_row_id_4, row_id);
2971
XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_prev_rec_id_4, rec_id);
2972
XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_xact_id_4, ot->ot_thread->st_xact_data->xd_start_xn_id);
2974
if (!tab_add_record(ot, &rec_info, XT_LOG_ENT_DELETE))
2977
if (!xt_tab_set_row(ot, XT_LOG_ENT_ROW_ADD_REC, row_id, rec_info.ri_rec_id))
2981
tab_restore_exception(&e);
2986
xt_log_and_clear_exception_ns();
2988
tab_restore_exception(&e);
2576
3108
* variation chain, it is also not visible.
2578
3110
row_id = XT_GET_DISK_4(rec_head.tr_row_id_4);
2579
//xt_rwlock_rdlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
2581
if (!(xt_tab_get_row(ot, row_id, &variation)))
3113
xt_rwlock_rdlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
3115
if (!(xt_tab_get_row(ot, row_id, &var_rec_id)))
2583
#ifdef TRACE_VARIATIONS
2584
//len = sprintf(t_buf, "dup row=%d", (int) row_id);
3117
#ifdef TRACE_VARIATIONS_IN_DUP_CHECK
3118
len = sprintf(t_buf, "dup row=%d", (int) row_id);
2586
while (variation != record) {
3120
while (var_rec_id != rec_id) {
2588
3122
goto not_found;
2589
#ifdef TRACE_VARIATIONS
2591
// len += sprintf(t_buf+len, " -> %d", (int) variation);
3123
#ifdef TRACE_VARIATIONS_IN_DUP_CHECK
3125
len += sprintf(t_buf+len, " -> %d", (int) var_rec_id);
2593
if (!xt_tab_get_record(ot, variation, sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head))
3127
if (!xt_tab_get_rec_data(ot, var_rec_id, sizeof(XTTabRecHeadDRec), (xtWord1 *) &rec_head))
2595
3129
/* All clean records are visible, by all transactions: */
2596
3130
if (XT_REC_IS_CLEAN(rec_head.tr_rec_type_1))
2597
3131
goto not_found;
2598
if (!XT_TAB_IS_DELETED(rec_head.tr_rec_type_1)) {
2599
xtBool var_wait = FALSE;
2601
tn_id = XT_GET_DISK_6(rec_head.tr_xact_id_6);
2602
if (xt_xn_may_commit(ot, tn_id, variation, NULL, &var_wait)) {
2605
/* See comment above.
3132
if (XT_REC_IS_FREE(rec_head.tr_rec_type_1))
3133
/* Should not happen: */
3135
xn_id = XT_GET_DISK_4(rec_head.tr_xact_id_4);
3136
switch (xt_xn_status(ot, xn_id, var_rec_id)) {
3138
case XT_XN_NOT_VISIBLE:
3141
/* Ingore the record, it will be removed. */
3142
#ifdef TRACE_VARIATIONS_IN_DUP_CHECK
3144
len += sprintf(t_buf+len, "(T%d-A)", (int) xn_id);
3147
case XT_XN_MY_UPDATE:
3149
case XT_XN_OTHER_UPDATE:
3150
#ifdef TRACE_VARIATIONS_IN_DUP_CHECK
3152
len += sprintf(t_buf+len, "(T%d-wait)", (int) xn_id);
3154
/* Wait for this update to commit or abort: */
2613
variation = XT_GET_DISK_6(rec_head.tr_prev_var_6);
3161
var_rec_id = XT_GET_DISK_4(rec_head.tr_prev_rec_id_4);
2615
#ifdef TRACE_VARIATIONS
2617
// sprintf(t_buf+len, " -> %d\n", (int) variation);
2619
// sprintf(t_buf+len, " ...\n", (int) variation);
3163
#ifdef TRACE_VARIATIONS_IN_DUP_CHECK
3165
sprintf(t_buf+len, " -> %d(T%d-%s)\n", (int) var_rec_id, (int) rec_xn_id, t_type);
3167
sprintf(t_buf+len, " ...(T%d-%s)\n", (int) rec_xn_id, t_type);
2622
//xt_rwlock_unlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
3170
xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
2624
*out_tn_id = wait_tn_id;
3172
*out_xn_id = wait_xn_id;
2625
3173
return XT_MAYBE;
2627
#ifdef TRACE_VARIATIONS
2628
//xt_trace("%s", t_buf);
3175
#ifdef TRACE_VARIATIONS_IN_DUP_CHECK
3176
xt_trace("%s", t_buf);
2630
3178
if (out_rowid) {
2631
3179
*out_rowid = row_id;
2632
*out_updated = (rec_tn_id == ot->ot_thread->st_xact_data->xd_start_id);
3180
*out_updated = (rec_xn_id == ot->ot_thread->st_xact_data->xd_start_xn_id);
2637
//xt_rwlock_unlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
3185
xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
2641
//xt_rwlock_unlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
3189
xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
3275
/* We cannot remove a change we have made to a row while a transaction
3276
* is running, so we have to undo what we have done by
3277
* overwriting the record we just created with
3280
static xtBool tab_overwrite_record_on_fail(XTOpenTablePtr ot, XTTabRecInfoPtr rec_info, xtWord1 *before_buf, xtWord1 *after_buf, u_int idx_cnt)
3282
register XTTableHPtr tab = ot->ot_table;
3283
XTTabRecHeadDRec prev_rec_head;
3286
XTThreadPtr thread = ot->ot_thread;
3288
xtLogOffset log_offset;
3289
xtRecordID rec_id = rec_info->ri_rec_id;
3291
/* Remove the new extended record: */
3292
if (rec_info->ri_ext_rec)
3293
tab_free_ext_record_on_fail(ot, rec_id, (XTTabRecExtDPtr) rec_info->ri_fix_rec_buf, TRUE);
3295
/* Undo index entries of the new record: */
3297
for (i=0, ind=tab->tab_dic.dic_keys; i<idx_cnt; i++, ind++) {
3298
if (!xt_idx_delete(ot, *ind, rec_id, after_buf))
3303
memcpy(&prev_rec_head, rec_info->ri_fix_rec_buf, sizeof(XTTabRecHeadDRec));
3305
/* Restore the previous record! */
3306
if (!myxt_store_row(ot, rec_info, (char *) before_buf))
3309
memcpy(rec_info->ri_fix_rec_buf, &prev_rec_head, sizeof(XTTabRecHeadDRec));
3311
if (rec_info->ri_ext_rec) {
3312
/* Determine where the overflow will go... */
3313
if (!thread->st_dlog_buf.dlb_get_log_offset(&log_id, &log_offset, rec_info->ri_log_data_size + offsetof(XTactExtRecEntryDRec, er_data)))
3315
XT_SET_LOG_REF(rec_info->ri_ext_rec, log_id, log_offset);
3318
if (!xt_tab_put_log_op_rec_data(ot, XT_LOG_ENT_REC_MODIFIED, 0, rec_id, rec_info->ri_rec_buf_size, (xtWord1 *) rec_info->ri_fix_rec_buf))
3321
if (rec_info->ri_ext_rec) {
3322
/* Write the log buffer overflow: */
3323
rec_info->ri_log_buf->er_status_1 = XT_LOG_ENT_EXT_REC_OK;
3324
XT_SET_DISK_4(rec_info->ri_log_buf->er_data_size_4, rec_info->ri_log_data_size);
3325
XT_SET_DISK_4(rec_info->ri_log_buf->er_tab_id_4, tab->tab_id);
3326
XT_SET_DISK_4(rec_info->ri_log_buf->er_rec_id_4, rec_id);
3327
if (!thread->st_dlog_buf.dlb_append_log(log_id, log_offset, offsetof(XTactExtRecEntryDRec, er_data) + rec_info->ri_log_data_size, (xtWord1 *) rec_info->ri_log_buf))
3331
/* Put the index entries back: */
3332
for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) {
3333
if (!xt_idx_insert(ot, *ind, rec_id, before_buf, after_buf, TRUE))
3334
/* Incomplete restore, there will be a rollback... */
2726
3343
* If a transaction updates the same record over again, we should update
2727
* in place. This prevents producting unnecessary variations!
3344
* in place. This prevents producing unnecessary variations!
2729
3346
static xtBool tab_overwrite_record(XTOpenTablePtr ot, xtWord1 *before_buf, xtWord1 *after_buf)
2731
3348
register XTTableHPtr tab = ot->ot_table;
2732
xtWord4 row_id = ot->ot_curr_row_id;
3349
xtRowID row_id = ot->ot_curr_row_id;
2733
3350
register XTThreadPtr self = ot->ot_thread;
2734
off_t rec_address = ot->ot_curr_rec;
2735
xtWord1 rec_head[offsetof(XTTabRecExtDRec, re_data)];
3351
xtRecordID rec_id = ot->ot_curr_rec_id;
3352
XTTabRecExtDRec prev_rec_head;
2736
3353
XTTabRecInfoRec rec_info;
2737
3354
u_int idx_cnt = 0, i;
2738
3355
XTIndexPtr *ind;
3357
xtLogOffset log_offset;
2741
3358
void *mybs_table;
3359
xtBool prev_ext_rec;
2743
3361
if (tab->tab_dic.dic_blob_count) {
2744
3362
if (!myxt_use_blobs(ot, &mybs_table, after_buf))
2751
3369
/* Read before we overwrite! */
3370
if (!xt_tab_get_rec_data(ot, rec_id, XT_REC_EXT_HEADER_SIZE, (xtWord1 *) &prev_rec_head))
3373
prev_ext_rec = prev_rec_head.tr_rec_type_1 & XT_TAB_STATUS_EXT_DLOG;
2752
3375
if (rec_info.ri_ext_rec) {
2753
3376
/* Determine where the overflow will go... */
2754
if (!xt_tab_get_record(ot, rec_address, offsetof(XTTabRecExtDRec, re_data), rec_head))
2757
if (!xt_dl_get_log_offset(ot, &log_id, &log_offset, rec_info.ri_log_data_size + XT_LOG_REC_HEADER_SIZE))
2759
XT_SET_LOG_REF_6(rec_info.ri_ext_rec->re_log_rec_6, log_id, log_offset);
2762
if (!xt_tab_get_record(ot, rec_address, sizeof(XTTabRecHeadDRec), rec_head))
3377
if (!self->st_dlog_buf.dlb_get_log_offset(&log_id, &log_offset, offsetof(XTactExtRecEntryDRec, er_data) + rec_info.ri_log_data_size))
3379
XT_SET_LOG_REF(rec_info.ri_ext_rec, log_id, log_offset);
2766
3382
rec_info.ri_fix_rec_buf->tr_stat_id_1 = self->st_update_id;
2767
3383
XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_row_id_4, row_id);
2768
XT_COPY_DISK_6(rec_info.ri_fix_rec_buf->tr_prev_var_6, ((XTTabRecHeadDPtr) rec_head)->tr_prev_var_6);
2769
XT_SET_DISK_6(rec_info.ri_fix_rec_buf->tr_xact_id_6, self->st_xact_data->xd_start_id);
3384
XT_COPY_DISK_4(rec_info.ri_fix_rec_buf->tr_prev_rec_id_4, prev_rec_head.tr_prev_rec_id_4);
3385
XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_xact_id_4, self->st_xact_data->xd_start_xn_id);
2771
3387
/* Remove the index references, that have changed: */
2772
3388
for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) {
2773
if (!xt_idx_delete(ot, *ind, rec_address, before_buf)) {
3389
if (!xt_idx_delete(ot, *ind, rec_id, before_buf)) {
3394
#ifdef TRACE_VARIATIONS
3395
xt_trace("%s overwrite: row=%d rec=%d T%d\n", self->t_name, (int) row_id, (int) rec_id, (int) self->st_xact_data->xd_start_xn_id);
2778
3397
/* Overwrite the record: */
2779
if (!xt_tab_put_data(ot, rec_address, rec_info.ri_rec_buf_size, (xtWord1 *) rec_info.ri_fix_rec_buf))
3398
if (!xt_tab_put_log_op_rec_data(ot, XT_LOG_ENT_REC_MODIFIED, 0, rec_id, rec_info.ri_rec_buf_size, (xtWord1 *) rec_info.ri_fix_rec_buf))
2782
3401
if (rec_info.ri_ext_rec) {
2783
3402
/* Write the log buffer overflow: */
2784
rec_info.ri_log_buf->lb_status_1 = XT_DL_STATUS_RECORD;
2785
XT_SET_DISK_4(rec_info.ri_log_buf->lb_data_size_4, rec_info.ri_log_data_size);
2786
XT_SET_DISK_6(rec_info.ri_log_buf->lb_record_6, rec_address);
2787
if (!xt_dl_append_log(ot, log_id, log_offset, rec_info.ri_log_data_size + XT_LOG_REC_HEADER_SIZE, rec_info.ri_log_buf))
3403
rec_info.ri_log_buf->er_status_1 = XT_LOG_ENT_EXT_REC_OK;
3404
XT_SET_DISK_4(rec_info.ri_log_buf->er_data_size_4, rec_info.ri_log_data_size);
3405
XT_SET_DISK_4(rec_info.ri_log_buf->er_tab_id_4, tab->tab_id);
3406
XT_SET_DISK_4(rec_info.ri_log_buf->er_rec_id_4, rec_id);
3407
if (!self->st_dlog_buf.dlb_append_log(log_id, log_offset, offsetof(XTactExtRecEntryDRec, er_data) + rec_info.ri_log_data_size, (xtWord1 *) rec_info.ri_log_buf))
2791
3411
/* Add the index references that have changed: */
2792
3412
for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) {
2793
if (!xt_idx_insert(ot, *ind, rec_address, after_buf, before_buf)) {
3413
if (!xt_idx_insert(ot, *ind, rec_id, after_buf, before_buf, FALSE)) {
2794
3414
ot->ot_err_index_no = (*ind)->mi_index_no;
2799
/* Reference the BLOBs in the row: */
2800
if (tab->tab_dic.dic_blob_count) {
2801
if (!myxt_retain_blobs(ot, mybs_table, rec_address)) {
2805
myxt_release_blobs(ot, before_buf, rec_address);
2808
3419
/* Do the foreign key stuff: */
2809
3420
if (ot->ot_table->tab_dic.dic_table->dt_trefs || ot->ot_table->tab_dic.dic_table->dt_fkeys.size() > 0) {
2810
3421
if (!ot->ot_table->tab_dic.dic_table->updateRow(ot, before_buf, after_buf))
2814
3425
/* Delete the previous overflow area: */
3427
tab_free_ext_record_on_fail(ot, rec_id, &prev_rec_head, TRUE);
3429
if (tab->tab_dic.dic_blob_count) {
3430
/* Retain the BLOBs new record: */
3431
if (!myxt_retain_blobs(ot, mybs_table, rec_id))
3433
/* Release the BLOBs in the old record: */
3434
myxt_release_blobs(ot, before_buf, rec_id);
3440
/* Remove the new extended record: */
2815
3441
if (rec_info.ri_ext_rec)
2816
tab_free_extended_record(ot, rec_address, (XTTabRecExtDPtr) rec_head);
3442
tab_free_ext_record_on_fail(ot, rec_id, (XTTabRecExtDPtr) rec_info.ri_fix_rec_buf, TRUE);
3444
/* Restore the previous record! */
2821
3445
/* Undo index entries: */
2822
3446
for (i=0, ind=tab->tab_dic.dic_keys; i<idx_cnt; i++, ind++) {
2823
if (!xt_idx_delete(ot, *ind, rec_address, after_buf))
2824
xt_log_and_clear_exception_ns();
3447
if (!xt_idx_delete(ot, *ind, rec_id, after_buf))
2827
if (rec_info.ri_ext_rec)
2828
tab_free_extended_record(ot, rec_address, (XTTabRecExtDPtr) rec_info.ri_fix_rec_buf);
2831
3451
/* Restore the record: */
2832
3452
if (!myxt_store_row(ot, &rec_info, (char *) before_buf))
2835
3455
if (rec_info.ri_ext_rec)
2836
memcpy(rec_info.ri_fix_rec_buf, rec_head, offsetof(XTTabRecExtDRec, re_data));
3456
memcpy(rec_info.ri_fix_rec_buf, &prev_rec_head, XT_REC_EXT_HEADER_SIZE);
2838
memcpy(rec_info.ri_fix_rec_buf, rec_head, sizeof(XTTabRecHeadDRec));
3458
memcpy(rec_info.ri_fix_rec_buf, &prev_rec_head, sizeof(XTTabRecHeadDRec));
2840
if (!xt_tab_put_data(ot, rec_address, rec_info.ri_rec_buf_size, (xtWord1 *) rec_info.ri_fix_rec_buf))
3460
if (!xt_tab_put_log_op_rec_data(ot, XT_LOG_ENT_REC_MODIFIED, 0, rec_id, rec_info.ri_rec_buf_size, (xtWord1 *) rec_info.ri_fix_rec_buf))
2843
3463
/* Put the index entries back: */
2845
3464
for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) {
2846
if (!xt_idx_insert(ot, *ind, rec_address, before_buf, after_buf))
3465
if (!xt_idx_insert(ot, *ind, rec_id, before_buf, after_buf, TRUE))
3466
/* Incomplete restore, there will be a rollback... */
3470
/* The previous record has now been restored. */
3474
/* The old record is overwritten, I must free the previous extended record: */
3476
tab_free_ext_record_on_fail(ot, rec_id, &prev_rec_head, TRUE);
3479
/* Unuse the BLOBs of the new record: */
2851
3480
if (tab->tab_dic.dic_blob_count && mybs_table)
2852
3481
myxt_unuse_blobs(ot, mybs_table);
2972
3601
rec_info.ri_fix_rec_buf->tr_rec_type_1 = XT_TAB_STATUS_DELETE;
2973
3602
rec_info.ri_fix_rec_buf->tr_stat_id_1 = 0;
2974
3603
XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_row_id_4, row_id);
2975
XT_SET_DISK_6(rec_info.ri_fix_rec_buf->tr_prev_var_6, ot->ot_curr_rec);
2976
XT_SET_DISK_6(rec_info.ri_fix_rec_buf->tr_xact_id_6, ot->ot_thread->st_xact_data->xd_start_id);
3604
XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_prev_rec_id_4, ot->ot_curr_rec_id);
3605
XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_xact_id_4, ot->ot_thread->st_xact_data->xd_start_xn_id);
2978
if (!tab_add_record(ot, &rec_info))
3607
if (!tab_add_record(ot, &rec_info, XT_LOG_ENT_DELETE))
2981
xt_rwlock_wrlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
3610
xt_rwlock_wrlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
2983
if (!xt_tab_get_row(ot, row_id, &curr_variation))
3612
if (!xt_tab_get_row(ot, row_id, &curr_var_rec_id))
2986
if (curr_variation != ot->ot_curr_rec) {
2987
if (!tab_wait_for_rollback(ot, row_id, ot->ot_curr_rec))
3615
if (curr_var_rec_id != ot->ot_curr_rec_id) {
3616
if (!tab_wait_for_rollback(ot, row_id, ot->ot_curr_rec_id))
2991
if (!xt_tab_set_row(ot, row_id, rec_info.ri_rec_address, FALSE))
3620
#ifdef TRACE_VARIATIONS
3621
xt_trace("%s update: row=%d rec=%d T%d\n", ot->ot_thread->t_name, (int) row_id, (int) rec_info.ri_rec_id, (int) ot->ot_thread->st_xact_data->xd_start_xn_id);
3623
if (!xt_tab_set_row(ot, XT_LOG_ENT_ROW_ADD_REC, row_id, rec_info.ri_rec_id))
2993
XT_DISABLED_TRACE(("del row tx=%d row=%d rec=%d\n", (int) ot->ot_thread->st_xact_data->xd_start_id, (int) row_id, (int) rec_info.ri_rec_address));
2995
xt_rwlock_unlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
2997
/* Log this change (after this point rollback will remove the record): */
2998
if (!xt_xn_log_update(ot, rec_info.ri_rec_address, XT_XN_STATUS_DELETE, rec_info.ri_fix_rec_buf->tr_rec_type_1))
3625
XT_DISABLED_TRACE(("del row tx=%d row=%d rec=%d\n", (int) ot->ot_thread->st_xact_data->xd_start_xn_id, (int) row_id, (int) rec_info.ri_rec_id));
3627
xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
3001
3629
if (ot->ot_table->tab_dic.dic_table->dt_trefs) {
3002
3630
if (!ot->ot_table->tab_dic.dic_table->deleteRow(ot, rec_buf))
3009
xt_rwlock_wrlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
3010
tab_set_row_on_fail(ot, row_id, ot->ot_curr_rec, FALSE);
3637
tab_overwrite_record_on_fail(ot, &rec_info, rec_buf, NULL, 0);
3013
xt_rwlock_unlock(&tab->tab_row_locks[row_id % XT_ROW_LOCK_TABLE_SIZE]);
3015
tab_remove_record_on_fail(ot, rec_info.ri_rec_address, (XTTabRecHeadDPtr) rec_info.ri_fix_rec_buf, rec_buf, 0);
3641
xt_rwlock_unlock(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS]);
3019
3645
xtPublic xtBool xt_tab_seq_init(XTOpenTablePtr ot)
3021
3647
register XTTableHPtr tab = ot->ot_table;
3023
if (!ot->ot_thread->st_xact_data) {
3024
/* MySQL ingores this error, so we
3025
* setup the sequential scan so that it will
3028
ot->ot_seq_pos = sizeof(XTTabDataHeadDRec);
3029
ot->ot_seq_eof = ot->ot_seq_pos;
3030
ot->ot_buf_pos = ot->ot_seq_pos + 1;
3031
xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_TRANSACTION);
3035
ot->ot_seq_pos = sizeof(XTTabDataHeadDRec);
3649
ot->ot_seq_page = NULL;
3650
ot->ot_seq_offset = 0;
3652
ot->ot_curr_rec_id = 0; // 0 is an invalid position!
3653
ot->ot_curr_row_id = 0; // 0 is an invalid row ID!
3654
ot->ot_curr_updated = FALSE;
3037
3656
/* We note the current EOF before we start a sequential scan.
3038
3657
* It is basically possible to update the same record more than
3039
3658
* once because an updated record creates a new record which
3041
3660
* still to be scanned.
3043
3662
* By noting the EOF before we start a sequential scan we
3044
* reduce this posibility of this.
3663
* reduce the possibility of this.
3046
* However, the possibility still remains, however it should
3665
* However, the possibility still remains, but it should
3047
3666
* not be a problem because a record is not modified
3048
3667
* if there is nothing to change, which is the case
3049
3668
* if the record has already been changed!
3670
* NOTE (2008-01-29) There is no longer a problem with updating a
3671
* record twice because records are marked by an update.
3673
* [(10)] I have changed this (see below). I now check the
3674
* current EOF of the table.
3676
* The reason is that committed read must be able to see the
3677
* changes that occur during table table scan. *
3051
if (tab->tab_buf_offset) {
3052
xt_rwlock_rdlock(&tab->tab_buf_rwlock);
3053
ot->ot_seq_eof = tab->tab_data_eof + tab->tab_buf_offset;
3054
xt_rwlock_unlock(&tab->tab_buf_rwlock);
3679
ot->ot_seq_eof_id = tab->tab_rec_eof_id;
3681
if (!ot->ot_thread->st_xact_data) {
3682
/* MySQL ignores this error, so we
3683
* setup the sequential scan so that it will
3686
ot->ot_seq_rec_id = ot->ot_seq_eof_id;
3687
xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_TRANSACTION);
3057
ot->ot_seq_eof = tab->tab_data_eof;
3058
ot->ot_curr_rec = 0; // 0 is an invalid position!
3059
ot->ot_curr_row_id = 0; // 0 is an invalid row ID!
3060
ot->ot_curr_updated = FALSE;
3062
* This will make sure that the test below fails so that the
3063
* execution jumps immendiately to load_buffer.
3065
ot->ot_buf_pos = ot->ot_seq_pos + 1;
3691
ot->ot_seq_rec_id = 1;
3696
xtPublic void xt_tab_seq_reset(XTOpenTablePtr ot)
3698
ot->ot_seq_rec_id = 0;
3699
ot->ot_seq_eof_id = 0;
3700
ot->ot_seq_page = NULL;
3701
ot->ot_seq_offset = 0;
3704
xtPublic void xt_tab_seq_exit(XTOpenTablePtr ot)
3706
register XTTableHPtr tab = ot->ot_table;
3708
if (ot->ot_seq_page) {
3709
tab->tab_recs.xt_tc_unlock_page(ot->ot_seq_page);
3710
ot->ot_seq_page = NULL;
3071
3714
xtPublic xtBool xt_tab_seq_next(XTOpenTablePtr ot, xtWord1 *buffer, xtBool *eof)
3073
3716
register XTTableHPtr tab = ot->ot_table;
3074
register size_t size = tab->tab_dic.dic_rec_size;
3076
xtBool head_read = FALSE;
3717
register size_t rec_size = tab->tab_dic.dic_rec_size;
3077
3718
xtWord1 *buff_ptr;
3080
if (ot->ot_seq_pos < ot->ot_buf_pos)
3084
/* Start read position is on or after buffer position */
3085
if (ot->ot_seq_pos + size <= ot->ot_buf_pos + ot->ot_buf_len) {
3086
/* Read is completely in the buffer (this is the fast track): */
3087
boff = (size_t) (ot->ot_seq_pos - ot->ot_buf_pos);
3089
// Check for deleted record:
3091
ot->ot_curr_rec = ot->ot_seq_pos;
3092
ot->ot_seq_pos += size;
3094
block_type = ot->ot_read_buf[boff];
3095
if (XT_REC_NOT_VISIBLE(block_type))
3097
switch (tab_visible(ot, (XTTabRecHeadDPtr) (ot->ot_read_buf + boff))) {
3103
switch (block_type) {
3104
case XT_TAB_STATUS_FIXED:
3105
case XT_TAB_STATUS_FIX_CLEAN:
3106
memcpy(buffer, ot->ot_read_buf + boff + XT_REC_FIX_HEADER_SIZE, size - XT_REC_FIX_HEADER_SIZE);
3108
case XT_TAB_STATUS_VARIABLE:
3109
case XT_TAB_STATUS_VAR_CLEAN:
3110
if (!myxt_load_row(ot, ot->ot_read_buf + boff + XT_REC_FIX_HEADER_SIZE, buffer))
3113
case XT_TAB_STATUS_EXTENDED:
3114
case XT_TAB_STATUS_EXT_CLEAN:
3115
memcpy(ot->ot_row_rbuffer, ot->ot_read_buf + boff, size);
3116
if (!tab_load_ext_data(ot, ot->ot_curr_rec, buffer))
3120
/* Unknown record type? */
3125
ot->ot_seq_pos += size;
3126
memcpy(buff_ptr, ot->ot_read_buf + boff, size);
3127
switch (tab_visible(ot, (XTTabRecHeadDPtr) ot->ot_row_rbuffer)) {
3133
switch (block_type) {
3134
case XT_TAB_STATUS_FIXED:
3135
case XT_TAB_STATUS_FIX_CLEAN:
3136
memcpy(buffer, ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE, tab->tab_dic.dic_rec_size - XT_REC_FIX_HEADER_SIZE);
3138
case XT_TAB_STATUS_VARIABLE:
3139
case XT_TAB_STATUS_VAR_CLEAN:
3140
if (!myxt_load_row(ot, ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE, buffer))
3143
case XT_TAB_STATUS_EXTENDED:
3144
case XT_TAB_STATUS_EXT_CLEAN:
3145
if (!tab_load_ext_data(ot, ot->ot_curr_rec, buffer))
3719
xtRecordID new_rec_id;
3722
if (!ot->ot_seq_page) {
3723
if (!(ot->ot_seq_page = tab->tab_recs.xt_tc_lock_page(ot->ot_rec_file, ot->ot_seq_rec_id, &ot->ot_seq_offset)))
3728
/* [(10)] The current EOF is used: */
3729
if (ot->ot_seq_rec_id >= ot->ot_seq_eof_id) {
3155
if (ot->ot_seq_pos < ot->ot_buf_pos + ot->ot_buf_len) {
3156
/* Partially in the buffer: */
3157
boff = (size_t) (ot->ot_seq_pos - ot->ot_buf_pos);
3159
// Check for deleted record:
3161
block_type = ot->ot_read_buf[boff];
3163
/* Skip these records, they are deleted, or will be deleted. */
3164
if (XT_REC_NOT_VISIBLE(block_type)) {
3165
ot->ot_seq_pos += size;
3169
/* We have found a valid record. */
3171
ot->ot_curr_rec = ot->ot_seq_pos;
3734
if (ot->ot_seq_offset >= tab->tab_recs.tci_page_size) {
3735
tab->tab_recs.xt_tc_unlock_page(ot->ot_seq_page);
3736
ot->ot_seq_page = NULL;
3740
buff_ptr = (xtWord1 *) &ot->ot_seq_page->tcp_data[ot->ot_seq_offset];
3742
/* This is the current record: */
3743
ot->ot_curr_rec_id = ot->ot_seq_rec_id;
3744
ot->ot_curr_row_id = 0;
3746
/* Move to the next record: */
3747
ot->ot_seq_rec_id++;
3748
ot->ot_seq_offset += rec_size;
3751
switch (tab_visible(ot, (XTTabRecHeadDPtr) buff_ptr, &new_rec_id)) {
3757
/* Skip the records that are deleted, or will be deleted. */
3172
3760
buff_ptr = ot->ot_row_rbuffer;
3175
tfer = (size_t) ((ot->ot_buf_pos + ot->ot_buf_len) - ot->ot_seq_pos);
3176
memcpy(buff_ptr, ot->ot_read_buf + boff, tfer);
3179
ot->ot_seq_pos += tfer;
3181
/* Continue to get the rest... */
3761
if (!xt_tab_get_rec_data(ot, new_rec_id, rec_size, ot->ot_row_rbuffer))
3763
ot->ot_curr_rec_id = new_rec_id;
3186
/* The start read position is not in the buffer, read as much as we can */
3187
ASSERT_NS(ot->ot_seq_pos <= tab->tab_data_eof + tab->tab_buf_offset);
3189
tfer = tab->tab_buf_size;
3190
if (ot->ot_seq_pos + tfer > ot->ot_seq_eof) {
3191
if (ot->ot_seq_pos >= ot->ot_seq_eof) {
3192
if (!ot->ot_thread->st_xact_data) {
3193
/* If MySQL ingores this error above, then
3194
* we generate the error again here
3195
* (I just want to avoid doing this on
3196
* each xt_tab_seq_next() call)!
3197
* [ Every little instruction is bad! ]
3199
xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_TRANSACTION);
3769
switch (*buff_ptr) {
3770
case XT_TAB_STATUS_FIXED:
3771
case XT_TAB_STATUS_FIX_CLEAN:
3772
memcpy(buffer, buff_ptr + XT_REC_FIX_HEADER_SIZE, rec_size - XT_REC_FIX_HEADER_SIZE);
3774
case XT_TAB_STATUS_VARIABLE:
3775
case XT_TAB_STATUS_VAR_CLEAN:
3776
if (!myxt_load_row(ot, buff_ptr + XT_REC_FIX_HEADER_SIZE, buffer, ot->ot_cols_req))
3779
case XT_TAB_STATUS_EXT_DLOG:
3780
case XT_TAB_STATUS_EXT_CLEAN: {
3781
u_int cols_req = ot->ot_cols_req;
3783
ASSERT_NS(cols_req);
3784
if (cols_req && cols_req <= tab->tab_dic.dic_fix_col_count) {
3785
if (!myxt_load_row(ot, buff_ptr + XT_REC_EXT_HEADER_SIZE, buffer, cols_req))
3789
if (buff_ptr != ot->ot_row_rbuffer)
3790
memcpy(ot->ot_row_rbuffer, buff_ptr, rec_size);
3791
if (!xt_tab_load_ext_data(ot, ot->ot_curr_rec_id, buffer, cols_req))
3206
tfer = (size_t) (ot->ot_seq_eof - ot->ot_seq_pos);
3209
if (!xt_tab_get_data(ot, ot->ot_seq_pos, tfer, ot->ot_read_buf, &ot->ot_buf_len))
3211
if (!ot->ot_buf_len) {
3216
ot->ot_buf_pos = ot->ot_seq_pos;
3220
xtPublic void xt_tab_get_stats(XTThreadPtr self, XTTableHPtr tab, xtWord4 *file_size, xtWord4 *free_count)
3224
xtPublic void xt_tab_io_failed(XTOpenFilePtr of)
3229
* -----------------------------------------------------------------------
3233
/* Wait until all open tables are closed. */
3235
xtPublic XTOpenTablePtr xt_open_table_from_pool(XTTableHPtr tab, XTThreadPtr thread)
3239
xt_mutex_lock(&tab->tab_open_lock);
3240
if (tab->tab_will_close) {
3241
xt_register_ixterr(XT_REG_CONTEXT, XT_ERR_TABLE_LOCKED, tab->tab_name);
3244
else if (tab->tab_open_pool) {
3245
ot = tab->tab_open_pool;
3246
tab->tab_open_pool = ot->ot_pool_next;
3248
else if ((ot = tab_open_table(tab))) {
3249
tab->tab_open_count++;
3251
xt_mutex_unlock(&tab->tab_open_lock);
3252
ot->ot_thread = thread;
3256
xtPublic void xt_return_table_to_pool(XTOpenTablePtr ot)
3258
XTTableHPtr tab = ot->ot_table;
3260
ot->ot_thread = NULL;
3261
xt_mutex_lock(&tab->tab_open_lock);
3262
if (tab->tab_will_close) {
3263
ASSERT_NS(tab->tab_open_count > 0);
3264
tab->tab_open_count--;
3265
tab_close_table(ot);
3268
ot->ot_pool_next = tab->tab_open_pool;
3269
tab->tab_open_pool = ot;
3271
xt_mutex_unlock(&tab->tab_open_lock);
3274
xtPublic void xt_close_all_open_tables(XTThreadPtr self, XTTableHPtr tab)
3276
XTDatabaseHPtr db = tab->tab_db;
3279
/* Lock order: TABLE, SWEEPER, COMPACTOR! */
3280
/* Force the sweeper to close all tables: */
3281
xt_sw_lock_sweeper(self, db);
3282
pushr_(xt_sw_unlock_sweeper, db);
3283
/* Force the compactor to close all files */
3284
xt_dl_lock_compactor(self, db);
3285
pushr_(xt_dl_unlock_compactor, db);
3287
/* Wait for all open tables to close: */
3288
tab_wait_for_open_tables(self, tab);
3290
freer_(); // xt_dl_unlock_compactor(db)
3291
freer_(); // xt_sw_unlock_sweeper(db)