~tsarev/percona-server/replication_slave_skip_columns-mysqlbinlog

« back to all changes in this revision

Viewing changes to HandlerSocket-Plugin-for-MySQL/handlersocket/database.cpp

  • Committer: Aleksandr Kuzminsky
  • Date: 2010-11-24 08:58:24 UTC
  • mto: (135.19.7 percona-server)
  • mto: This revision was merged to the branch mainline in revision 173.
  • Revision ID: aleksandr.kuzminsky@percona.com-20101124085824-9742ytdt2x50djw3
Added HandlerSocket-Plugin-for-MySQL sources

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
 
 
2
// vim:sw=2:ai
 
3
 
 
4
/*
 
5
 * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
 
6
 * See COPYRIGHT.txt for details.
 
7
 */
 
8
 
 
9
#include <stdlib.h>
 
10
#include <stdio.h>
 
11
#include <string.h>
 
12
 
 
13
#include "database.hpp"
 
14
#include "string_util.hpp"
 
15
#include "escape.hpp"
 
16
#include "mysql_incl.hpp"
 
17
 
 
18
#define DBG_KEY(x)
 
19
#define DBG_SHUT(x)
 
20
#define DBG_LOCK(x)
 
21
#define DBG_THR(x)
 
22
#define DBG_CMP(x)
 
23
#define DBG_FLD(x)
 
24
#define DBG_REFCNT(x)
 
25
#define DBG_DELETED
 
26
 
 
27
/* status variables */
 
28
unsigned long long int open_tables_count;
 
29
unsigned long long int close_tables_count;
 
30
unsigned long long int lock_tables_count;
 
31
unsigned long long int unlock_tables_count;
 
32
unsigned long long int index_exec_count;
 
