~ubuntu-branches/ubuntu/trusty/mariadb-5.5/trusty-proposed

« back to all changes in this revision

Viewing changes to plugin/handler_socket/client/hslongrun.cpp

  • Committer: Package Import Robot
  • Author(s): Otto Kekäläinen
  • Date: 2013-12-22 10:27:05 UTC
  • Revision ID: package-import@ubuntu.com-20131222102705-mndw7s12mz0szrcn
Tags: upstream-5.5.32
Import upstream version 5.5.32

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
 
 
2
// vim:sw=2:ai
 
3
 
 
4
#include <signal.h>
 
5
#include <sys/time.h>
 
6
#include <stdio.h>
 
7
#include <string.h>
 
8
#include <vector>
 
9
#include <map>
 
10
#include <stdlib.h>
 
11
#include <memory>
 
12
#include <errno.h>
 
13
#include <mysql.h>
 
14
#include <time.h>
 
15
#include <sys/types.h>
 
16
#include <sys/stat.h>
 
17
#include <fcntl.h>
 
18
 
 
19
#include "util.hpp"
 
20
#include "auto_ptrcontainer.hpp"
 
21
#include "socket.hpp"
 
22
#include "hstcpcli.hpp"
 
23
#include "string_util.hpp"
 
24
#include "mutex.hpp"
 
25
 
 
26
namespace dena {
 
27
 
 
28
struct auto_mysql : private noncopyable {
 
29
  auto_mysql() : db(0) {
 
30
    reset();
 
31
  }
 
32
  ~auto_mysql() {
 
33
    if (db) {
 
34
      mysql_close(db);
 
35
    }
 
36
  }
 
37
  void reset() {
 
38
    if (db) {
 
39
      mysql_close(db);
 
40
    }
 
41
    if ((db = mysql_init(0)) == 0) {
 
42
      fatal_exit("failed to initialize mysql client");
 
43
    }
 
44
  }
 
45
  operator MYSQL *() const { return db; }
 
46
 private:
 
47
  MYSQL *db;
 
48
};
 
49
 
 
50
struct auto_mysql_res : private noncopyable {
 
51
  auto_mysql_res(MYSQL *db) {
 
52
    res = mysql_store_result(db);
 
53
  }
 
54
  ~auto_mysql_res() {
 
55
    if (res) {
 
56
      mysql_free_result(res);
 
57
    }
 
58
  }
 
59
  operator MYSQL_RES *() const { return res; }
 
60
 private:
 
61
  MYSQL_RES *res;
 
62
};
 
63
 
 
64
struct auto_mysql_stmt : private noncopyable {
 
65
  auto_mysql_stmt(MYSQL *db) {
 
66
    stmt = mysql_stmt_init(db);
 
67
  }
 
68
  ~auto_mysql_stmt() {
 
69
    if (stmt) {
 
70
      mysql_stmt_close(stmt);
 
71
    }
 
72
  }
 
73
  operator MYSQL_STMT *() const { return stmt; }
 
74
 private:
 
75
  MYSQL_STMT *stmt;
 
76
};
 
77
 
 
78
double
 
79
gettimeofday_double()
 
80
{
 
81
  struct timeval tv = { };
 
82
  if (gettimeofday(&tv, 0) != 0) {
 
83
    fatal_abort("gettimeofday");
 
84
  }
 
85
  return static_cast<double>(tv.tv_usec) / 1000000 + tv.tv_sec;
 
86
}
 
87
 
 
88
struct record_value {
 
89
  mutex lock;
 
90
  bool deleted;
 
91
  bool unknown_state;
 
92
  std::string key;
 
93
  std::vector<std::string> values;
 
94
  record_value() : deleted(true), unknown_state(false) { }
 
95
};
 
96
 
 
97
struct hs_longrun_shared {
 
98
  config conf;
 
99
  socket_args arg;
 
100
  int verbose;
 
101
  long num_threads;
 
102
  int usleep;
 
103
  volatile mutable int running;
 
104
  auto_ptrcontainer< std::vector<record_value *> > records;
 
105
  hs_longrun_shared() : verbose(0), num_threads(0), usleep(0), running(1) { }
 
106
};
 
107
 
 
108
struct thread_base {
 
109
  thread_base() : need_join(false), stack_size(256 * 1024) { }
 
110
  virtual ~thread_base() {
 
111
    join();
 
112
  }
 
113
  virtual void run() = 0;
 
114
  void start() {
 
115
    if (!start_nothrow()) {
 
116
      fatal_abort("thread::start");
 
117
    }
 
118
  }
 
119
  bool start_nothrow() {
 
120
    if (need_join) {
 
121
      return need_join; /* true */
 
122
    }
 
123
    void *const arg = this;
 
124
    pthread_attr_t attr;
 
125
    if (pthread_attr_init(&attr) != 0) {
 
126
      fatal_abort("pthread_attr_init");
 
127
    }
 
128
    if (pthread_attr_setstacksize(&attr, stack_size) != 0) {
 
129
      fatal_abort("pthread_attr_setstacksize");
 
130
    }
 
131
    const int r = pthread_create(&thr, &attr, thread_main, arg);
 
132
    if (pthread_attr_destroy(&attr) != 0) {
 
133
      fatal_abort("pthread_attr_destroy");
 
134
    }
 
135
    if (r != 0) {
 
136
      return need_join; /* false */
 
137
    }
 
138
    need_join = true;
 
139
    return need_join; /* true */
 
140
  }
 
141
  void join() {
 
142
    if (!need_join) {
 
143
      return;
 
144
    }
 
145
    int e = 0;
 
146
    if ((e = pthread_join(thr, 0)) != 0) {
 
147
      fatal_abort("pthread_join");
 
148
    }
 
149
    need_join = false;
 
150
  }
 
151
 private:
 
152
  static void *thread_main(void *arg) {
 
153
    thread_base *p = static_cast<thread_base *>(arg);
 
154
    p->run();
 
155
    return 0;
 
156
  }
 
157
 private:
 
158
  pthread_t thr;
 
159
  bool need_join;
 
160
  size_t stack_size;
 
161
};
 
162
 
 
163
struct hs_longrun_stat {
 
164
  unsigned long long verify_error_count;
 
165
  unsigned long long runtime_error_count;
 
166
  unsigned long long unknown_count;
 
167
  unsigned long long success_count;
 
168
  hs_longrun_stat()
 
169
    : verify_error_count(0), runtime_error_count(0),
 
170
      unknown_count(0), success_count(0) { }
 
171
  void add(const hs_longrun_stat& x) {
 
172
    verify_error_count += x.verify_error_count;
 
173
    runtime_error_count += x.runtime_error_count;
 
174
    unknown_count += x.unknown_count;
 
175
    success_count += x.success_count;
 
176
  }
 
177
};
 
178
 
 
179
struct hs_longrun_thread_base : public thread_base {
 
180
  struct arg_type {
 
181
    int id;
 
182
    std::string worker_type;
 
183
    char op;
 
184
    int lock_flag;
 
185
    const hs_longrun_shared& sh;
 
186
    arg_type(int id, const std::string& worker_type, char op, int lock_flag,
 
187
      const hs_longrun_shared& sh)
 
188
      : id(id), worker_type(worker_type), op(op), lock_flag(lock_flag),
 
189
        sh(sh) { }
 
190
  };
 
191
  arg_type arg;
 
192
  hs_longrun_stat stat;
 
193
  drand48_data randbuf;
 
194
  unsigned int seed;
 
195
  hs_longrun_thread_base(const arg_type& arg)
 
196
    : arg(arg), seed(0) {
 
197
    seed = time(0) + arg.id + 1;
 
198
    srand48_r(seed, &randbuf);
 
199
  }
 
200
  virtual ~hs_longrun_thread_base() { }
 
201
  virtual void run() = 0;
 
202
  size_t rand_record() {
 
203
    double v = 0;
 
204
    drand48_r(&randbuf, &v);
 
205
    const size_t sz = arg.sh.records.size();
 
206
    size_t r = size_t(v * sz);
 
207
    if (r >= sz) {
 
208
      r = 0;
 
209
    }
 
210
    return r;
 
211
  }
 
212
  int verify_update(const std::string& k, const std::string& v1,
 
213
    const std::string& v2, const std::string& v3, record_value& rec,
 
214
    uint32_t num_rows, bool cur_unknown_state);
 
215
  int verify_read(const std::string& k, uint32_t num_rows, uint32_t num_flds,
 
216
    const std::string rrec[4], record_value& rec);
 
217
  int verify_readnolock(const std::string& k, uint32_t num_rows,
 
218
    uint32_t num_flds, const std::string rrec[4]);
 
219
};
 
220
 
 
221
int
 
222
hs_longrun_thread_base::verify_update(const std::string& k,
 
223
  const std::string& v1, const std::string& v2, const std::string& v3,
 
224
  record_value& rec, uint32_t num_rows, bool cur_unknown_state)
 
225
{
 
226
  const bool op_success = num_rows == 1;
 
227
  int ret = 0;
 
228
  if (!rec.unknown_state) {
 
229
    if (!rec.deleted && !op_success) {
 
230
      ++stat.verify_error_count;
 
231
      if (arg.sh.verbose > 0) {
 
232
        fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
 
233
          "unexpected_update_failure\n",
 
234
          arg.worker_type.c_str(), arg.id, k.c_str());
 
235
      }
 
236
      ret = 1;
 
237
    } else if (rec.deleted && op_success) {
 
238
      ++stat.verify_error_count;
 
239
      if (arg.sh.verbose > 0) {
 
240
        fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
 
241
          "unexpected_update_success\n",
 
242
          arg.worker_type.c_str(), arg.id, k.c_str());
 
243
      }
 
244
      ret = 1;
 
245
    }
 
246
  }
 
