~vadim-tk/percona-server/percona-galera-5.1.57

« back to all changes in this revision

Viewing changes to storage/ndb/test/ndbapi/testDeadlock.cpp

  • Committer: root
  • Date: 2011-07-10 16:09:24 UTC
  • Revision ID: root@r815.office.percona.com-20110710160924-fyffqsbaclgu6vui
Initial port

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#include <ndb_global.h>
 
17
#include <NdbMain.h>
 
18
#include <NdbApi.hpp>
 
19
#include <NdbOut.hpp>
 
20
#include <NdbMutex.h>
 
21
#include <NdbCondition.h>
 
22
#include <NdbThread.h>
 
23
#include <NdbTest.hpp>
 
24
 
 
25
struct Opt {
 
26
  bool m_dbg;
 
27
  const char* m_scan;
 
28
  const char* m_tname;
 
29
  const char* m_xname;
 
30
  Opt() :
 
31
    m_dbg(true),
 
32
    m_scan("tx"),
 
33
    m_tname("T"),
 
34
    m_xname("X")
 
35
    {}
 
36
};
 
37
 
 
38
static void
 
39
printusage()
 
40
{
 
41
  Opt d;
 
42
  ndbout
 
43
    << "usage: testDeadlock" << endl
 
44
    << "-scan tx        scan table, index [" << d.m_scan << "]" << endl
 
45
    ;
 
46
}
 
47
 
 
48
static Opt g_opt;
 
49
 
 
50
static NdbMutex *ndbout_mutex= NULL;
 
51
static Ndb_cluster_connection *g_cluster_connection= 0;
 
52
#define DBG(x) \
 
53
  do { \
 
54
    if (! g_opt.m_dbg) break; \
 
55
    NdbMutex_Lock(ndbout_mutex); \
 
56
    ndbout << "line " << __LINE__ << " " << x << endl; \
 
57
    NdbMutex_Unlock(ndbout_mutex); \
 
58
  } while (0)
 
59
 
 
60
#define CHK(x) \
 
61
  do { \
 
62
    if (x) break; \
 
63
    ndbout << "line " << __LINE__ << ": " << #x << " failed" << endl; \
 
64
    return -1; \
 
65
  } while (0)
 
66
 
 
67
#define CHN(p, x) \
 
68
  do { \
 
69
    if (x) break; \
 
70
    ndbout << "line " << __LINE__ << ": " << #x << " failed" << endl; \
 
71
    ndbout << (p)->getNdbError() << endl; \
 
72
    return -1; \
 
73
  } while (0)
 
74
 
 
75
// threads
 
76
 
 
77
typedef int (*Runstep)(struct Thr& thr);
 
78
 
 
79
struct Thr {
 
80
  enum State { Wait, Start, Stop, Stopped, Exit };
 
81
  State m_state;
 
82
  int m_no;
 
83
  Runstep m_runstep;
 
84
  int m_ret;
 
85
  NdbMutex* m_mutex;
 
86
  NdbCondition* m_cond;
 
87
  NdbThread* m_thread;
 
88
  void* m_status;
 
89
  Ndb* m_ndb;
 
90
  NdbConnection* m_con;
 
91
  NdbScanOperation* m_scanop;
 
92
  NdbIndexScanOperation* m_indexscanop;
 
93
  //
 
94
  Thr(int no);
 
95
  ~Thr();
 
96
  int run();
 
97
  void start(Runstep runstep);
 
98
  void stop();
 
99
  void stopped();
 
100
  void lock() { NdbMutex_Lock(m_mutex); }
 
101
  void unlock() { NdbMutex_Unlock(m_mutex); }
 
102
  void wait() { NdbCondition_Wait(m_cond, m_mutex); }
 
103
  void signal() { NdbCondition_Signal(m_cond); }
 
104
  void exit();
 
105
  void join() { NdbThread_WaitFor(m_thread, &m_status); }
 
106
};
 
107
 
 
108
static NdbOut&
 
109
operator<<(NdbOut& out, const Thr& thr) {
 
110
  out << "thr " << thr.m_no;
 
111
  return out;
 
112
}
 
113
 
 
114
extern "C" { static void* runthread(void* arg); }
 
115
 
 
116
Thr::Thr(int no)
 