33
 
 
34
namespace dena {
 
35
 
 
36
prep_stmt::prep_stmt()
 
37
  : dbctx(0), table_id(static_cast<size_t>(-1)),
 
38
    idxnum(static_cast<size_t>(-1))
 
39
{
 
40
}
 
41
prep_stmt::prep_stmt(dbcontext_i *c, size_t tbl, size_t idx,
 
42
  const retfields_type& rf)
 
43
  : dbctx(c), table_id(tbl), idxnum(idx), retfields(rf)
 
44
{
 
45
  if (dbctx) {
 
46
    dbctx->table_addref(table_id);
 
47
  }
 
48
}
 
49
prep_stmt::~prep_stmt()
 
50
{
 
51
  if (dbctx) {
 
52
    dbctx->table_release(table_id);
 
53
  }
 
54
}
 
55
 
 
56
prep_stmt::prep_stmt(const prep_stmt& x)
 
57
  : dbctx(x.dbctx), table_id(x.table_id), idxnum(x.idxnum),
 
58
  retfields(x.retfields)
 
59
{
 
60
  if (dbctx) {
 
61
    dbctx->table_addref(table_id);
 
62
  }
 
63
}
 
64
 
 
65
prep_stmt&
 
66
prep_stmt::operator =(const prep_stmt& x)
 
67
{
 
68
  if (this != &x) {
 
69
    if (dbctx) {
 
70
      dbctx->table_release(table_id);
 
71
    }
 
72
    dbctx = x.dbctx;
 
73
    table_id = x.table_id;
 
74
    idxnum = x.idxnum;
 
75
    retfields = x.retfields;
 
76
    if (dbctx) {
 
77
      dbctx->table_addref(table_id);
 
78
    }
 
79
  }
 
80
  return *this;
 
81
}
 
82
 
 
83
struct database : public database_i, private noncopyable {
 
84
  database(const config& c);
 
85
  virtual ~database();
 
86
  virtual dbcontext_ptr create_context(bool for_write) volatile;
 
87
  virtual void stop() volatile;
 
88
  virtual const config& get_conf() const volatile;
 
89
 public:
 
90
  int child_running;
 
91
 private:
 
92
  config conf;
 
93
};
 
94
 
 
95
struct tablevec_entry {
 
96
  TABLE *table;
 
97
  size_t refcount;
 
98
  tablevec_entry() : table(0), refcount(0) { }
 
99
};
 
100
 
 
101
struct expr_user_lock : private noncopyable {
 
102
  expr_user_lock(THD *thd, int timeout)
 
103
    : lck_key("handlersocket_wr", 16, &my_charset_latin1),
 
104
      lck_timeout(timeout),
 
105
      lck_func_get_lock(&lck_key, &lck_timeout),
 
106
      lck_func_release_lock(&lck_key)
 
107
  {
 
108
    lck_key.fix_fields(thd, 0);
 
109
    lck_timeout.fix_fields(thd, 0);
 
110
    lck_func_get_lock.fix_fields(thd, 0);
 
111
    lck_func_release_lock.fix_fields(thd, 0);
 
112
  }
 
113
  long long get_lock() {
 
114
    return lck_func_get_lock.val_int();
 
115
  }
 
116
  long long release_lock() {
 
117
    return lck_func_release_lock.val_int();
 
118
  }
 
119
 private:
 
120
  Item_string lck_key;
 
121
  Item_int lck_timeout;
 
122
  Item_func_get_lock lck_func_get_lock;
 
123
  Item_func_release_lock lck_func_release_lock;
 
124
};
 
125
 
 
126
struct dbcontext : public dbcontext_i, private noncopyable {
 
127
  dbcontext(volatile database *d, bool for_write);
 
128
  virtual ~dbcontext();
 
129
  virtual void init_thread(const void *stack_botton,
 
130
    volatile int& shutdown_flag);
 
131
  virtual void term_thread();
 
132
  virtual bool check_alive();
 
133
  virtual void lock_tables_if();
 
134
  virtual void unlock_tables_if();
 
135
  virtual bool get_commit_error();
 
136
  virtual void clear_error();
 
137
  virtual void close_tables_if();
 
138
  virtual void table_addref(size_t tbl_id);
 
139
  virtual void table_release(size_t tbl_id);
 
140
  virtual void cmd_open_index(dbcallback_i& cb, size_t pst_id, const char *dbn,
 
141
    const char *tbl, const char *idx, const char *retflds);
 
142
  virtual void cmd_exec_on_index(dbcallback_i& cb, const cmd_exec_args& args);
 
143
  virtual void set_statistics(size_t num_conns, size_t num_active);
 
144
 private:
 
145
  int set_thread_message(const char *fmt, ...)
 
146
    __attribute__((format (printf, 2, 3)));
 
147
  void cmd_insert_internal(dbcallback_i& cb, const prep_stmt& pst,
 
148
    const string_ref *fvals, size_t fvalslen);
 
149
  void cmd_sql_internal(dbcallback_i& cb, const prep_stmt& pst,
 
150
    const string_ref *fvals, size_t fvalslen);
 
151
  void cmd_find_internal(dbcallback_i& cb, const prep_stmt& pst,
 
152
    ha_rkey_function find_flag, const cmd_exec_args& args);
 
153
  void resp_record(dbcallback_i& cb, TABLE *const table, const prep_stmt& pst);
 
154
  void dump_record(dbcallback_i& cb, TABLE *const table, const prep_stmt& pst);
 
155
  int modify_record(dbcallback_i& cb, TABLE *const table,
 
156
    const prep_stmt& pst, const cmd_exec_args& args, char mod_op,
 
157
    size_t& success_count);
 
158
 private:
 
159
  typedef std::vector<tablevec_entry> table_vec_type;
 
160
  typedef std::pair<std::string, std::string> table_name_type;
 
161
  typedef std::map<table_name_type, size_t> table_map_type;
 
162
 private:
 
163
  volatile database *const dbref;
 
164
  bool for_write_flag;
 
165
  THD *thd;
 
166
  MYSQL_LOCK *lock;
 
167
  bool lock_failed;
 
168
  std::auto_ptr<expr_user_lock> user_lock;
 
169
  int user_level_lock_timeout;
 
170
  bool user_level_lock_locked;
 
171
  bool commit_error;
 
172
  std::vector<char> info_message_buf;
 
173
  table_vec_type table_vec;
 
174
  table_map_type table_map;
 
175
  #if MYSQL_VERSION_ID >= 50505
 
176
  MDL_request *mdl_request;
 
177
  #else
 
178
  void *mdl_request;
 
179
  #endif
 
180
};
 
181
 
 
182
database::database(const config& c)
 
183
  : child_running(1), conf(c)
 
184
{
 
185
}
 
186
 
 
187
database::~database()
 
188
{
 
189
}
 
190
 
 
191
dbcontext_ptr
 
192
database::create_context(bool for_write) volatile
 
193
{
 
194
  return dbcontext_ptr(new dbcontext(this, for_write));
 
195
}
 
196
 
 
197
void
 
198
database::stop() volatile
 
199
{
 
200
  child_running = false;
 
201
}
 
202
 
 
203
const config&
 
204
database::get_conf() const volatile
 
205
{
 
206
  return const_cast<const config&>(conf);
 
207
}
 
208
 
 
209
database_ptr
 
210
database_i::create(const config& conf)
 
211
{
 
212
  return database_ptr(new database(conf));
 
213
}
 
214
 
 
215
dbcontext::dbcontext(volatile database *d, bool for_write)
 
216
  : dbref(d), for_write_flag(for_write), thd(0), lock(0), lock_failed(false),
 
217
    user_level_lock_timeout(0), user_level_lock_locked(false),
 
218
    commit_error(false), mdl_request(0)
 
219
{
 
220
  info_message_buf.resize(8192);
 
221
  user_level_lock_timeout = d->get_conf().get_int("wrlock_timeout", 12);
 
222
}
 
223
 
 
224
dbcontext::~dbcontext()
 
225
{
 
226
}
 
227
 
 
228
namespace {
 
229
 
 
230
int
 
231
wait_server_to_start(THD *thd, volatile int& shutdown_flag)
 
232
{
 
233
  int r = 0;
 
234
  DBG_SHUT(fprintf(stderr, "HNDSOCK wsts\n"));
 
235
  pthread_mutex_lock(&LOCK_server_started);
 
236
  while (!mysqld_server_started) {
 
237
    timespec abstime = { };
 
238
    set_timespec(abstime, 1);
 
239
    pthread_cond_timedwait(&COND_server_started, &LOCK_server_started,
 
240
      &abstime);
 
241
    pthread_mutex_unlock(&LOCK_server_started);
 
242
    pthread_mutex_lock(&thd->mysys_var->mutex);
 
243
    THD::killed_state st = thd->killed;
 
244
    pthread_mutex_unlock(&thd->mysys_var->mutex);
 
245
    DBG_SHUT(fprintf(stderr, "HNDSOCK wsts kst %d\n", (int)st));
 
246
    pthread_mutex_lock(&LOCK_server_started);
 
247
    if (st != THD::NOT_KILLED) {
 
248
      DBG_SHUT(fprintf(stderr, "HNDSOCK wsts kst %d break\n", (int)st));
 
249
      r = -1;
 
250
      break;
 
251
    }
 
252
    if (shutdown_flag) {
 
253
      DBG_SHUT(fprintf(stderr, "HNDSOCK wsts kst shut break\n"));
 
254
      r = -1;
 
255
      break;
 
256
    }
 
257
  }
 
258
  pthread_mutex_unlock(&LOCK_server_started);
 
259
  DBG_SHUT(fprintf(stderr, "HNDSOCK wsts done\n"));
 
260
  return r;
 
261
}
 
262
 
 
263
}; // namespace
 
264
 
 
265
void
 
266
dbcontext::init_thread(const void *stack_bottom, volatile int& shutdown_flag)
 
267
{
 
268
  DBG_THR(fprintf(stderr, "HNDSOCK init thread\n"));
 
269
  {
 
270
    my_thread_init();
 
271
    thd = new THD;
 
272
    thd->thread_stack = (char *)stack_bottom;
 
273
    DBG_THR(const size_t of = (char *)(&thd->thread_stack) - (char *)thd);
 
274
    DBG_THR(fprintf(stderr, "thread_stack = %p sz=%zu of=%zu\n",
 
275
      thd->thread_stack, sizeof(THD), of));
 
276
    thd->store_globals();
 
277
    thd->system_thread = static_cast<enum_thread_type>(1<<30UL);
 
278
    const NET v = { 0 };
 
279
    thd->net = v;
 
280
    if (for_write_flag) {
 
281
      #if MYSQL_VERSION_ID >= 50505
 
282
      thd->variables.option_bits |= OPTION_BIN_LOG;
 
283
      #else
 
284
      thd->options |= OPTION_BIN_LOG;
 
285
      #endif
 
286
      safeFree(thd->db);
 
287
      thd->db = 0;
 
288
      thd->db = my_strdup("handlersocket", MYF(0));
 
289
    }
 
290
    my_pthread_setspecific_ptr(THR_THD, thd);
 
291
    DBG_THR(fprintf(stderr, "HNDSOCK x0 %p\n", thd));
 
292
  }
 
293
  {
 
294
    pthread_mutex_lock(&LOCK_thread_count);
 
295
    thd->thread_id = thread_id++;
 
296
    threads.append(thd);
 
297
    ++thread_count;
 
298
    pthread_mutex_unlock(&LOCK_thread_count);
 
299
  }
 
300
 
 
301
  DBG_THR(fprintf(stderr, "HNDSOCK init thread wsts\n"));
 
302
  wait_server_to_start(thd, shutdown_flag);
 
303
  DBG_THR(fprintf(stderr, "HNDSOCK init thread done\n"));
 
304
 
 
305
  thd_proc_info(thd, &info_message_buf[0]);
 
306
  set_thread_message("hs:listening");
 
307
  DBG_THR(fprintf(stderr, "HNDSOCK x1 %p\n", thd));
 
308
 
 
309
  #if MYSQL_VERSION_ID >= 50505
 
310
  mdl_request = MDL_request::create(MDL_key::TABLE, "", "", for_write_flag ?
 
311
                  MDL_SHARED_WRITE : MDL_SHARED_READ, thd->mem_root);
 
312
  #endif
 
313
 
 
314
  lex_start(thd);
 
315
 
 
316
  user_lock.reset(new expr_user_lock(thd, user_level_lock_timeout));
 
317
}
 
318
 
 
319
int
 
320
dbcontext::set_thread_message(const char *fmt, ...)
 
321
{
 
322
  va_list ap;
 
323
  va_start(ap, fmt);
 
324
  const int n = vsnprintf(&info_message_buf[0], info_message_buf.size(),
 
325
    fmt, ap);
 
326
  va_end(ap);
 
327
  return n;
 
328
}
 
329
 
 
330
void
 
331
dbcontext::term_thread()
 
332
{
 
333
  DBG_THR(fprintf(stderr, "HNDSOCK thread end %p\n", thd));
 
334
  unlock_tables_if();
 
335
  my_pthread_setspecific_ptr(THR_THD, 0);
 
336
  {
 
337
    pthread_mutex_lock(&LOCK_thread_count);
 
338
    delete thd;
 
339
    thd = 0;
 
340
    --thread_count;
 
341
    pthread_mutex_unlock(&LOCK_thread_count);
 
342
    my_thread_end();
 
343
  }
 
344
}
 
345
 
 
346
bool
 
347
dbcontext::check_alive()
 
348
{
 
349
  pthread_mutex_lock(&thd->mysys_var->mutex);
 
350
  THD::killed_state st = thd->killed;
 
351
  pthread_mutex_unlock(&thd->mysys_var->mutex);
 
352
  DBG_SHUT(fprintf(stderr, "chk HNDSOCK kst %p %p %d %zu\n", thd, &thd->killed,
 
353
    (int)st, sizeof(*thd)));
 
354
  if (st != THD::NOT_KILLED) {
 
355
    DBG_SHUT(fprintf(stderr, "chk HNDSOCK kst %d break\n", (int)st));
 
356
    return false;
 
357
  }
 
358
  return true;
 
359
}
 
360
 
 
361
void
 
362
dbcontext::lock_tables_if()
 
363
{
 
364
  if (lock_failed) {
 
365
    return;
 
366
  }
 
367
  if (for_write_flag && !user_level_lock_locked) {
 
368
    if (user_lock->get_lock()) {
 
369
      user_level_lock_locked = true;
 
370
    } else {
 
371
      lock_failed = true;
 
372
      return;
 
373
    }
 
374
  }
 
375
  if (lock == 0) {
 
376
    const size_t num_max = table_vec.size();
 
377
    TABLE *tables[num_max ? num_max : 1]; /* GNU */
 
378
    size_t num_open = 0;
 
379
    for (size_t i = 0; i < num_max; ++i) {
 
380
      if (table_vec[i].refcount > 0) {
 
381
        tables[num_open++] = table_vec[i].table;
 
382
      }
 
383
    }
 
384
    #if MYSQL_VERSION_ID >= 50505
 
385
    lock = thd->lock = mysql_lock_tables(thd, &tables[0], num_open, 0);
 
386
    #else
 
387
    bool need_reopen= false;
 
388
    lock = thd->lock = mysql_lock_tables(thd, &tables[0], num_open,
 
389
      MYSQL_LOCK_NOTIFY_IF_NEED_REOPEN, &need_reopen);
 
390
    #endif
 
391
    statistic_increment(lock_tables_count, &LOCK_status);
 
392
    thd_proc_info(thd, &info_message_buf[0]);
 
393
    DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK lock tables %p %p %zu %zu\n",
 
394
      thd, lock, num_max, num_open));
 
395
    if (lock == 0) {
 
396
      lock_failed = true;
 
397
      DENA_VERBOSE(10, fprintf(stderr, "HNDSOCK failed to lock tables %p\n",
 
398
        thd));
 
399
    }
 
400
    if (for_write_flag) {
 
401
      #if MYSQL_VERSION_ID >= 50505
 
402
      thd->set_current_stmt_binlog_format_row();
 
403
      #else
 
404
      thd->current_stmt_binlog_row_based = 1;
 
405
      #endif
 
406
    }
 
407
  }
 