247
  if (op_success) {
 
248
    rec.values.resize(4);
 
249
    rec.values[0] = k;
 
250
    rec.values[1] = v1;
 
251
    rec.values[2] = v2;
 
252
    rec.values[3] = v3;
 
253
    if (ret == 0 && !rec.unknown_state) {
 
254
      ++stat.success_count;
 
255
    }
 
256
  }
 
257
  rec.unknown_state = cur_unknown_state;
 
258
  if (arg.sh.verbose >= 100 && ret == 0) {
 
259
    fprintf(stderr, "%s %s %s %s %s\n", arg.worker_type.c_str(),
 
260
      k.c_str(), v1.c_str(), v2.c_str(), v3.c_str());
 
261
  }
 
262
  return ret;
 
263
}
 
264
 
 
265
int
 
266
hs_longrun_thread_base::verify_read(const std::string& k,
 
267
  uint32_t num_rows, uint32_t num_flds, const std::string rrec[4],
 
268
  record_value& rec)
 
269
{
 
270
  const bool op_success = num_rows != 0;
 
271
  int ret = 0;
 
272
  if (!rec.unknown_state) {
 
273
    if (!rec.deleted && !op_success) {
 
274
      ++stat.verify_error_count;
 
275
      if (arg.sh.verbose > 0) {
 
276
        fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
 
277
          "unexpected_read_failure\n",
 
278
          arg.worker_type.c_str(), arg.id, k.c_str());
 
279
      }
 
280
      ret = 1;
 
281
    } else if (rec.deleted && op_success) {
 
282
      ++stat.verify_error_count;
 
283
      if (arg.sh.verbose > 0) {
 
284
        fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
 
285
          "unexpected_read_success\n",
 
286
          arg.worker_type.c_str(), arg.id, k.c_str());
 
287
      }
 
288
      ret = 1;
 
289
    } else if (num_flds != 4) {
 
290
      ++stat.verify_error_count;
 
291
      if (arg.sh.verbose > 0) {
 
292
        fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
 
293
          "unexpected_read_fldnum %d\n",
 
294
          arg.worker_type.c_str(), arg.id, k.c_str(),
 
295
          static_cast<int>(num_flds));
 
296
      }
 