117
{
 
118
  m_state = Wait;
 
119
  m_no = no;
 
120
  m_runstep = 0;
 
121
  m_ret = 0;
 
122
  m_mutex = NdbMutex_Create();
 
123
  m_cond = NdbCondition_Create();
 
124
  assert(m_mutex != 0 && m_cond != 0);
 
125
  const unsigned stacksize = 256 * 1024;
 
126
  const NDB_THREAD_PRIO prio = NDB_THREAD_PRIO_LOW;
 
127
  m_thread = NdbThread_Create(runthread, (void**)this, stacksize, "me", prio);
 
128
  if (m_thread == 0) {
 
129
    DBG("create thread failed: errno=" << errno);
 
130
    m_ret = -1;
 
131
  }
 
132
  m_status = 0;
 
133
  m_ndb = 0;
 
134
  m_con = 0;
 
135
  m_scanop = 0;
 
136
  m_indexscanop = 0;
 
137
}
 
138
 
 
139
Thr::~Thr()
 
140
{
 
141
  if (m_thread != 0)
 
142
    NdbThread_Destroy(&m_thread);
 
143
  if (m_cond != 0)
 
144
    NdbCondition_Destroy(m_cond);
 
145
  if (m_mutex != 0)
 
146
    NdbMutex_Destroy(m_mutex);
 
147
}
 
148
 
 
149
static void*
 
150
runthread(void* arg) {
 
151
  Thr& thr = *(Thr*)arg;
 
152
  thr.run();
 
153
  return 0;
 
154
}
 
155
 
 
156
int
 
157
Thr::run()
 
158
{
 
159
  DBG(*this << " run");
 
160
  while (true) {
 
161
    lock();
 
162
    while (m_state != Start && m_state != Exit) {
 
163
      wait();
 
164
    }
 
165
    if (m_state == Exit) {
 
166
      DBG(*this << " exit");
 
167
      unlock();
 
168
      break;
 
169
    }
 
170
    m_ret = (*m_runstep)(*this);
 
171
    m_state = Stopped;
 
172
    signal();
 
173
    unlock();
 
174
    if (m_ret != 0) {
 
175
      DBG(*this << " error exit");
 
176
      break;
 
177
    }
 
178
  }
 
179
  delete m_ndb;
 
180
  m_ndb = 0;
 
181
  return 0;
 
182
}
 
183
 
 
184
void
 
185
Thr::start(Runstep runstep)
 
186
{
 
187
  lock();
 
188
  m_state = Start;
 
189
  m_runstep = runstep;
 
190
  signal();
 
191
  unlock();
 
192
}
 
193
 
 
194
void
 
195
Thr::stopped()
 
196
{
 
197
  lock();
 
198
  while (m_state != Stopped) {
 
199
    wait();
 
200
  }
 
201
  m_state = Wait;
 
202
  unlock();
 
203
}
 
204
 
 
205
void
 
206
Thr::exit()
 
207
{
 
208
  lock();
 
209
  m_state = Exit;
 
210
  signal();
 
211
  unlock();
 
212
}
 
213
 
 
214
// general
 
215
 
 
216
static int
 
217
runstep_connect(Thr& thr)
 
218
{
 
219
  Ndb* ndb = thr.m_ndb = new Ndb(g_cluster_connection, "TEST_DB");
 
220
  CHN(ndb, ndb->init() == 0);
 
221
  CHN(ndb, ndb->waitUntilReady() == 0);
 
222
  DBG(thr << " connected");
 
223
  return 0;
 
224
}
 
225
 
 
226
static int
 
227
runstep_starttx(Thr& thr)
 
228
{
 
229
  Ndb* ndb = thr.m_ndb;
 
230
  assert(ndb != 0);
 
231
  CHN(ndb, (thr.m_con = ndb->startTransaction()) != 0);
 
232
  DBG("thr " << thr.m_no << " tx started");
 
233
  return 0;
 
234
}
 
235
 
 
236
/*
 
237
 * WL1822 flush locks
 
238
 *
 
239
 * Table T with 3 tuples X, Y, Z.
 
240
 * Two transactions (* = lock wait).
 
241
 *
 
242
 * - tx1 reads and locks Z
 
243
 * - tx2 scans X, Y, *Z
 
244
 * - tx2 returns X, Y before lock wait on Z
 
245
 * - tx1 reads and locks *X
 
246
 * - api asks for next tx2 result
 
247
 * - LQH unlocks X via ACC or TUX [*]
 
248
 * - tx1 gets lock on X
 
249
 * - tx1 returns X to api
 
250
 * - api commits tx1
 
251
 * - tx2 gets lock on Z
 
252
 * - tx2 returs Z to api
 
253
 *
 
254
 * The point is deadlock is avoided due to [*].
 
255
 * The test is for 1 db node and 1 fragment table.
 
256
 */
 
