5
* Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
6
* See COPYRIGHT.txt for details.
13
#include "database.hpp"
14
#include "string_util.hpp"
16
#include "mysql_incl.hpp"
27
/* status variables */
28
unsigned long long int open_tables_count;
29
unsigned long long int close_tables_count;
30
unsigned long long int lock_tables_count;
31
unsigned long long int unlock_tables_count;
32
unsigned long long int index_exec_count;
36
prep_stmt::prep_stmt()
37
: dbctx(0), table_id(static_cast<size_t>(-1)),
38
idxnum(static_cast<size_t>(-1))
41
prep_stmt::prep_stmt(dbcontext_i *c, size_t tbl, size_t idx,
42
const retfields_type& rf)
43
: dbctx(c), table_id(tbl), idxnum(idx), retfields(rf)
46
dbctx->table_addref(table_id);
49
prep_stmt::~prep_stmt()
52
dbctx->table_release(table_id);
56
prep_stmt::prep_stmt(const prep_stmt& x)
57
: dbctx(x.dbctx), table_id(x.table_id), idxnum(x.idxnum),
58
retfields(x.retfields)
61
dbctx->table_addref(table_id);
66
prep_stmt::operator =(const prep_stmt& x)
70
dbctx->table_release(table_id);
73
table_id = x.table_id;
75
retfields = x.retfields;
77
dbctx->table_addref(table_id);
83
struct database : public database_i, private noncopyable {
84
database(const config& c);
86
virtual dbcontext_ptr create_context(bool for_write) volatile;
87
virtual void stop() volatile;
88
virtual const config& get_conf() const volatile;
95
struct tablevec_entry {
98
tablevec_entry() : table(0), refcount(0) { }
101
struct expr_user_lock : private noncopyable {
102
expr_user_lock(THD *thd, int timeout)
103
: lck_key("handlersocket_wr", 16, &my_charset_latin1),
104
lck_timeout(timeout),
105
lck_func_get_lock(&lck_key, &lck_timeout),
106
lck_func_release_lock(&lck_key)
108
lck_key.fix_fields(thd, 0);
109
lck_timeout.fix_fields(thd, 0);
110
lck_func_get_lock.fix_fields(thd, 0);
111
lck_func_release_lock.fix_fields(thd, 0);
113
long long get_lock() {
114
return lck_func_get_lock.val_int();
116
long long release_lock() {
117
return lck_func_release_lock.val_int();
121
Item_int lck_timeout;
122
Item_func_get_lock lck_func_get_lock;
123
Item_func_release_lock lck_func_release_lock;
126
struct dbcontext : public dbcontext_i, private noncopyable {
127
dbcontext(volatile database *d, bool for_write);
128
virtual ~dbcontext();
129
virtual void init_thread(const void *stack_botton,
130
volatile int& shutdown_flag);
131
virtual void term_thread();
132
virtual bool check_alive();
133
virtual void lock_tables_if();
134
virtual void unlock_tables_if();
135
virtual bool get_commit_error();
136
virtual void clear_error();
137
virtual void close_tables_if();
138
virtual void table_addref(size_t tbl_id);
139
virtual void table_release(size_t tbl_id);
140
virtual void cmd_open_index(dbcallback_i& cb, size_t pst_id, const char *dbn,
141
const char *tbl, const char *idx, const char *retflds);
142
virtual void cmd_exec_on_index(dbcallback_i& cb, const cmd_exec_args& args);
143
virtual void set_statistics(size_t num_conns, size_t num_active);
145
int set_thread_message(const char *fmt, ...)
146
__attribute__((format (printf, 2, 3)));
147
void cmd_insert_internal(dbcallback_i& cb, const prep_stmt& pst,
148
const string_ref *fvals, size_t fvalslen);
149
void cmd_sql_internal(dbcallback_i& cb, const prep_stmt& pst,
150
const string_ref *fvals, size_t fvalslen);
151
void cmd_find_internal(dbcallback_i& cb, const prep_stmt& pst,
152
ha_rkey_function find_flag, const cmd_exec_args& args);
153
void resp_record(dbcallback_i& cb, TABLE *const table, const prep_stmt& pst);
154
void dump_record(dbcallback_i& cb, TABLE *const table, const prep_stmt& pst);
155
int modify_record(dbcallback_i& cb, TABLE *const table,
156
const prep_stmt& pst, const cmd_exec_args& args, char mod_op,
157
size_t& success_count);
159
typedef std::vector<tablevec_entry> table_vec_type;
160
typedef std::pair<std::string, std::string> table_name_type;
161
typedef std::map<table_name_type, size_t> table_map_type;
163
volatile database *const dbref;
168
std::auto_ptr<expr_user_lock> user_lock;
169
int user_level_lock_timeout;
170
bool user_level_lock_locked;
172
std::vector<char> info_message_buf;
173
table_vec_type table_vec;
174
table_map_type table_map;
175
#if MYSQL_VERSION_ID >= 50505
176
MDL_request *mdl_request;
182
database::database(const config& c)
183
: child_running(1), conf(c)
187
database::~database()
192
database::create_context(bool for_write) volatile
194
return dbcontext_ptr(new dbcontext(this, for_write));
198
database::stop() volatile
200
child_running = false;
204
database::get_conf() const volatile
206
return const_cast<const config&>(conf);
210
database_i::create(const config& conf)
212
return database_ptr(new database(conf));
215
dbcontext::dbcontext(volatile database *d, bool for_write)
216
: dbref(d), for_write_flag(for_write), thd(0), lock(0), lock_failed(false),
217
user_level_lock_timeout(0), user_level_lock_locked(false),
218
commit_error(false), mdl_request(0)
220
info_message_buf.resize(8192);
221
user_level_lock_timeout = d->get_conf().get_int("wrlock_timeout", 12);
224
dbcontext::~dbcontext()
231
wait_server_to_start(THD *thd, volatile int& shutdown_flag)
234
DBG_SHUT(fprintf(stderr, "HNDSOCK wsts\n"));
235
pthread_mutex_lock(&LOCK_server_started);
236
while (!mysqld_server_started) {
237
timespec abstime = { };
238
set_timespec(abstime, 1);
239
pthread_cond_timedwait(&COND_server_started, &LOCK_server_started,
241
pthread_mutex_unlock(&LOCK_server_started);
242
pthread_mutex_lock(&thd->mysys_var->mutex);
243
THD::killed_state st = thd->killed;
244
pthread_mutex_unlock(&thd->mysys_var->mutex);
245
DBG_SHUT(fprintf(stderr, "HNDSOCK wsts kst %d\n", (int)st));
246
pthread_mutex_lock(&LOCK_server_started);
247
if (st != THD::NOT_KILLED) {
248
DBG_SHUT(fprintf(stderr, "HNDSOCK wsts kst %d break\n", (int)st));
253
DBG_SHUT(fprintf(stderr, "HNDSOCK wsts kst shut break\n"));
258
pthread_mutex_unlock(&LOCK_server_started);
259
DBG_SHUT(fprintf(stderr, "HNDSOCK wsts done\n"));
266
dbcontext::init_thread(const void *stack_bottom, volatile int& shutdown_flag)
268
DBG_THR(fprintf(stderr, "HNDSOCK init thread\n"));
272
thd->thread_stack = (char *)stack_bottom;
273
DBG_THR(const size_t of = (char *)(&thd->thread_stack) - (char *)thd);
274
DBG_THR(fprintf(stderr, "thread_stack = %p sz=%zu of=%zu\n",
275
thd->thread_stack, sizeof(THD), of));
276
thd->store_globals();
277
thd->system_thread = static_cast<enum_thread_type>(1<<30UL);
280
if (for_write_flag) {
281
#if MYSQL_VERSION_ID >= 50505
282
thd->variables.option_bits |= OPTION_BIN_LOG;
284
thd->options |= OPTION_BIN_LOG;
288
thd->db = my_strdup("handlersocket", MYF(0));
290
my_pthread_setspecific_ptr(THR_THD, thd);
291
DBG_THR(fprintf(stderr, "HNDSOCK x0 %p\n", thd));
294
pthread_mutex_lock(&LOCK_thread_count);
295
thd->thread_id = thread_id++;
298
pthread_mutex_unlock(&LOCK_thread_count);
301
DBG_THR(fprintf(stderr, "HNDSOCK init thread wsts\n"));
302
wait_server_to_start(thd, shutdown_flag);
303
DBG_THR(fprintf(stderr, "HNDSOCK init thread done\n"));
305
thd_proc_info(thd, &info_message_buf[0]);
306
set_thread_message("hs:listening");
307
DBG_THR(fprintf(stderr, "HNDSOCK x1 %p\n", thd));
309
#if MYSQL_VERSION_ID >= 50505
310
mdl_request = MDL_request::create(MDL_key::TABLE, "", "", for_write_flag ?
311
MDL_SHARED_WRITE : MDL_SHARED_READ, thd->mem_root);
316
user_lock.reset(new expr_user_lock(thd, user_level_lock_timeout));
320
dbcontext::set_thread_message(const char *fmt, ...)
324
const int n = vsnprintf(&info_message_buf[0], info_message_buf.size(),
331
dbcontext::term_thread()
333
DBG_THR(fprintf(stderr, "HNDSOCK thread end %p\n", thd));
335
my_pthread_setspecific_ptr(THR_THD, 0);
337
pthread_mutex_lock(&LOCK_thread_count);
341
pthread_mutex_unlock(&LOCK_thread_count);
347
dbcontext::check_alive()
349
pthread_mutex_lock(&thd->mysys_var->mutex);
350
THD::killed_state st = thd->killed;
351
pthread_mutex_unlock(&thd->mysys_var->mutex);
352
DBG_SHUT(fprintf(stderr, "chk HNDSOCK kst %p %p %d %zu\n", thd, &thd->killed,
353
(int)st, sizeof(*thd)));
354
if (st != THD::NOT_KILLED) {
355
DBG_SHUT(fprintf(stderr, "chk HNDSOCK kst %d break\n", (int)st));
362
dbcontext::lock_tables_if()
367
if (for_write_flag && !user_level_lock_locked) {
368
if (user_lock->get_lock()) {
369
user_level_lock_locked = true;
376
const size_t num_max = table_vec.size();
377
TABLE *tables[num_max ? num_max : 1]; /* GNU */
379
for (size_t i = 0; i < num_max; ++i) {
380
if (table_vec[i].refcount > 0) {
381
tables[num_open++] = table_vec[i].table;
384
#if MYSQL_VERSION_ID >= 50505
385
lock = thd->lock = mysql_lock_tables(thd, &tables[0], num_open, 0);
387
bool need_reopen= false;
388
lock = thd->lock = mysql_lock_tables(thd, &tables[0], num_open,
389
MYSQL_LOCK_NOTIFY_IF_NEED_REOPEN, &need_reopen);
391
statistic_increment(lock_tables_count, &LOCK_status);
392
thd_proc_info(thd, &info_message_buf[0]);
393
DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK lock tables %p %p %zu %zu\n",
394
thd, lock, num_max, num_open));
397
DENA_VERBOSE(10, fprintf(stderr, "HNDSOCK failed to lock tables %p\n",
400
if (for_write_flag) {
401
#if MYSQL_VERSION_ID >= 50505
402
thd->set_current_stmt_binlog_format_row();
404
thd->current_stmt_binlog_row_based = 1;
408
DBG_LOCK(fprintf(stderr, "HNDSOCK tblnum=%d\n", (int)tblnum));
412
dbcontext::unlock_tables_if()
415
DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK unlock tables\n"));
416
if (for_write_flag) {
418
#if MYSQL_VERSION_ID >= 50505
419
suc = (trans_commit_stmt(thd) == 0);
421
suc = (ha_autocommit_or_rollback(thd, 0) == 0);
425
DENA_VERBOSE(10, fprintf(stderr,
426
"HNDSOCK unlock tables: commit failed\n"));
429
mysql_unlock_tables(thd, lock);
430
lock = thd->lock = 0;
431
statistic_increment(unlock_tables_count, &LOCK_status);
433
if (user_level_lock_locked) {
434
if (user_lock->release_lock()) {
435
user_level_lock_locked = false;
441
dbcontext::get_commit_error()
447
dbcontext::clear_error()
450
commit_error = false;
454
dbcontext::close_tables_if()
457
if (!table_vec.empty()) {
458
DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK close tables\n"));
459
close_thread_tables(thd);
460
statistic_increment(close_tables_count, &LOCK_status);
467
dbcontext::table_addref(size_t tbl_id)
469
table_vec[tbl_id].refcount += 1;
470
DBG_REFCNT(fprintf(stderr, "%p %zu %zu addref\n", this, tbl_id,
471
table_vec[tbl_id].refcount));
475
dbcontext::table_release(size_t tbl_id)
477
table_vec[tbl_id].refcount -= 1;
478
DBG_REFCNT(fprintf(stderr, "%p %zu %zu release\n", this, tbl_id,
479
table_vec[tbl_id].refcount));
483
dbcontext::resp_record(dbcallback_i& cb, TABLE *const table,
484
const prep_stmt& pst)
487
String rwpstr(rwpstr_buf, sizeof(rwpstr_buf), &my_charset_bin);
488
const prep_stmt::retfields_type& rf = pst.get_retfields();
489
const size_t n = rf.size();
490
for (size_t i = 0; i < n; ++i) {
492
Field *const fld = table->field[fn];
493
DBG_FLD(fprintf(stderr, "fld=%p %zu\n", fld, fn));
494
if (fld->is_null()) {
496
cb.dbcb_resp_entry(0, 0);
498
fld->val_str(&rwpstr, &rwpstr);
499
const size_t len = rwpstr.length();
502
cb.dbcb_resp_entry(rwpstr.ptr(), rwpstr.length());
505
static const char empty_str[] = "";
506
cb.dbcb_resp_entry(empty_str, 0);
513
dbcontext::dump_record(dbcallback_i& cb, TABLE *const table,
514
const prep_stmt& pst)
517
String rwpstr(rwpstr_buf, sizeof(rwpstr_buf), &my_charset_bin);
518
const prep_stmt::retfields_type& rf = pst.get_retfields();
519
const size_t n = rf.size();
520
for (size_t i = 0; i < n; ++i) {
522
Field *const fld = table->field[fn];
523
if (fld->is_null()) {
525
cb.dbcb_resp_entry(0, 0);
526
fprintf(stderr, "NULL");
528
fld->val_str(&rwpstr, &rwpstr);
529
const std::string s(rwpstr.ptr(), rwpstr.length());
530
fprintf(stderr, "[%s]", s.c_str());
533
fprintf(stderr, "\n");
537
dbcontext::modify_record(dbcallback_i& cb, TABLE *const table,
538
const prep_stmt& pst, const cmd_exec_args& args, char mod_op,
539
size_t& success_count)
543
handler *const hnd = table->file;
544
uchar *const buf = table->record[0];
545
store_record(table, record[1]);
546
const prep_stmt::retfields_type& rf = pst.get_retfields();
547
const size_t n = rf.size();
548
for (size_t i = 0; i < n; ++i) {
549
const string_ref& nv = args.uvals[i];
551
Field *const fld = table->field[fn];
552
fld->store(nv.begin(), nv.size(), &my_charset_bin);
554
memset(buf, 0, table->s->null_bytes); /* TODO: allow NULL */
555
const int r = hnd->ha_update_row(table->record[1], buf);
556
if (r != 0 && r != HA_ERR_RECORD_IS_THE_SAME) {
560
} else if (mod_op == 'D') {
562
handler *const hnd = table->file;
563
const int r = hnd->ha_delete_row(table->record[0]);
573
dbcontext::cmd_insert_internal(dbcallback_i& cb, const prep_stmt& pst,
574
const string_ref *fvals, size_t fvalslen)
576
if (!for_write_flag) {
577
return cb.dbcb_resp_short(2, "readonly");
581
return cb.dbcb_resp_short(2, "lock_tables");
583
if (pst.get_table_id() >= table_vec.size()) {
584
return cb.dbcb_resp_short(2, "tblnum");
586
TABLE *const table = table_vec[pst.get_table_id()].table;
587
handler *const hnd = table->file;
588
uchar *const buf = table->record[0];
590
Field **fld = table->field;
592
for (; *fld && i < fvalslen; ++fld, ++i) {
593
(*fld)->store(fvals[i].begin(), fvals[i].size(), &my_charset_bin);
595
memset(buf, 0, table->s->null_bytes); /* TODO: allow NULL */
596
const int r = hnd->ha_write_row(buf);
597
return cb.dbcb_resp_short(r != 0 ? 1 : 0, "");
601
dbcontext::cmd_sql_internal(dbcallback_i& cb, const prep_stmt& pst,
602
const string_ref *fvals, size_t fvalslen)
605
return cb.dbcb_resp_short(2, "syntax");
607
return cb.dbcb_resp_short(2, "notimpl");
611
dbcontext::cmd_find_internal(dbcallback_i& cb, const prep_stmt& pst,
612
ha_rkey_function find_flag, const cmd_exec_args& args)
614
const bool debug_out = (verbose_level >= 100);
615
const bool modify_op_flag = (args.mod_op.size() != 0);
617
if (modify_op_flag && !for_write_flag) {
618
return cb.dbcb_resp_short(2, "readonly");
620
if (modify_op_flag) {
621
mod_op = args.mod_op.begin()[0];
622
if (mod_op != 'U' && mod_op != 'D') {
623
return cb.dbcb_resp_short(2, "modop");
628
return cb.dbcb_resp_short(2, "lock_tables");
630
if (pst.get_table_id() >= table_vec.size()) {
631
return cb.dbcb_resp_short(2, "tblnum");
633
TABLE *const table = table_vec[pst.get_table_id()].table;
634
if (pst.get_idxnum() >= table->s->keys) {
635
return cb.dbcb_resp_short(2, "idxnum");
637
KEY& kinfo = table->key_info[pst.get_idxnum()];
638
if (args.kvalslen > kinfo.key_parts) {
639
return cb.dbcb_resp_short(2, "kpnum");
641
uchar key_buf[kinfo.key_length]; /* GNU */
642
size_t kplen_sum = 0;
644
DBG_KEY(fprintf(stderr, "SLOW\n"));
645
for (size_t i = 0; i < args.kvalslen; ++i) {
646
const KEY_PART_INFO & kpt = kinfo.key_part[i];
647
const string_ref& kval = args.kvals[i];
648
if (kval.begin() == 0) {
649
kpt.field->set_null();
651
kpt.field->set_notnull();
653
kpt.field->store(kval.begin(), kval.size(), &my_charset_bin);
654
kplen_sum += kpt.length;
656
key_copy(key_buf, table->record[0], &kinfo, kplen_sum);
658
table->read_set = &table->s->all_set;
659
handler *const hnd = table->file;
660
if (!for_write_flag) {
661
hnd->init_table_handle_for_HANDLER();
663
hnd->ha_index_or_rnd_end();
664
hnd->ha_index_init(pst.get_idxnum(), 1);
666
statistic_increment(index_exec_count, &LOCK_status);
668
if (!modify_op_flag) {
669
cb.dbcb_resp_begin(pst.get_retfields().size());
673
const uint32_t limit = args.limit ? args.limit : 1;
674
uint32_t skip = args.skip;
675
size_t mod_success_count = 0;
677
for (uint32_t i = 0; i < limit + skip; ++i) {
679
const key_part_map kpm = (1U << args.kvalslen) - 1;
680
r = hnd->index_read_map(table->record[0], key_buf, kpm, find_flag);
683
case HA_READ_BEFORE_KEY:
684
case HA_READ_KEY_OR_PREV:
685
r = hnd->index_prev(table->record[0]);
687
case HA_READ_AFTER_KEY:
688
case HA_READ_KEY_OR_NEXT:
689
r = hnd->index_next(table->record[0]);
691
case HA_READ_KEY_EXACT:
692
r = hnd->index_next_same(table->record[0], key_buf, kplen_sum);
695
r = HA_ERR_END_OF_FILE; /* to finish the loop */
700
fprintf(stderr, "r=%d\n", r);
701
if (r == 0 || r == HA_ERR_RECORD_DELETED) {
702
dump_record(cb, table, pst);
707
} else if (skip > 0) {
710
if (!modify_op_flag) {
711
resp_record(cb, table, pst);
713
r = modify_record(cb, table, pst, args, mod_op, mod_success_count);
716
if (r != 0 && r != HA_ERR_RECORD_DELETED) {
720
hnd->ha_index_or_rnd_end();
721
if (r != 0 && r != HA_ERR_RECORD_DELETED && r != HA_ERR_KEY_NOT_FOUND &&
722
r != HA_ERR_END_OF_FILE) {
724
if (!modify_op_flag) {
725
/* revert dbcb_resp_begin() and dbcb_resp_entry() */
726
cb.dbcb_resp_cancel();
728
cb.dbcb_resp_short_num(2, r);
731
if (!modify_op_flag) {
734
cb.dbcb_resp_short_num(0, mod_success_count);
740
dbcontext::cmd_open_index(dbcallback_i& cb, size_t pst_id, const char *dbn,
741
const char *tbl, const char *idx, const char *retflds)
744
const table_name_type k = std::make_pair(std::string(dbn), std::string(tbl));
745
const table_map_type::const_iterator iter = table_map.find(k);
747
if (iter != table_map.end()) {
748
tblnum = iter->second;
749
DBG_CMP(fprintf(stderr, "HNDSOCK k=%s tblnum=%d\n", k.c_str(),
755
const thr_lock_type lock_type = for_write_flag ? TL_WRITE : TL_READ;
756
#if MYSQL_VERSION_ID >= 50505
757
tables.init_one_table(dbn, strlen(dbn), tbl, strlen(tbl), tbl,
759
tables.mdl_request = mdl_request;
760
Open_table_context ot_act(thd, MYSQL_OPEN_REOPEN);
761
if (!open_table(thd, &tables, thd->mem_root, &ot_act)) {
762
table = tables.table;
765
tables.init_one_table(dbn, tbl, lock_type);
766
table = open_table(thd, &tables, thd->mem_root, &refresh,
770
DENA_VERBOSE(10, fprintf(stderr,
771
"HNDSOCK failed to open %p [%s] [%s] [%d]\n",
772
thd, dbn, tbl, static_cast<int>(refresh)));
773
return cb.dbcb_resp_short(2, "open_table");
775
statistic_increment(open_tables_count, &LOCK_status);
776
table->reginfo.lock_type = lock_type;
777
table->use_all_columns();
778
tblnum = table_vec.size();
781
table_vec.push_back(e);
782
table_map[k] = tblnum;
784
size_t idxnum = static_cast<size_t>(-1);
785
if (idx[0] >= '0' && idx[0] <= '9') {
787
TABLE *const table = table_vec[tblnum].table;
789
if (idxnum >= table->s->keys) {
790
return cb.dbcb_resp_short(2, "idxnum");
793
const char *const idx_name_to_open = idx[0] == '\0' ? "PRIMARY" : idx;
794
TABLE *const table = table_vec[tblnum].table;
795
for (uint i = 0; i < table->s->keys; ++i) {
796
KEY& kinfo = table->key_info[i];
797
if (strcmp(kinfo.name, idx_name_to_open) == 0) {
803
if (idxnum == size_t(-1)) {
804
return cb.dbcb_resp_short(2, "idxnum");
806
string_ref retflds_sr(retflds, strlen(retflds));
807
std::vector<string_ref> fldnms;
808
if (retflds_sr.size() != 0) {
809
split(',', retflds_sr, fldnms);
811
prep_stmt::retfields_type rf;
812
for (size_t i = 0; i < fldnms.size(); ++i) {
813
TABLE *const table = table_vec[tblnum].table;
816
for (fld = table->field; *fld; ++fld, ++j) {
817
DBG_FLD(fprintf(stderr, "f %s\n", (*fld)->field_name));
818
string_ref fn((*fld)->field_name, strlen((*fld)->field_name));
819
if (fn == fldnms[i]) {
824
DBG_FLD(fprintf(stderr, "UNKNOWN FLD %s [%s]\n", retflds,
825
std::string(fldnms[i].begin(), fldnms[i].size()).c_str()));
826
return cb.dbcb_resp_short(2, "fld");
828
DBG_FLD(fprintf(stderr, "FLD %s %zu\n", (*fld)->field_name, j));
831
prep_stmt p(this, tblnum, idxnum, rf);
832
cb.dbcb_set_prep_stmt(pst_id, p);
833
return cb.dbcb_resp_short(0, "");
837
db_write_op_none = 0,
838
db_write_op_insert = 1,
843
dbcontext::cmd_exec_on_index(dbcallback_i& cb, const cmd_exec_args& args)
845
const prep_stmt& p = *args.pst;
846
if (p.get_table_id() == static_cast<size_t>(-1)) {
847
return cb.dbcb_resp_short(2, "stmtnum");
849
ha_rkey_function find_flag = HA_READ_KEY_EXACT;
850
db_write_op wrop = db_write_op_none;
851
if (args.op.size() == 1) {
852
switch (args.op.begin()[0]) {
854
find_flag = HA_READ_KEY_EXACT;
857
find_flag = HA_READ_AFTER_KEY;
860
find_flag = HA_READ_BEFORE_KEY;
863
wrop = db_write_op_insert;
866
wrop = db_write_op_sql;
869
return cb.dbcb_resp_short(1, "op");
871
} else if (args.op.size() == 2 && args.op.begin()[1] == '=') {
872
switch (args.op.begin()[0]) {
874
find_flag = HA_READ_KEY_OR_NEXT;
877
find_flag = HA_READ_KEY_OR_PREV;
880
return cb.dbcb_resp_short(1, "op");
883
return cb.dbcb_resp_short(1, "op");
885
if (args.kvalslen <= 0) {
886
return cb.dbcb_resp_short(2, "klen");
889
case db_write_op_none:
890
return cmd_find_internal(cb, p, find_flag, args);
891
case db_write_op_insert:
892
return cmd_insert_internal(cb, p, args.kvals, args.kvalslen);
893
case db_write_op_sql:
894
return cmd_sql_internal(cb, p, args.kvals, args.kvalslen);
899
dbcontext::set_statistics(size_t num_conns, size_t num_active)
901
thd_proc_info(thd, &info_message_buf[0]);
902
if (for_write_flag) {
903
set_thread_message("handlersocket: mode=wr, %zu conns, %zu active",
904
num_conns, num_active);
906
set_thread_message("handlersocket: mode=rd, %zu conns, %zu active",
907
num_conns, num_active);