408
  DBG_LOCK(fprintf(stderr, "HNDSOCK tblnum=%d\n", (int)tblnum));
 
409
}
 
410
 
 
411
void
 
412
dbcontext::unlock_tables_if()
 
413
{
 
414
  if (lock != 0) {
 
415
    DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK unlock tables\n"));
 
416
    if (for_write_flag) {
 
417
      bool suc = true;
 
418
      #if MYSQL_VERSION_ID >= 50505
 
419
      suc = (trans_commit_stmt(thd) == 0);
 
420
      #else
 
421
      suc = (ha_autocommit_or_rollback(thd, 0) == 0);
 
422
      #endif
 
423
      if (!suc) {
 
424
        commit_error = true;
 
425
        DENA_VERBOSE(10, fprintf(stderr,
 
426
          "HNDSOCK unlock tables: commit failed\n"));
 
427
      }
 
428
    }
 
429
    mysql_unlock_tables(thd, lock);
 
430
    lock = thd->lock = 0;
 
431
    statistic_increment(unlock_tables_count, &LOCK_status);
 
432
  }
 
433
  if (user_level_lock_locked) {
 
434
    if (user_lock->release_lock()) {
 
435
      user_level_lock_locked = false;
 
436
    }
 
437
  }
 
438
}
 
439
 
 
440
bool
 
