120
120
#include "hatoku_defines.h"
121
121
#include "hatoku_cmp.h"
123
static inline void *thd_data_get(THD *thd, int slot) {
124
return thd->ha_data[slot].ha_ptr;
127
static inline void thd_data_set(THD *thd, int slot, void *data) {
128
thd->ha_data[slot].ha_ptr = data;
131
123
static inline uint get_key_parts(const KEY *key);
1017
1008
static int create_tokudb_trx_data_instance(tokudb_trx_data** out_trx) {
1019
tokudb_trx_data* trx = NULL;
1020
trx = (tokudb_trx_data *) tokudb_my_malloc(sizeof(*trx), MYF(MY_ZEROFILL));
1010
tokudb_trx_data* trx = (tokudb_trx_data *) tokudb_my_malloc(sizeof(*trx), MYF(MY_ZEROFILL));
1022
1012
error = ENOMEM;
1614
1604
DB_TXN* txn = NULL;
1615
1605
bool do_commit = false;
1616
1606
THD* thd = ha_thd();
1617
tokudb_trx_data *trx = NULL;
1618
trx = (tokudb_trx_data *) thd_data_get(ha_thd(), tokudb_hton->slot);
1607
tokudb_trx_data *trx = (tokudb_trx_data *) thd_get_ha_data(ha_thd(), tokudb_hton);
1619
1608
if (thd_sql_command(thd) == SQLCOM_CREATE_TABLE && trx && trx->sub_sp_level) {
1620
1609
txn = trx->sub_sp_level;
1728
1717
share->ref_length = ref_length;
1730
error = estimate_num_rows(share->file,&num_rows, txn);
1719
error = estimate_num_rows(share->file, &num_rows, txn);
1732
1721
// estimate_num_rows should not fail under normal conditions
1951
1939
txn_to_use = txn;
1942
error = db->stat64(db, txn_to_use, &dict_stats);
1959
1943
if (error) { goto cleanup; }
1961
1945
*num_rows = dict_stats.bt_ndata;
1965
int r = crsr->c_close(crsr);
1969
1948
if (do_commit) {
1970
1949
commit_txn(txn_to_use, 0);
1971
1950
txn_to_use = NULL;
3271
3250
TOKUDB_HANDLER_DBUG_ENTER("%llu txn %p", (unsigned long long) rows, transaction);
3273
3252
THD* thd = ha_thd();
3274
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
3253
tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);
3275
3254
delay_updating_ai_metadata = true;
3276
3255
ai_metadata_update_required = false;
3277
3256
abort_loader = false;
3281
3260
num_DBs_locked_in_bulk = true;
3282
3261
lock_count = 0;
3284
if (share->try_table_lock) {
3263
if ((rows == 0 || rows > 1) && share->try_table_lock) {
3285
3264
if (get_prelock_empty(thd) && may_table_be_empty(transaction)) {
3286
3265
if (using_ignore || is_insert_ignore(thd) || thd->lex->duplicates != DUP_ERROR
3287
3266
|| table->s->next_number_key_offset) {
3340
3319
TOKUDB_HANDLER_DBUG_ENTER("");
3342
3321
THD* thd = ha_thd();
3343
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
3322
tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);
3344
3323
bool using_loader = (loader != NULL);
3345
3324
if (ai_metadata_update_required) {
3346
3325
tokudb_pthread_mutex_lock(&share->mutex);
3355
3334
if (!abort_loader && !thd->killed) {
3356
3335
DBUG_EXECUTE_IF("tokudb_end_bulk_insert_sleep", {
3357
const char *old_proc_info = tokudb_thd_get_proc_info(thd);
3336
const char *orig_proc_info = tokudb_thd_get_proc_info(thd);
3358
3337
thd_proc_info(thd, "DBUG sleep");
3359
3338
my_sleep(20000000);
3360
thd_proc_info(thd, old_proc_info);
3339
thd_proc_info(thd, orig_proc_info);
3362
3341
error = loader->close(loader);
3374
3353
if (i == primary_key && !share->pk_has_string) {
3377
error = is_index_unique(
3356
error = is_index_unique(&is_unique, transaction, share->key_file[i], &table->key_info[i],
3357
DB_PRELOCKED_WRITE);
3383
3358
if (error) goto cleanup;
3384
3359
if (!is_unique) {
3385
3360
error = HA_ERR_FOUND_DUPP_KEY;
3426
3402
return end_bulk_insert( false );
3429
int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_info) {
3405
int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_info, int lock_flags) {
3431
3407
DBC* tmp_cursor1 = NULL;
3432
3408
DBC* tmp_cursor2 = NULL;
3434
3410
uint64_t cnt = 0;
3435
3411
char status_msg[MAX_ALIAS_NAME + 200]; //buffer of 200 should be a good upper bound.
3436
3412
THD* thd = ha_thd();
3437
const char *old_proc_info = tokudb_thd_get_proc_info(thd);
3413
const char *orig_proc_info = tokudb_thd_get_proc_info(thd);
3438
3414
memset(&key1, 0, sizeof(key1));
3439
3415
memset(&key2, 0, sizeof(key2));
3440
3416
memset(&val, 0, sizeof(val));
3442
3418
memset(&packed_key2, 0, sizeof(packed_key2));
3443
3419
*is_unique = true;
3451
if (error) { goto cleanup; }
3459
if (error) { goto cleanup; }
3421
error = db->cursor(db, txn, &tmp_cursor1, DB_SERIALIZABLE);
3422
if (error) { goto cleanup; }
3424
error = db->cursor(db, txn, &tmp_cursor2, DB_SERIALIZABLE);
3425
if (error) { goto cleanup; }
3462
error = tmp_cursor1->c_get(
3427
error = tmp_cursor1->c_get(tmp_cursor1, &key1, &val, DB_NEXT + lock_flags);
3468
3428
if (error == DB_NOTFOUND) {
3469
3429
*is_unique = true;
3473
3433
else if (error) { goto cleanup; }
3474
error = tmp_cursor2->c_get(
3434
error = tmp_cursor2->c_get(tmp_cursor2, &key2, &val, DB_NEXT + lock_flags);
3480
3435
if (error) { goto cleanup; }
3482
error = tmp_cursor2->c_get(
3437
error = tmp_cursor2->c_get(tmp_cursor2, &key2, &val, DB_NEXT + lock_flags);
3488
3438
if (error == DB_NOTFOUND) {
3489
3439
*is_unique = true;
3496
3446
bool has_null1;
3497
3447
bool has_null2;
3499
place_key_into_mysql_buff(
3502
(uchar *) key1.data + 1
3504
place_key_into_mysql_buff(
3507
(uchar *) key2.data + 1
3449
place_key_into_mysql_buff(key_info, table->record[0], (uchar *) key1.data + 1);
3450
place_key_into_mysql_buff(key_info, table->record[1], (uchar *) key2.data + 1);
3510
create_dbt_key_for_lookup(
3517
create_dbt_key_for_lookup(
3452
create_dbt_key_for_lookup(&packed_key1, key_info, key_buff, table->record[0], &has_null1);
3453
create_dbt_key_for_lookup(&packed_key2, key_info, key_buff2, table->record[1], &has_null2);
3525
3455
if (!has_null1 && !has_null2) {
3526
3456
cmp = tokudb_prefix_cmp_dbt_key(db, &packed_key1, &packed_key2);
3527
3457
if (cmp == 0) {
3528
3458
memcpy(key_buff, key1.data, key1.size);
3529
place_key_into_mysql_buff(
3532
(uchar *) key_buff + 1
3459
place_key_into_mysql_buff(key_info, table->record[0], (uchar *) key_buff + 1);
3534
3460
*is_unique = false;
3539
error = tmp_cursor1->c_get(
3465
error = tmp_cursor1->c_get(tmp_cursor1, &key1, &val, DB_NEXT + lock_flags);
3545
3466
if (error) { goto cleanup; }
3546
error = tmp_cursor2->c_get(
3467
error = tmp_cursor2->c_get(tmp_cursor2, &key2, &val, DB_NEXT + lock_flags);
3552
3468
if (error && (error != DB_NOTFOUND)) { goto cleanup; }
4304
4220
THD* thd = ha_thd();
4305
4221
uint curr_num_DBs;
4306
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
4222
tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);;
4308
4224
ha_statistic_increment(&SSV::ha_delete_count);
4871
4787
uint32_t flags = 0;
4872
4788
THD* thd = ha_thd();
4873
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
4789
tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);;
4874
4790
struct smart_dbt_info info;
4875
4791
struct index_read_info ir_info;
5501
5417
struct smart_dbt_info info;
5502
5418
uint32_t flags = SET_PRELOCK_FLAG(0);
5503
5419
THD* thd = ha_thd();
5504
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
5420
tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);;
5505
5421
HANDLE_INVALID_CURSOR();
5507
5423
ha_statistic_increment(&SSV::ha_read_first_count);
5544
5460
struct smart_dbt_info info;
5545
5461
uint32_t flags = SET_PRELOCK_FLAG(0);
5546
5462
THD* thd = ha_thd();
5547
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
5463
tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);;
5548
5464
HANDLE_INVALID_CURSOR();
5550
5466
ha_statistic_increment(&SSV::ha_read_last_count);
5637
5553
void ha_tokudb::track_progress(THD* thd) {
5638
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
5554
tokudb_trx_data* trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);
5640
5556
ulonglong num_written = trx->stmt_progress.inserted + trx->stmt_progress.updated + trx->stmt_progress.deleted;
5641
5557
bool update_status =
6228
tokudb_trx_data *trx = NULL;
6229
trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
6144
tokudb_trx_data *trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);
6231
6146
error = create_tokudb_trx_data_instance(&trx);
6232
6147
if (error) { goto cleanup; }
6233
thd_data_set(thd, tokudb_hton->slot, trx);
6148
thd_set_ha_data(thd, tokudb_hton, trx);
6235
6150
if (trx->all == NULL) {
6236
6151
trx->sp_level = NULL;
6304
6219
TOKUDB_HANDLER_TRACE("q %s", thd->query());
6307
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
6222
tokudb_trx_data *trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);
6308
6223
DBUG_ASSERT(trx);
6404
6319
lock (if we don't want to use MySQL table locks at all) or add locks
6405
6320
for many tables (like we do when we are using a MERGE handler).
6407
Tokudb DB changes all WRITE locks to TL_WRITE_ALLOW_WRITE (which
6322
TokuDB changes all WRITE locks to TL_WRITE_ALLOW_WRITE (which
6408
6323
signals that we are doing WRITES, but we are still allowing other
6409
6324
reader's and writer's.
6428
6343
if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) {
6429
// if creating a hot index
6430
if (thd_sql_command(thd)== SQLCOM_CREATE_INDEX && get_create_index_online(thd)) {
6431
rw_rdlock(&share->num_DBs_lock);
6432
if (share->num_DBs == (table->s->keys + tokudb_test(hidden_primary_key))) {
6433
lock_type = TL_WRITE_ALLOW_WRITE;
6435
lock.type = lock_type;
6436
rw_unlock(&share->num_DBs_lock);
6439
// 5.5 supports reads concurrent with alter table. just use the default lock type.
6440
#if MYSQL_VERSION_ID < 50500
6441
else if (thd_sql_command(thd)== SQLCOM_CREATE_INDEX ||
6442
thd_sql_command(thd)== SQLCOM_ALTER_TABLE ||
6443
thd_sql_command(thd)== SQLCOM_DROP_INDEX) {
6444
// force alter table to lock out other readers
6445
lock_type = TL_WRITE;
6446
lock.type = lock_type;
6450
// If we are not doing a LOCK TABLE, then allow multiple writers
6451
if ((lock_type >= TL_WRITE_CONCURRENT_INSERT && lock_type <= TL_WRITE) &&
6452
!thd->in_lock_tables && thd_sql_command(thd) != SQLCOM_TRUNCATE && !thd_tablespace_op(thd)) {
6453
lock_type = TL_WRITE_ALLOW_WRITE;
6455
lock.type = lock_type;
6344
enum_sql_command sql_command = (enum_sql_command) thd_sql_command(thd);
6345
if (!thd->in_lock_tables) {
6346
if (sql_command == SQLCOM_CREATE_INDEX && get_create_index_online(thd)) {
6348
rw_rdlock(&share->num_DBs_lock);
6349
if (share->num_DBs == (table->s->keys + tokudb_test(hidden_primary_key))) {
6350
lock_type = TL_WRITE_ALLOW_WRITE;
6352
rw_unlock(&share->num_DBs_lock);
6353
} else if ((lock_type >= TL_WRITE_CONCURRENT_INSERT && lock_type <= TL_WRITE) &&
6354
sql_command != SQLCOM_TRUNCATE && !thd_tablespace_op(thd)) {
6355
// allow concurrent writes
6356
lock_type = TL_WRITE_ALLOW_WRITE;
6357
} else if (sql_command == SQLCOM_OPTIMIZE && lock_type == TL_READ_NO_INSERT) {
6358
// hot optimize table
6359
lock_type = TL_READ;
6362
lock.type = lock_type;
6459
6365
if (tokudb_debug & TOKUDB_DEBUG_LOCK)
6903
6809
newname = (char *)tokudb_my_malloc(get_max_dict_name_path_length(name),MYF(MY_WME));
6904
6810
if (newname == NULL){ error = ENOMEM; goto cleanup;}
6906
trx = (tokudb_trx_data *) thd_data_get(ha_thd(), tokudb_hton->slot);
6812
trx = (tokudb_trx_data *) thd_get_ha_data(ha_thd(), tokudb_hton);
6907
6813
if (trx && trx->sub_sp_level && thd_sql_command(thd) == SQLCOM_CREATE_TABLE) {
6908
6814
txn = trx->sub_sp_level;
7094
7000
DB_TXN *parent_txn = NULL;
7095
7001
tokudb_trx_data *trx = NULL;
7096
trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
7002
trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);
7097
7003
if (thd_sql_command(ha_thd()) == SQLCOM_CREATE_TABLE && trx && trx->sub_sp_level) {
7098
7004
parent_txn = trx->sub_sp_level;
7534
7440
DBC* tmp_cursor = NULL;
7535
7441
int cursor_ret_val = 0;
7536
7442
DBT curr_pk_key, curr_pk_val;
7537
THD* thd = ha_thd();
7443
THD* thd = ha_thd();
7538
7444
DB_LOADER* loader = NULL;
7539
7445
DB_INDEXER* indexer = NULL;
7540
7446
bool loader_save_space = get_load_save_space(thd);
7573
7479
// status message to be shown in "show process list"
7575
const char *old_proc_info = tokudb_thd_get_proc_info(thd);
7481
const char *orig_proc_info = tokudb_thd_get_proc_info(thd);
7576
7482
char status_msg[MAX_ALIAS_NAME + 200]; //buffer of 200 should be a good upper bound.
7577
7483
ulonglong num_processed = 0; //variable that stores number of elements inserted thus far
7578
7484
thd_proc_info(thd, "Adding indexes");
7798
7704
num_processed++;
7800
7706
if ((num_processed % 1000) == 0) {
7801
sprintf(status_msg, "Adding indexes: Fetched %llu of about %llu rows, loading of data still remains.", num_processed, (long long unsigned) share->rows);
7707
sprintf(status_msg, "Adding indexes: Fetched %llu of about %llu rows, loading of data still remains.",
7708
num_processed, (long long unsigned) share->rows);
7802
7709
thd_proc_info(thd, status_msg);
7804
7711
#ifdef HA_TOKUDB_HAS_THD_PROGRESS
7830
7737
for (uint i = 0; i < num_of_keys; i++, curr_index++) {
7831
7738
if (key_info[i].flags & HA_NOSAME) {
7832
7739
bool is_unique;
7833
error = is_index_unique(
7836
share->key_file[curr_index],
7740
error = is_index_unique(&is_unique, txn, share->key_file[curr_index], &key_info[i],
7741
creating_hot_index ? 0 : DB_PRELOCKED_WRITE);
7839
7742
if (error) goto cleanup;
7840
7743
if (!is_unique) {
7841
7744
error = HA_ERR_FOUND_DUPP_KEY;
7893
7796
another transaction has accessed the table. \
7894
7797
To add indexes, make sure no transactions touch the table.", share->table_name);
7896
thd_proc_info(thd, old_proc_info);
7799
thd_proc_info(thd, orig_proc_info);
7897
7800
TOKUDB_HANDLER_DBUG_RETURN(error ? error : loader_error);
8247
8150
void ha_tokudb::add_to_trx_handler_list() {
8248
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(ha_thd(), tokudb_hton->slot);
8151
tokudb_trx_data *trx = (tokudb_trx_data *) thd_get_ha_data(ha_thd(), tokudb_hton);
8249
8152
trx->handlers = list_add(trx->handlers, &trx_handler_list);
8252
8155
void ha_tokudb::remove_from_trx_handler_list() {
8253
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(ha_thd(), tokudb_hton->slot);
8156
tokudb_trx_data *trx = (tokudb_trx_data *) thd_get_ha_data(ha_thd(), tokudb_hton);
8254
8157
trx->handlers = list_delete(trx->handlers, &trx_handler_list);