~percona-dev/percona-server/5.1.57-fix-bug723050

« back to all changes in this revision

Viewing changes to HandlerSocket-Plugin-for-MySQL/handlersocket/database.cpp

  • Committer: Alexey Kopytov
  • Date: 2011-04-20 08:52:35 UTC
  • mfrom: (222.1.3 5.1.56-tmp)
  • Revision ID: akopytov@gmail.com-20110420085235-vh7g8vx04y4m6cno
MergeĀ fromĀ lp:percona-server

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
#define DBG_THR(x)
22
22
#define DBG_CMP(x)
23
23
#define DBG_FLD(x)
 
24
#define DBG_FILTER(x)
24
25
#define DBG_REFCNT(x)
 
26
#define DBG_KEYLEN(x)
25
27
#define DBG_DELETED
26
28
 
27
29
/* status variables */
39
41
{
40
42
}
41
43
prep_stmt::prep_stmt(dbcontext_i *c, size_t tbl, size_t idx,
42
 
  const retfields_type& rf)
43
 
  : dbctx(c), table_id(tbl), idxnum(idx), retfields(rf)
 
44
  const fields_type& rf, const fields_type& ff)
 
45
  : dbctx(c), table_id(tbl), idxnum(idx), ret_fields(rf), filter_fields(ff)