441
dbcontext::get_commit_error()
 
442
{
 
443
  return commit_error;
 
444
}
 
445
 
 
446
void
 
447
dbcontext::clear_error()
 
448
{
 
449
  lock_failed = false;
 
450
  commit_error = false;
 
451
}
 
452
 
 
453
void
 
454
dbcontext::close_tables_if()
 
455
{
 
456
  unlock_tables_if();
 
457
  if (!table_vec.empty()) {
 
458
    DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK close tables\n"));
 
459
    close_thread_tables(thd);
 
460
    statistic_increment(close_tables_count, &LOCK_status);
 
461
    table_vec.clear();
 
462
    table_map.clear();
 
463
  }
 
464
}
 
465
 
 
466
void
 
467
dbcontext::table_addref(size_t tbl_id)
 
468
{
 
469
  table_vec[tbl_id].refcount += 1;
 
470
  DBG_REFCNT(fprintf(stderr, "%p %zu %zu addref\n", this, tbl_id,
 
471
    table_vec[tbl_id].refcount));
 
472
}
 
473
 
 
474
void
 
475
dbcontext::table_release(size_t tbl_id)
 
476
{
 
477
  table_vec[tbl_id].refcount -= 1;
 
478
  DBG_REFCNT(fprintf(stderr, "%p %zu %zu release\n", this, tbl_id,
 
479
    table_vec[tbl_id].refcount));
 
480
}
 