297
      ret = 1;
 
298
    } else if (rec.deleted) {
 
299
      /* nothing to verify */
 
300
    } else {
 
301
      int diff = 0;
 
302
      for (size_t i = 0; i < 4; ++i) {
 
303
        if (rec.values[i] == rrec[i]) {
 
304
          /* ok */
 
305
        } else {
 
306
          diff = 1;
 
307
        }
 
308
      }
 
309
      if (diff) {
 
310
        std::string mess;
 
311
        for (size_t i = 0; i < 4; ++i) {
 
312
          const std::string& expected = rec.values[i];
 
313
          const std::string& val = rrec[i];
 
314
          mess += " " + val + "/" + expected;
 
315
        }
 
316
        if (arg.sh.verbose > 0) {
 
317
          fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
 
318
            "unexpected_read_value %s\n",
 
319
            arg.worker_type.c_str(), arg.id, k.c_str(), mess.c_str());
 
320
        }
 
321
        ret = 1;
 
322
      }
 
323
    }
 
324
  }
 
325
  if (arg.sh.verbose >= 100 && ret == 0) {
 
326
    fprintf(stderr, "%s %s\n", arg.worker_type.c_str(), k.c_str());
 
327
  }
 
328
  if (ret == 0 && !rec.unknown_state) {
 
329
    ++stat.success_count;
 
330
  }
 
331
  return ret;
 
332
}
 
333
 
 
334
int
 
335
hs_longrun_thread_base::verify_readnolock(const std::string& k,
 
336
  uint32_t num_rows, uint32_t num_flds, const std::string rrec[4])
 
337
{
 
338
  int ret = 0;
 
339
  if (num_rows != 1 || num_flds != 4) {
 
340
    ++stat.verify_error_count;
 
341
    if (arg.sh.verbose > 0) {
 
342
      fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
 
343
        "unexpected_read_failure\n",
 
344
        arg.worker_type.c_str(), arg.id, k.c_str());
 
345
    }
 
346
    ret = 1;
 
347
  }
 
348
  if (arg.sh.verbose >= 100 && ret == 0) {
 
349
    fprintf(stderr, "%s -> %s %s %s %s %s\n", arg.worker_type.c_str(),
 
350
      k.c_str(), rrec[0].c_str(), rrec[1].c_str(), rrec[2].c_str(),
 
351
      rrec[3].c_str());
 
352
  }
 
353
  if (ret == 0) {
 
354
    ++stat.success_count;
 
355
  }
 
356
  return ret;
 
357
}
 
358
 
 
359
struct hs_longrun_thread_hs : public hs_longrun_thread_base {
 
360
  hs_longrun_thread_hs(const arg_type& arg)
 
361
    : hs_longrun_thread_base(arg) { }
 
362
  void run();
 
363
  int check_hs_error(const char *mess, record_value *rec);
 
364
  int op_insert(record_value& rec);
 
365
  int op_delete(record_value& rec);
 
366
  int op_update(record_value& rec);
 
367
  int op_read(record_value& rec);
 
368
  int op_readnolock(int k);
 
369
  hstcpcli_ptr cli;
 
370
  socket_args sockargs;
 
371
};
 
372
 
 
373
struct lock_guard : noncopyable {
 
374
  lock_guard(mutex& mtx) : mtx(mtx) {
 
375
    mtx.lock();
 
376
  }
 
377
  ~lock_guard() {
 
378
    mtx.unlock();
 
379
  }
 
380
  mutex& mtx;
 
381
};
 
382
 
 
383
string_ref
 
384
to_string_ref(const std::string& s)
 
385
{
 
386
  return string_ref(s.data(), s.size());
 
387
}
 
388
 
 
389
std::string
 
390
to_string(const string_ref& s)
 
391
{
 
392
  return std::string(s.begin(), s.size());
 
393
}
 
394
 
 
395
void
 
396
hs_longrun_thread_hs::run()
 