257
 
 
258
static char wl1822_scantx = 0;
 
259
 
 
260
static const Uint32 wl1822_valA[3] = { 0, 1, 2 };
 
261
static const Uint32 wl1822_valB[3] = { 3, 4, 5 };
 
262
 
 
263
static Uint32 wl1822_bufA = ~0;
 
264
static Uint32 wl1822_bufB = ~0;
 
265
 
 
266
// map scan row to key (A) and reverse
 
267
static unsigned wl1822_r2k[3] = { 0, 0, 0 };
 
268
static unsigned wl1822_k2r[3] = { 0, 0, 0 };
 
269
 
 
270
static int
 
271
wl1822_createtable(Thr& thr)
 
272
{
 
273
  Ndb* ndb = thr.m_ndb;
 
274
  assert(ndb != 0);
 
275
  NdbDictionary::Dictionary* dic = ndb->getDictionary();
 
276
  // drop T
 
277
  if (dic->getTable(g_opt.m_tname) != 0)
 
278
    CHN(dic, dic->dropTable(g_opt.m_tname) == 0);
 
279
  // create T
 
280
  NdbDictionary::Table tab(g_opt.m_tname);
 
281
  tab.setFragmentType(NdbDictionary::Object::FragAllSmall);
 
282
  { NdbDictionary::Column col("A");
 
283
    col.setType(NdbDictionary::Column::Unsigned);
 
284
    col.setPrimaryKey(true);
 
285
    tab.addColumn(col);
 
286
  }
 
287
  { NdbDictionary::Column col("B");
 
288
    col.setType(NdbDictionary::Column::Unsigned);
 
289
    col.setPrimaryKey(false);
 
290
    tab.addColumn(col);
 
291
  }
 
292
  CHN(dic, dic->createTable(tab) == 0);
 
293
  // create X
 
294
  NdbDictionary::Index ind(g_opt.m_xname);
 
295
  ind.setTable(g_opt.m_tname);
 
296
  ind.setType(NdbDictionary::Index::OrderedIndex);
 
297
  ind.setLogging(false);
 
298
  ind.addColumn("B");
 
299
  CHN(dic, dic->createIndex(ind) == 0);
 
300
  DBG("created " << g_opt.m_tname << ", " << g_opt.m_xname);
 
301
  return 0;
 
302
}
 
303
 
 
304
static int
 
305
wl1822_insertrows(Thr& thr)
 
306
{
 
307
  // insert X, Y, Z
 
308
  Ndb* ndb = thr.m_ndb;
 
309
  assert(ndb != 0);
 
310
  NdbConnection* con;
 
311
  NdbOperation* op;
 
312
  for (unsigned k = 0; k < 3; k++) {
 
313
    CHN(ndb, (con = ndb->startTransaction()) != 0);
 
314
    CHN(con, (op = con->getNdbOperation(g_opt.m_tname)) != 0);
 
315
    CHN(op, op->insertTuple() == 0);
 
316
    CHN(op, op->equal("A", (char*)&wl1822_valA[k]) == 0);
 
317
    CHN(op, op->setValue("B", (char*)&wl1822_valB[k]) == 0);
 
318
    CHN(con, con->execute(Commit) == 0);
 
319
    ndb->closeTransaction(con);
 
320
  }
 
321
  DBG("inserted X, Y, Z");
 
322
  return 0;
 
323
}
 
324
 
 
325
static int
 
326
wl1822_getscanorder(Thr& thr)
 
327
{
 
328
  // cheat, table order happens to be key order in my test
 
329
  wl1822_r2k[0] = 0;
 
330
  wl1822_r2k[1] = 1;
 
331
  wl1822_r2k[2] = 2;
 
332
  wl1822_k2r[0] = 0;
 
333
  wl1822_k2r[1] = 1;
 
334
  wl1822_k2r[2] = 2;
 
335
  DBG("scan order determined");
 
336
  return 0;
 
337
}
 
338
 
 
339
static int
 
340
wl1822_tx1_readZ(Thr& thr)
 