481
 
 
482
void
 
483
dbcontext::resp_record(dbcallback_i& cb, TABLE *const table,
 
484
  const prep_stmt& pst)
 
485
{
 
486
  char rwpstr_buf[64];
 
487
  String rwpstr(rwpstr_buf, sizeof(rwpstr_buf), &my_charset_bin);
 
488
  const prep_stmt::retfields_type& rf = pst.get_retfields();
 
489
  const size_t n = rf.size();
 
490
  for (size_t i = 0; i < n; ++i) {
 
491
    uint32_t fn = rf[i];
 
492
    Field *const fld = table->field[fn];
 
493
    DBG_FLD(fprintf(stderr, "fld=%p %zu\n", fld, fn));
 
494
    if (fld->is_null()) {
 
495
      /* null */
 
496
      cb.dbcb_resp_entry(0, 0);
 
497
    } else {
 
498
      fld->val_str(&rwpstr, &rwpstr);
 
499
      const size_t len = rwpstr.length();
 
500
      if (len != 0) {
 
501
        /* non-empty */
 
502
        cb.dbcb_resp_entry(rwpstr.ptr(), rwpstr.length());
 
503
      } else {
 
504
        /* empty */
 
505
        static const char empty_str[] = "";
 
506
        cb.dbcb_resp_entry(empty_str, 0);
 
507
      }
 
508
    }
 
509
  }
 
510
}
 
511
 
 
512
void
 
513
dbcontext::dump_record(dbcallback_i& cb, TABLE *const table,
 
514
  const prep_stmt& pst)
 
515
{
 
516
  char rwpstr_buf[64];
 
517
  String rwpstr(rwpstr_buf, sizeof(rwpstr_buf), &my_charset_bin);
 
518
  const prep_stmt::retfields_type& rf = pst.get_retfields();
 
519
  const size_t n = rf.size();
 
520
  for (size_t i = 0; i < n; ++i) {
 
521
    uint32_t fn = rf[i];
 
522
    Field *const fld = table->field[fn];
 
523
    if (fld->is_null()) {
 
524
      /* null */
 
525
      cb.dbcb_resp_entry(0, 0);
 
526
      fprintf(stderr, "NULL");
 
527
    } else {
 
528
      fld->val_str(&rwpstr, &rwpstr);
 
529
      const std::string s(rwpstr.ptr(), rwpstr.length());
 
530
      fprintf(stderr, "[%s]", s.c_str());
 
531
    }
 
532
  }
 
533
  fprintf(stderr, "\n");
 
534
}
 
535
 
 
536
int
 
537
dbcontext::modify_record(dbcallback_i& cb, TABLE *const table,
 
538
  const prep_stmt& pst, const cmd_exec_args& args, char mod_op,
 
539
  size_t& success_count)
 
540
{
 
541
  if (mod_op == 'U') {
 
542
    /* update */
 
543
    handler *const hnd = table->file;
 
544
    uchar *const buf = table->record[0];
 
545
    store_record(table, record[1]);
 
546
    const prep_stmt::retfields_type& rf = pst.get_retfields();
 
547
    const size_t n = rf.size();
 
548
    for (size_t i = 0; i < n; ++i) {
 
549
      const string_ref& nv = args.uvals[i];
 
550
      uint32_t fn = rf[i];
 
551
      Field *const fld = table->field[fn];
 
552
      fld->store(nv.begin(), nv.size(), &my_charset_bin);
 
553
    }
 
554
    memset(buf, 0, table->s->null_bytes); /* TODO: allow NULL */
 
555
    const int r = hnd->ha_update_row(table->record[1], buf);
 
556
    if (r != 0 && r != HA_ERR_RECORD_IS_THE_SAME) {
 
557
      return r;
 
558
    }
 
559
    ++success_count;
 
560
  } else if (mod_op == 'D') {
 
561
    /* delete */
 
562
    handler *const hnd = table->file;
 
563
    const int r = hnd->ha_delete_row(table->record[0]);
 
564
    if (r != 0) {
 
565
      return r;
 
566
    }
 
567
    ++success_count;
 
568
  }
 
569
  return 0;
 
570
}
 
571
 
 
572
void
 
573
dbcontext::cmd_insert_internal(dbcallback_i& cb, const prep_stmt& pst,
 
574
  const string_ref *fvals, size_t fvalslen)
 