397
{
 
398
  config c = arg.sh.conf;
 
399
  if (arg.op == 'R' || arg.op == 'N') {
 
400
    c["port"] = to_stdstring(arg.sh.conf.get_int("hsport", 9998));
 
401
  } else {
 
402
    c["port"] = to_stdstring(arg.sh.conf.get_int("hsport_wr", 9999));
 
403
  }
 
404
  sockargs.set(c);
 
405
 
 
406
  while (arg.sh.running) {
 
407
    if (cli.get() == 0 || !cli->stable_point()) {
 
408
      cli = hstcpcli_i::create(sockargs);
 
409
      if (check_hs_error("connect", 0) != 0) {
 
410
        cli.reset();
 
411
        continue;
 
412
      }
 
413
      cli->request_buf_open_index(0, "hstestdb", "hstesttbl", "PRIMARY",
 
414
        "k,v1,v2,v3", "k,v1,v2,v3");
 
415
      cli->request_send();
 
416
      if (check_hs_error("openindex_send", 0) != 0) {
 
417
        cli.reset();
 
418
        continue;
 
419
      }
 
420
      size_t num_flds = 0;
 
421
      cli->response_recv(num_flds);
 
422
      if (check_hs_error("openindex_recv", 0) != 0) {
 
423
        cli.reset();
 
424
        continue;
 
425
      }
 
426
      cli->response_buf_remove();
 
427
    }
 
428
    const size_t rec_id = rand_record();
 
429
    if (arg.lock_flag) {
 
430
      record_value& rec = *arg.sh.records[rec_id];
 
431
      lock_guard g(rec.lock);
 
432
      int e = 0;
 
433
      switch (arg.op) {
 
434
      case 'I':
 
435
        e = op_insert(rec);
 
436
        break;
 
437
      case 'D':
 
438
        e = op_delete(rec);
 
439
        break;
 
440
      case 'U':
 
441
        e = op_update(rec);
 
442
        break;
 
443
      case 'R':
 
444
        e = op_read(rec);
 
445
        break;
 
446
      default:
 
447
        break;
 
448
      }
 
449
    } else {
 
450
      int e = 0;
 
451
      switch (arg.op) {
 
452
      case 'N':
 
453
        e = op_readnolock(rec_id);
 
454
        break;
 
455
      default:
 
456
        break;
 
457
      }
 
458
    }
 
459
  }
 
460
}
 
461
 
 
462
int
 
463
hs_longrun_thread_hs::op_insert(record_value& rec)
 
464
{
 
465
  const std::string k = rec.key;
 
466
  const std::string v1 = "iv1_" + k + "_" + to_stdstring(arg.id);
 
467
  const std::string v2 = "iv2_" + k + "_" + to_stdstring(arg.id);
 
468
  const std::string v3 = "iv3_" + k + "_" + to_stdstring(arg.id);
 
469
  const string_ref op_ref("+", 1);
 
470
  const string_ref op_args[4] = {
 
471
    to_string_ref(k),
 
472
    to_string_ref(v1),
 
473
    to_string_ref(v2),
 
474
    to_string_ref(v3)
 
475
  };
 
476
  cli->request_buf_exec_generic(0, op_ref, op_args, 4, 1, 0,
 
477
    string_ref(), 0, 0, 0, 0);
 
478
  cli->request_send();
 
479
  if (check_hs_error("op_insert_send", &rec) != 0) { return 1; }
 
480
  size_t numflds = 0;
 
481
  cli->response_recv(numflds);
 
482
  if (arg.sh.verbose > 10) {
 
483
    const string_ref *row = cli->get_next_row();
 
484
    fprintf(stderr, "HS op=+ errrcode=%d errmess=[%s]\n", cli->get_error_code(),
 
485
      row ? to_string(row[0]).c_str() : "");
 
486
  }
 
487
  const bool op_success = cli->get_error_code() == 0;
 
488
  int ret = 0;
 
489
  if (!rec.unknown_state) {
 
490
    if (rec.deleted && !op_success) {
 
491
      ++stat.verify_error_count;
 
492
      if (arg.sh.verbose > 0) {
 
493
        fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
 
494
          "unexpected_insert_failure\n",
 
495
          arg.worker_type.c_str(), arg.id, k.c_str());
 
496
      }
 
497
      ret = 1;
 
498
    } else if (!rec.deleted && op_success) {
 
499
      ++stat.verify_error_count;
 
500
      if (arg.sh.verbose > 0) {
 
501
        fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
 
502
          "unexpected_insert_success\n",
 
503
          arg.worker_type.c_str(), arg.id, k.c_str());
 
504
      }
 
505
      ret = 1;
 
506
    }
 
507
  } else {
 
508
    ++stat.unknown_count;
 
509
  }
 
510
  if (op_success) {
 
511
    rec.values.resize(4);
 
512
    rec.values[0] = k;
 
513
    rec.values[1] = v1;
 
514
    rec.values[2] = v2;
 
515
    rec.values[3] = v3;
 
516
    rec.deleted = false;
 
517
    if (arg.sh.verbose >= 100 && ret == 0) {
 
518
      fprintf(stderr, "HS_INSERT %s %s %s %s\n", k.c_str(), v1.c_str(),
 
519
        v2.c_str(), v3.c_str());
 
520
    }
 
521
    if (ret == 0 && !rec.unknown_state) {
 
522
      ++stat.success_count;
 
523
    }
 
524
    rec.unknown_state = false;
 
525
  }
 
526
  cli->response_buf_remove();
 
527
  return ret;
 
528
}
 
529
 
 
530
int
 
531
hs_longrun_thread_hs::op_delete(record_value& rec)
 
532
{
 
533
  const std::string k = rec.key;
 
534
  const string_ref op_ref("=", 1);
 
535
  const string_ref op_args[1] = {
 
536
    to_string_ref(k),
 
537
  };
 
538
  const string_ref modop_ref("D", 1);
 
539
  cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0,
 
540
    modop_ref, 0, 0, 0, 0);
 
541
  cli->request_send();
 
542
  if (check_hs_error("op_delete_send", &rec) != 0) { return 1; }
 
543
  size_t numflds = 0;
 
544
  cli->response_recv(numflds);
 
545
  if (check_hs_error("op_delete_recv", &rec) != 0) { return 1; }
 
546
  const string_ref *row = cli->get_next_row();
 
547
  const bool op_success = (numflds > 0 && row != 0 &&
 
548
    to_string(row[0]) == "1");
 