44
46
{
45
47
  if (dbctx) {
46
48
    dbctx->table_addref(table_id);
55
57
 
56
58
prep_stmt::prep_stmt(const prep_stmt& x)
57
59
  : dbctx(x.dbctx), table_id(x.table_id), idxnum(x.idxnum),
58
 
  retfields(x.retfields)
 
60
  ret_fields(x.ret_fields), filter_fields(x.filter_fields)
59
61
{
60
62
  if (dbctx) {
61
63
    dbctx->table_addref(table_id);
72
74
    dbctx = x.dbctx;
73
75
    table_id = x.table_id;
74
76
    idxnum = x.idxnum;
75
 
    retfields = x.retfields;
 
77
    ret_fields = x.ret_fields;
 
78
    filter_fields = x.filter_fields;
76
79
    if (dbctx) {
77
80
      dbctx->table_addref(table_id);
78
81
    }
95
98
struct tablevec_entry {
96
99
  TABLE *table;
97
100
  size_t refcount;
98
 
  tablevec_entry() : table(0), refcount(0) { }
 
101
  bool modified;
 
102
  tablevec_entry() : table(0), refcount(0), modified(false) { }
99
103
};
100
104
 
101
105
struct expr_user_lock : private noncopyable {
137
141
  virtual void close_tables_if();
138
142
  virtual void table_addref(size_t tbl_id);
139
143
  virtual void table_release(size_t tbl_id);
140
 
  virtual void cmd_open_index(dbcallback_i& cb, size_t pst_id, const char *dbn,
141
 
    const char *tbl, const char *idx, const char *retflds);
142
 
  virtual void cmd_exec_on_index(dbcallback_i& cb, const cmd_exec_args& args);
 
144
  virtual void cmd_open(dbcallback_i& cb, const cmd_open_args& args);
 
145
  virtual void cmd_exec(dbcallback_i& cb, const cmd_exec_args& args);
143
146
  virtual void set_statistics(size_t num_conns, size_t num_active);
144
147
 private:
145
148
  int set_thread_message(const char *fmt, ...)
146
149
    __attribute__((format (printf, 2, 3)));
 
150
  bool parse_fields(TABLE *const table, const char *str,
 
151
    prep_stmt::fields_type& flds);
147
152
  void cmd_insert_internal(dbcallback_i& cb, const prep_stmt& pst,
148
153
    const string_ref *fvals, size_t fvalslen);
149
154
  void cmd_sql_internal(dbcallback_i& cb, const prep_stmt& pst,
150
155
    const string_ref *fvals, size_t fvalslen);
151
156
  void cmd_find_internal(dbcallback_i& cb, const prep_stmt& pst,
152
157
    ha_rkey_function find_flag, const cmd_exec_args& args);
 
158
  size_t calc_filter_buf_size(TABLE *table, const prep_stmt& pst,
 
159
    const record_filter *filters);
 
160
  bool fill_filter_buf(TABLE *table, const prep_stmt& pst,
 
161
    const record_filter *filters, uchar *filter_buf, size_t len);
 
162
  int check_filter(dbcallback_i& cb, TABLE *table, const prep_stmt& pst,
 
163
    const record_filter *filters, const uchar *filter_buf);
153
164
  void resp_record(dbcallback_i& cb, TABLE *const table, const prep_stmt& pst);
154
165
  void dump_record(dbcallback_i& cb, TABLE *const table, const prep_stmt& pst);
155
166
  int modify_record(dbcallback_i& cb, TABLE *const table,
156
167
    const prep_stmt& pst, const cmd_exec_args& args, char mod_op,
157
 
    size_t& success_count);
 
168
    size_t& modified_count);
158
169
 private:
159
170
  typedef std::vector<tablevec_entry> table_vec_type;
160
171
  typedef std::pair<std::string, std::string> table_name_type;
172
183
  std::vector<char> info_message_buf;
173
184
  table_vec_type table_vec;
174
185
  table_map_type table_map;
175
 
  #if MYSQL_VERSION_ID >= 50505
176
 
  MDL_request *mdl_request;
177
 
  #else
178
 
  void *mdl_request;
179
 
  #endif
180
186
};
181
187
 
182
188
database::database(const config& c)
215
221
dbcontext::dbcontext(volatile database *d, bool for_write)
216
222
  : dbref(d), for_write_flag(for_write), thd(0), lock(0), lock_failed(false),
217
223
    user_level_lock_timeout(0), user_level_lock_locked(false),
218
 
    commit_error(false), mdl_request(0)
 
224
    commit_error(false)
219
225
{
220
226
  info_message_buf.resize(8192);
221
227
  user_level_lock_timeout = d->get_conf().get_int("wrlock_timeout", 12);
262
268
 
263
269
}; // namespace
264
270
 
 
271
#define DENA_THR_OFFSETOF(fld) ((char *)(&thd->fld) - (char *)thd)
 
272
 
265
273
void
266
274
dbcontext::init_thread(const void *stack_bottom, volatile int& shutdown_flag)
267
275
{
270
278
    my_thread_init();
271
279
    thd = new THD;
272
280
    thd->thread_stack = (char *)stack_bottom;
273
 
    DBG_THR(const size_t of = (char *)(&thd->thread_stack) - (char *)thd);
274
 
    DBG_THR(fprintf(stderr, "thread_stack = %p sz=%zu of=%zu\n",
275
 
      thd->thread_stack, sizeof(THD), of));
 
281
    DBG_THR(fprintf(stderr,
 
282
      "thread_stack = %p sizeof(THD)=%zu sizeof(mtx)=%zu "
 
283
      "O: %zu %zu %zu %zu %zu %zu %zu\n",
 
284
      thd->thread_stack, sizeof(THD), sizeof(LOCK_thread_count),
 
285
      DENA_THR_OFFSETOF(mdl_context),
 
286
      DENA_THR_OFFSETOF(net),
 
287
      DENA_THR_OFFSETOF(LOCK_thd_data),
 
288
      DENA_THR_OFFSETOF(mysys_var),
 
289
      DENA_THR_OFFSETOF(stmt_arena),
 
290
      DENA_THR_OFFSETOF(limit_found_rows),
 
291
      DENA_THR_OFFSETOF(locked_tables_list)));
276
292
    thd->store_globals();
277
293
    thd->system_thread = static_cast<enum_thread_type>(1<<30UL);
278
294
    const NET v = { 0 };
306
322
  set_thread_message("hs:listening");
307
323
  DBG_THR(fprintf(stderr, "HNDSOCK x1 %p\n", thd));
308
324
 
309
 
  #if MYSQL_VERSION_ID >= 50505
310
 
  mdl_request = MDL_request::create(MDL_key::TABLE, "", "", for_write_flag ?
311
 
                  MDL_SHARED_WRITE : MDL_SHARED_READ, thd->mem_root);
312
 
  #endif
313
 
 
314
325
  lex_start(thd);
315
326
 
316
327
  user_lock.reset(new expr_user_lock(thd, user_level_lock_timeout));
374
385
  }
375
386
  if (lock == 0) {
376
387
    const size_t num_max = table_vec.size();
377
 
    TABLE *tables[num_max ? num_max : 1]; /* GNU */
 
388
    TABLE **const tables = DENA_ALLOCA_ALLOCATE(TABLE *, num_max + 1);
378
389
    size_t num_open = 0;
379
390
    for (size_t i = 0; i < num_max; ++i) {
380
391
      if (table_vec[i].refcount > 0) {
381
392
        tables[num_open++] = table_vec[i].table;
382
393
      }
 
394
      table_vec[i].modified = false;
383
395
    }
384
396
    #if MYSQL_VERSION_ID >= 50505
385
397
    lock = thd->lock = mysql_lock_tables(thd, &tables[0], num_open, 0);
404
416
      thd->current_stmt_binlog_row_based = 1;
405
417
      #endif
406
418
    }
 
419
    DENA_ALLOCA_FREE(tables);
407
420
  }
408
421
  DBG_LOCK(fprintf(stderr, "HNDSOCK tblnum=%d\n", (int)tblnum));
409
422
}
412
425
dbcontext::unlock_tables_if()
413
426
{
414
427
  if (lock != 0) {
415
 
    DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK unlock tables\n"));
 
428
    DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK unlock tables %p %p\n",
 
429
      thd, thd->lock));
416
430
    if (for_write_flag) {
 
431
      for (size_t i = 0; i < table_vec.size(); ++i) {
 
432
        if (table_vec[i].modified) {
 
433
          query_cache_invalidate3(thd, table_vec[i].table, 1);
 
434
          table_vec[i].table->file->ha_release_auto_increment();
 
435
        }
 
436
      }
 
437
    }
 
438
    {
417
439
      bool suc = true;
418
440
      #if MYSQL_VERSION_ID >= 50505
419
441
      suc = (trans_commit_stmt(thd) == 0);
454
476
dbcontext::close_tables_if()
455
477
{
456
478
  unlock_tables_if();
 
479
  DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK close tables\n"));
 
480
  close_thread_tables(thd);
 
481
  #if MYSQL_VERSION_ID >= 50505
 
482
  thd->mdl_context.release_transactional_locks();
 
483
  #endif
457
484
  if (!table_vec.empty()) {
458
 
    DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK close tables\n"));
459
 
    close_thread_tables(thd);
460
485
    statistic_increment(close_tables_count, &LOCK_status);
461
486
    table_vec.clear();
462
487
    table_map.clear();
485
510
{
486
511
  char rwpstr_buf[64];
487
512
  String rwpstr(rwpstr_buf, sizeof(rwpstr_buf), &my_charset_bin);
488
 
  const prep_stmt::retfields_type& rf = pst.get_retfields();
 
513
  const prep_stmt::fields_type& rf = pst.get_ret_fields();
489
514
  const size_t n = rf.size();
490
515
  for (size_t i = 0; i < n; ++i) {
491
516
    uint32_t fn = rf[i];
515
540
{
516
541
  char rwpstr_buf[64];
517
542
  String rwpstr(rwpstr_buf, sizeof(rwpstr_buf), &my_charset_bin);
518
 
  const prep_stmt::retfields_type& rf = pst.get_retfields();
 
543
  const prep_stmt::fields_type& rf = pst.get_ret_fields();
519
544
  const size_t n = rf.size();
520
545
  for (size_t i = 0; i < n; ++i) {
521
546
    uint32_t fn = rf[i];
522
547
    Field *const fld = table->field[fn];
523
548
    if (fld->is_null()) {
524
549
      /* null */
525
 
      cb.dbcb_resp_entry(0, 0);
526
550
      fprintf(stderr, "NULL");
527
551
    } else {
528
552
      fld->val_str(&rwpstr, &rwpstr);
536
560
int
537
561
dbcontext::modify_record(dbcallback_i& cb, TABLE *const table,
538
562
  const prep_stmt& pst, const cmd_exec_args& args, char mod_op,
539
 
  size_t& success_count)
 
563
  size_t& modified_count)
540
564
{
541
565
  if (mod_op == 'U') {
542
566
    /* update */
543
567
    handler *const hnd = table->file;
544
568
    uchar *const buf = table->record[0];
545
569
    store_record(table, record[1]);
546
 
    const prep_stmt::retfields_type& rf = pst.get_retfields();
 
570
    const prep_stmt::fields_type& rf = pst.get_ret_fields();
547
571
    const size_t n = rf.size();
548
572
    for (size_t i = 0; i < n; ++i) {
549
573
      const string_ref& nv = args.uvals[i];
550
574
      uint32_t fn = rf[i];
551
575
      Field *const fld = table->field[fn];
552
 
      fld->store(nv.begin(), nv.size(), &my_charset_bin);
 
576
      if (nv.begin() == 0) {
 
577
        fld->set_null();
 
578
      } else {
 
579
        fld->set_notnull();
 
580
        fld->store(nv.begin(), nv.size(), &my_charset_bin);
 
581
      }
553
582
    }
554
 
    memset(buf, 0, table->s->null_bytes); /* TODO: allow NULL */
 
583
    table_vec[pst.get_table_id()].modified = true;
555
584
    const int r = hnd->ha_update_row(table->record[1], buf);
556
585
    if (r != 0 && r != HA_ERR_RECORD_IS_THE_SAME) {
557
586
      return r;
558
587
    }
559
 
    ++success_count;
 
588
    ++modified_count; /* TODO: HA_ERR_RECORD_IS_THE_SAME? */
560
589
  } else if (mod_op == 'D') {
561
590
    /* delete */
562
591
    handler *const hnd = table->file;
 
592
    table_vec[pst.get_table_id()].modified = true;
563
593
    const int r = hnd->ha_delete_row(table->record[0]);
564
594
    if (r != 0) {
565
595
      return r;
566
596
    }
567
 
    ++success_count;
 
597
    ++modified_count;
 
598
  } else if (mod_op == '+' || mod_op == '-') {
 
599
    /* increment/decrement */
 
600
    handler *const hnd = table->file;
 
601
    uchar *const buf = table->record[0];
 
602
    store_record(table, record[1]);
 
603
    const prep_stmt::fields_type& rf = pst.get_ret_fields();
 
604
    const size_t n = rf.size();
 
605
    size_t i = 0;
 
606
    for (i = 0; i < n; ++i) {
 
607
      const string_ref& nv = args.uvals[i];
 
608
      uint32_t fn = rf[i];
 
609
      Field *const fld = table->field[fn];
 
610
      if (fld->is_null() || nv.begin() == 0) {
 
611
        continue;
 
612
      }
 
613
      const long long pval = fld->val_int();
 
614
      const long long llv = atoll_nocheck(nv.begin(), nv.end());
 
615
      /* TODO: llv == 0? */
 
616
      long long nval = 0;
 
617
      if (mod_op == '+') {
 
618
        /* increment */
 
619
        nval = pval + llv;
 
620
      } else {
 
621
        /* decrement */
 
622
        nval = pval - llv;
 
623
        if ((pval < 0 && nval > 0) || (pval > 0 && nval < 0)) {
 
624
          break; /* don't modify */
 
625
        }
 
626
      }
 
627
      fld->store(nval, false);
 
628
    }
 
629
    if (i == n) {
 
630
      /* modify */
 
631
      table_vec[pst.get_table_id()].modified = true;
 
632
      const int r = hnd->ha_update_row(table->record[1], buf);
 
633
      if (r != 0 && r != HA_ERR_RECORD_IS_THE_SAME) {
 
634
        return r;
 
635
      }
 
636
      ++modified_count;
 
637
    }
568
638
  }
569
639
  return 0;
570
640
}
578
648
  }
579
649
  lock_tables_if();
580
650
  if (lock == 0) {
581
 
    return cb.dbcb_resp_short(2, "lock_tables");
 
651
    return cb.dbcb_resp_short(1, "lock_tables");
582
652
  }
583
653
  if (pst.get_table_id() >= table_vec.size()) {
584
654
    return cb.dbcb_resp_short(2, "tblnum");
587
657
  handler *const hnd = table->file;
588
658
  uchar *const buf = table->record[0];
589
659
  empty_record(table);
590
 
  Field **fld = table->field;
591
 
  size_t i = 0;
592
 
  for (; *fld && i < fvalslen; ++fld, ++i) {
593
 
    (*fld)->store(fvals[i].begin(), fvals[i].size(), &my_charset_bin);
 
660
  memset(buf, 0, table->s->null_bytes); /* clear null flags */
 
661
  const prep_stmt::fields_type& rf = pst.get_ret_fields();
 
662
  const size_t n = rf.size();
 
663
  for (size_t i = 0; i < n; ++i) {
 
664
    uint32_t fn = rf[i];
 
665
    Field *const fld = table->field[fn];
 
666
    if (fvals[i].begin() == 0) {
 
667
      fld->set_null();
 
668
    } else {
 
669
      fld->store(fvals[i].begin(), fvals[i].size(), &my_charset_bin);
 
670
    }
594
671
  }
595
 
  memset(buf, 0, table->s->null_bytes); /* TODO: allow NULL */
 
672
  table->next_number_field = table->found_next_number_field;
 
673
    /* FIXME: test */
596
674
  const int r = hnd->ha_write_row(buf);
597
 
  return cb.dbcb_resp_short(r != 0 ? 1 : 0, "");
 
675
  const ulonglong insert_id = table->file->insert_id_for_cur_row;
 
676
  table->next_number_field = 0;
 
677
  table_vec[pst.get_table_id()].modified = true;
 
678
  if (r == 0 && table->found_next_number_field != 0) {
 
679
    return cb.dbcb_resp_short_num64(0, insert_id);
 
680
  }
 
681
  if (r != 0) {
 
682
    return cb.dbcb_resp_short_num(1, r);
 
683
  }
 
684
  return cb.dbcb_resp_short(0, "");
598
685
}
599
686
 
600
687
void
607
694
  return cb.dbcb_resp_short(2, "notimpl");
608
695
}
609
696
 
 
697
static size_t
 
698
prepare_keybuf(const cmd_exec_args& args, uchar *key_buf, TABLE *table,
 
699
  KEY& kinfo, size_t invalues_index)
 
700
{
 
701
  size_t kplen_sum = 0;
 
702
  DBG_KEY(fprintf(stderr, "SLOW\n"));
 
703
  for (size_t i = 0; i < args.kvalslen; ++i) {
 
704
    const KEY_PART_INFO & kpt = kinfo.key_part[i];
 
705
    string_ref kval = args.kvals[i];
 
706
    if (args.invalues_keypart >= 0 &&
 
707
      static_cast<size_t>(args.invalues_keypart) == i) {
 
708
      kval = args.invalues[invalues_index];
 
709
    }
 
710
    if (kval.begin() == 0) {
 
711
      kpt.field->set_null();
 
712
    } else {
 
713
      kpt.field->set_notnull();
 
714
    }
 
715
    kpt.field->store(kval.begin(), kval.size(), &my_charset_bin);
 
716
    kplen_sum += kpt.store_length;
 
717
    DBG_KEYLEN(fprintf(stderr, "l=%u sl=%zu\n", kpt.length,
 
718
      kpt.store_length));
 
719
  }
 
720
  key_copy(key_buf, table->record[0], &kinfo, kplen_sum);
 
721
  DBG_KEYLEN(fprintf(stderr, "sum=%zu flen=%u\n", kplen_sum,
 
722
    kinfo.key_length));
 
723
  return kplen_sum;
 
724
}
 
725
 
610
726
void
611
727
dbcontext::cmd_find_internal(dbcallback_i& cb, const prep_stmt& pst,
612
728
  ha_rkey_function find_flag, const cmd_exec_args& args)
613
729
{
614
730
  const bool debug_out = (verbose_level >= 100);
615
 
  const bool modify_op_flag = (args.mod_op.size() != 0);
 
731
  bool need_resp_record = true;
616
732
  char mod_op = 0;
617
 
  if (modify_op_flag && !for_write_flag) {
618
 
    return cb.dbcb_resp_short(2, "readonly");
619
 
  }
620
 
  if (modify_op_flag) {
621
 
    mod_op = args.mod_op.begin()[0];
622
 
    if (mod_op != 'U' && mod_op != 'D') {
 
733
  const string_ref& mod_op_str = args.mod_op;
 
734
  if (mod_op_str.size() != 0) {
 
735
    if (!for_write_flag) {
 
736
      return cb.dbcb_resp_short(2, "readonly");
 
737
    }
 
738
    mod_op = mod_op_str.begin()[0];
 
739
    need_resp_record = mod_op_str.size() > 1 && mod_op_str.begin()[1] == '?';
 
740
    switch (mod_op) {
 
741
    case 'U': /* update */
 
742
    case 'D': /* delete */
 
743
    case '+': /* increment */
 
744
    case '-': /* decrement */
 
745
      break;
 
746
    default:
 
747
      if (debug_out) {
 
748
        fprintf(stderr, "unknown modop: %c\n", mod_op);
 
749
      }
623
750
      return cb.dbcb_resp_short(2, "modop");
624
751
    }
625
752
  }
626
753
  lock_tables_if();
627
754
  if (lock == 0) {
628
 
    return cb.dbcb_resp_short(2, "lock_tables");
 
755
    return cb.dbcb_resp_short(1, "lock_tables");
629
756
  }
630
757
  if (pst.get_table_id() >= table_vec.size()) {
631
758
    return cb.dbcb_resp_short(2, "tblnum");
632
759
  }
633
760
  TABLE *const table = table_vec[pst.get_table_id()].table;
 
761
  /* keys */
634
762
  if (pst.get_idxnum() >= table->s->keys) {
635
763
    return cb.dbcb_resp_short(2, "idxnum");
636
764
  }
638
766
  if (args.kvalslen > kinfo.key_parts) {
639
767
    return cb.dbcb_resp_short(2, "kpnum");
640
768
  }
641
 
  uchar key_buf[kinfo.key_length]; /* GNU */
642
 
  size_t kplen_sum = 0;
643
 
  {
644
 
    DBG_KEY(fprintf(stderr, "SLOW\n"));
645
 
    for (size_t i = 0; i < args.kvalslen; ++i) {
646
 
      const KEY_PART_INFO & kpt = kinfo.key_part[i];
647
 
      const string_ref& kval = args.kvals[i];
648
 
      if (kval.begin() == 0) {
649
 
        kpt.field->set_null();
650
 
      } else {
651
 
        kpt.field->set_notnull();
652
 
      }
653
 
      kpt.field->store(kval.begin(), kval.size(), &my_charset_bin);
654
 
      kplen_sum += kpt.length;
 
769
  uchar *const key_buf = DENA_ALLOCA_ALLOCATE(uchar, kinfo.key_length);
 
770
  size_t invalues_idx = 0;
 
771
  size_t kplen_sum = prepare_keybuf(args, key_buf, table, kinfo, invalues_idx);
 
772
  /* filters */
 
773
  uchar *filter_buf = 0;
 
774
  if (args.filters != 0) {
 
775
    const size_t filter_buf_len = calc_filter_buf_size(table, pst,
 
776
      args.filters);
 
777
    filter_buf = DENA_ALLOCA_ALLOCATE(uchar, filter_buf_len);
 
778
    if (!fill_filter_buf(table, pst, args.filters, filter_buf,
 
779
      filter_buf_len)) {
 
780
      return cb.dbcb_resp_short(2, "filterblob");
655
781
    }
656
 
    key_copy(key_buf, table->record[0], &kinfo, kplen_sum);
657
782
  }
 
783
  /* handler */
658
784
  table->read_set = &table->s->all_set;
659
785
  handler *const hnd = table->file;
660
786
  if (!for_write_flag) {
662
788
  }
663
789
  hnd->ha_index_or_rnd_end();
664
790
  hnd->ha_index_init(pst.get_idxnum(), 1);
665
 
  #if 0
666
 
  statistic_increment(index_exec_count, &LOCK_status);
667
 
  #endif
668
 
  if (!modify_op_flag) {
669
 
    cb.dbcb_resp_begin(pst.get_retfields().size());
670
 
  } else {
671
 
    /* nothing to do */
 
791
  if (need_resp_record) {
 
792
    cb.dbcb_resp_begin(pst.get_ret_fields().size());
672
793
  }
673
794
  const uint32_t limit = args.limit ? args.limit : 1;
674
795
  uint32_t skip = args.skip;
675
 
  size_t mod_success_count = 0;
 
796
  size_t modified_count = 0;
676
797
  int r = 0;
677
 
  for (uint32_t i = 0; i < limit + skip; ++i) {
678
 
    if (i == 0) {
 
798
  bool is_first = true;
 
799
  for (uint32_t cnt = 0; cnt < limit + skip;) {
 
800
    if (is_first) {
 
801
      is_first = false;
 
802
      const key_part_map kpm = (1U << args.kvalslen) - 1;
 
803
      r = hnd->index_read_map(table->record[0], key_buf, kpm, find_flag);
 
804
    } else if (args.invalues_keypart >= 0) {
 
805
      if (++invalues_idx >= args.invalueslen) {
 
806
        break;
 
807
      }
 
808
      kplen_sum = prepare_keybuf(args, key_buf, table, kinfo, invalues_idx);
679
809
      const key_part_map kpm = (1U << args.kvalslen) - 1;
680
810
      r = hnd->index_read_map(table->record[0], key_buf, kpm, find_flag);
681
811
    } else {
702
832
        dump_record(cb, table, pst);
703
833
      }
704
834
    }
 
835
    int filter_res = 0;
705
836
    if (r != 0) {
706
837
      /* no-count */
 
838
    } else if (args.filters != 0 && (filter_res = check_filter(cb, table, 
 
839
      pst, args.filters, filter_buf)) != 0) {
 
840
      if (filter_res < 0) {
 
841
        break;
 
842
      }
707
843
    } else if (skip > 0) {
708
844
      --skip;
709
845
    } else {
710
 
      if (!modify_op_flag) {
 
846
      /* hit */
 
847
      if (need_resp_record) {
711
848
        resp_record(cb, table, pst);
712
 
      } else {
713
 
        r = modify_record(cb, table, pst, args, mod_op, mod_success_count);
714
 
      }
 
849
      }
 
850
      if (mod_op != 0) {
 
851
        r = modify_record(cb, table, pst, args, mod_op, modified_count);
 
852
      }
 
853
      ++cnt;
 
854
    }
 
855
    if (args.invalues_keypart >= 0 && r == HA_ERR_KEY_NOT_FOUND) {
 
856
      continue;
715
857
    }
716
858
    if (r != 0 && r != HA_ERR_RECORD_DELETED) {
717
859
      break;
721
863
  if (r != 0 && r != HA_ERR_RECORD_DELETED && r != HA_ERR_KEY_NOT_FOUND &&
722
864
    r != HA_ERR_END_OF_FILE) {
723
865
    /* failed */
724
 
    if (!modify_op_flag) {
 
866
    if (need_resp_record) {
725
867
      /* revert dbcb_resp_begin() and dbcb_resp_entry() */
726
868
      cb.dbcb_resp_cancel();
727
869
    }
728
 
    cb.dbcb_resp_short_num(2, r);
 
870
    cb.dbcb_resp_short_num(1, r);
729
871
  } else {
730
872
    /* succeeded */
731
 
    if (!modify_op_flag) {
 
873
    if (need_resp_record) {
732
874
      cb.dbcb_resp_end();
733
875
    } else {
734
 
      cb.dbcb_resp_short_num(0, mod_success_count);
735
 
    }
736
 
  }
 
876
      cb.dbcb_resp_short_num(0, modified_count);
 
877
    }
 
878
  }
 
879
  DENA_ALLOCA_FREE(filter_buf);
 
880
  DENA_ALLOCA_FREE(key_buf);
 
881
}
 
882
 
 
883
size_t
 
884
dbcontext::calc_filter_buf_size(TABLE *table, const prep_stmt& pst,
 
885
  const record_filter *filters)
 
886
{
 
887
  size_t filter_buf_len = 0;
 
888
  for (const record_filter *f = filters; f->op.begin() != 0; ++f) {
 
889
    if (f->val.begin() == 0) {
 
890
      continue;
 
891
    }
 
892
    const uint32_t fn = pst.get_filter_fields()[f->ff_offset];
 
893
    filter_buf_len += table->field[fn]->pack_length();
 
894
  }
 
895
  ++filter_buf_len;
 
896
    /* Field_medium::cmp() calls uint3korr(), which may read 4 bytes.
 
897
       Allocate 1 more byte for safety. */
 
898
  return filter_buf_len;
 
899
}
 
900
 
 
901
bool
 
902
dbcontext::fill_filter_buf(TABLE *table, const prep_stmt& pst,
 
903
  const record_filter *filters, uchar *filter_buf, size_t len)
 
904
{
 
905
  memset(filter_buf, 0, len);
 
906
  size_t pos = 0;
 
907
  for (const record_filter *f = filters; f->op.begin() != 0; ++f) {
 
908
    if (f->val.begin() == 0) {
 
909
      continue;
 
910
    }
 
911
    const uint32_t fn = pst.get_filter_fields()[f->ff_offset];
 
912
    Field *const fld = table->field[fn];
 
913
    if ((fld->flags & BLOB_FLAG) != 0) {
 
914
      return false;
 
915
    }
 
916
    fld->store(f->val.begin(), f->val.size(), &my_charset_bin);
 
917
    const size_t packlen = fld->pack_length();
 
918
    memcpy(filter_buf + pos, fld->ptr, packlen);
 
919
    pos += packlen;
 
920
  }
 
921
  return true;
 
922
}
 
923
 
 
924
int
 
925
dbcontext::check_filter(dbcallback_i& cb, TABLE *table, const prep_stmt& pst,
 
926
  const record_filter *filters, const uchar *filter_buf)
 
927
{
 
928
  DBG_FILTER(fprintf(stderr, "check_filter\n"));
 
929
  size_t pos = 0;
 
930
  for (const record_filter *f = filters; f->op.begin() != 0; ++f) {
 
931
    const string_ref& op = f->op;
 
932
    const string_ref& val = f->val;
 
933
    const uint32_t fn = pst.get_filter_fields()[f->ff_offset];
 
934
    Field *const fld = table->field[fn];
 
935
    const size_t packlen = fld->pack_length();
 
936
    const uchar *const bval = filter_buf + pos;
 
937
    int cv = 0;
 
938
    if (fld->is_null()) {
 
939
      cv = (val.begin() == 0) ? 0 : -1;
 
940
    } else {
 
941
      cv = (val.begin() == 0) ? 1 : fld->cmp(bval);
 
942
    }
 
943
    DBG_FILTER(fprintf(stderr, "check_filter cv=%d\n", cv));
 
944
    bool cond = true;
 
945
    if (op.size() == 1) {
 
946
      switch (op.begin()[0]) {
 
947
      case '>':
 
948
        DBG_FILTER(fprintf(stderr, "check_filter op: >\n"));
 
949
        cond = (cv > 0);
 
950
        break;
 
951
      case '<':
 
952
        DBG_FILTER(fprintf(stderr, "check_filter op: <\n"));
 
953
        cond = (cv < 0);
 
954
        break;
 
955
      case '=':
 
956
        DBG_FILTER(fprintf(stderr, "check_filter op: =\n"));
 
957
        cond = (cv == 0);
 
958
        break;
 
959
      default:
 
960
        DBG_FILTER(fprintf(stderr, "check_filter op: unknown\n"));
 
961
        cond = false; /* FIXME: error */
 
962
        break;
 
963
      }
 
964
    } else if (op.size() == 2 && op.begin()[1] == '=') {
 
965
      switch (op.begin()[0]) {
 
966
      case '>':
 
967
        DBG_FILTER(fprintf(stderr, "check_filter op: >=\n"));
 
968
        cond = (cv >= 0);
 
969
        break;
 
970
      case '<':
 
971
        DBG_FILTER(fprintf(stderr, "check_filter op: <=\n"));
 
972
        cond = (cv <= 0);
 
973
        break;
 
974
      case '!':
 
975
        DBG_FILTER(fprintf(stderr, "check_filter op: !=\n"));
 
976
        cond = (cv != 0);
 
977
        break;
 
978
      default:
 
979
        DBG_FILTER(fprintf(stderr, "check_filter op: unknown\n"));
 
980
        cond = false; /* FIXME: error */
 
981
        break;
 
982
      }
 
983
    }
 
984
    DBG_FILTER(fprintf(stderr, "check_filter cond: %d\n", (int)cond));
 
985
    if (!cond) {
 
986
      return (f->filter_type == record_filter_type_skip) ? 1 : -1;
 
987
    }
 
988
    if (val.begin() != 0) {
 
989
      pos += packlen;
 
990
    }
 
991
  }
 
992
  return 0;
737
993
}
738
994
 
739
995
void
740
 
dbcontext::cmd_open_index(dbcallback_i& cb, size_t pst_id, const char *dbn,
741
 
  const char *tbl, const char *idx, const char *retflds)
 
996
dbcontext::cmd_open(dbcallback_i& cb, const cmd_open_args& arg)
742
997
{
743
998
  unlock_tables_if();
744
 
  const table_name_type k = std::make_pair(std::string(dbn), std::string(tbl));
 
999
  const table_name_type k = std::make_pair(std::string(arg.dbn),
 
1000
    std::string(arg.tbl));
745
1001
  const table_map_type::const_iterator iter = table_map.find(k);
746
1002
  uint32_t tblnum = 0;
747
1003
  if (iter != table_map.end()) {
754
1010
    bool refresh = true;
755
1011
    const thr_lock_type lock_type = for_write_flag ? TL_WRITE : TL_READ;
756
1012
    #if MYSQL_VERSION_ID >= 50505
757
 
    tables.init_one_table(dbn, strlen(dbn), tbl, strlen(tbl), tbl,
758
 
      lock_type);
759
 
    tables.mdl_request = mdl_request;
760
 
    Open_table_context ot_act(thd, MYSQL_OPEN_REOPEN);
 
1013
    tables.init_one_table(arg.dbn, strlen(arg.dbn), arg.tbl, strlen(arg.tbl),
 
1014
      arg.tbl, lock_type);
 
1015
    tables.mdl_request.init(MDL_key::TABLE, arg.dbn, arg.tbl,
 
1016
      for_write_flag ? MDL_SHARED_WRITE : MDL_SHARED_READ, MDL_TRANSACTION);
 
1017
    Open_table_context ot_act(thd, 0);
761
1018
    if (!open_table(thd, &tables, thd->mem_root, &ot_act)) {
762
1019
      table = tables.table;
763
1020
    }
764
1021
    #else
765
 
    tables.init_one_table(dbn, tbl, lock_type);
 
1022
    tables.init_one_table(arg.dbn, arg.tbl, lock_type);
766
1023
    table = open_table(thd, &tables, thd->mem_root, &refresh,
767
1024
      OPEN_VIEW_NO_PARSE);
768
1025
    #endif
769
1026
    if (table == 0) {
770
1027
      DENA_VERBOSE(10, fprintf(stderr,
771
1028
        "HNDSOCK failed to open %p [%s] [%s] [%d]\n",
772
 
        thd, dbn, tbl, static_cast<int>(refresh)));
773
 
      return cb.dbcb_resp_short(2, "open_table");
 
1029
        thd, arg.dbn, arg.tbl, static_cast<int>(refresh)));
 
1030
      return cb.dbcb_resp_short(1, "open_table");
774
1031
    }
775
1032
    statistic_increment(open_tables_count, &LOCK_status);
776
1033
    table->reginfo.lock_type = lock_type;
782
1039
    table_map[k] = tblnum;
783
1040
  }
784
1041
  size_t idxnum = static_cast<size_t>(-1);
785
 
  if (idx[0] >= '0' && idx[0] <= '9') {
 
1042
  if (arg.idx[0] >= '0' && arg.idx[0] <= '9') {
786
1043
    /* numeric */
787
1044
    TABLE *const table = table_vec[tblnum].table;
788
 
    idxnum = atoi(idx);
 
1045
    idxnum = atoi(arg.idx);
789
1046
    if (idxnum >= table->s->keys) {
790
1047
      return cb.dbcb_resp_short(2, "idxnum");
791
1048
    }
792
1049
  } else {
793
 
    const char *const idx_name_to_open = idx[0]  == '\0' ? "PRIMARY" : idx;
 
1050
    const char *const idx_name_to_open =
 
1051
      arg.idx[0]  == '\0' ? "PRIMARY" : arg.idx;
794
1052
    TABLE *const table = table_vec[tblnum].table;
795
1053
    for (uint i = 0; i < table->s->keys; ++i) {
796
1054
      KEY& kinfo = table->key_info[i];
803
1061
  if (idxnum == size_t(-1)) {
804
1062
    return cb.dbcb_resp_short(2, "idxnum");
805
1063
  }
806
 
  string_ref retflds_sr(retflds, strlen(retflds));
 
1064
  prep_stmt::fields_type rf;
 
1065
  prep_stmt::fields_type ff;
 
1066
  if (!parse_fields(table_vec[tblnum].table, arg.retflds, rf)) {
 
1067
    return cb.dbcb_resp_short(2, "fld");
 
1068
  }
 
1069
  if (!parse_fields(table_vec[tblnum].table, arg.filflds, ff)) {
 
1070
    return cb.dbcb_resp_short(2, "fld");
 
1071
  }
 
1072
  prep_stmt p(this, tblnum, idxnum, rf, ff);
 
1073
  cb.dbcb_set_prep_stmt(arg.pst_id, p);
 
1074
  return cb.dbcb_resp_short(0, "");
 
1075
}
 
1076
 
 
1077
bool
 
1078
dbcontext::parse_fields(TABLE *const table, const char *str,
 
1079
  prep_stmt::fields_type& flds)
 
1080
{
 
1081
  string_ref flds_sr(str, strlen(str));
807
1082
  std::vector<string_ref> fldnms;
808
 
  if (retflds_sr.size() != 0) {
809
 
    split(',', retflds_sr, fldnms);
 
1083
  if (flds_sr.size() != 0) {
 
1084
    split(',', flds_sr, fldnms);
810
1085
  }
811
 
  prep_stmt::retfields_type rf;
812
1086
  for (size_t i = 0; i < fldnms.size(); ++i) {
813
 
    TABLE *const table = table_vec[tblnum].table;
814
1087
    Field **fld = 0;
815
1088
    size_t j = 0;
816
1089
    for (fld = table->field; *fld; ++fld, ++j) {
823
1096
    if (*fld == 0) {
824
1097
      DBG_FLD(fprintf(stderr, "UNKNOWN FLD %s [%s]\n", retflds,
825
1098
        std::string(fldnms[i].begin(), fldnms[i].size()).c_str()));
826
 
      return cb.dbcb_resp_short(2, "fld");
 
1099
      return false;
827
1100
    }
828
1101
    DBG_FLD(fprintf(stderr, "FLD %s %zu\n", (*fld)->field_name, j));
829
 
    rf.push_back(j);
 
1102
    flds.push_back(j);
830
1103
  }
831
 
  prep_stmt p(this, tblnum, idxnum, rf);
832
 
  cb.dbcb_set_prep_stmt(pst_id, p);
833
 
  return cb.dbcb_resp_short(0, "");
 
1104
  return true;
834
1105
}
835
1106
 
836
1107
enum db_write_op {
840
1111
};
841
1112
 
842
1113
void
843
 
dbcontext::cmd_exec_on_index(dbcallback_i& cb, const cmd_exec_args& args)
 
1114
dbcontext::cmd_exec(dbcallback_i& cb, const cmd_exec_args& args)
844
1115
{
845
1116
  const prep_stmt& p = *args.pst;
846
1117
  if (p.get_table_id() == static_cast<size_t>(-1)) {
866
1137
      wrop = db_write_op_sql;
867
1138
      break;
868
1139
    default:
869
 
      return cb.dbcb_resp_short(1, "op");
 
1140
      return cb.dbcb_resp_short(2, "op");
870
1141
    }
871
1142
  } else if (args.op.size() == 2 && args.op.begin()[1] == '=') {
872
1143
    switch (args.op.begin()[0]) {
877
1148
      find_flag = HA_READ_KEY_OR_PREV;
878
1149
      break;
879
1150
    default:
880
 
      return cb.dbcb_resp_short(1, "op");
 
1151
      return cb.dbcb_resp_short(2, "op");
881
1152
    }
882
1153
  } else {
883
 
    return cb.dbcb_resp_short(1, "op");
 
1154
    return cb.dbcb_resp_short(2, "op");
884
1155
  }
885
1156
  if (args.kvalslen <= 0) {
886
1157
    return cb.dbcb_resp_short(2, "klen");