575
{
 
576
  if (!for_write_flag) {
 
577
    return cb.dbcb_resp_short(2, "readonly");
 
578
  }
 
579
  lock_tables_if();
 
580
  if (lock == 0) {
 
581
    return cb.dbcb_resp_short(2, "lock_tables");
 
582
  }
 
583
  if (pst.get_table_id() >= table_vec.size()) {
 
584
    return cb.dbcb_resp_short(2, "tblnum");
 
585
  }
 
586
  TABLE *const table = table_vec[pst.get_table_id()].table;
 
587
  handler *const hnd = table->file;
 
588
  uchar *const buf = table->record[0];
 
589
  empty_record(table);
 
590
  Field **fld = table->field;
 
591
  size_t i = 0;
 
592
  for (; *fld && i < fvalslen; ++fld, ++i) {
 
593
    (*fld)->store(fvals[i].begin(), fvals[i].size(), &my_charset_bin);
 
594
  }
 
595
  memset(buf, 0, table->s->null_bytes); /* TODO: allow NULL */
 
596
  const int r = hnd->ha_write_row(buf);
 
597
  return cb.dbcb_resp_short(r != 0 ? 1 : 0, "");
 
598
}
 
599
 
 
600
void
 
601
dbcontext::cmd_sql_internal(dbcallback_i& cb, const prep_stmt& pst,
 
602
  const string_ref *fvals, size_t fvalslen)
 
603
{
 
604
  if (fvalslen < 1) {
 
605
    return cb.dbcb_resp_short(2, "syntax");
 
606
  }
 
607
  return cb.dbcb_resp_short(2, "notimpl");
 
608
}
 
609
 
 
610
void
 
611
dbcontext::cmd_find_internal(dbcallback_i& cb, const prep_stmt& pst,
 
612
  ha_rkey_function find_flag, const cmd_exec_args& args)
 
613
{
 
614
  const bool debug_out = (verbose_level >= 100);
 
615
  const bool modify_op_flag = (args.mod_op.size() != 0);
 
616
  char mod_op = 0;
 
617
  if (modify_op_flag && !for_write_flag) {
 
618
    return cb.dbcb_resp_short(2, "readonly");
 
619
  }
 
620
  if (modify_op_flag) {
 
621
    mod_op = args.mod_op.begin()[0];
 
622
    if (mod_op != 'U' && mod_op != 'D') {
 
623
      return cb.dbcb_resp_short(2, "modop");
 
624
    }
 
625
  }
 
626
  lock_tables_if();
 
627
  if (lock == 0) {
 
628
    return cb.dbcb_resp_short(2, "lock_tables");
 
629
  }
 
630
  if (pst.get_table_id() >= table_vec.size()) {
 
631
    return cb.dbcb_resp_short(2, "tblnum");
 
632
  }
 
633
  TABLE *const table = table_vec[pst.get_table_id()].table;
 
634
  if (pst.get_idxnum() >= table->s->keys) {
 
635
    return cb.dbcb_resp_short(2, "idxnum");
 
636
  }
 
637
  KEY& kinfo = table->key_info[pst.get_idxnum()];
 
638
  if (args.kvalslen > kinfo.key_parts) {
 
639
    return cb.dbcb_resp_short(2, "kpnum");
 
640
  }
 
641
  uchar key_buf[kinfo.key_length]; /* GNU */
 
642
  size_t kplen_sum = 0;
 
643
  {
 
644
    DBG_KEY(fprintf(stderr, "SLOW\n"));
 
645
    for (size_t i = 0; i < args.kvalslen; ++i) {
 
646
      const KEY_PART_INFO & kpt = kinfo.key_part[i];
 
647
      const string_ref& kval = args.kvals[i];
 
648
      if (kval.begin() == 0) {
 
649
        kpt.field->set_null();
 
650
      } else {
 
651
        kpt.field->set_notnull();
 
652
      }
 
653
      kpt.field->store(kval.begin(), kval.size(), &my_charset_bin);
 
654
      kplen_sum += kpt.length;
 
655
    }
 
656
    key_copy(key_buf, table->record[0], &kinfo, kplen_sum);
 
657
  }
 
658
  table->read_set = &table->s->all_set;
 
659
  handler *const hnd = table->file;
 
660
  if (!for_write_flag) {
 
661
    hnd->init_table_handle_for_HANDLER();
 
662
  }
 
663
  hnd->ha_index_or_rnd_end();
 
664
  hnd->ha_index_init(pst.get_idxnum(), 1);
 
665
  #if 0
 
666
  statistic_increment(index_exec_count, &LOCK_status);
 
667
  #endif
 
668
  if (!modify_op_flag) {
 
669
    cb.dbcb_resp_begin(pst.get_retfields().size());
 
670
  } else {
 
671
    /* nothing to do */
 
672
  }
 
673
  const uint32_t limit = args.limit ? args.limit : 1;
 
674
  uint32_t skip = args.skip;
 
675
  size_t mod_success_count = 0;
 
676
  int r = 0;
 
677
  for (uint32_t i = 0; i < limit + skip; ++i) {
 
678
    if (i == 0) {
 
679
      const key_part_map kpm = (1U << args.kvalslen) - 1;
 
680
      r = hnd->index_read_map(table->record[0], key_buf, kpm, find_flag);
 
681
    } else {
 
682
      switch (find_flag) {
 
683
      case HA_READ_BEFORE_KEY:
 
684
      case HA_READ_KEY_OR_PREV:
 
685
        r = hnd->index_prev(table->record[0]);
 
686
        break;
 
687
      case HA_READ_AFTER_KEY:
 
688
      case HA_READ_KEY_OR_NEXT:
 
689
        r = hnd->index_next(table->record[0]);
 
690
        break;
 
691
      case HA_READ_KEY_EXACT:
 
692
        r = hnd->index_next_same(table->record[0], key_buf, kplen_sum);
 
693
        break;
 
694
      default:
 
695
        r = HA_ERR_END_OF_FILE; /* to finish the loop */
 
696
        break;
 
697
      }
 
698
    }
 
699
    if (debug_out) {
 
700
      fprintf(stderr, "r=%d\n", r);
 
701
      if (r == 0 || r == HA_ERR_RECORD_DELETED) { 
 
702
        dump_record(cb, table, pst);
 
703
      }
 
704
    }
 
705
    if (r != 0) {
 
706
      /* no-count */
 
707
    } else if (skip > 0) {
 
708
      --skip;
 
709
    } else {
 
710
      if (!modify_op_flag) {
 
711
        resp_record(cb, table, pst);
 
712
      } else {
 
713
        r = modify_record(cb, table, pst, args, mod_op, mod_success_count);
 
714
      }
 
715
    }
 
716
    if (r != 0 && r != HA_ERR_RECORD_DELETED) {
 
717
      break;
 
718
    }
 
719
  }
 
720
  hnd->ha_index_or_rnd_end();
 
721
  if (r != 0 && r != HA_ERR_RECORD_DELETED && r != HA_ERR_KEY_NOT_FOUND &&
 
722
    r != HA_ERR_END_OF_FILE) {
 
723
    /* failed */
 
724
    if (!modify_op_flag) {
 
725
      /* revert dbcb_resp_begin() and dbcb_resp_entry() */
 
726
      cb.dbcb_resp_cancel();
 
727
    }
 
728
    cb.dbcb_resp_short_num(2, r);
 
729
  } else {
 
730
    /* succeeded */
 
731
    if (!modify_op_flag) {
 
732
      cb.dbcb_resp_end();
 
733
    } else {
 
734
      cb.dbcb_resp_short_num(0, mod_success_count);
 
735
    }
 
736
  }
 
737
}
 