549
  int ret = 0;
 
550
  if (!rec.unknown_state) {
 
551
    if (!rec.deleted && !op_success) {
 
552
      ++stat.verify_error_count;
 
553
      if (arg.sh.verbose > 0) {
 
554
        fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
 
555
          "unexpected_delete_failure\n",
 
556
          arg.worker_type.c_str(), arg.id, k.c_str());
 
557
      }
 
558
      ret = 1;
 
559
    } else if (rec.deleted && op_success) {
 
560
      ++stat.verify_error_count;
 
561
      if (arg.sh.verbose > 0) {
 
562
        fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
 
563
          "unexpected_delete_success\n",
 
564
          arg.worker_type.c_str(), arg.id, k.c_str());
 
565
      }
 
566
      ret = 1;
 
567
    }
 
568
  }
 
569
  cli->response_buf_remove();
 
570
  if (op_success) {
 
571
    rec.deleted = true;
 
572
    if (ret == 0 && !rec.unknown_state) {
 
573
      ++stat.success_count;
 
574
    }
 
575
    rec.unknown_state = false;
 
576
  }
 
577
  if (arg.sh.verbose >= 100 && ret == 0) {
 
578
    fprintf(stderr, "HS_DELETE %s\n", k.c_str());
 
579
  }
 
580
  return ret;
 
581
}
 
582
 
 
583
int
 
584
hs_longrun_thread_hs::op_update(record_value& rec)
 
585
{
 
586
  const std::string k = rec.key;
 
587
  const std::string v1 = "uv1_" + k + "_" + to_stdstring(arg.id);
 
588
  const std::string v2 = "uv2_" + k + "_" + to_stdstring(arg.id);
 
589
  const std::string v3 = "uv3_" + k + "_" + to_stdstring(arg.id);
 
590
  const string_ref op_ref("=", 1);
 
591
  const string_ref op_args[1] = {
 
592
    to_string_ref(k),
 
593
  };
 
594
  const string_ref modop_ref("U", 1);
 
595
  const string_ref modop_args[4] = {
 
596
    to_string_ref(k),
 
597
    to_string_ref(v1),
 
598
    to_string_ref(v2),
 
599
    to_string_ref(v3)
 
600
  };
 
601
  cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0,
 
602
    modop_ref, modop_args, 4, 0, 0);
 
603
  cli->request_send();
 
604
  if (check_hs_error("op_update_send", &rec) != 0) { return 1; }
 
605
  size_t numflds = 0;
 
606
  cli->response_recv(numflds);
 
607
  if (check_hs_error("op_update_recv", &rec) != 0) { return 1; }
 
608
  const string_ref *row = cli->get_next_row();
 
609
  uint32_t num_rows = row
 
610
    ? atoi_uint32_nocheck(row[0].begin(), row[0].end()) : 0;
 
611
  cli->response_buf_remove();
 
612
  const bool cur_unknown_state = (num_rows == 1);
 
613
  return verify_update(k, v1, v2, v3, rec, num_rows, cur_unknown_state);
 
614
}
 
615
 
 
616
int
 
617
hs_longrun_thread_hs::op_read(record_value& rec)
 
618
{
 
619
  const std::string k = rec.key;
 
620
  const string_ref op_ref("=", 1);
 
621
  const string_ref op_args[1] = {
 
622
    to_string_ref(k),
 
623
  };
 
624
  cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0,
 
625
    string_ref(), 0, 0, 0, 0);
 
626
  cli->request_send();
 
627
  if (check_hs_error("op_read_send", 0) != 0) { return 1; }
 
628
  size_t num_flds = 0;
 
629
  size_t num_rows = 0;
 
630
  cli->response_recv(num_flds);
 
631
  if (check_hs_error("op_read_recv", 0) != 0) { return 1; }
 
632
  const string_ref *row = cli->get_next_row();
 
633
  std::string rrec[4];
 
634
  if (row != 0 && num_flds == 4) {
 
635
    for (int i = 0; i < 4; ++i) {
 
636
      rrec[i] = to_string(row[i]);
 
637
    }
 
638
    ++num_rows;
 
639
  }
 
640
  row = cli->get_next_row();
 
641
  if (row != 0) {
 
642
    ++num_rows;
 
643
  }
 
644
  cli->response_buf_remove();
 
645
  return verify_read(k, num_rows, num_flds, rrec, rec);
 
646
}
 
647
 
 
648
int
 
649
hs_longrun_thread_hs::op_readnolock(int key)
 
650
{
 
651
  const std::string k = to_stdstring(key);
 
652
  const string_ref op_ref("=", 1);
 
653
  const string_ref op_args[1] = {
 
654
    to_string_ref(k),
 
655
  };
 
656
  cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0,
 
657
    string_ref(), 0, 0, 0, 0);
 
658
  cli->request_send();
 
659
  if (check_hs_error("op_read_send", 0) != 0) { return 1; }
 
660
  size_t num_flds = 0;
 
661
  size_t num_rows = 0;
 
662
  cli->response_recv(num_flds);
 
663
  if (check_hs_error("op_read_recv", 0) != 0) { return 1; }
 
664
  const string_ref *row = cli->get_next_row();
 
665
  std::string rrec[4];
 
666
  if (row != 0 && num_flds == 4) {
 
667
    for (int i = 0; i < 4; ++i) {
 
668
      rrec[i] = to_string(row[i]);
 
669
    }
 
670
    ++num_rows;
 
671
  }
 
672
  row = cli->get_next_row();
 
673
  if (row != 0) {
 
674
    ++num_rows;
 
675
  }
 
676
  cli->response_buf_remove();
 