341
{
 
342
  // tx1 read Z with exclusive lock
 
343
  NdbConnection* con = thr.m_con;
 
344
  assert(con != 0);
 
345
  NdbOperation* op;
 
346
  CHN(con, (op = con->getNdbOperation(g_opt.m_tname)) != 0);
 
347
  CHN(op, op->readTupleExclusive() == 0);
 
348
  CHN(op, op->equal("A", wl1822_valA[wl1822_r2k[2]]) == 0);
 
349
  wl1822_bufB = ~0;
 
350
  CHN(op, op->getValue("B", (char*)&wl1822_bufB) != 0);
 
351
  CHN(con, con->execute(NoCommit) == 0);
 
352
  CHK(wl1822_bufB == wl1822_valB[wl1822_r2k[2]]);
 
353
  DBG("tx1 locked Z");
 
354
  return 0;
 
355
}
 
356
 
 
357
static int
 
358
wl1822_tx2_scanXY(Thr& thr)
 
359
{
 
360
  // tx2 scan X, Y with exclusive lock
 
361
  NdbConnection* con = thr.m_con;
 
362
  assert(con != 0);
 
363
  NdbScanOperation* scanop;
 
364
  NdbIndexScanOperation* indexscanop;
 
365
  NdbResultSet* rs;
 
366
  if (wl1822_scantx == 't') {
 
367
    CHN(con, (scanop = thr.m_scanop = con->getNdbScanOperation(g_opt.m_tname)) != 0);
 
368
    DBG("tx2 scan exclusive " << g_opt.m_tname);
 
369
  }
 
370
  if (wl1822_scantx == 'x') {
 
371
    CHN(con, (scanop = thr.m_scanop = indexscanop = thr.m_indexscanop = con->getNdbIndexScanOperation(g_opt.m_xname, g_opt.m_tname)) != 0);
 
372
    DBG("tx2 scan exclusive " << g_opt.m_xname);
 
373
  }
 
374
  CHN(scanop, scanop->readTuplesExclusive(16) == 0);
 
375
  CHN(scanop, scanop->getValue("A", (char*)&wl1822_bufA) != 0);
 
376
  CHN(scanop, scanop->getValue("B", (char*)&wl1822_bufB) != 0);
 
377
  CHN(con, con->execute(NoCommit) == 0);
 
378
  unsigned row = 0;
 
379
  while (row < 2) {
 
380
    DBG("before row " << row);
 
381
    int ret;
 
382
    wl1822_bufA = wl1822_bufB = ~0;
 
383
    CHN(con, (ret = scanop->nextResult(true)) == 0);
 
384
    DBG("got row " << row << " a=" << wl1822_bufA << " b=" << wl1822_bufB);
 
385
    CHK(wl1822_bufA == wl1822_valA[wl1822_r2k[row]]);
 
386
    CHK(wl1822_bufB == wl1822_valB[wl1822_r2k[row]]);
 
387
    row++;
 
388
  }
 
389
  return 0;
 
390
}
 
391
 
 
392
static int
 
393
wl1822_tx1_readX_commit(Thr& thr)
 
394
{
 
395
  // tx1 read X with exclusive lock and commit
 
396
  NdbConnection* con = thr.m_con;
 
397
  assert(con != 0);
 
398
  NdbOperation* op;
 
399
  CHN(con, (op = con->getNdbOperation(g_opt.m_tname)) != 0);
 
400
  CHN(op, op->readTupleExclusive() == 0);
 
401
  CHN(op, op->equal("A", wl1822_valA[wl1822_r2k[2]]) == 0);
 
402
  wl1822_bufB = ~0;
 
403
  CHN(op, op->getValue("B", (char*)&wl1822_bufB) != 0);
 
404
  CHN(con, con->execute(NoCommit) == 0);
 
405
  CHK(wl1822_bufB == wl1822_valB[wl1822_r2k[2]]);
 
406
  DBG("tx1 locked X");
 
407
  CHN(con, con->execute(Commit) == 0);
 
408
  DBG("tx1 commit");
 
409
  return 0;
 
410
}
 
411
 
 
412
static int
 
413
wl1822_tx2_scanZ_close(Thr& thr)
 