738
 
 
739
void
 
740
dbcontext::cmd_open_index(dbcallback_i& cb, size_t pst_id, const char *dbn,
 
741
  const char *tbl, const char *idx, const char *retflds)
 
742
{
 
743
  unlock_tables_if();
 
744
  const table_name_type k = std::make_pair(std::string(dbn), std::string(tbl));
 
745
  const table_map_type::const_iterator iter = table_map.find(k);
 
746
  uint32_t tblnum = 0;
 
747
  if (iter != table_map.end()) {
 
748
    tblnum = iter->second;
 
749
    DBG_CMP(fprintf(stderr, "HNDSOCK k=%s tblnum=%d\n", k.c_str(),
 
750
      (int)tblnum));
 
751
  } else {
 
752
    TABLE_LIST tables;
 
753
    TABLE *table = 0;
 
754
    bool refresh = true;
 
755
    const thr_lock_type lock_type = for_write_flag ? TL_WRITE : TL_READ;
 
756
    #if MYSQL_VERSION_ID >= 50505
 
757
    tables.init_one_table(dbn, strlen(dbn), tbl, strlen(tbl), tbl,
 
758
      lock_type);
 
759
    tables.mdl_request = mdl_request;
 
760
    Open_table_context ot_act(thd, MYSQL_OPEN_REOPEN);
 
761
    if (!open_table(thd, &tables, thd->mem_root, &ot_act)) {
 
762
      table = tables.table;
 
763
    }
 
764
    #else
 
765
    tables.init_one_table(dbn, tbl, lock_type);
 
766
    table = open_table(thd, &tables, thd->mem_root, &refresh,
 
767
      OPEN_VIEW_NO_PARSE);
 
768
    #endif
 
769
    if (table == 0) {
 
770
      DENA_VERBOSE(10, fprintf(stderr,
 
771
        "HNDSOCK failed to open %p [%s] [%s] [%d]\n",
 
772
        thd, dbn, tbl, static_cast<int>(refresh)));
 
773
      return cb.dbcb_resp_short(2, "open_table");
 
774
    }
 
775
    statistic_increment(open_tables_count, &LOCK_status);
 
776
    table->reginfo.lock_type = lock_type;
 
777
    table->use_all_columns();
 
778
    tblnum = table_vec.size();
 
779
    tablevec_entry e;
 
780
    e.table = table;
 
781
    table_vec.push_back(e);
 
782
    table_map[k] = tblnum;
 
783
  }
 
784
  size_t idxnum = static_cast<size_t>(-1);
 
785
  if (idx[0] >= '0' && idx[0] <= '9') {
 
786
    /* numeric */
 
787
    TABLE *const table = table_vec[tblnum].table;
 
788
    idxnum = atoi(idx);
 
789
    if (idxnum >= table->s->keys) {
 
790
      return cb.dbcb_resp_short(2, "idxnum");
 
791
    }
 
792
  } else {
 
793
    const char *const idx_name_to_open = idx[0]  == '\0' ? "PRIMARY" : idx;
 
794
    TABLE *const table = table_vec[tblnum].table;
 
795
    for (uint i = 0; i < table->s->keys; ++i) {
 
796
      KEY& kinfo = table->key_info[i];
 
797
      if (strcmp(kinfo.name, idx_name_to_open) == 0) {
 
798
        idxnum = i;
 
799
        break;
 
800
      }
 
801
    }
 
802
  }
 
803
  if (idxnum == size_t(-1)) {
 
804
    return cb.dbcb_resp_short(2, "idxnum");
 
805
  }
 
806
  string_ref retflds_sr(retflds, strlen(retflds));
 
807
  std::vector<string_ref> fldnms;
 
808
  if (retflds_sr.size() != 0) {
 
809
    split(',', retflds_sr, fldnms);
 
810
  }
 
811
  prep_stmt::retfields_type rf;
 
812
  for (size_t i = 0; i < fldnms.size(); ++i) {
 
813
    TABLE *const table = table_vec[tblnum].table;
 
814
    Field **fld = 0;
 
815
    size_t j = 0;
 
816
    for (fld = table->field; *fld; ++fld, ++j) {
 
817
      DBG_FLD(fprintf(stderr, "f %s\n", (*fld)->field_name));
 
818
      string_ref fn((*fld)->field_name, strlen((*fld)->field_name));
 
819
      if (fn == fldnms[i]) {
 
820
        break;
 
821
      }
 
822
    }
 
823
    if (*fld == 0) {
 
824
      DBG_FLD(fprintf(stderr, "UNKNOWN FLD %s [%s]\n", retflds,
 
825
        std::string(fldnms[i].begin(), fldnms[i].size()).c_str()));
 
826
      return cb.dbcb_resp_short(2, "fld");
 
827
    }
 
828
    DBG_FLD(fprintf(stderr, "FLD %s %zu\n", (*fld)->field_name, j));
 
829
    rf.push_back(j);
 
830
  }
 
831
  prep_stmt p(this, tblnum, idxnum, rf);
 
832
  cb.dbcb_set_prep_stmt(pst_id, p);
 
833
  return cb.dbcb_resp_short(0, "");
 
834
}
 
835
 
 
836
enum db_write_op {
 
837
  db_write_op_none = 0,
 
838
  db_write_op_insert = 1,
 
839
  db_write_op_sql = 2,
 
840
};
 
841
 
 
842
void
 
843
dbcontext::cmd_exec_on_index(dbcallback_i& cb, const cmd_exec_args& args)
 
844
{
 
845
  const prep_stmt& p = *args.pst;
 
846
  if (p.get_table_id() == static_cast<size_t>(-1)) {
 
847
    return cb.dbcb_resp_short(2, "stmtnum");
 
848
  }
 
849
  ha_rkey_function find_flag = HA_READ_KEY_EXACT;
 
850
  db_write_op wrop = db_write_op_none;
 
851
  if (args.op.size() == 1) {
 
852
    switch (args.op.begin()[0]) {
 
853
    case '=':
 
854
      find_flag = HA_READ_KEY_EXACT;
 
855
      break;
 
856
    case '>':
 
857
      find_flag = HA_READ_AFTER_KEY;
 
858
      break;
 
859
    case '<':
 
860
      find_flag = HA_READ_BEFORE_KEY;
 
861
      break;
 
862
    case '+':
 
863
      wrop = db_write_op_insert;
 
864
      break;
 
865
    case 'S':
 
866
      wrop = db_write_op_sql;
 
867
      break;
 
868
    default:
 
869
      return cb.dbcb_resp_short(1, "op");
 
870
    }
 
871
  } else if (args.op.size() == 2 && args.op.begin()[1] == '=') {
 
872
    switch (args.op.begin()[0]) {
 
873
    case '>':
 
874
      find_flag = HA_READ_KEY_OR_NEXT;
 
875
      break;
 
876
    case '<':
 
877
      find_flag = HA_READ_KEY_OR_PREV;
 
878
      break;
 
879
    default:
 
880
      return cb.dbcb_resp_short(1, "op");
 
881
    }
 
882
  } else {
 
883
    return cb.dbcb_resp_short(1, "op");
 
884
  }
 
885
  if (args.kvalslen <= 0) {
 
886
    return cb.dbcb_resp_short(2, "klen");
 
887
  }
 
888
  switch (wrop) {
 
889
  case db_write_op_none:
 
890
    return cmd_find_internal(cb, p, find_flag, args);
 
891
  case db_write_op_insert:
 
892
    return cmd_insert_internal(cb, p, args.kvals, args.kvalslen);
 
893
  case db_write_op_sql:
 
894
    return cmd_sql_internal(cb, p, args.kvals, args.kvalslen);
 
895
  }
 
896
}
 
897
 
 
898
void
 
899
dbcontext::set_statistics(size_t num_conns, size_t num_active)
 
900
{
 
901
  thd_proc_info(thd, &info_message_buf[0]);
 
902
  if (for_write_flag) {
 
903
    set_thread_message("handlersocket: mode=wr, %zu conns, %zu active",
 
904
      num_conns, num_active);
 
905
  } else {
 
906
    set_thread_message("handlersocket: mode=rd, %zu conns, %zu active",
 
907
      num_conns, num_active);
 
908
  }
 
909
}
 
910
 
 
911
};
 
912