677
  return verify_readnolock(k, num_rows, num_flds, rrec);
 
678
}
 
679
 
 
680
int
 
681
hs_longrun_thread_hs::check_hs_error(const char *mess, record_value *rec)
 
682
{
 
683
  const int err = cli->get_error_code();
 
684
  if (err == 0) {
 
685
    return 0;
 
686
  }
 
687
  ++stat.runtime_error_count;
 
688
  if (arg.sh.verbose > 0) {
 
689
    const std::string estr = cli->get_error();
 
690
    fprintf(stderr, "RUNTIME_ERROR: op=%c wid=%d %s: %d %s\n",
 
691
      arg.op, arg.id, mess, err, estr.c_str());
 
692
  }
 
693
  if (rec) {
 
694
    rec->unknown_state = true;
 
695
  }
 
696
  return 1;
 
697
}
 
698
 
 
699
struct hs_longrun_thread_my : public hs_longrun_thread_base {
 
700
  hs_longrun_thread_my(const arg_type& arg)
 
701
    : hs_longrun_thread_base(arg), connected(false) { }
 
702
  void run();
 
703
  void show_mysql_error(const char *mess, record_value *rec);
 
704
  int op_insert(record_value& rec);
 
705
  int op_delete(record_value& rec);
 
706
  int op_update(record_value& rec);
 
707
  int op_delins(record_value& rec);
 
708
  int op_read(record_value& rec);
 
709
  auto_mysql db;
 
710
  bool connected;
 
711
};
 
712
 
 
713
void
 
714
hs_longrun_thread_my::run()
 
715
{
 
716
  const std::string mysql_host = arg.sh.conf.get_str("host", "localhost");
 
717
  const std::string mysql_user = arg.sh.conf.get_str("mysqluser", "root");
 
718
  const std::string mysql_passwd = arg.sh.conf.get_str("mysqlpass", "");
 
719
  const std::string mysql_dbname = "hstestdb";
 
720
 
 
721
  while (arg.sh.running) {
 
722
    if (!connected) {
 
723
      if (!mysql_real_connect(db, mysql_host.c_str(), mysql_user.c_str(),
 
724
        mysql_passwd.c_str(), mysql_dbname.c_str(), mysql_port, 0, 0)) {
 
725
        show_mysql_error("mysql_real_connect", 0);
 
726
        continue;
 
727
      }
 
728
    }
 
729
    connected = true;
 
730
    const size_t rec_id = rand_record();
 
731
    record_value& rec = *arg.sh.records[rec_id];
 
732
    lock_guard g(rec.lock);
 
733
    int e = 0;
 
734
    switch (arg.op) {
 
735
    #if 0
 
736
    case 'I':
 
737
      e = op_insert(rec);
 
738
      break;
 
739
    case 'D':
 
740
      e = op_delete(rec);
 
741
      break;
 
742
    case 'U':
 
743
      e = op_update(rec);
 
744
      break;
 
745
    #endif
 
746
    case 'T':
 
747
      e = op_delins(rec);
 
748
      break;
 
749
    case 'R':
 
750
      e = op_read(rec);
 
751
      break;
 
752
    default:
 
753
      break;
 
754
    }
 
755
  }
 
756
}
 
757
 
 
758
int
 
759
hs_longrun_thread_my::op_delins(record_value& rec)
 
760
{
 
761
  const std::string k = rec.key;
 
762
  const std::string v1 = "div1_" + k + "_" + to_stdstring(arg.id);
 
763
  const std::string v2 = "div2_" + k + "_" + to_stdstring(arg.id);
 
764
  const std::string v3 = "div3_" + k + "_" + to_stdstring(arg.id);
 
765
  int success = 0;
 
766
  bool cur_unknown_state = false;
 
767
  do {
 
768
    char query[1024];
 
769
    #if 1
 
770
    if (mysql_query(db, "begin") != 0) {
 
771
      if (arg.sh.verbose >= 20) {
 
772
        fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), "begin");
 
773
      }
 
774
      break;
 
775
    }
 
776
    #endif
 
777
    cur_unknown_state = true;
 
778
    snprintf(query, 1024,
 
779
      "delete from hstesttbl where k = '%s'", k.c_str());
 
780
    if (mysql_query(db, query) != 0) {
 
781
      if (arg.sh.verbose >= 20) {
 
782
        fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), query);
 
783
      }
 
784
      break;
 
785
    }
 
786
    if (mysql_affected_rows(db) != 1) {
 
787
      if (arg.sh.verbose >= 20) {
 
788
        fprintf(stderr, "mysql: notfound: [%s]\n", query);
 
789
      }
 
790
      break;
 
791
    }
 
792
    snprintf(query, 1024,
 
793
      "insert into hstesttbl values ('%s', '%s', '%s', '%s')",
 
794
      k.c_str(), v1.c_str(), v2.c_str(), v3.c_str());
 
795
    if (mysql_query(db, query) != 0) {
 
796
      if (arg.sh.verbose >= 20) {
 
797
        fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), query);
 
798
      }
 
799
      break;
 
800
    }
 
801
    #if 1
 
802
    if (mysql_query(db, "commit") != 0) {
 
803
      if (arg.sh.verbose >= 20) {
 
804
        fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), "commit");
 
805
      }
 
806
      break;
 
807
    }
 
808
    #endif
 
809
    success = true;
 
810
    cur_unknown_state = false;
 
811
  } while (false);
 
812
  return verify_update(k, v1, v2, v3, rec, (success != 0), cur_unknown_state);
 
813
}
 
814
 
 
815
int
 
816
hs_longrun_thread_my::op_read(record_value& rec)
 