414
{
 
415
  // tx2 scan Z with exclusive lock and close scan
 
416
  Ndb* ndb = thr.m_ndb;
 
417
  NdbConnection* con = thr.m_con;
 
418
  NdbScanOperation* scanop = thr.m_scanop;
 
419
  assert(ndb != 0 && con != 0 && scanop != 0);
 
420
  unsigned row = 2;
 
421
  while (true) {
 
422
    DBG("before row " << row);
 
423
    int ret;
 
424
    wl1822_bufA = wl1822_bufB = ~0;
 
425
    CHN(con, (ret = scanop->nextResult(true)) == 0 || ret == 1);
 
426
    if (ret == 1)
 
427
      break;
 
428
    DBG("got row " << row << " a=" << wl1822_bufA << " b=" << wl1822_bufB);
 
429
    CHK(wl1822_bufA == wl1822_valA[wl1822_r2k[row]]);
 
430
    CHK(wl1822_bufB == wl1822_valB[wl1822_r2k[row]]);
 
431
    row++;
 
432
  }
 
433
  ndb->closeTransaction(con);
 
434
  CHK(row == 3);
 
435
  return 0;
 
436
}
 
437
 
 
438
// threads are synced between each step
 
439
static Runstep wl1822_step[][2] = {
 
440
  { runstep_connect, runstep_connect },
 
441
  { wl1822_createtable, 0 },
 
442
  { wl1822_insertrows, 0 },
 
443
  { wl1822_getscanorder, 0 },
 
444
  { runstep_starttx, runstep_starttx },
 
445
  { wl1822_tx1_readZ, 0 },
 
446
  { 0, wl1822_tx2_scanXY },
 
447
  { wl1822_tx1_readX_commit, wl1822_tx2_scanZ_close }
 
448
};
 
449
const unsigned wl1822_stepcount = sizeof(wl1822_step)/sizeof(wl1822_step[0]);
 
450
 
 
451
static int
 
452
wl1822_main(char scantx)
 
453
{
 
454
  wl1822_scantx = scantx;
 
455
  static const unsigned thrcount = 2;
 
456
  // create threads for tx1 and tx2
 
457
  Thr* thrlist[2];
 
458
  int n;
 
459
  for (n = 0; n < thrcount; n++) {
 
460
    Thr& thr = *(thrlist[n] = new Thr(1 + n));
 
461
    CHK(thr.m_ret == 0);
 
462
  }
 
463
  // run the steps
 
464
  for (unsigned i = 0; i < wl1822_stepcount; i++) {
 
465
    DBG("step " << i << " start");
 
466
    for (n = 0; n < thrcount; n++) {
 
467
      Thr& thr = *thrlist[n];
 
468
      Runstep runstep = wl1822_step[i][n];
 
469
      if (runstep != 0)
 
470
        thr.start(runstep);
 
471
    }
 
472
    for (n = 0; n < thrcount; n++) {
 
473
      Thr& thr = *thrlist[n];
 
474
      Runstep runstep = wl1822_step[i][n];
 
475
      if (runstep != 0)
 
476
        thr.stopped();
 
477
    }
 
478
  }
 
479
  // delete threads
 
480
  for (n = 0; n < thrcount; n++) {
 
481
    Thr& thr = *thrlist[n];
 
482
    thr.exit();
 
483
    thr.join();
 
484
    delete &thr;
 
485
  }
 
486
  return 0;
 
487
}
 
488
 
 
489
NDB_COMMAND(testOdbcDriver, "testDeadlock", "testDeadlock", "testDeadlock", 65535)
 
490
{
 
491
  ndb_init();
 
492
  if (ndbout_mutex == NULL)
 
493
    ndbout_mutex= NdbMutex_Create();
 
494
  while (++argv, --argc > 0) {
 
495
    const char* arg = argv[0];
 
496
    if (strcmp(arg, "-scan") == 0) {
 
497
      if (++argv, --argc > 0) {
 
498
        g_opt.m_scan = strdup(argv[0]);
 
499
        continue;
 
500
      }
 
501
    }
 
502
    printusage();
 
503
    return NDBT_ProgramExit(NDBT_WRONGARGS);
 
504
  }
 
505
 
 
506
  Ndb_cluster_connection con;
 
507
  if(con.connect(12, 5, 1) != 0)
 
508
  {
 
509
    return NDBT_ProgramExit(NDBT_FAILED);
 
510
  }
 
511
  g_cluster_connection= &con;
 
512
  
 
513
  if (
 
514
      strchr(g_opt.m_scan, 't') != 0 && wl1822_main('t') == -1 ||
 
515
      strchr(g_opt.m_scan, 'x') != 0 && wl1822_main('x') == -1
 
516
      ) {
 
517
    return NDBT_ProgramExit(NDBT_FAILED);
 
518
  }
 
519
  return NDBT_ProgramExit(NDBT_OK);
 
520
}
 
521
 
 
522
// vim: set sw=2 et: