137
141
virtual void close_tables_if();
138
142
virtual void table_addref(size_t tbl_id);
139
143
virtual void table_release(size_t tbl_id);
140
virtual void cmd_open_index(dbcallback_i& cb, size_t pst_id, const char *dbn,
141
const char *tbl, const char *idx, const char *retflds);
142
virtual void cmd_exec_on_index(dbcallback_i& cb, const cmd_exec_args& args);
144
virtual void cmd_open(dbcallback_i& cb, const cmd_open_args& args);
145
virtual void cmd_exec(dbcallback_i& cb, const cmd_exec_args& args);
143
146
virtual void set_statistics(size_t num_conns, size_t num_active);
145
148
int set_thread_message(const char *fmt, ...)
146
149
__attribute__((format (printf, 2, 3)));
150
bool parse_fields(TABLE *const table, const char *str,
151
prep_stmt::fields_type& flds);
147
152
void cmd_insert_internal(dbcallback_i& cb, const prep_stmt& pst,
148
153
const string_ref *fvals, size_t fvalslen);
149
154
void cmd_sql_internal(dbcallback_i& cb, const prep_stmt& pst,
150
155
const string_ref *fvals, size_t fvalslen);
151
156
void cmd_find_internal(dbcallback_i& cb, const prep_stmt& pst,
152
157
ha_rkey_function find_flag, const cmd_exec_args& args);
158
size_t calc_filter_buf_size(TABLE *table, const prep_stmt& pst,
159
const record_filter *filters);
160
bool fill_filter_buf(TABLE *table, const prep_stmt& pst,
161
const record_filter *filters, uchar *filter_buf, size_t len);
162
int check_filter(dbcallback_i& cb, TABLE *table, const prep_stmt& pst,
163
const record_filter *filters, const uchar *filter_buf);
153
164
void resp_record(dbcallback_i& cb, TABLE *const table, const prep_stmt& pst);
154
165
void dump_record(dbcallback_i& cb, TABLE *const table, const prep_stmt& pst);
155
166
int modify_record(dbcallback_i& cb, TABLE *const table,
156
167
const prep_stmt& pst, const cmd_exec_args& args, char mod_op,
157
size_t& success_count);
168
size_t& modified_count);
159
170
typedef std::vector<tablevec_entry> table_vec_type;
160
171
typedef std::pair<std::string, std::string> table_name_type;
270
278
my_thread_init();
272
280
thd->thread_stack = (char *)stack_bottom;
273
DBG_THR(const size_t of = (char *)(&thd->thread_stack) - (char *)thd);
274
DBG_THR(fprintf(stderr, "thread_stack = %p sz=%zu of=%zu\n",
275
thd->thread_stack, sizeof(THD), of));
281
DBG_THR(fprintf(stderr,
282
"thread_stack = %p sizeof(THD)=%zu sizeof(mtx)=%zu "
283
"O: %zu %zu %zu %zu %zu %zu %zu\n",
284
thd->thread_stack, sizeof(THD), sizeof(LOCK_thread_count),
285
DENA_THR_OFFSETOF(mdl_context),
286
DENA_THR_OFFSETOF(net),
287
DENA_THR_OFFSETOF(LOCK_thd_data),
288
DENA_THR_OFFSETOF(mysys_var),
289
DENA_THR_OFFSETOF(stmt_arena),
290
DENA_THR_OFFSETOF(limit_found_rows),
291
DENA_THR_OFFSETOF(locked_tables_list)));
276
292
thd->store_globals();
277
293
thd->system_thread = static_cast<enum_thread_type>(1<<30UL);
278
294
const NET v = { 0 };
537
561
dbcontext::modify_record(dbcallback_i& cb, TABLE *const table,
538
562
const prep_stmt& pst, const cmd_exec_args& args, char mod_op,
539
size_t& success_count)
563
size_t& modified_count)
541
565
if (mod_op == 'U') {
543
567
handler *const hnd = table->file;
544
568
uchar *const buf = table->record[0];
545
569
store_record(table, record[1]);
546
const prep_stmt::retfields_type& rf = pst.get_retfields();
570
const prep_stmt::fields_type& rf = pst.get_ret_fields();
547
571
const size_t n = rf.size();
548
572
for (size_t i = 0; i < n; ++i) {
549
573
const string_ref& nv = args.uvals[i];
550
574
uint32_t fn = rf[i];
551
575
Field *const fld = table->field[fn];
552
fld->store(nv.begin(), nv.size(), &my_charset_bin);
576
if (nv.begin() == 0) {
580
fld->store(nv.begin(), nv.size(), &my_charset_bin);
554
memset(buf, 0, table->s->null_bytes); /* TODO: allow NULL */
583
table_vec[pst.get_table_id()].modified = true;
555
584
const int r = hnd->ha_update_row(table->record[1], buf);
556
585
if (r != 0 && r != HA_ERR_RECORD_IS_THE_SAME) {
588
++modified_count; /* TODO: HA_ERR_RECORD_IS_THE_SAME? */
560
589
} else if (mod_op == 'D') {
562
591
handler *const hnd = table->file;
592
table_vec[pst.get_table_id()].modified = true;
563
593
const int r = hnd->ha_delete_row(table->record[0]);
598
} else if (mod_op == '+' || mod_op == '-') {
599
/* increment/decrement */
600
handler *const hnd = table->file;
601
uchar *const buf = table->record[0];
602
store_record(table, record[1]);
603
const prep_stmt::fields_type& rf = pst.get_ret_fields();
604
const size_t n = rf.size();
606
for (i = 0; i < n; ++i) {
607
const string_ref& nv = args.uvals[i];
609
Field *const fld = table->field[fn];
610
if (fld->is_null() || nv.begin() == 0) {
613
const long long pval = fld->val_int();
614
const long long llv = atoll_nocheck(nv.begin(), nv.end());
615
/* TODO: llv == 0? */
623
if ((pval < 0 && nval > 0) || (pval > 0 && nval < 0)) {
624
break; /* don't modify */
627
fld->store(nval, false);
631
table_vec[pst.get_table_id()].modified = true;
632
const int r = hnd->ha_update_row(table->record[1], buf);
633
if (r != 0 && r != HA_ERR_RECORD_IS_THE_SAME) {
587
657
handler *const hnd = table->file;
588
658
uchar *const buf = table->record[0];
589
659
empty_record(table);
590
Field **fld = table->field;
592
for (; *fld && i < fvalslen; ++fld, ++i) {
593
(*fld)->store(fvals[i].begin(), fvals[i].size(), &my_charset_bin);
660
memset(buf, 0, table->s->null_bytes); /* clear null flags */
661
const prep_stmt::fields_type& rf = pst.get_ret_fields();
662
const size_t n = rf.size();
663
for (size_t i = 0; i < n; ++i) {
665
Field *const fld = table->field[fn];
666
if (fvals[i].begin() == 0) {
669
fld->store(fvals[i].begin(), fvals[i].size(), &my_charset_bin);
595
memset(buf, 0, table->s->null_bytes); /* TODO: allow NULL */
672
table->next_number_field = table->found_next_number_field;
596
674
const int r = hnd->ha_write_row(buf);
597
return cb.dbcb_resp_short(r != 0 ? 1 : 0, "");
675
const ulonglong insert_id = table->file->insert_id_for_cur_row;
676
table->next_number_field = 0;
677
table_vec[pst.get_table_id()].modified = true;
678
if (r == 0 && table->found_next_number_field != 0) {
679
return cb.dbcb_resp_short_num64(0, insert_id);
682
return cb.dbcb_resp_short_num(1, r);
684
return cb.dbcb_resp_short(0, "");
607
694
return cb.dbcb_resp_short(2, "notimpl");
698
prepare_keybuf(const cmd_exec_args& args, uchar *key_buf, TABLE *table,
699
KEY& kinfo, size_t invalues_index)
701
size_t kplen_sum = 0;
702
DBG_KEY(fprintf(stderr, "SLOW\n"));
703
for (size_t i = 0; i < args.kvalslen; ++i) {
704
const KEY_PART_INFO & kpt = kinfo.key_part[i];
705
string_ref kval = args.kvals[i];
706
if (args.invalues_keypart >= 0 &&
707
static_cast<size_t>(args.invalues_keypart) == i) {
708
kval = args.invalues[invalues_index];
710
if (kval.begin() == 0) {
711
kpt.field->set_null();
713
kpt.field->set_notnull();
715
kpt.field->store(kval.begin(), kval.size(), &my_charset_bin);
716
kplen_sum += kpt.store_length;
717
DBG_KEYLEN(fprintf(stderr, "l=%u sl=%zu\n", kpt.length,
720
key_copy(key_buf, table->record[0], &kinfo, kplen_sum);
721
DBG_KEYLEN(fprintf(stderr, "sum=%zu flen=%u\n", kplen_sum,
611
727
dbcontext::cmd_find_internal(dbcallback_i& cb, const prep_stmt& pst,
612
728
ha_rkey_function find_flag, const cmd_exec_args& args)
614
730
const bool debug_out = (verbose_level >= 100);
615
const bool modify_op_flag = (args.mod_op.size() != 0);
731
bool need_resp_record = true;
617
if (modify_op_flag && !for_write_flag) {
618
return cb.dbcb_resp_short(2, "readonly");
620
if (modify_op_flag) {
621
mod_op = args.mod_op.begin()[0];
622
if (mod_op != 'U' && mod_op != 'D') {
733
const string_ref& mod_op_str = args.mod_op;
734
if (mod_op_str.size() != 0) {
735
if (!for_write_flag) {
736
return cb.dbcb_resp_short(2, "readonly");
738
mod_op = mod_op_str.begin()[0];
739
need_resp_record = mod_op_str.size() > 1 && mod_op_str.begin()[1] == '?';
741
case 'U': /* update */
742
case 'D': /* delete */
743
case '+': /* increment */
744
case '-': /* decrement */
748
fprintf(stderr, "unknown modop: %c\n", mod_op);
623
750
return cb.dbcb_resp_short(2, "modop");
626
753
lock_tables_if();
628
return cb.dbcb_resp_short(2, "lock_tables");
755
return cb.dbcb_resp_short(1, "lock_tables");
630
757
if (pst.get_table_id() >= table_vec.size()) {
631
758
return cb.dbcb_resp_short(2, "tblnum");
633
760
TABLE *const table = table_vec[pst.get_table_id()].table;
634
762
if (pst.get_idxnum() >= table->s->keys) {
635
763
return cb.dbcb_resp_short(2, "idxnum");
638
766
if (args.kvalslen > kinfo.key_parts) {
639
767
return cb.dbcb_resp_short(2, "kpnum");
641
uchar key_buf[kinfo.key_length]; /* GNU */
642
size_t kplen_sum = 0;
644
DBG_KEY(fprintf(stderr, "SLOW\n"));
645
for (size_t i = 0; i < args.kvalslen; ++i) {
646
const KEY_PART_INFO & kpt = kinfo.key_part[i];
647
const string_ref& kval = args.kvals[i];
648
if (kval.begin() == 0) {
649
kpt.field->set_null();
651
kpt.field->set_notnull();
653
kpt.field->store(kval.begin(), kval.size(), &my_charset_bin);
654
kplen_sum += kpt.length;
769
uchar *const key_buf = DENA_ALLOCA_ALLOCATE(uchar, kinfo.key_length);
770
size_t invalues_idx = 0;
771
size_t kplen_sum = prepare_keybuf(args, key_buf, table, kinfo, invalues_idx);
773
uchar *filter_buf = 0;
774
if (args.filters != 0) {
775
const size_t filter_buf_len = calc_filter_buf_size(table, pst,
777
filter_buf = DENA_ALLOCA_ALLOCATE(uchar, filter_buf_len);
778
if (!fill_filter_buf(table, pst, args.filters, filter_buf,
780
return cb.dbcb_resp_short(2, "filterblob");
656
key_copy(key_buf, table->record[0], &kinfo, kplen_sum);
658
784
table->read_set = &table->s->all_set;
659
785
handler *const hnd = table->file;
660
786
if (!for_write_flag) {
663
789
hnd->ha_index_or_rnd_end();
664
790
hnd->ha_index_init(pst.get_idxnum(), 1);
666
statistic_increment(index_exec_count, &LOCK_status);
668
if (!modify_op_flag) {
669
cb.dbcb_resp_begin(pst.get_retfields().size());
791
if (need_resp_record) {
792
cb.dbcb_resp_begin(pst.get_ret_fields().size());
673
794
const uint32_t limit = args.limit ? args.limit : 1;
674
795
uint32_t skip = args.skip;
675
size_t mod_success_count = 0;
796
size_t modified_count = 0;
677
for (uint32_t i = 0; i < limit + skip; ++i) {
798
bool is_first = true;
799
for (uint32_t cnt = 0; cnt < limit + skip;) {
802
const key_part_map kpm = (1U << args.kvalslen) - 1;
803
r = hnd->index_read_map(table->record[0], key_buf, kpm, find_flag);
804
} else if (args.invalues_keypart >= 0) {
805
if (++invalues_idx >= args.invalueslen) {
808
kplen_sum = prepare_keybuf(args, key_buf, table, kinfo, invalues_idx);
679
809
const key_part_map kpm = (1U << args.kvalslen) - 1;
680
810
r = hnd->index_read_map(table->record[0], key_buf, kpm, find_flag);
702
832
dump_record(cb, table, pst);
838
} else if (args.filters != 0 && (filter_res = check_filter(cb, table,
839
pst, args.filters, filter_buf)) != 0) {
840
if (filter_res < 0) {
707
843
} else if (skip > 0) {
710
if (!modify_op_flag) {
847
if (need_resp_record) {
711
848
resp_record(cb, table, pst);
713
r = modify_record(cb, table, pst, args, mod_op, mod_success_count);
851
r = modify_record(cb, table, pst, args, mod_op, modified_count);
855
if (args.invalues_keypart >= 0 && r == HA_ERR_KEY_NOT_FOUND) {
716
858
if (r != 0 && r != HA_ERR_RECORD_DELETED) {
721
863
if (r != 0 && r != HA_ERR_RECORD_DELETED && r != HA_ERR_KEY_NOT_FOUND &&
722
864
r != HA_ERR_END_OF_FILE) {
724
if (!modify_op_flag) {
866
if (need_resp_record) {
725
867
/* revert dbcb_resp_begin() and dbcb_resp_entry() */
726
868
cb.dbcb_resp_cancel();
728
cb.dbcb_resp_short_num(2, r);
870
cb.dbcb_resp_short_num(1, r);
731
if (!modify_op_flag) {
873
if (need_resp_record) {
732
874
cb.dbcb_resp_end();
734
cb.dbcb_resp_short_num(0, mod_success_count);
876
cb.dbcb_resp_short_num(0, modified_count);
879
DENA_ALLOCA_FREE(filter_buf);
880
DENA_ALLOCA_FREE(key_buf);
884
dbcontext::calc_filter_buf_size(TABLE *table, const prep_stmt& pst,
885
const record_filter *filters)
887
size_t filter_buf_len = 0;
888
for (const record_filter *f = filters; f->op.begin() != 0; ++f) {
889
if (f->val.begin() == 0) {
892
const uint32_t fn = pst.get_filter_fields()[f->ff_offset];
893
filter_buf_len += table->field[fn]->pack_length();
896
/* Field_medium::cmp() calls uint3korr(), which may read 4 bytes.
897
Allocate 1 more byte for safety. */
898
return filter_buf_len;
902
dbcontext::fill_filter_buf(TABLE *table, const prep_stmt& pst,
903
const record_filter *filters, uchar *filter_buf, size_t len)
905
memset(filter_buf, 0, len);
907
for (const record_filter *f = filters; f->op.begin() != 0; ++f) {
908
if (f->val.begin() == 0) {
911
const uint32_t fn = pst.get_filter_fields()[f->ff_offset];
912
Field *const fld = table->field[fn];
913
if ((fld->flags & BLOB_FLAG) != 0) {
916
fld->store(f->val.begin(), f->val.size(), &my_charset_bin);
917
const size_t packlen = fld->pack_length();
918
memcpy(filter_buf + pos, fld->ptr, packlen);
925
dbcontext::check_filter(dbcallback_i& cb, TABLE *table, const prep_stmt& pst,
926
const record_filter *filters, const uchar *filter_buf)
928
DBG_FILTER(fprintf(stderr, "check_filter\n"));
930
for (const record_filter *f = filters; f->op.begin() != 0; ++f) {
931
const string_ref& op = f->op;
932
const string_ref& val = f->val;
933
const uint32_t fn = pst.get_filter_fields()[f->ff_offset];
934
Field *const fld = table->field[fn];
935
const size_t packlen = fld->pack_length();
936
const uchar *const bval = filter_buf + pos;
938
if (fld->is_null()) {
939
cv = (val.begin() == 0) ? 0 : -1;
941
cv = (val.begin() == 0) ? 1 : fld->cmp(bval);
943
DBG_FILTER(fprintf(stderr, "check_filter cv=%d\n", cv));
945
if (op.size() == 1) {
946
switch (op.begin()[0]) {
948
DBG_FILTER(fprintf(stderr, "check_filter op: >\n"));
952
DBG_FILTER(fprintf(stderr, "check_filter op: <\n"));
956
DBG_FILTER(fprintf(stderr, "check_filter op: =\n"));
960
DBG_FILTER(fprintf(stderr, "check_filter op: unknown\n"));
961
cond = false; /* FIXME: error */
964
} else if (op.size() == 2 && op.begin()[1] == '=') {
965
switch (op.begin()[0]) {
967
DBG_FILTER(fprintf(stderr, "check_filter op: >=\n"));
971
DBG_FILTER(fprintf(stderr, "check_filter op: <=\n"));
975
DBG_FILTER(fprintf(stderr, "check_filter op: !=\n"));
979
DBG_FILTER(fprintf(stderr, "check_filter op: unknown\n"));
980
cond = false; /* FIXME: error */
984
DBG_FILTER(fprintf(stderr, "check_filter cond: %d\n", (int)cond));
986
return (f->filter_type == record_filter_type_skip) ? 1 : -1;
988
if (val.begin() != 0) {
740
dbcontext::cmd_open_index(dbcallback_i& cb, size_t pst_id, const char *dbn,
741
const char *tbl, const char *idx, const char *retflds)
996
dbcontext::cmd_open(dbcallback_i& cb, const cmd_open_args& arg)
743
998
unlock_tables_if();
744
const table_name_type k = std::make_pair(std::string(dbn), std::string(tbl));
999
const table_name_type k = std::make_pair(std::string(arg.dbn),
1000
std::string(arg.tbl));
745
1001
const table_map_type::const_iterator iter = table_map.find(k);
746
1002
uint32_t tblnum = 0;
747
1003
if (iter != table_map.end()) {
754
1010
bool refresh = true;
755
1011
const thr_lock_type lock_type = for_write_flag ? TL_WRITE : TL_READ;
756
1012
#if MYSQL_VERSION_ID >= 50505
757
tables.init_one_table(dbn, strlen(dbn), tbl, strlen(tbl), tbl,
759
tables.mdl_request = mdl_request;
760
Open_table_context ot_act(thd, MYSQL_OPEN_REOPEN);
1013
tables.init_one_table(arg.dbn, strlen(arg.dbn), arg.tbl, strlen(arg.tbl),
1014
arg.tbl, lock_type);
1015
tables.mdl_request.init(MDL_key::TABLE, arg.dbn, arg.tbl,
1016
for_write_flag ? MDL_SHARED_WRITE : MDL_SHARED_READ, MDL_TRANSACTION);
1017
Open_table_context ot_act(thd, 0);
761
1018
if (!open_table(thd, &tables, thd->mem_root, &ot_act)) {
762
1019
table = tables.table;
765
tables.init_one_table(dbn, tbl, lock_type);
1022
tables.init_one_table(arg.dbn, arg.tbl, lock_type);
766
1023
table = open_table(thd, &tables, thd->mem_root, &refresh,
767
1024
OPEN_VIEW_NO_PARSE);
769
1026
if (table == 0) {
770
1027
DENA_VERBOSE(10, fprintf(stderr,
771
1028
"HNDSOCK failed to open %p [%s] [%s] [%d]\n",
772
thd, dbn, tbl, static_cast<int>(refresh)));
773
return cb.dbcb_resp_short(2, "open_table");
1029
thd, arg.dbn, arg.tbl, static_cast<int>(refresh)));
1030
return cb.dbcb_resp_short(1, "open_table");
775
1032
statistic_increment(open_tables_count, &LOCK_status);
776
1033
table->reginfo.lock_type = lock_type;
803
1061
if (idxnum == size_t(-1)) {
804
1062
return cb.dbcb_resp_short(2, "idxnum");
806
string_ref retflds_sr(retflds, strlen(retflds));
1064
prep_stmt::fields_type rf;
1065
prep_stmt::fields_type ff;
1066
if (!parse_fields(table_vec[tblnum].table, arg.retflds, rf)) {
1067
return cb.dbcb_resp_short(2, "fld");
1069
if (!parse_fields(table_vec[tblnum].table, arg.filflds, ff)) {
1070
return cb.dbcb_resp_short(2, "fld");
1072
prep_stmt p(this, tblnum, idxnum, rf, ff);
1073
cb.dbcb_set_prep_stmt(arg.pst_id, p);
1074
return cb.dbcb_resp_short(0, "");
1078
dbcontext::parse_fields(TABLE *const table, const char *str,
1079
prep_stmt::fields_type& flds)
1081
string_ref flds_sr(str, strlen(str));
807
1082
std::vector<string_ref> fldnms;
808
if (retflds_sr.size() != 0) {
809
split(',', retflds_sr, fldnms);
1083
if (flds_sr.size() != 0) {
1084
split(',', flds_sr, fldnms);
811
prep_stmt::retfields_type rf;
812
1086
for (size_t i = 0; i < fldnms.size(); ++i) {
813
TABLE *const table = table_vec[tblnum].table;
814
1087
Field **fld = 0;
816
1089
for (fld = table->field; *fld; ++fld, ++j) {