817
{
 
818
  const std::string k = rec.key;
 
819
  char query[1024] = { 0 };
 
820
  const int len = snprintf(query, 1024,
 
821
    "select k,v1,v2,v3 from hstesttbl where k='%s'", k.c_str());
 
822
  const int r = mysql_real_query(db, query, len > 0 ? len : 0);
 
823
  if (r != 0) {
 
824
    show_mysql_error(query, 0);
 
825
    return 1;
 
826
  }
 
827
  MYSQL_ROW row = 0;
 
828
  unsigned long *lengths = 0;
 
829
  unsigned int num_rows = 0;
 
830
  unsigned int num_flds = 0;
 
831
  auto_mysql_res res(db);
 
832
  std::string rrec[4];
 
833
  if (res != 0) {
 
834
    num_flds = mysql_num_fields(res);
 
835
    row = mysql_fetch_row(res);
 
836
    if (row != 0) {
 
837
      lengths = mysql_fetch_lengths(res);
 
838
      if (num_flds == 4) {
 
839
        for (int i = 0; i < 4; ++i) {
 
840
          rrec[i] = std::string(row[i], lengths[i]);
 
841
        }
 
842
      }
 
843
      ++num_rows;
 
844
      row = mysql_fetch_row(res);
 
845
      if (row != 0) {
 
846
        ++num_rows;
 
847
      }
 
848
    }
 
849
  }
 
850
  return verify_read(k, num_rows, num_flds, rrec, rec);
 
851
}
 
852
 
 
853
void
 
854
hs_longrun_thread_my::show_mysql_error(const char *mess, record_value *rec)
 
855
{
 
856
  ++stat.runtime_error_count;
 
857
  if (arg.sh.verbose > 0) {
 
858
    fprintf(stderr, "RUNTIME_ERROR: op=%c wid=%d [%s]: %s\n",
 
859
      arg.op, arg.id, mess, mysql_error(db));
 
860
  }
 
861
  if (rec) {
 
862
    rec->unknown_state = true;
 
863
  }
 
864
  db.reset();
 
865
  connected = false;
 
866
}
 
867
 
 
868
void
 
869
mysql_do(MYSQL *db, const char *query)
 
870
{
 
871
  if (mysql_real_query(db, query, strlen(query)) != 0) {
 
872
    fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), query);
 
873
    fatal_exit("mysql_do");
 
874
  }
 
875
}
 
876
 
 
877
void
 
878
hs_longrun_init_table(const config& conf, int num_prepare,
 
879
  hs_longrun_shared& shared)
 
880
{
 
881
  const std::string mysql_host = conf.get_str("host", "localhost");
 
882
  const std::string mysql_user = conf.get_str("mysqluser", "root");
 
883
  const std::string mysql_passwd = conf.get_str("mysqlpass", "");
 
884
  const std::string mysql_dbname = "";
 
885
  auto_mysql db;
 
886
  if (!mysql_real_connect(db, mysql_host.c_str(), mysql_user.c_str(),
 
887
    mysql_passwd.c_str(), mysql_dbname.c_str(), mysql_port, 0, 0)) {
 
888
    fprintf(stderr, "mysql: error=[%s]\n", mysql_error(db));
 
889
    fatal_exit("hs_longrun_init_table");
 
890
  }
 
891
  mysql_do(db, "drop database if exists hstestdb");
 
892
  mysql_do(db, "create database hstestdb");
 
893
  mysql_do(db, "use hstestdb");
 
894
  mysql_do(db,
 
895
    "create table hstesttbl ("
 
896
    "k int primary key,"
 
897
    "v1 varchar(32) not null,"
 
898
    "v2 varchar(32) not null,"
 
899
    "v3 varchar(32) not null"
 
900
    ") character set utf8 collate utf8_bin engine = innodb");
 
901
  for (int i = 0; i < num_prepare; ++i) {
 
902
    const std::string i_str = to_stdstring(i);
 
903
    const std::string v1 = "pv1_" + i_str;
 
904
    const std::string v2 = "pv2_" + i_str;
 
905
    const std::string v3 = "pv3_" + i_str;
 
906
    char buf[1024];
 
907
    snprintf(buf, 1024, "insert into hstesttbl(k, v1, v2, v3) values"
 
908
      "(%d, '%s', '%s', '%s')", i, v1.c_str(), v2.c_str(), v3.c_str());
 
909
    mysql_do(db, buf);
 
910
    record_value *rec = shared.records[i];
 
911
    rec->key = i_str;
 
912
    rec->values.resize(4);
 
913
    rec->values[0] = i_str;
 
914
    rec->values[1] = v1;
 
915
    rec->values[2] = v2;
 
916
    rec->values[3] = v3;
 
917
    rec->deleted = false;
 
918
  }
 
919
}
 
920
 
 
921
int
 
922
hs_longrun_main(int argc, char **argv)
 
923
{
 
924
  hs_longrun_shared shared;
 
925
  parse_args(argc, argv, shared.conf);
 
926
  shared.conf["host"] = shared.conf.get_str("host", "localhost");
 
927
  shared.verbose = shared.conf.get_int("verbose", 1);
 
928
  const int table_size = shared.conf.get_int("table_size", 10000);
 
929
  for (int i = 0; i < table_size; ++i) {
 
930
    std::auto_ptr<record_value> rec(new record_value());
 
931
    rec->key = to_stdstring(i);
 
932
    shared.records.push_back_ptr(rec);
 
933
  }
 
934
  mysql_library_init(0, 0, 0);
 
935
  const int duration = shared.conf.get_int("duration", 10);
 
936
  const int num_hsinsert = shared.conf.get_int("num_hsinsert", 10);
 
937
  const int num_hsdelete = shared.conf.get_int("num_hsdelete", 10);
 
938
  const int num_hsupdate = shared.conf.get_int("num_hsupdate", 10);
 
939
  const int num_hsread = shared.conf.get_int("num_hsread", 10);
 
940
  const int num_myread = shared.conf.get_int("num_myread", 10);
 
941
  const int num_mydelins = shared.conf.get_int("num_mydelins", 10);
 
942
  int num_hsreadnolock = shared.conf.get_int("num_hsreadnolock", 10);
 
943
  const bool always_filled = (num_hsinsert == 0 && num_hsdelete == 0);
 
944
  if (!always_filled) {
 
945
    num_hsreadnolock = 0;
 
946
  }
 
947
  hs_longrun_init_table(shared.conf, always_filled ? table_size : 0,
 
948
    shared);
 
949
  /* create worker threads */
 
950
  static const struct thrtmpl_type {
 
951
    const char *type; char op; int num; int hs; int lock;
 
952
  } thrtmpl[] = {
 
953
    { "hsinsert", 'I', num_hsinsert, 1, 1 },
 
954
    { "hsdelete", 'D', num_hsdelete, 1, 1 },
 
955
    { "hsupdate", 'U', num_hsupdate, 1, 1 },
 
956
    { "hsread", 'R', num_hsread, 1, 1 },
 
957
    { "hsreadnolock", 'N', num_hsreadnolock, 1, 0 },
 
958
    { "myread", 'R', num_myread, 0, 1 },
 
959
    { "mydelins", 'T', num_mydelins, 0, 1 },
 
960
  };
 
961
  typedef auto_ptrcontainer< std::vector<hs_longrun_thread_base *> > thrs_type;
 
962
  thrs_type thrs;
 
963
  for (size_t i = 0; i < sizeof(thrtmpl)/sizeof(thrtmpl[0]); ++i) {
 
964
    const thrtmpl_type& e = thrtmpl[i];
 
965
    for (int j = 0; j < e.num; ++j) {
 
966
      int id = thrs.size();
 
967
      const hs_longrun_thread_hs::arg_type arg(id, e.type, e.op, e.lock,
 
968
        shared);
 
969
      std::auto_ptr<hs_longrun_thread_base> thr;
 
970
      if (e.hs) {
 
971
        thr.reset(new hs_longrun_thread_hs(arg));
 
972
      } else {
 
973
        thr.reset(new hs_longrun_thread_my(arg));
 
974
      }
 
975
      thrs.push_back_ptr(thr);
 
976
    }
 
977
  }
 
978
  shared.num_threads = thrs.size();
 
979
  /* start threads */
 
980
  fprintf(stderr, "START\n");
 
981
  shared.running = 1;
 
982
  for (size_t i = 0; i < thrs.size(); ++i) {
 
983
    thrs[i]->start();
 
984
  }
 
985
  /* wait */
 
986
  sleep(duration);
 
987
  /* stop thread */
 
988
  shared.running = 0;
 
989
  for (size_t i = 0; i < thrs.size(); ++i) {
 
990
    thrs[i]->join();
 
991
  }
 
992
  fprintf(stderr, "DONE\n");
 
993
  /* summary */
 
994
  typedef std::map<std::string, hs_longrun_stat> stat_map;
 
995
  stat_map sm;
 
996
  for (size_t i = 0; i < thrs.size(); ++i) {
 
997
    hs_longrun_thread_base *const thr = thrs[i];
 
998
    const std::string wt = thr->arg.worker_type;
 
999
    hs_longrun_stat& v = sm[wt];
 
1000
    v.add(thr->stat);
 
1001
  }
 
1002
  hs_longrun_stat total;
 
1003
  for (stat_map::const_iterator i = sm.begin(); i != sm.end(); ++i) {
 
1004
    if (i->second.verify_error_count != 0) {
 
1005
      fprintf(stderr, "%s verify_error %llu\n", i->first.c_str(),
 
1006
        i->second.verify_error_count);
 
1007
    }
 
1008
    if (i->second.runtime_error_count) {
 
1009
      fprintf(stderr, "%s runtime_error %llu\n", i->first.c_str(),
 
1010
        i->second.runtime_error_count);
 
1011
    }
 
1012
    if (i->second.unknown_count) {
 
1013
      fprintf(stderr, "%s unknown %llu\n", i->first.c_str(),
 
1014
        i->second.unknown_count);
 
1015
    }
 
1016
    fprintf(stderr, "%s success %llu\n", i->first.c_str(),
 
1017
      i->second.success_count);
 
1018
    total.add(i->second);
 
1019
  }
 
1020
  if (total.verify_error_count != 0) {
 
1021
    fprintf(stderr, "TOTAL verify_error %llu\n", total.verify_error_count);
 
1022
  }
 
1023
  if (total.runtime_error_count != 0) {
 
1024
    fprintf(stderr, "TOTAL runtime_error %llu\n", total.runtime_error_count);
 
1025
  }
 
1026
  if (total.unknown_count != 0) {
 
1027
    fprintf(stderr, "TOTAL unknown %llu\n", total.unknown_count);
 
1028
  }
 
1029
  fprintf(stderr, "TOTAL success %llu\n", total.success_count);
 
1030
  mysql_library_end();
 
1031
  return 0;
 
1032
}
 
1033
 
 
1034
};
 
1035
 
 
1036
int
 
1037
main(int argc, char **argv)
 
1038
{
 
1039
  return dena::hs_longrun_main(argc, argv);
 
1040
}
 
1041