~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to storage/ndb/test/ndbapi/testOIBasic.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
/*
 
17
 * testOIBasic - ordered index test
 
18
 */
 
19
 
 
20
#include <ndb_global.h>
 
21
 
 
22
#include <NdbMain.h>
 
23
#include <NdbOut.hpp>
 
24
#include <NdbApi.hpp>
 
25
#include <NdbTest.hpp>
 
26
#include <NdbMutex.h>
 
27
#include <NdbCondition.h>
 
28
#include <NdbThread.h>
 
29
#include <NdbTick.h>
 
30
#include <NdbSleep.h>
 
31
#include <my_sys.h>
 
32
#include <NdbSqlUtil.hpp>
 
33
#include <ndb_version.h>
 
34
 
 
35
// options
 
36
 
 
37
struct Opt {
 
38
  // common options
 
39
  uint m_batch;
 
40
  const char* m_bound;
 
41
  const char* m_case;
 
42
  bool m_collsp;
 
43
  bool m_cont;
 
44
  bool m_core;
 
45
  const char* m_csname;
 
46
  CHARSET_INFO* m_cs;
 
47
  int m_die;
 
48
  bool m_dups;
 
49
  NdbDictionary::Object::FragmentType m_fragtype;
 
50
  const char* m_index;
 
51
  uint m_loop;
 
52
  bool m_msglock;
 
53
  bool m_nologging;
 
54
  bool m_noverify;
 
55
  uint m_pctnull;
 
56
  uint m_rows;
 
57
  uint m_samples;
 
58
  uint m_scanbatch;
 
59
  uint m_scanpar;
 
60
  uint m_scanstop;
 
61
  int m_seed;
 
62
  const char* m_skip;
 
63
  uint m_sloop;
 
64
  uint m_ssloop;
 
65
  const char* m_table;
 
66
  uint m_threads;
 
67
  int m_v;      // int for lint
 
68
  Opt() :
 
69
    m_batch(32),
 
70
    m_bound("01234"),
 
71
    m_case(0),
 
72
    m_collsp(false),
 
73
    m_cont(false),
 
74
    m_core(false),
 
75
    m_csname("random"),
 
76
    m_cs(0),
 
77
    m_die(0),
 
78
    m_dups(false),
 
79
    m_fragtype(NdbDictionary::Object::FragUndefined),
 
80
    m_index(0),
 
81
    m_loop(1),
 
82
    m_msglock(true),
 
83
    m_nologging(false),
 
84
    m_noverify(false),
 
85
    m_pctnull(10),
 
86
    m_rows(1000),
 
87
    m_samples(0),
 
88
    m_scanbatch(0),
 
89
    m_scanpar(0),
 
90
    m_scanstop(0),
 
91
    m_seed(-1),
 
92
    m_skip(0),
 
93
    m_sloop(4),
 
94
    m_ssloop(4),
 
95
    m_table(0),
 
96
    m_threads(4),
 
97
    m_v(1) {
 
98
  }
 
99
};
 
100
 
 
101
static Opt g_opt;
 
102
 
 
103
static void printcases();
 
104
static void printtables();
 
105
 
 
106
static void
 
107
printhelp()
 
108
{
 
109
  Opt d;
 
110
  ndbout
 
111
    << "usage: testOIbasic [options]" << endl
 
112
    << "  -batch N      pk operations in batch [" << d.m_batch << "]" << endl
 
113
    << "  -bound xyz    use only these bound types 0-4 [" << d.m_bound << "]" << endl
 
114
    << "  -case abc     only given test cases (letters a-z)" << endl
 
115
    << "  -collsp       use strnncollsp instead of strnxfrm" << endl
 
116
    << "  -cont         on error continue to next test case [" << d.m_cont << "]" << endl
 
117
    << "  -core         core dump on error [" << d.m_core << "]" << endl
 
118
    << "  -csname S     charset or collation [" << d.m_csname << "]" << endl
 
119
    << "  -die nnn      exit immediately on NDB error code nnn" << endl
 
120
    << "  -dups         allow duplicate tuples from index scan [" << d.m_dups << "]" << endl
 
121
    << "  -fragtype T   fragment type single/small/medium/large" << endl
 
122
    << "  -index xyz    only given index numbers (digits 0-9)" << endl
 
123
    << "  -loop N       loop count full suite 0=forever [" << d.m_loop << "]" << endl
 
124
    << "  -nologging    create tables in no-logging mode" << endl
 
125
    << "  -noverify     skip index verifications" << endl
 
126
    << "  -pctnull N    pct NULL values in nullable column [" << d.m_pctnull << "]" << endl
 
127
    << "  -rows N       rows per thread [" << d.m_rows << "]" << endl
 
128
    << "  -samples N    samples for some timings (0=all) [" << d.m_samples << "]" << endl
 
129
    << "  -scanbatch N  scan batch 0=default [" << d.m_scanbatch << "]" << endl
 
130
    << "  -scanpar N    scan parallel 0=default [" << d.m_scanpar << "]" << endl
 
131
    << "  -seed N       srandom seed 0=loop number -1=random [" << d.m_seed << "]" << endl
 
132
    << "  -skip abc     skip given test cases (letters a-z)" << endl
 
133
    << "  -sloop N      level 2 (sub)loop count [" << d.m_sloop << "]" << endl
 
134
    << "  -ssloop N     level 3 (sub)loop count [" << d.m_ssloop << "]" << endl
 
135
    << "  -table xyz    only given table numbers (digits 0-9)" << endl
 
136
    << "  -threads N    number of threads [" << d.m_threads << "]" << endl
 
137
    << "  -vN           verbosity [" << d.m_v << "]" << endl
 
138
    << "  -h or -help   print this help text" << endl
 
139
    ;
 
140
  printcases();
 
141
  printtables();
 
142
}
 
143
 
 
144
// not yet configurable
 
145
static const bool g_store_null_key = true;
 
146
 
 
147
// compare NULL like normal value (NULL < not NULL, NULL == NULL)
 
148
static const bool g_compare_null = true;
 
149
 
 
150
static const char* hexstr = "0123456789abcdef";
 
151
 
 
152
// random ints
 
153
 
 
154
static uint
 
155
urandom(uint n)
 
156
{
 
157
  if (n == 0)
 
158
    return 0;
 
159
  uint i = random() % n;
 
160
  return i;
 
161
}
 
162
 
 
163
static int
 
164
irandom(uint n)
 
165
{
 
166
  if (n == 0)
 
167
    return 0;
 
168
  int i = random() % n;
 
169
  if (random() & 0x1)
 
170
    i = -i;
 
171
  return i;
 
172
}
 
173
 
 
174
static bool
 
175
randompct(uint pct)
 
176
{
 
177
  if (pct == 0)
 
178
    return false;
 
179
  if (pct >= 100)
 
180
    return true;
 
181
  return urandom(100) < pct;
 
182
}
 
183
 
 
184
static uint
 
185
random_coprime(uint n)
 
186
{
 
187
    uint prime[] = { 101, 211, 307, 401, 503, 601, 701, 809, 907 };
 
188
    uint count = sizeof(prime) / sizeof(prime[0]);
 
189
    if (n == 0)
 
190
      return 0;
 
191
    while (1) {
 
192
      uint i = urandom(count);
 
193
      if (n % prime[i] != 0)
 
194
        return prime[i];
 
195
    }
 
196
}
 
197
 
 
198
// random re-sequence of 0...(n-1)
 
199
 
 
200
struct Rsq {
 
201
  Rsq(uint n);
 
202
  uint next();
 
203
private:
 
204
  uint m_n;
 
205
  uint m_i;
 
206
  uint m_start;
 
207
  uint m_prime;
 
208
};
 
209
 
 
210
Rsq::Rsq(uint n)
 
211
{
 
212
  m_n = n;
 
213
  m_i = 0;
 
214
  m_start = urandom(n);
 
215
  m_prime = random_coprime(n);
 
216
}
 
217
 
 
218
uint
 
219
Rsq::next()
 
220
{
 
221
  assert(m_n != 0);
 
222
  return (m_start + m_i++ * m_prime) % m_n;
 
223
}
 
224
 
 
225
// log and error macros
 
226
 
 
227
static NdbMutex *ndbout_mutex = NULL;
 
228
static const char* getthrprefix();
 
229
 
 
230
#define LLN(n, s) \
 
231
  do { \
 
232
    if ((n) > g_opt.m_v) break; \
 
233
    if (g_opt.m_msglock) NdbMutex_Lock(ndbout_mutex); \
 
234
    ndbout << getthrprefix(); \
 
235
    if ((n) > 2) \
 
236
      ndbout << "line " << __LINE__ << ": "; \
 
237
    ndbout << s << endl; \
 
238
    if (g_opt.m_msglock) NdbMutex_Unlock(ndbout_mutex); \
 
239
  } while(0)
 
240
 
 
241
#define LL0(s) LLN(0, s)
 
242
#define LL1(s) LLN(1, s)
 
243
#define LL2(s) LLN(2, s)
 
244
#define LL3(s) LLN(3, s)
 
245
#define LL4(s) LLN(4, s)
 
246
#define LL5(s) LLN(5, s)
 
247
 
 
248
#define HEX(x)  hex << (x) << dec
 
249
 
 
250
// following check a condition and return -1 on failure
 
251
 
 
252
#undef CHK      // simple check
 
253
#undef CHKTRY   // check with action on fail
 
254
#undef CHKCON   // print NDB API errors on failure
 
255
 
 
256
#define CHK(x)  CHKTRY(x, ;)
 
257
 
 
258
#define CHKTRY(x, act) \
 
259
  do { \
 
260
    if (x) break; \
 
261
    LL0("line " << __LINE__ << ": " << #x << " failed"); \
 
262
    if (g_opt.m_core) abort(); \
 
263
    act; \
 
264
    return -1; \
 
265
  } while (0)
 
266
 
 
267
#define CHKCON(x, con) \
 
268
  do { \
 
269
    if (x) break; \
 
270
    LL0("line " << __LINE__ << ": " << #x << " failed"); \
 
271
    (con).printerror(ndbout); \
 
272
    if (g_opt.m_core) abort(); \
 
273
    return -1; \
 
274
  } while (0)
 
275
 
 
276
// method parameters
 
277
 
 
278
class Thr;
 
279
class Con;
 
280
class Tab;
 
281
class ITab;
 
282
class Set;
 
283
class Tmr;
 
284
 
 
285
struct Par : public Opt {
 
286
  uint m_no;
 
287
  Con* m_con;
 
288
  Con& con() const { assert(m_con != 0); return *m_con; }
 
289
  const Tab* m_tab;
 
290
  const Tab& tab() const { assert(m_tab != 0); return *m_tab; }
 
291
  const ITab* m_itab;
 
292
  const ITab& itab() const { assert(m_itab != 0); return *m_itab; }
 
293
  Set* m_set;
 
294
  Set& set() const { assert(m_set != 0); return *m_set; }
 
295
  Tmr* m_tmr;
 
296
  Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; }
 
297
  char m_currcase[2];
 
298
  uint m_lno;
 
299
  uint m_slno;
 
300
  uint m_totrows;
 
301
  // value calculation
 
302
  uint m_range;
 
303
  uint m_pctrange;
 
304
  uint m_pctbrange;
 
305
  int m_bdir;
 
306
  bool m_noindexkeyupdate;
 
307
  // choice of key
 
308
  bool m_randomkey;
 
309
  // do verify after read
 
310
  bool m_verify;
 
311
  // errors to catch (see Con)
 
312
  bool m_catcherr;
 
313
  // abort percentage
 
314
  uint m_abortpct;
 
315
  NdbOperation::LockMode m_lockmode;
 
316
  // scan options
 
317
  bool m_tupscan;
 
318
  bool m_ordered;
 
319
  bool m_descending;
 
320
  // threads used by current test case
 
321
  uint m_usedthreads;
 
322
  Par(const Opt& opt) :
 
323
    Opt(opt),
 
324
    m_no(0),
 
325
    m_con(0),
 
326
    m_tab(0),
 
327
    m_itab(0),
 
328
    m_set(0),
 
329
    m_tmr(0),
 
330
    m_lno(0),
 
331
    m_slno(0),
 
332
    m_totrows(0),
 
333
    m_range(m_rows),
 
334
    m_pctrange(40),
 
335
    m_pctbrange(80),
 
336
    m_bdir(0),
 
337
    m_noindexkeyupdate(false),
 
338
    m_randomkey(false),
 
339
    m_verify(false),
 
340
    m_catcherr(0),
 
341
    m_abortpct(0),
 
342
    m_lockmode(NdbOperation::LM_Read),
 
343
    m_tupscan(false),
 
344
    m_ordered(false),
 
345
    m_descending(false),
 
346
    m_usedthreads(0)
 
347
  {
 
348
    m_currcase[0] = 0;
 
349
  }
 
350
};
 
351
 
 
352
static bool
 
353
usetable(Par par, uint i)
 
354
{
 
355
  return par.m_table == 0 || strchr(par.m_table, '0' + i) != 0;
 
356
}
 
357
 
 
358
static bool
 
359
useindex(Par par, uint i)
 
360
{
 
361
  return par.m_index == 0 || strchr(par.m_index, '0' + i) != 0;
 
362
}
 
363
 
 
364
static uint
 
365
thrrow(Par par, uint j)
 
366
{
 
367
  return par.m_usedthreads * j + par.m_no;
 
368
}
 
369
 
 
370
static bool
 
371
isthrrow(Par par, uint i)
 
372
{
 
373
  return i % par.m_usedthreads == par.m_no;
 
374
}
 
375
 
 
376
// timer
 
377
 
 
378
struct Tmr {
 
379
  void clr();
 
380
  void on();
 
381
  void off(uint cnt = 0);
 
382
  const char* time();
 
383
  const char* pct(const Tmr& t1);
 
384
  const char* over(const Tmr& t1);
 
385
  NDB_TICKS m_on;
 
386
  uint m_ms;
 
387
  uint m_cnt;
 
388
  char m_time[100];
 
389
  char m_text[100];
 
390
  Tmr() { clr(); }
 
391
};
 
392
 
 
393
void
 
394
Tmr::clr()
 
395
{
 
396
  m_on = m_ms = m_cnt = m_time[0] = m_text[0] = 0;
 
397
}
 
398
 
 
399
void
 
400
Tmr::on()
 
401
{
 
402
  assert(m_on == 0);
 
403
  m_on = NdbTick_CurrentMillisecond();
 
404
}
 
405
 
 
406
void
 
407
Tmr::off(uint cnt)
 
408
{
 
409
  NDB_TICKS off = NdbTick_CurrentMillisecond();
 
410
  assert(m_on != 0 && off >= m_on);
 
411
  m_ms += off - m_on;
 
412
  m_cnt += cnt;
 
413
  m_on = 0;
 
414
}
 
415
 
 
416
const char*
 
417
Tmr::time()
 
418
{
 
419
  if (m_cnt == 0) {
 
420
    sprintf(m_time, "%u ms", m_ms);
 
421
  } else {
 
422
    sprintf(m_time, "%u ms per %u ( %u ms per 1000 )", m_ms, m_cnt, (1000 * m_ms) / m_cnt);
 
423
  }
 
424
  return m_time;
 
425
}
 
426
 
 
427
const char*
 
428
Tmr::pct(const Tmr& t1)
 
429
{
 
430
  if (0 < t1.m_ms) {
 
431
    sprintf(m_text, "%u pct", (100 * m_ms) / t1.m_ms);
 
432
  } else {
 
433
    sprintf(m_text, "[cannot measure]");
 
434
  }
 
435
  return m_text;
 
436
}
 
437
 
 
438
const char*
 
439
Tmr::over(const Tmr& t1)
 
440
{
 
441
  if (0 < t1.m_ms) {
 
442
    if (t1.m_ms <= m_ms)
 
443
      sprintf(m_text, "%u pct", (100 * (m_ms - t1.m_ms)) / t1.m_ms);
 
444
    else
 
445
      sprintf(m_text, "-%u pct", (100 * (t1.m_ms - m_ms)) / t1.m_ms);
 
446
  } else {
 
447
    sprintf(m_text, "[cannot measure]");
 
448
  }
 
449
  return m_text;
 
450
}
 
451
 
 
452
// character sets
 
453
 
 
454
static const uint maxcsnumber = 512;
 
455
static const uint maxcharcount = 32;
 
456
static const uint maxcharsize = 4;
 
457
static const uint maxxmulsize = 8;
 
458
 
 
459
// single mb char
 
460
struct Chr {
 
461
  uchar m_bytes[maxcharsize];
 
462
  uchar m_xbytes[maxxmulsize * maxcharsize];
 
463
  uint m_size;
 
464
  Chr();
 
465
};
 
466
 
 
467
Chr::Chr()
 
468
{
 
469
  memset(m_bytes, 0, sizeof(m_bytes));
 
470
  memset(m_xbytes, 0, sizeof(m_xbytes));
 
471
  m_size = 0;
 
472
}
 
473
 
 
474
// charset and random valid chars to use
 
475
struct Chs {
 
476
  CHARSET_INFO* m_cs;
 
477
  uint m_xmul;
 
478
  Chr* m_chr;
 
479
  Chs(CHARSET_INFO* cs);
 
480
  ~Chs();
 
481
};
 
482
 
 
483
static NdbOut&
 
484
operator<<(NdbOut& out, const Chs& chs);
 
485
 
 
486
Chs::Chs(CHARSET_INFO* cs) :
 
487
  m_cs(cs)
 
488
{
 
489
  m_xmul = m_cs->strxfrm_multiply;
 
490
  if (m_xmul == 0)
 
491
    m_xmul = 1;
 
492
  assert(m_xmul <= maxxmulsize);
 
493
  m_chr = new Chr [maxcharcount];
 
494
  uint i = 0;
 
495
  uint miss1 = 0;
 
496
  uint miss2 = 0;
 
497
  uint miss3 = 0;
 
498
  uint miss4 = 0;
 
499
  while (i < maxcharcount) {
 
500
    uchar* bytes = m_chr[i].m_bytes;
 
501
    uchar* xbytes = m_chr[i].m_xbytes;
 
502
    uint& size = m_chr[i].m_size;
 
503
    bool ok;
 
504
    size = m_cs->mbminlen + urandom(m_cs->mbmaxlen - m_cs->mbminlen + 1);
 
505
    assert(m_cs->mbminlen <= size && size <= m_cs->mbmaxlen);
 
506
    // prefer longer chars
 
507
    if (size == m_cs->mbminlen && m_cs->mbminlen < m_cs->mbmaxlen && urandom(5) != 0)
 
508
      continue;
 
509
    for (uint j = 0; j < size; j++) {
 
510
      bytes[j] = urandom(256);
 
511
    }
 
512
    int not_used;
 
513
    // check wellformed
 
514
    const char* sbytes = (const char*)bytes;
 
515
    if ((*cs->cset->well_formed_len)(cs, sbytes, sbytes + size, 1, &not_used) != size) {
 
516
      miss1++;
 
517
      continue;
 
518
    }
 
519
    // check no proper prefix wellformed
 
520
    ok = true;
 
521
    for (uint j = 1; j < size; j++) {
 
522
      if ((*cs->cset->well_formed_len)(cs, sbytes, sbytes + j, 1, &not_used) == j) {
 
523
        ok = false;
 
524
        break;
 
525
      }
 
526
    }
 
527
    if (!ok) {
 
528
      miss2++;
 
529
      continue;
 
530
    }
 
531
    // normalize
 
532
    memset(xbytes, 0, sizeof(xbytes));
 
533
    // currently returns buffer size always
 
534
    int xlen = (*cs->coll->strnxfrm)(cs, xbytes, m_xmul * size, bytes, size);
 
535
    // check we got something
 
536
    ok = false;
 
537
    for (uint j = 0; j < xlen; j++) {
 
538
      if (xbytes[j] != 0) {
 
539
        ok = true;
 
540
        break;
 
541
      }
 
542
    }
 
543
    if (!ok) {
 
544
      miss3++;
 
545
      continue;
 
546
    }
 
547
    // check for duplicate (before normalize)
 
548
    ok = true;
 
549
    for (uint j = 0; j < i; j++) {
 
550
      const Chr& chr = m_chr[j];
 
551
      if (chr.m_size == size && memcmp(chr.m_bytes, bytes, size) == 0) {
 
552
        ok = false;
 
553
        break;
 
554
      }
 
555
    }
 
556
    if (!ok) {
 
557
      miss4++;
 
558
      continue;
 
559
    }
 
560
    i++;
 
561
  }
 
562
  bool disorder = true;
 
563
  uint bubbles = 0;
 
564
  while (disorder) {
 
565
    disorder = false;
 
566
    for (uint i = 1; i < maxcharcount; i++) {
 
567
      uint len = sizeof(m_chr[i].m_xbytes);
 
568
      if (memcmp(m_chr[i-1].m_xbytes, m_chr[i].m_xbytes, len) > 0) {
 
569
        Chr chr = m_chr[i];
 
570
        m_chr[i] = m_chr[i-1];
 
571
        m_chr[i-1] = chr;
 
572
        disorder = true;
 
573
        bubbles++;
 
574
      }
 
575
    }
 
576
  }
 
577
  LL3("inited charset " << *this << " miss=" << miss1 << "," << miss2 << "," << miss3 << "," << miss4 << " bubbles=" << bubbles);
 
578
}
 
579
 
 
580
Chs::~Chs()
 
581
{
 
582
  delete [] m_chr;
 
583
}
 
584
 
 
585
static NdbOut&
 
586
operator<<(NdbOut& out, const Chs& chs)
 
587
{
 
588
  CHARSET_INFO* cs = chs.m_cs;
 
589
  out << cs->name << "[" << cs->mbminlen << "-" << cs->mbmaxlen << "," << chs.m_xmul << "]";
 
590
  return out;
 
591
}
 
592
 
 
593
static Chs* cslist[maxcsnumber];
 
594
 
 
595
static void
 
596
resetcslist()
 
597
{
 
598
  for (uint i = 0; i < maxcsnumber; i++) {
 
599
    delete cslist[i];
 
600
    cslist[i] = 0;
 
601
  }
 
602
}
 
603
 
 
604
static Chs*
 
605
getcs(Par par)
 
606
{
 
607
  CHARSET_INFO* cs;
 
608
  if (par.m_cs != 0) {
 
609
    cs = par.m_cs;
 
610
  } else {
 
611
    while (1) {
 
612
      uint n = urandom(maxcsnumber);
 
613
      cs = get_charset(n, MYF(0));
 
614
      if (cs != 0) {
 
615
        // prefer complex charsets
 
616
        if (cs->mbmaxlen != 1 || urandom(5) == 0)
 
617
          break;
 
618
      }
 
619
    }
 
620
  }
 
621
  if (cslist[cs->number] == 0)
 
622
    cslist[cs->number] = new Chs(cs);
 
623
  return cslist[cs->number];
 
624
}
 
625
 
 
626
// tables and indexes
 
627
 
 
628
// Col - table column
 
629
 
 
630
struct Col {
 
631
  enum Type {
 
632
    Unsigned = NdbDictionary::Column::Unsigned,
 
633
    Char = NdbDictionary::Column::Char,
 
634
    Varchar = NdbDictionary::Column::Varchar,
 
635
    Longvarchar = NdbDictionary::Column::Longvarchar
 
636
  };
 
637
  const class Tab& m_tab;
 
638
  uint m_num;
 
639
  const char* m_name;
 
640
  bool m_pk;
 
641
  Type m_type;
 
642
  uint m_length;
 
643
  uint m_bytelength;        // multiplied by char width
 
644
  uint m_attrsize;          // base type size
 
645
  uint m_headsize;          // length bytes
 
646
  uint m_bytesize;          // full value size
 
647
  bool m_nullable;
 
648
  const Chs* m_chs;
 
649
  Col(const class Tab& tab, uint num, const char* name, bool pk, Type type, uint length, bool nullable, const Chs* chs);
 
650
  ~Col();
 
651
  bool equal(const Col& col2) const;
 
652
  void wellformed(const void* addr) const;
 
653
};
 
654
 
 
655
Col::Col(const class Tab& tab, uint num, const char* name, bool pk, Type type, uint length, bool nullable, const Chs* chs) :
 
656
  m_tab(tab),
 
657
  m_num(num),
 
658
  m_name(strcpy(new char [strlen(name) + 1], name)),
 
659
  m_pk(pk),
 
660
  m_type(type),
 
661
  m_length(length),
 
662
  m_bytelength(length * (chs == 0 ? 1 : chs->m_cs->mbmaxlen)),
 
663
  m_attrsize(
 
664
      type == Unsigned ? sizeof(Uint32) :
 
665
      type == Char ? sizeof(char) :
 
666
      type == Varchar ? sizeof(char) :
 
667
      type == Longvarchar ? sizeof(char) : ~0),
 
668
  m_headsize(
 
669
      type == Unsigned ? 0 :
 
670
      type == Char ? 0 :
 
671
      type == Varchar ? 1 :
 
672
      type == Longvarchar ? 2 : ~0),
 
673
  m_bytesize(m_headsize + m_attrsize * m_bytelength),
 
674
  m_nullable(nullable),
 
675
  m_chs(chs)
 
676
{
 
677
  // fix long varchar
 
678
  if (type == Varchar && m_bytelength > 255) {
 
679
    m_type = Longvarchar;
 
680
    m_headsize += 1;
 
681
    m_bytesize += 1;
 
682
  }
 
683
}
 
684
 
 
685
Col::~Col()
 
686
{
 
687
  delete [] m_name;
 
688
}
 
689
 
 
690
bool
 
691
Col::equal(const Col& col2) const
 
692
{
 
693
  return m_type == col2.m_type && m_length == col2.m_length && m_chs == col2.m_chs;
 
694
}
 
695
 
 
696
void
 
697
Col::wellformed(const void* addr) const
 
698
{
 
699
  switch (m_type) {
 
700
  case Col::Unsigned:
 
701
    break;
 
702
  case Col::Char:
 
703
    {
 
704
      CHARSET_INFO* cs = m_chs->m_cs;
 
705
      const char* src = (const char*)addr;
 
706
      uint len = m_bytelength;
 
707
      int not_used;
 
708
      assert((*cs->cset->well_formed_len)(cs, src, src + len, 0xffff, &not_used) == len);
 
709
    }
 
710
    break;
 
711
  case Col::Varchar:
 
712
    {
 
713
      CHARSET_INFO* cs = m_chs->m_cs;
 
714
      const uchar* src = (const uchar*)addr;
 
715
      const char* ssrc = (const char*)src;
 
716
      uint len = src[0];
 
717
      int not_used;
 
718
      assert(len <= m_bytelength);
 
719
      assert((*cs->cset->well_formed_len)(cs, ssrc + 1, ssrc + 1 + len, 0xffff, &not_used) == len);
 
720
    }
 
721
    break;
 
722
  case Col::Longvarchar:
 
723
    {
 
724
      CHARSET_INFO* cs = m_chs->m_cs;
 
725
      const uchar* src = (const uchar*)addr;
 
726
      const char* ssrc = (const char*)src;
 
727
      uint len = src[0] + (src[1] << 8);
 
728
      int not_used;
 
729
      assert(len <= m_bytelength);
 
730
      assert((*cs->cset->well_formed_len)(cs, ssrc + 2, ssrc + 2 + len, 0xffff, &not_used) == len);
 
731
    }
 
732
    break;
 
733
  default:
 
734
    assert(false);
 
735
    break;
 
736
  }
 
737
}
 
738
 
 
739
static NdbOut&
 
740
operator<<(NdbOut& out, const Col& col)
 
741
{
 
742
  out << "col[" << col.m_num << "] " << col.m_name;
 
743
  switch (col.m_type) {
 
744
  case Col::Unsigned:
 
745
    out << " uint";
 
746
    break;
 
747
  case Col::Char:
 
748
    {
 
749
      CHARSET_INFO* cs = col.m_chs->m_cs;
 
750
      out << " char(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")";
 
751
    }
 
752
    break;
 
753
  case Col::Varchar:
 
754
    {
 
755
      CHARSET_INFO* cs = col.m_chs->m_cs;
 
756
      out << " varchar(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")";
 
757
    }
 
758
    break;
 
759
  case Col::Longvarchar:
 
760
    {
 
761
      CHARSET_INFO* cs = col.m_chs->m_cs;
 
762
      out << " longvarchar(" << col.m_length << "*" << cs->mbmaxlen << ";" << cs->name << ")";
 
763
    }
 
764
    break;
 
765
  default:
 
766
    out << "type" << (int)col.m_type;
 
767
    assert(false);
 
768
    break;
 
769
  }
 
770
  out << (col.m_pk ? " pk" : "");
 
771
  out << (col.m_nullable ? " nullable" : "");
 
772
  return out;
 
773
}
 
774
 
 
775
// ICol - index column
 
776
 
 
777
struct ICol {
 
778
  const class ITab& m_itab;
 
779
  uint m_num;
 
780
  const Col& m_col;
 
781
  ICol(const class ITab& itab, uint num, const Col& col);
 
782
  ~ICol();
 
783
};
 
784
 
 
785
ICol::ICol(const class ITab& itab, uint num, const Col& col) :
 
786
  m_itab(itab),
 
787
  m_num(num),
 
788
  m_col(col)
 
789
{
 
790
}
 
791
 
 
792
ICol::~ICol()
 
793
{
 
794
}
 
795
 
 
796
static NdbOut&
 
797
operator<<(NdbOut& out, const ICol& icol)
 
798
{
 
799
  out << "icol[" << icol.m_num << "] " << icol.m_col;
 
800
  return out;
 
801
}
 
802
 
 
803
// ITab - index
 
804
 
 
805
struct ITab {
 
806
  enum Type {
 
807
    OrderedIndex = NdbDictionary::Index::OrderedIndex,
 
808
    UniqueHashIndex = NdbDictionary::Index::UniqueHashIndex
 
809
  };
 
810
  const class Tab& m_tab;
 
811
  const char* m_name;
 
812
  Type m_type;
 
813
  uint m_icols;
 
814
  const ICol** m_icol;
 
815
  uint m_keymask;
 
816
  ITab(const class Tab& tab, const char* name, Type type, uint icols);
 
817
  ~ITab();
 
818
  void icoladd(uint k, const ICol* icolptr);
 
819
};
 
820
 
 
821
ITab::ITab(const class Tab& tab, const char* name, Type type, uint icols) :
 
822
  m_tab(tab),
 
823
  m_name(strcpy(new char [strlen(name) + 1], name)),
 
824
  m_type(type),
 
825
  m_icols(icols),
 
826
  m_icol(new const ICol* [icols + 1]),
 
827
  m_keymask(0)
 
828
{
 
829
  for (uint k = 0; k <= m_icols; k++)
 
830
    m_icol[k] = 0;
 
831
}
 
832
 
 
833
ITab::~ITab()
 
834
{
 
835
  delete [] m_name;
 
836
  for (uint i = 0; i < m_icols; i++)
 
837
    delete m_icol[i];
 
838
  delete [] m_icol;
 
839
}
 
840
 
 
841
void
 
842
ITab::icoladd(uint k, const ICol* icolptr)
 
843
{
 
844
  assert(k == icolptr->m_num && k < m_icols && m_icol[k] == 0);
 
845
  m_icol[k] = icolptr;
 
846
  m_keymask |= (1 << icolptr->m_col.m_num);
 
847
}
 
848
 
 
849
static NdbOut&
 
850
operator<<(NdbOut& out, const ITab& itab)
 
851
{
 
852
  out << "itab " << itab.m_name << " icols=" << itab.m_icols;
 
853
  for (uint k = 0; k < itab.m_icols; k++) {
 
854
    const ICol& icol = *itab.m_icol[k];
 
855
    out << endl << icol;
 
856
  }
 
857
  return out;
 
858
}
 
859
 
 
860
// Tab - table
 
861
 
 
862
struct Tab {
 
863
  const char* m_name;
 
864
  uint m_cols;
 
865
  const Col** m_col;
 
866
  uint m_pkmask;
 
867
  uint m_itabs;
 
868
  const ITab** m_itab;
 
869
  uint m_orderedindexes;
 
870
  uint m_hashindexes;
 
871
  // pk must contain an Unsigned column
 
872
  uint m_keycol;
 
873
  void coladd(uint k, Col* colptr);
 
874
  void itabadd(uint j, ITab* itab);
 
875
  Tab(const char* name, uint cols, uint itabs, uint keycol);
 
876
  ~Tab();
 
877
};
 
878
 
 
879
Tab::Tab(const char* name, uint cols, uint itabs, uint keycol) :
 
880
  m_name(strcpy(new char [strlen(name) + 1], name)),
 
881
  m_cols(cols),
 
882
  m_col(new const Col* [cols + 1]),
 
883
  m_pkmask(0),
 
884
  m_itabs(itabs),
 
885
  m_itab(new const ITab* [itabs + 1]),
 
886
  m_orderedindexes(0),
 
887
  m_hashindexes(0),
 
888
  m_keycol(keycol)
 
889
{
 
890
  for (uint k = 0; k <= cols; k++)
 
891
    m_col[k] = 0;
 
892
  for (uint j = 0; j <= itabs; j++)
 
893
    m_itab[j] = 0;
 
894
}
 
895
 
 
896
Tab::~Tab()
 
897
{
 
898
  delete [] m_name;
 
899
  for (uint i = 0; i < m_cols; i++)
 
900
    delete m_col[i];
 
901
  delete [] m_col;
 
902
  for (uint i = 0; i < m_itabs; i++)
 
903
    delete m_itab[i];
 
904
  delete [] m_itab;
 
905
}
 
906
 
 
907
void
 
908
Tab::coladd(uint k, Col* colptr)
 
909
{
 
910
  assert(k == colptr->m_num && k < m_cols && m_col[k] == 0);
 
911
  m_col[k] = colptr;
 
912
  if (colptr->m_pk)
 
913
    m_pkmask |= (1 << k);
 
914
}
 
915
 
 
916
void
 
917
Tab::itabadd(uint j, ITab* itabptr)
 
918
{
 
919
  assert(j < m_itabs && m_itab[j] == 0 && itabptr != 0);
 
920
  m_itab[j] = itabptr;
 
921
  if (itabptr->m_type == ITab::OrderedIndex)
 
922
    m_orderedindexes++;
 
923
  else
 
924
    m_hashindexes++;
 
925
}
 
926
 
 
927
static NdbOut&
 
928
operator<<(NdbOut& out, const Tab& tab)
 
929
{
 
930
  out << "tab " << tab.m_name << " cols=" << tab.m_cols;
 
931
  for (uint k = 0; k < tab.m_cols; k++) {
 
932
    const Col& col =  *tab.m_col[k];
 
933
    out << endl << col;
 
934
  }
 
935
  for (uint i = 0; i < tab.m_itabs; i++) {
 
936
    if (tab.m_itab[i] == 0)
 
937
      continue;
 
938
    const ITab& itab = *tab.m_itab[i];
 
939
    out << endl << itab;
 
940
  }
 
941
  return out;
 
942
}
 
943
 
 
944
// make table structs
 
945
 
 
946
static const Tab** tablist = 0;
 
947
static uint tabcount = 0;
 
948
 
 
949
static void
 
950
verifytables()
 
951
{
 
952
  for (uint j = 0; j < tabcount; j++) {
 
953
    const Tab* t = tablist[j];
 
954
    if (t == 0)
 
955
      continue;
 
956
    assert(t->m_cols != 0 && t->m_col != 0);
 
957
    for (uint k = 0; k < t->m_cols; k++) {
 
958
      const Col* c = t->m_col[k];
 
959
      assert(c != 0 && c->m_num == k);
 
960
      assert(!(c->m_pk && c->m_nullable));
 
961
    }
 
962
    assert(t->m_col[t->m_cols] == 0);
 
963
    {
 
964
      assert(t->m_keycol < t->m_cols);
 
965
      const Col* c = t->m_col[t->m_keycol];
 
966
      assert(c->m_pk && c->m_type == Col::Unsigned);
 
967
    }
 
968
    assert(t->m_itabs != 0 && t->m_itab != 0);
 
969
    for (uint i = 0; i < t->m_itabs; i++) {
 
970
      const ITab* x = t->m_itab[i];
 
971
      if (x == 0)
 
972
        continue;
 
973
      assert(x != 0 && x->m_icols != 0 && x->m_icol != 0);
 
974
      for (uint k = 0; k < x->m_icols; k++) {
 
975
        const ICol* c = x->m_icol[k];
 
976
        assert(c != 0 && c->m_num == k && c->m_col.m_num < t->m_cols);
 
977
        if (x->m_type == ITab::UniqueHashIndex) {
 
978
          assert(!c->m_col.m_nullable);
 
979
        }
 
980
      }
 
981
    }
 
982
    assert(t->m_itab[t->m_itabs] == 0);
 
983
  }
 
984
}
 
985
 
 
986
static void
 
987
makebuiltintables(Par par)
 
988
{
 
989
  LL2("makebuiltintables");
 
990
  resetcslist();
 
991
  tabcount = 3;
 
992
  if (tablist == 0) {
 
993
    tablist = new const Tab* [tabcount];
 
994
    for (uint j = 0; j < tabcount; j++) {
 
995
      tablist[j] = 0;
 
996
    }
 
997
  } else {
 
998
    for (uint j = 0; j < tabcount; j++) {
 
999
      delete tablist[j];
 
1000
      tablist[j] = 0;
 
1001
    }
 
1002
  }
 
1003
  // ti0 - basic
 
1004
  if (usetable(par, 0)) {
 
1005
    Tab* t = new Tab("ti0", 5, 7, 0);
 
1006
    // name - pk - type - length - nullable - cs
 
1007
    t->coladd(0, new Col(*t, 0, "a", 1, Col::Unsigned, 1, 0, 0));
 
1008
    t->coladd(1, new Col(*t, 1, "b", 0, Col::Unsigned, 1, 1, 0));
 
1009
    t->coladd(2, new Col(*t, 2, "c", 0, Col::Unsigned, 1, 0, 0));
 
1010
    t->coladd(3, new Col(*t, 3, "d", 0, Col::Unsigned, 1, 1, 0));
 
1011
    t->coladd(4, new Col(*t, 4, "e", 0, Col::Unsigned, 1, 0, 0));
 
1012
    if (useindex(par, 0)) {
 
1013
      // a
 
1014
      ITab* x = new ITab(*t, "ti0x0", ITab::OrderedIndex, 1);
 
1015
      x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
 
1016
      t->itabadd(0, x);
 
1017
    }
 
1018
    if (useindex(par, 1)) {
 
1019
      // b
 
1020
      ITab* x = new ITab(*t, "ti0x1", ITab::OrderedIndex, 1);
 
1021
      x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
 
1022
      t->itabadd(1, x);
 
1023
    }
 
1024
    if (useindex(par, 2)) {
 
1025
      // b, c
 
1026
      ITab* x = new ITab(*t, "ti0x2", ITab::OrderedIndex, 2);
 
1027
      x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
 
1028
      x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
 
1029
      t->itabadd(2, x);
 
1030
    }
 
1031
    if (useindex(par, 3)) {
 
1032
      // b, e, c, d
 
1033
      ITab* x = new ITab(*t, "ti0x3", ITab::OrderedIndex, 4);
 
1034
      x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
 
1035
      x->icoladd(1, new ICol(*x, 1, *t->m_col[4]));
 
1036
      x->icoladd(2, new ICol(*x, 2, *t->m_col[2]));
 
1037
      x->icoladd(3, new ICol(*x, 3, *t->m_col[3]));
 
1038
      t->itabadd(3, x);
 
1039
    }
 
1040
    if (useindex(par, 4)) {
 
1041
      // a, c
 
1042
      ITab* x = new ITab(*t, "ti0z4", ITab::UniqueHashIndex, 2);
 
1043
      x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
 
1044
      x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
 
1045
      t->itabadd(4, x);
 
1046
    }
 
1047
    if (useindex(par, 5)) {
 
1048
      // a, e
 
1049
      ITab* x = new ITab(*t, "ti0z5", ITab::UniqueHashIndex, 2);
 
1050
      x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
 
1051
      x->icoladd(1, new ICol(*x, 1, *t->m_col[4]));
 
1052
      t->itabadd(5, x);
 
1053
    }
 
1054
    tablist[0] = t;
 
1055
  }
 
1056
  // ti1 - simple char fields
 
1057
  if (usetable(par, 1)) {
 
1058
    Tab* t = new Tab("ti1", 5, 7, 1);
 
1059
    // name - pk - type - length - nullable - cs
 
1060
    t->coladd(0, new Col(*t, 0, "a", 0, Col::Unsigned, 1, 0, 0));
 
1061
    t->coladd(1, new Col(*t, 1, "b", 1, Col::Unsigned, 1, 0, 0));
 
1062
    t->coladd(2, new Col(*t, 2, "c", 0, Col::Varchar, 20, 0, getcs(par)));
 
1063
    t->coladd(3, new Col(*t, 3, "d", 0, Col::Char, 5, 0, getcs(par)));
 
1064
    t->coladd(4, new Col(*t, 4, "e", 0, Col::Longvarchar, 5, 1, getcs(par)));
 
1065
    if (useindex(par, 0)) {
 
1066
      // b
 
1067
      ITab* x = new ITab(*t, "ti1x0", ITab::OrderedIndex, 1);
 
1068
      x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
 
1069
      t->itabadd(0, x);
 
1070
    }
 
1071
    if (useindex(par, 1)) {
 
1072
      // c, a
 
1073
      ITab* x = new ITab(*t, "ti1x1", ITab::OrderedIndex, 2);
 
1074
      x->icoladd(0, new ICol(*x, 0, *t->m_col[2]));
 
1075
      x->icoladd(1, new ICol(*x, 1, *t->m_col[0]));
 
1076
      t->itabadd(1, x);
 
1077
    }
 
1078
    if (useindex(par, 2)) {
 
1079
      // d
 
1080
      ITab* x = new ITab(*t, "ti1x2", ITab::OrderedIndex, 1);
 
1081
      x->icoladd(0, new ICol(*x, 0, *t->m_col[3]));
 
1082
      t->itabadd(2, x);
 
1083
    }
 
1084
    if (useindex(par, 3)) {
 
1085
      // e, d, c, b
 
1086
      ITab* x = new ITab(*t, "ti1x3", ITab::OrderedIndex, 4);
 
1087
      x->icoladd(0, new ICol(*x, 0, *t->m_col[4]));
 
1088
      x->icoladd(1, new ICol(*x, 1, *t->m_col[3]));
 
1089
      x->icoladd(2, new ICol(*x, 2, *t->m_col[2]));
 
1090
      x->icoladd(3, new ICol(*x, 3, *t->m_col[1]));
 
1091
      t->itabadd(3, x);
 
1092
    }
 
1093
    if (useindex(par, 4)) {
 
1094
      // a, b
 
1095
      ITab* x = new ITab(*t, "ti1z4", ITab::UniqueHashIndex, 2);
 
1096
      x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
 
1097
      x->icoladd(1, new ICol(*x, 1, *t->m_col[1]));
 
1098
      t->itabadd(4, x);
 
1099
    }
 
1100
    if (useindex(par, 5)) {
 
1101
      // b, c, d
 
1102
      ITab* x = new ITab(*t, "ti1z5", ITab::UniqueHashIndex, 3);
 
1103
      x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
 
1104
      x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
 
1105
      x->icoladd(2, new ICol(*x, 2, *t->m_col[3]));
 
1106
      t->itabadd(5, x);
 
1107
    }
 
1108
    tablist[1] = t;
 
1109
  }
 
1110
  // ti2 - complex char fields
 
1111
  if (usetable(par, 2)) {
 
1112
    Tab* t = new Tab("ti2", 5, 7, 2);
 
1113
    // name - pk - type - length - nullable - cs
 
1114
    t->coladd(0, new Col(*t, 0, "a", 1, Col::Char, 31, 0, getcs(par)));
 
1115
    t->coladd(1, new Col(*t, 1, "b", 0, Col::Char, 4, 1, getcs(par)));
 
1116
    t->coladd(2, new Col(*t, 2, "c", 1, Col::Unsigned, 1, 0, 0));
 
1117
    t->coladd(3, new Col(*t, 3, "d", 1, Col::Varchar, 128, 0, getcs(par)));
 
1118
    t->coladd(4, new Col(*t, 4, "e", 0, Col::Varchar, 7, 0, getcs(par)));
 
1119
    if (useindex(par, 0)) {
 
1120
      // a, c, d
 
1121
      ITab* x = new ITab(*t, "ti2x0", ITab::OrderedIndex, 3);
 
1122
      x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
 
1123
      x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
 
1124
      x->icoladd(2, new ICol(*x, 2, *t->m_col[3]));
 
1125
      t->itabadd(0, x);
 
1126
    }
 
1127
    if (useindex(par, 1)) {
 
1128
      // e, d, c, b, a
 
1129
      ITab* x = new ITab(*t, "ti2x1", ITab::OrderedIndex, 5);
 
1130
      x->icoladd(0, new ICol(*x, 0, *t->m_col[4]));
 
1131
      x->icoladd(1, new ICol(*x, 1, *t->m_col[3]));
 
1132
      x->icoladd(2, new ICol(*x, 2, *t->m_col[2]));
 
1133
      x->icoladd(3, new ICol(*x, 3, *t->m_col[1]));
 
1134
      x->icoladd(4, new ICol(*x, 4, *t->m_col[0]));
 
1135
      t->itabadd(1, x);
 
1136
    }
 
1137
    if (useindex(par, 2)) {
 
1138
      // d
 
1139
      ITab* x = new ITab(*t, "ti2x2", ITab::OrderedIndex, 1);
 
1140
      x->icoladd(0, new ICol(*x, 0, *t->m_col[3]));
 
1141
      t->itabadd(2, x);
 
1142
    }
 
1143
    if (useindex(par, 3)) {
 
1144
      // b
 
1145
      ITab* x = new ITab(*t, "ti2x3", ITab::OrderedIndex, 1);
 
1146
      x->icoladd(0, new ICol(*x, 0, *t->m_col[1]));
 
1147
      t->itabadd(3, x);
 
1148
    }
 
1149
    if (useindex(par, 4)) {
 
1150
      // a, c
 
1151
      ITab* x = new ITab(*t, "ti2z4", ITab::UniqueHashIndex, 2);
 
1152
      x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
 
1153
      x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
 
1154
      t->itabadd(4, x);
 
1155
    }
 
1156
    if (useindex(par, 5)) {
 
1157
      // a, c, d, e
 
1158
      ITab* x = new ITab(*t, "ti2z5", ITab::UniqueHashIndex, 4);
 
1159
      x->icoladd(0, new ICol(*x, 0, *t->m_col[0]));
 
1160
      x->icoladd(1, new ICol(*x, 1, *t->m_col[2]));
 
1161
      x->icoladd(2, new ICol(*x, 2, *t->m_col[3]));
 
1162
      x->icoladd(3, new ICol(*x, 3, *t->m_col[4]));
 
1163
      t->itabadd(5, x);
 
1164
    }
 
1165
    tablist[2] = t;
 
1166
  }
 
1167
  verifytables();
 
1168
}
 
1169
 
 
1170
// connections
 
1171
 
 
1172
static Ndb_cluster_connection* g_ncc = 0;
 
1173
 
 
1174
struct Con {
 
1175
  Ndb* m_ndb;
 
1176
  NdbDictionary::Dictionary* m_dic;
 
1177
  NdbTransaction* m_tx;
 
1178
  Uint64 m_txid;
 
1179
  NdbOperation* m_op;
 
1180
  NdbIndexOperation* m_indexop;
 
1181
  NdbScanOperation* m_scanop;
 
1182
  NdbIndexScanOperation* m_indexscanop;
 
1183
  NdbScanFilter* m_scanfilter;
 
1184
  enum ScanMode { ScanNo = 0, Committed, Latest, Exclusive };
 
1185
  ScanMode m_scanmode;
 
1186
  enum ErrType {
 
1187
    ErrNone = 0,
 
1188
    ErrDeadlock = 1,
 
1189
    ErrNospace = 2,
 
1190
    ErrOther = 4
 
1191
  };
 
1192
  ErrType m_errtype;
 
1193
  char m_errname[100];
 
1194
  Con() :
 
1195
    m_ndb(0), m_dic(0), m_tx(0), m_txid(0), m_op(0), m_indexop(0),
 
1196
    m_scanop(0), m_indexscanop(0), m_scanfilter(0),
 
1197
    m_scanmode(ScanNo), m_errtype(ErrNone) {}
 
1198
  ~Con() {
 
1199
    if (m_tx != 0)
 
1200
      closeTransaction();
 
1201
  }
 
1202
  int connect();
 
1203
  void connect(const Con& con);
 
1204
  void disconnect();
 
1205
  int startTransaction();
 
1206
  int getNdbOperation(const Tab& tab);
 
1207
  int getNdbIndexOperation1(const ITab& itab, const Tab& tab);
 
1208
  int getNdbIndexOperation(const ITab& itab, const Tab& tab);
 
1209
  int getNdbScanOperation(const Tab& tab);
 
1210
  int getNdbIndexScanOperation1(const ITab& itab, const Tab& tab);
 
1211
  int getNdbIndexScanOperation(const ITab& itab, const Tab& tab);
 
1212
  int getNdbScanFilter();
 
1213
  int equal(int num, const char* addr);
 
1214
  int getValue(int num, NdbRecAttr*& rec);
 
1215
  int setValue(int num, const char* addr);
 
1216
  int setBound(int num, int type, const void* value);
 
1217
  int beginFilter(int group);
 
1218
  int endFilter();
 
1219
  int setFilter(int num, int cond, const void* value, uint len);
 
1220
  int execute(ExecType et);
 
1221
  int execute(ExecType et, uint& err);
 
1222
  int readTuple(Par par);
 
1223
  int readTuples(Par par);
 
1224
  int readIndexTuples(Par par);
 
1225
  int executeScan();
 
1226
  int nextScanResult(bool fetchAllowed);
 
1227
  int nextScanResult(bool fetchAllowed, uint& err);
 
1228
  int updateScanTuple(Con& con2);
 
1229
  int deleteScanTuple(Con& con2);
 
1230
  void closeScan();
 
1231
  void closeTransaction();
 
1232
  const char* errname(uint err);
 
1233
  void printerror(NdbOut& out);
 
1234
};
 
1235
 
 
1236
int
 
1237
Con::connect()
 
1238
{
 
1239
  assert(m_ndb == 0);
 
1240
  m_ndb = new Ndb(g_ncc, "TEST_DB");
 
1241
  CHKCON(m_ndb->init() == 0, *this);
 
1242
  CHKCON(m_ndb->waitUntilReady(30) == 0, *this);
 
1243
  m_tx = 0, m_txid = 0, m_op = 0;
 
1244
  return 0;
 
1245
}
 
1246
 
 
1247
void
 
1248
Con::connect(const Con& con)
 
1249
{
 
1250
  assert(m_ndb == 0);
 
1251
  m_ndb = con.m_ndb;
 
1252
}
 
1253
 
 
1254
void
 
1255
Con::disconnect()
 
1256
{
 
1257
  delete m_ndb;
 
1258
  m_ndb = 0, m_dic = 0, m_tx = 0, m_txid = 0, m_op = 0;
 
1259
}
 
1260
 
 
1261
int
 
1262
Con::startTransaction()
 
1263
{
 
1264
  assert(m_ndb != 0);
 
1265
  if (m_tx != 0)
 
1266
    closeTransaction();
 
1267
  CHKCON((m_tx = m_ndb->startTransaction()) != 0, *this);
 
1268
  m_txid = m_tx->getTransactionId();
 
1269
  return 0;
 
1270
}
 
1271
 
 
1272
int
 
1273
Con::getNdbOperation(const Tab& tab)
 
1274
{
 
1275
  assert(m_tx != 0);
 
1276
  CHKCON((m_op = m_tx->getNdbOperation(tab.m_name)) != 0, *this);
 
1277
  return 0;
 
1278
}
 
1279
 
 
1280
int
 
1281
Con::getNdbIndexOperation1(const ITab& itab, const Tab& tab)
 
1282
{
 
1283
  assert(m_tx != 0);
 
1284
  CHKCON((m_op = m_indexop = m_tx->getNdbIndexOperation(itab.m_name, tab.m_name)) != 0, *this);
 
1285
  return 0;
 
1286
}
 
1287
 
 
1288
int
 
1289
Con::getNdbIndexOperation(const ITab& itab, const Tab& tab)
 
1290
{
 
1291
  assert(m_tx != 0);
 
1292
  uint tries = 0;
 
1293
  while (1) {
 
1294
    if (getNdbIndexOperation1(itab, tab) == 0)
 
1295
      break;
 
1296
    CHK(++tries < 10);
 
1297
    NdbSleep_MilliSleep(100);
 
1298
  }
 
1299
  return 0;
 
1300
}
 
1301
 
 
1302
int
 
1303
Con::getNdbScanOperation(const Tab& tab)
 
1304
{
 
1305
  assert(m_tx != 0);
 
1306
  CHKCON((m_op = m_scanop = m_tx->getNdbScanOperation(tab.m_name)) != 0, *this);
 
1307
  return 0;
 
1308
}
 
1309
 
 
1310
int
 
1311
Con::getNdbIndexScanOperation1(const ITab& itab, const Tab& tab)
 
1312
{
 
1313
  assert(m_tx != 0);
 
1314
  CHKCON((m_op = m_scanop = m_indexscanop = m_tx->getNdbIndexScanOperation(itab.m_name, tab.m_name)) != 0, *this);
 
1315
  return 0;
 
1316
}
 
1317
 
 
1318
int
 
1319
Con::getNdbIndexScanOperation(const ITab& itab, const Tab& tab)
 
1320
{
 
1321
  assert(m_tx != 0);
 
1322
  uint tries = 0;
 
1323
  while (1) {
 
1324
    if (getNdbIndexScanOperation1(itab, tab) == 0)
 
1325
      break;
 
1326
    CHK(++tries < 10);
 
1327
    NdbSleep_MilliSleep(100);
 
1328
  }
 
1329
  return 0;
 
1330
}
 
1331
 
 
1332
int
 
1333
Con::getNdbScanFilter()
 
1334
{
 
1335
  assert(m_tx != 0 && m_scanop != 0);
 
1336
  delete m_scanfilter;
 
1337
  m_scanfilter = new NdbScanFilter(m_scanop);
 
1338
  return 0;
 
1339
}
 
1340
 
 
1341
int
 
1342
Con::equal(int num, const char* addr)
 
1343
{
 
1344
  assert(m_tx != 0 && m_op != 0);
 
1345
  CHKCON(m_op->equal(num, addr) == 0, *this);
 
1346
  return 0;
 
1347
}
 
1348
 
 
1349
int
 
1350
Con::getValue(int num, NdbRecAttr*& rec)
 
1351
{
 
1352
  assert(m_tx != 0 && m_op != 0);
 
1353
  CHKCON((rec = m_op->getValue(num, 0)) != 0, *this);
 
1354
  return 0;
 
1355
}
 
1356
 
 
1357
int
 
1358
Con::setValue(int num, const char* addr)
 
1359
{
 
1360
  assert(m_tx != 0 && m_op != 0);
 
1361
  CHKCON(m_op->setValue(num, addr) == 0, *this);
 
1362
  return 0;
 
1363
}
 
1364
 
 
1365
int
 
1366
Con::setBound(int num, int type, const void* value)
 
1367
{
 
1368
  assert(m_tx != 0 && m_indexscanop != 0);
 
1369
  CHKCON(m_indexscanop->setBound(num, type, value) == 0, *this);
 
1370
  return 0;
 
1371
}
 
1372
 
 
1373
int
 
1374
Con::beginFilter(int group)
 
1375
{
 
1376
  assert(m_tx != 0 && m_scanfilter != 0);
 
1377
  CHKCON(m_scanfilter->begin((NdbScanFilter::Group)group) == 0, *this);
 
1378
  return 0;
 
1379
}
 
1380
 
 
1381
int
 
1382
Con::endFilter()
 
1383
{
 
1384
  assert(m_tx != 0 && m_scanfilter != 0);
 
1385
  CHKCON(m_scanfilter->end() == 0, *this);
 
1386
  return 0;
 
1387
}
 
1388
 
 
1389
int
 
1390
Con::setFilter(int num, int cond, const void* value, uint len)
 
1391
{
 
1392
  assert(m_tx != 0 && m_scanfilter != 0);
 
1393
  CHKCON(m_scanfilter->cmp((NdbScanFilter::BinaryCondition)cond, num, value, len) == 0, *this);
 
1394
  return 0;
 
1395
}
 
1396
 
 
1397
int
 
1398
Con::execute(ExecType et)
 
1399
{
 
1400
  assert(m_tx != 0);
 
1401
  CHKCON(m_tx->execute(et) == 0, *this);
 
1402
  return 0;
 
1403
}
 
1404
 
 
1405
int
 
1406
Con::execute(ExecType et, uint& err)
 
1407
{
 
1408
  int ret = execute(et);
 
1409
  // err in: errors to catch, out: error caught
 
1410
  const uint errin = err;
 
1411
  err = 0;
 
1412
  if (ret == -1) {
 
1413
    if (m_errtype == ErrDeadlock && (errin & ErrDeadlock)) {
 
1414
      LL3("caught deadlock");
 
1415
      err = ErrDeadlock;
 
1416
      ret = 0;
 
1417
    }
 
1418
    if (m_errtype == ErrNospace && (errin & ErrNospace)) {
 
1419
      LL3("caught nospace");
 
1420
      err = ErrNospace;
 
1421
      ret = 0;
 
1422
    }
 
1423
  }
 
1424
  CHK(ret == 0);
 
1425
  return 0;
 
1426
}
 
1427
 
 
1428
int
 
1429
Con::readTuple(Par par)
 
1430
{
 
1431
  assert(m_tx != 0 && m_op != 0);
 
1432
  NdbOperation::LockMode lm = par.m_lockmode;
 
1433
  CHKCON(m_op->readTuple(lm) == 0, *this);
 
1434
  return 0;
 
1435
}
 
1436
 
 
1437
int
 
1438
Con::readTuples(Par par)
 
1439
{
 
1440
  assert(m_tx != 0 && m_scanop != 0);
 
1441
  int scan_flags = 0;
 
1442
  if (par.m_tupscan)
 
1443
    scan_flags |= NdbScanOperation::SF_TupScan;
 
1444
  CHKCON(m_scanop->readTuples(par.m_lockmode, scan_flags, par.m_scanpar, par.m_scanbatch) == 0, *this);
 
1445
  return 0;
 
1446
}
 
1447
 
 
1448
int
 
1449
Con::readIndexTuples(Par par)
 
1450
{
 
1451
  assert(m_tx != 0 && m_indexscanop != 0);
 
1452
  int scan_flags = 0;
 
1453
  if (par.m_ordered)
 
1454
    scan_flags |= NdbScanOperation::SF_OrderBy;
 
1455
  if (par.m_descending)
 
1456
    scan_flags |= NdbScanOperation::SF_Descending;
 
1457
  CHKCON(m_indexscanop->readTuples(par.m_lockmode, scan_flags, par.m_scanpar, par.m_scanbatch) == 0, *this);
 
1458
  return 0;
 
1459
}
 
1460
 
 
1461
int
 
1462
Con::executeScan()
 
1463
{
 
1464
  CHKCON(m_tx->execute(NoCommit) == 0, *this);
 
1465
  return 0;
 
1466
}
 
1467
 
 
1468
int
 
1469
Con::nextScanResult(bool fetchAllowed)
 
1470
{
 
1471
  int ret;
 
1472
  assert(m_scanop != 0);
 
1473
  CHKCON((ret = m_scanop->nextResult(fetchAllowed)) != -1, *this);
 
1474
  assert(ret == 0 || ret == 1 || (!fetchAllowed && ret == 2));
 
1475
  return ret;
 
1476
}
 
1477
 
 
1478
int
 
1479
Con::nextScanResult(bool fetchAllowed, uint& err)
 
1480
{
 
1481
  int ret = nextScanResult(fetchAllowed);
 
1482
  // err in: errors to catch, out: error caught
 
1483
  const uint errin = err;
 
1484
  err = 0;
 
1485
  if (ret == -1) {
 
1486
    if (m_errtype == ErrDeadlock && (errin & ErrDeadlock)) {
 
1487
      LL3("caught deadlock");
 
1488
      err = ErrDeadlock;
 
1489
      ret = 0;
 
1490
    }
 
1491
  }
 
1492
  CHK(ret == 0 || ret == 1 || (!fetchAllowed && ret == 2));
 
1493
  return ret;
 
1494
}
 
1495
 
 
1496
int
 
1497
Con::updateScanTuple(Con& con2)
 
1498
{
 
1499
  assert(con2.m_tx != 0);
 
1500
  CHKCON((con2.m_op = m_scanop->updateCurrentTuple(con2.m_tx)) != 0, *this);
 
1501
  con2.m_txid = m_txid; // in the kernel
 
1502
  return 0;
 
1503
}
 
1504
 
 
1505
int
 
1506
Con::deleteScanTuple(Con& con2)
 
1507
{
 
1508
  assert(con2.m_tx != 0);
 
1509
  CHKCON(m_scanop->deleteCurrentTuple(con2.m_tx) == 0, *this);
 
1510
  con2.m_txid = m_txid; // in the kernel
 
1511
  return 0;
 
1512
}
 
1513
 
 
1514
void
 
1515
Con::closeScan()
 
1516
{
 
1517
  assert(m_scanop != 0);
 
1518
  m_scanop->close();
 
1519
  m_scanop = 0, m_indexscanop = 0;
 
1520
 
 
1521
}
 
1522
 
 
1523
void
 
1524
Con::closeTransaction()
 
1525
{
 
1526
  assert(m_ndb != 0 && m_tx != 0);
 
1527
  m_ndb->closeTransaction(m_tx);
 
1528
  m_tx = 0, m_txid = 0, m_op = 0;
 
1529
  m_scanop = 0, m_indexscanop = 0;
 
1530
}
 
1531
 
 
1532
const char*
 
1533
Con::errname(uint err)
 
1534
{
 
1535
  sprintf(m_errname, "0x%x", err);
 
1536
  if (err & ErrDeadlock)
 
1537
    strcat(m_errname, ",deadlock");
 
1538
  if (err & ErrNospace)
 
1539
    strcat(m_errname, ",nospace");
 
1540
  return m_errname;
 
1541
}
 
1542
 
 
1543
void
 
1544
Con::printerror(NdbOut& out)
 
1545
{
 
1546
  m_errtype = ErrOther;
 
1547
  uint any = 0;
 
1548
  int code;
 
1549
  int die = 0;
 
1550
  if (m_ndb) {
 
1551
    if ((code = m_ndb->getNdbError().code) != 0) {
 
1552
      LL0(++any << " ndb: error " << m_ndb->getNdbError());
 
1553
      die += (code == g_opt.m_die);
 
1554
    }
 
1555
    if (m_dic && (code = m_dic->getNdbError().code) != 0) {
 
1556
      LL0(++any << " dic: error " << m_dic->getNdbError());
 
1557
      die += (code == g_opt.m_die);
 
1558
    }
 
1559
    if (m_tx) {
 
1560
      if ((code = m_tx->getNdbError().code) != 0) {
 
1561
        LL0(++any << " con: error " << m_tx->getNdbError());
 
1562
        die += (code == g_opt.m_die);
 
1563
        // 631 is new, occurs only on 4 db nodes, needs to be checked out
 
1564
        if (code == 266 || code == 274 || code == 296 || code == 297 || code == 499 || code == 631)
 
1565
          m_errtype = ErrDeadlock;
 
1566
        if (code == 826 || code == 827 || code == 902)
 
1567
          m_errtype = ErrNospace;
 
1568
      }
 
1569
      if (m_op && m_op->getNdbError().code != 0) {
 
1570
        LL0(++any << " op : error " << m_op->getNdbError());
 
1571
        die += (code == g_opt.m_die);
 
1572
      }
 
1573
    }
 
1574
  }
 
1575
  if (!any) {
 
1576
    LL0("failed but no NDB error code");
 
1577
  }
 
1578
  if (die) {
 
1579
    if (g_opt.m_core)
 
1580
      abort();
 
1581
    exit(1);
 
1582
  }
 
1583
}
 
1584
 
 
1585
// dictionary operations
 
1586
 
 
1587
static int
 
1588
invalidateindex(Par par, const ITab& itab)
 
1589
{
 
1590
  Con& con = par.con();
 
1591
  const Tab& tab = par.tab();
 
1592
  con.m_ndb->getDictionary()->invalidateIndex(itab.m_name, tab.m_name);
 
1593
  return 0;
 
1594
}
 
1595
 
 
1596
static int
 
1597
invalidateindex(Par par)
 
1598
{
 
1599
  Con& con = par.con();
 
1600
  const Tab& tab = par.tab();
 
1601
  for (uint i = 0; i < tab.m_itabs; i++) {
 
1602
    if (tab.m_itab[i] == 0)
 
1603
      continue;
 
1604
    const ITab& itab = *tab.m_itab[i];
 
1605
    invalidateindex(par, itab);
 
1606
  }
 
1607
  return 0;
 
1608
}
 
1609
 
 
1610
static int
 
1611
invalidatetable(Par par)
 
1612
{
 
1613
  Con& con = par.con();
 
1614
  const Tab& tab = par.tab();
 
1615
  invalidateindex(par);
 
1616
  con.m_ndb->getDictionary()->invalidateTable(tab.m_name);
 
1617
  return 0;
 
1618
}
 
1619
 
 
1620
static int
 
1621
droptable(Par par)
 
1622
{
 
1623
  Con& con = par.con();
 
1624
  const Tab& tab = par.tab();
 
1625
  con.m_dic = con.m_ndb->getDictionary();
 
1626
  if (con.m_dic->getTable(tab.m_name) == 0) {
 
1627
    // how to check for error
 
1628
    LL4("no table " << tab.m_name);
 
1629
  } else {
 
1630
    LL3("drop table " << tab.m_name);
 
1631
    CHKCON(con.m_dic->dropTable(tab.m_name) == 0, con);
 
1632
  }
 
1633
  con.m_dic = 0;
 
1634
  return 0;
 
1635
}
 
1636
 
 
1637
static int
 
1638
createtable(Par par)
 
1639
{
 
1640
  Con& con = par.con();
 
1641
  const Tab& tab = par.tab();
 
1642
  LL3("create table " << tab.m_name);
 
1643
  LL4(tab);
 
1644
  NdbDictionary::Table t(tab.m_name);
 
1645
  if (par.m_fragtype != NdbDictionary::Object::FragUndefined) {
 
1646
    t.setFragmentType(par.m_fragtype);
 
1647
  }
 
1648
  if (par.m_nologging) {
 
1649
    t.setLogging(false);
 
1650
  }
 
1651
  for (uint k = 0; k < tab.m_cols; k++) {
 
1652
    const Col& col = *tab.m_col[k];
 
1653
    NdbDictionary::Column c(col.m_name);
 
1654
    c.setType((NdbDictionary::Column::Type)col.m_type);
 
1655
    c.setLength(col.m_bytelength); // for char NDB API uses length in bytes
 
1656
    c.setPrimaryKey(col.m_pk);
 
1657
    c.setNullable(col.m_nullable);
 
1658
    if (col.m_chs != 0)
 
1659
        c.setCharset(col.m_chs->m_cs);
 
1660
    t.addColumn(c);
 
1661
  }
 
1662
  con.m_dic = con.m_ndb->getDictionary();
 
1663
  CHKCON(con.m_dic->createTable(t) == 0, con);
 
1664
  con.m_dic = 0;
 
1665
  return 0;
 
1666
}
 
1667
 
 
1668
static int
 
1669
dropindex(Par par, const ITab& itab)
 
1670
{
 
1671
  Con& con = par.con();
 
1672
  const Tab& tab = par.tab();
 
1673
  con.m_dic = con.m_ndb->getDictionary();
 
1674
  if (con.m_dic->getIndex(itab.m_name, tab.m_name) == 0) {
 
1675
    // how to check for error
 
1676
    LL4("no index " << itab.m_name);
 
1677
  } else {
 
1678
    LL3("drop index " << itab.m_name);
 
1679
    CHKCON(con.m_dic->dropIndex(itab.m_name, tab.m_name) == 0, con);
 
1680
  }
 
1681
  con.m_dic = 0;
 
1682
  return 0;
 
1683
}
 
1684
 
 
1685
static int
 
1686
dropindex(Par par)
 
1687
{
 
1688
  const Tab& tab = par.tab();
 
1689
  for (uint i = 0; i < tab.m_itabs; i++) {
 
1690
    if (tab.m_itab[i] == 0)
 
1691
      continue;
 
1692
    const ITab& itab = *tab.m_itab[i];
 
1693
    CHK(dropindex(par, itab) == 0);
 
1694
  }
 
1695
  return 0;
 
1696
}
 
1697
 
 
1698
static int
 
1699
createindex(Par par, const ITab& itab)
 
1700
{
 
1701
  Con& con = par.con();
 
1702
  const Tab& tab = par.tab();
 
1703
  LL3("create index " << itab.m_name);
 
1704
  LL4(itab);
 
1705
  NdbDictionary::Index x(itab.m_name);
 
1706
  x.setTable(tab.m_name);
 
1707
  x.setType((NdbDictionary::Index::Type)itab.m_type);
 
1708
  if (par.m_nologging || itab.m_type == ITab::OrderedIndex) {
 
1709
    x.setLogging(false);
 
1710
  }
 
1711
  for (uint k = 0; k < itab.m_icols; k++) {
 
1712
    const ICol& icol = *itab.m_icol[k];
 
1713
    const Col& col = icol.m_col;
 
1714
    x.addColumnName(col.m_name);
 
1715
  }
 
1716
  con.m_dic = con.m_ndb->getDictionary();
 
1717
  CHKCON(con.m_dic->createIndex(x) == 0, con);
 
1718
  con.m_dic = 0;
 
1719
  return 0;
 
1720
}
 
1721
 
 
1722
static int
 
1723
createindex(Par par)
 
1724
{
 
1725
  const Tab& tab = par.tab();
 
1726
  for (uint i = 0; i < tab.m_itabs; i++) {
 
1727
    if (tab.m_itab[i] == 0)
 
1728
      continue;
 
1729
    const ITab& itab = *tab.m_itab[i];
 
1730
    CHK(createindex(par, itab) == 0);
 
1731
  }
 
1732
  return 0;
 
1733
}
 
1734
 
 
1735
// data sets
 
1736
 
 
1737
// Val - typed column value
 
1738
 
 
1739
struct Val {
 
1740
  const Col& m_col;
 
1741
  union {
 
1742
  Uint32 m_uint32;
 
1743
  uchar* m_char;
 
1744
  uchar* m_varchar;
 
1745
  uchar* m_longvarchar;
 
1746
  };
 
1747
  bool m_null;
 
1748
  // construct
 
1749
  Val(const Col& col);
 
1750
  ~Val();
 
1751
  void copy(const Val& val2);
 
1752
  void copy(const void* addr);
 
1753
  const void* dataaddr() const;
 
1754
  void calc(Par par, uint i);
 
1755
  void calckey(Par par, uint i);
 
1756
  void calckeychars(Par par, uint i, uint& n, uchar* buf);
 
1757
  void calcnokey(Par par);
 
1758
  void calcnokeychars(Par par, uint& n, uchar* buf);
 
1759
  // operations
 
1760
  int setval(Par par) const;
 
1761
  int setval(Par par, const ICol& icol) const;
 
1762
  // compare
 
1763
  int cmp(Par par, const Val& val2) const;
 
1764
  int cmpchars(Par par, const uchar* buf1, uint len1, const uchar* buf2, uint len2) const;
 
1765
  int verify(Par par, const Val& val2) const;
 
1766
private:
 
1767
  Val& operator=(const Val& val2);
 
1768
};
 
1769
 
 
1770
static NdbOut&
 
1771
operator<<(NdbOut& out, const Val& val);
 
1772
 
 
1773
// construct
 
1774
 
 
1775
Val::Val(const Col& col) :
 
1776
  m_col(col)
 
1777
{
 
1778
  switch (col.m_type) {
 
1779
  case Col::Unsigned:
 
1780
    m_uint32 = 0x7e7e7e7e;
 
1781
    break;
 
1782
  case Col::Char:
 
1783
    m_char = new uchar [col.m_bytelength];
 
1784
    memset(m_char, 0x7e, col.m_bytelength);
 
1785
    break;
 
1786
  case Col::Varchar:
 
1787
    m_varchar = new uchar [1 + col.m_bytelength];
 
1788
    memset(m_char, 0x7e, 1 + col.m_bytelength);
 
1789
    break;
 
1790
  case Col::Longvarchar:
 
1791
    m_longvarchar = new uchar [2 + col.m_bytelength];
 
1792
    memset(m_char, 0x7e, 2 + col.m_bytelength);
 
1793
    break;
 
1794
  default:
 
1795
    assert(false);
 
1796
    break;
 
1797
  }
 
1798
}
 
1799
 
 
1800
Val::~Val()
 
1801
{
 
1802
  const Col& col = m_col;
 
1803
  switch (col.m_type) {
 
1804
  case Col::Unsigned:
 
1805
    break;
 
1806
  case Col::Char:
 
1807
    delete [] m_char;
 
1808
    break;
 
1809
  case Col::Varchar:
 
1810
    delete [] m_varchar;
 
1811
    break;
 
1812
  case Col::Longvarchar:
 
1813
    delete [] m_longvarchar;
 
1814
    break;
 
1815
  default:
 
1816
    assert(false);
 
1817
    break;
 
1818
  }
 
1819
}
 
1820
 
 
1821
void
 
1822
Val::copy(const Val& val2)
 
1823
{
 
1824
  const Col& col = m_col;
 
1825
  const Col& col2 = val2.m_col;
 
1826
  assert(col.m_type == col2.m_type && col.m_length == col2.m_length);
 
1827
  if (val2.m_null) {
 
1828
    m_null = true;
 
1829
    return;
 
1830
  }
 
1831
  copy(val2.dataaddr());
 
1832
}
 
1833
 
 
1834
void
 
1835
Val::copy(const void* addr)
 
1836
{
 
1837
  const Col& col = m_col;
 
1838
  switch (col.m_type) {
 
1839
  case Col::Unsigned:
 
1840
    m_uint32 = *(const Uint32*)addr;
 
1841
    break;
 
1842
  case Col::Char:
 
1843
    memcpy(m_char, addr, col.m_bytelength);
 
1844
    break;
 
1845
  case Col::Varchar:
 
1846
    memcpy(m_varchar, addr, 1 + col.m_bytelength);
 
1847
    break;
 
1848
  case Col::Longvarchar:
 
1849
    memcpy(m_longvarchar, addr, 2 + col.m_bytelength);
 
1850
    break;
 
1851
  default:
 
1852
    assert(false);
 
1853
    break;
 
1854
  }
 
1855
  m_null = false;
 
1856
}
 
1857
 
 
1858
const void*
 
1859
Val::dataaddr() const
 
1860
{
 
1861
  const Col& col = m_col;
 
1862
  switch (col.m_type) {
 
1863
  case Col::Unsigned:
 
1864
    return &m_uint32;
 
1865
  case Col::Char:
 
1866
    return m_char;
 
1867
  case Col::Varchar:
 
1868
    return m_varchar;
 
1869
  case Col::Longvarchar:
 
1870
    return m_longvarchar;
 
1871
  default:
 
1872
    break;
 
1873
  }
 
1874
  assert(false);
 
1875
  return 0;
 
1876
}
 
1877
 
 
1878
void
 
1879
Val::calc(Par par, uint i)
 
1880
{
 
1881
  const Col& col = m_col;
 
1882
  col.m_pk ? calckey(par, i) : calcnokey(par);
 
1883
  if (!m_null)
 
1884
    col.wellformed(dataaddr());
 
1885
}
 
1886
 
 
1887
void
 
1888
Val::calckey(Par par, uint i)
 
1889
{
 
1890
  const Col& col = m_col;
 
1891
  m_null = false;
 
1892
  switch (col.m_type) {
 
1893
  case Col::Unsigned:
 
1894
    m_uint32 = i;
 
1895
    break;
 
1896
  case Col::Char:
 
1897
    {
 
1898
      const Chs* chs = col.m_chs;
 
1899
      CHARSET_INFO* cs = chs->m_cs;
 
1900
      uint n = 0;
 
1901
      calckeychars(par, i, n, m_char);
 
1902
      // extend by appropriate space
 
1903
      (*cs->cset->fill)(cs, (char*)&m_char[n], col.m_bytelength - n, 0x20);
 
1904
    }
 
1905
    break;
 
1906
  case Col::Varchar:
 
1907
    {
 
1908
      uint n = 0;
 
1909
      calckeychars(par, i, n, m_varchar + 1);
 
1910
      // set length and pad with nulls
 
1911
      m_varchar[0] = n;
 
1912
      memset(&m_varchar[1 + n], 0, col.m_bytelength - n);
 
1913
    }
 
1914
    break;
 
1915
  case Col::Longvarchar:
 
1916
    {
 
1917
      uint n = 0;
 
1918
      calckeychars(par, i, n, m_longvarchar + 2);
 
1919
      // set length and pad with nulls
 
1920
      m_longvarchar[0] = (n & 0xff);
 
1921
      m_longvarchar[1] = (n >> 8);
 
1922
      memset(&m_longvarchar[2 + n], 0, col.m_bytelength - n);
 
1923
    }
 
1924
    break;
 
1925
  default:
 
1926
    assert(false);
 
1927
    break;
 
1928
  }
 
1929
}
 
1930
 
 
1931
void
 
1932
Val::calckeychars(Par par, uint i, uint& n, uchar* buf)
 
1933
{
 
1934
  const Col& col = m_col;
 
1935
  const Chs* chs = col.m_chs;
 
1936
  CHARSET_INFO* cs = chs->m_cs;
 
1937
  n = 0;
 
1938
  uint len = 0;
 
1939
  while (len < col.m_length) {
 
1940
    if (i % (1 + n) == 0) {
 
1941
      break;
 
1942
    }
 
1943
    const Chr& chr = chs->m_chr[i % maxcharcount];
 
1944
    assert(n + chr.m_size <= col.m_bytelength);
 
1945
    memcpy(buf + n, chr.m_bytes, chr.m_size);
 
1946
    n += chr.m_size;
 
1947
    len++;
 
1948
  }
 
1949
}
 
1950
 
 
1951
void
 
1952
Val::calcnokey(Par par)
 
1953
{
 
1954
  const Col& col = m_col;
 
1955
  m_null = false;
 
1956
  if (col.m_nullable && urandom(100) < par.m_pctnull) {
 
1957
    m_null = true;
 
1958
    return;
 
1959
  }
 
1960
  int r = irandom((par.m_pctrange * par.m_range) / 100);
 
1961
  if (par.m_bdir != 0 && urandom(10) != 0) {
 
1962
    if (r < 0 && par.m_bdir > 0 || r > 0 && par.m_bdir < 0)
 
1963
      r = -r;
 
1964
  }
 
1965
  uint v = par.m_range + r;
 
1966
  switch (col.m_type) {
 
1967
  case Col::Unsigned:
 
1968
    m_uint32 = v;
 
1969
    break;
 
1970
  case Col::Char:
 
1971
    {
 
1972
      const Chs* chs = col.m_chs;
 
1973
      CHARSET_INFO* cs = chs->m_cs;
 
1974
      uint n = 0;
 
1975
      calcnokeychars(par, n, m_char);
 
1976
      // extend by appropriate space
 
1977
      (*cs->cset->fill)(cs, (char*)&m_char[n], col.m_bytelength - n, 0x20);
 
1978
    }
 
1979
    break;
 
1980
  case Col::Varchar:
 
1981
    {
 
1982
      uint n = 0;
 
1983
      calcnokeychars(par, n, m_varchar + 1);
 
1984
      // set length and pad with nulls
 
1985
      m_varchar[0] = n;
 
1986
      memset(&m_varchar[1 + n], 0, col.m_bytelength - n);
 
1987
    }
 
1988
    break;
 
1989
  case Col::Longvarchar:
 
1990
    {
 
1991
      uint n = 0;
 
1992
      calcnokeychars(par, n, m_longvarchar + 2);
 
1993
      // set length and pad with nulls
 
1994
      m_longvarchar[0] = (n & 0xff);
 
1995
      m_longvarchar[1] = (n >> 8);
 
1996
      memset(&m_longvarchar[2 + n], 0, col.m_bytelength - n);
 
1997
    }
 
1998
    break;
 
1999
  default:
 
2000
    assert(false);
 
2001
    break;
 
2002
  }
 
2003
}
 
2004
 
 
2005
void
 
2006
Val::calcnokeychars(Par par, uint& n, uchar* buf)
 
2007
{
 
2008
  const Col& col = m_col;
 
2009
  const Chs* chs = col.m_chs;
 
2010
  CHARSET_INFO* cs = chs->m_cs;
 
2011
  n = 0;
 
2012
  uint len = 0;
 
2013
  while (len < col.m_length) {
 
2014
    if (urandom(1 + col.m_bytelength) == 0) {
 
2015
      break;
 
2016
    }
 
2017
    uint half = maxcharcount / 2;
 
2018
    int r = irandom((par.m_pctrange * half) / 100);
 
2019
    if (par.m_bdir != 0 && urandom(10) != 0) {
 
2020
      if (r < 0 && par.m_bdir > 0 || r > 0 && par.m_bdir < 0)
 
2021
        r = -r;
 
2022
    }
 
2023
    uint i = half + r;
 
2024
    assert(i < maxcharcount);
 
2025
    const Chr& chr = chs->m_chr[i];
 
2026
    assert(n + chr.m_size <= col.m_bytelength);
 
2027
    memcpy(buf + n, chr.m_bytes, chr.m_size);
 
2028
    n += chr.m_size;
 
2029
    len++;
 
2030
  }
 
2031
}
 
2032
 
 
2033
// operations
 
2034
 
 
2035
int
 
2036
Val::setval(Par par) const
 
2037
{
 
2038
  Con& con = par.con();
 
2039
  const Col& col = m_col;
 
2040
  if (col.m_pk) {
 
2041
    assert(!m_null);
 
2042
    const char* addr = (const char*)dataaddr();
 
2043
    LL5("setval pk [" << col << "] " << *this);
 
2044
    CHK(con.equal(col.m_num, addr) == 0);
 
2045
  } else {
 
2046
    const char* addr = !m_null ? (const char*)dataaddr() : 0;
 
2047
    LL5("setval non-pk [" << col << "] " << *this);
 
2048
    CHK(con.setValue(col.m_num, addr) == 0);
 
2049
  }
 
2050
  return 0;
 
2051
}
 
2052
 
 
2053
int
 
2054
Val::setval(Par par, const ICol& icol) const
 
2055
{
 
2056
  Con& con = par.con();
 
2057
  assert(!m_null);
 
2058
  const char* addr = (const char*)dataaddr();
 
2059
  LL5("setval key [" << icol << "] " << *this);
 
2060
  CHK(con.equal(icol.m_num, addr) == 0);
 
2061
  return 0;
 
2062
}
 
2063
 
 
2064
// compare
 
2065
 
 
2066
int
 
2067
Val::cmp(Par par, const Val& val2) const
 
2068
{
 
2069
  const Col& col = m_col;
 
2070
  const Col& col2 = val2.m_col;
 
2071
  assert(col.equal(col2));
 
2072
  if (m_null || val2.m_null) {
 
2073
    if (!m_null)
 
2074
      return +1;
 
2075
    if (!val2.m_null)
 
2076
      return -1;
 
2077
    return 0;
 
2078
  }
 
2079
  // verify data formats
 
2080
  col.wellformed(dataaddr());
 
2081
  col.wellformed(val2.dataaddr());
 
2082
  // compare
 
2083
  switch (col.m_type) {
 
2084
  case Col::Unsigned:
 
2085
    {
 
2086
      if (m_uint32 < val2.m_uint32)
 
2087
        return -1;
 
2088
      if (m_uint32 > val2.m_uint32)
 
2089
        return +1;
 
2090
      return 0;
 
2091
    }
 
2092
    break;
 
2093
  case Col::Char:
 
2094
    {
 
2095
      uint len = col.m_bytelength;
 
2096
      return cmpchars(par, m_char, len, val2.m_char, len);
 
2097
    }
 
2098
    break;
 
2099
  case Col::Varchar:
 
2100
    {
 
2101
      uint len1 = m_varchar[0];
 
2102
      uint len2 = val2.m_varchar[0];
 
2103
      return cmpchars(par, m_varchar + 1, len1, val2.m_varchar + 1, len2);
 
2104
    }
 
2105
    break;
 
2106
  case Col::Longvarchar:
 
2107
    {
 
2108
      uint len1 = m_longvarchar[0] + (m_longvarchar[1] << 8);
 
2109
      uint len2 = val2.m_longvarchar[0] + (val2.m_longvarchar[1] << 8);
 
2110
      return cmpchars(par, m_longvarchar + 2, len1, val2.m_longvarchar + 2, len2);
 
2111
    }
 
2112
    break;
 
2113
  default:
 
2114
    break;
 
2115
  }
 
2116
  assert(false);
 
2117
  return 0;
 
2118
}
 
2119
 
 
2120
int
 
2121
Val::cmpchars(Par par, const uchar* buf1, uint len1, const uchar* buf2, uint len2) const
 
2122
{
 
2123
  const Col& col = m_col;
 
2124
  const Chs* chs = col.m_chs;
 
2125
  CHARSET_INFO* cs = chs->m_cs;
 
2126
  int k;
 
2127
  if (!par.m_collsp) {
 
2128
    uchar x1[maxxmulsize * 8000];
 
2129
    uchar x2[maxxmulsize * 8000];
 
2130
    // make strxfrm pad both to same length
 
2131
    uint len = maxxmulsize * col.m_bytelength;
 
2132
    int n1 = NdbSqlUtil::strnxfrm_bug7284(cs, x1, chs->m_xmul * len, buf1, len1);
 
2133
    int n2 = NdbSqlUtil::strnxfrm_bug7284(cs, x2, chs->m_xmul * len, buf2, len2);
 
2134
    assert(n1 != -1 && n1 == n2);
 
2135
    k = memcmp(x1, x2, n1);
 
2136
  } else {
 
2137
    k = (*cs->coll->strnncollsp)(cs, buf1, len1, buf2, len2, false);
 
2138
  }
 
2139
  return k < 0 ? -1 : k > 0 ? +1 : 0;
 
2140
}
 
2141
 
 
2142
int
 
2143
Val::verify(Par par, const Val& val2) const
 
2144
{
 
2145
  CHK(cmp(par, val2) == 0);
 
2146
  return 0;
 
2147
}
 
2148
 
 
2149
// print
 
2150
 
 
2151
static void
 
2152
printstring(NdbOut& out, const uchar* str, uint len, bool showlen)
 
2153
{
 
2154
  char buf[4 * 8000];
 
2155
  char *p = buf;
 
2156
  *p++ = '[';
 
2157
  if (showlen) {
 
2158
    sprintf(p, "%u:", len);
 
2159
    p += strlen(p);
 
2160
  }
 
2161
  for (uint i = 0; i < len; i++) {
 
2162
    uchar c = str[i];
 
2163
    if (c == '\\') {
 
2164
      *p++ = '\\';
 
2165
      *p++ = c;
 
2166
    } else if (0x20 <= c && c <= 0x7e) {
 
2167
      *p++ = c;
 
2168
    } else {
 
2169
      *p++ = '\\';
 
2170
      *p++ = hexstr[c >> 4];
 
2171
      *p++ = hexstr[c & 15];
 
2172
    }
 
2173
  }
 
2174
  *p++ = ']';
 
2175
  *p = 0;
 
2176
  out << buf;
 
2177
}
 
2178
 
 
2179
static NdbOut&
 
2180
operator<<(NdbOut& out, const Val& val)
 
2181
{
 
2182
  const Col& col = val.m_col;
 
2183
  if (val.m_null) {
 
2184
    out << "NULL";
 
2185
    return out;
 
2186
  }
 
2187
  switch (col.m_type) {
 
2188
  case Col::Unsigned:
 
2189
    out << val.m_uint32;
 
2190
    break;
 
2191
  case Col::Char:
 
2192
    {
 
2193
      uint len = col.m_bytelength;
 
2194
      printstring(out, val.m_char, len, false);
 
2195
    }
 
2196
    break;
 
2197
  case Col::Varchar:
 
2198
    {
 
2199
      uint len = val.m_varchar[0];
 
2200
      printstring(out, val.m_varchar + 1, len, true);
 
2201
    }
 
2202
    break;
 
2203
  case Col::Longvarchar:
 
2204
    {
 
2205
      uint len = val.m_longvarchar[0] + (val.m_longvarchar[1] << 8);
 
2206
      printstring(out, val.m_longvarchar + 2, len, true);
 
2207
    }
 
2208
    break;
 
2209
  default:
 
2210
    out << "type" << col.m_type;
 
2211
    assert(false);
 
2212
    break;
 
2213
  }
 
2214
  return out;
 
2215
}
 
2216
 
 
2217
// Row - table tuple
 
2218
 
 
2219
struct Row {
 
2220
  const Tab& m_tab;
 
2221
  Val** m_val;
 
2222
  enum St {
 
2223
    StUndef = 0,
 
2224
    StDefine = 1,
 
2225
    StPrepare = 2,
 
2226
    StCommit = 3
 
2227
  };
 
2228
  enum Op {
 
2229
    OpNone = 0,
 
2230
    OpIns = 2,
 
2231
    OpUpd = 4,
 
2232
    OpDel = 8,
 
2233
    OpRead = 16,
 
2234
    OpReadEx = 32,
 
2235
    OpReadCom = 64,
 
2236
    OpDML = 2 | 4 | 8,
 
2237
    OpREAD = 16 | 32 | 64
 
2238
  };
 
2239
  St m_st;
 
2240
  Op m_op;
 
2241
  Uint64 m_txid;
 
2242
  Row* m_bi;
 
2243
  // construct
 
2244
  Row(const Tab& tab);
 
2245
  ~Row();
 
2246
  void copy(const Row& row2, bool copy_bi);
 
2247
  void copyval(const Row& row2, uint colmask = ~0);
 
2248
  void calc(Par par, uint i, uint colmask = ~0);
 
2249
  // operations
 
2250
  int setval(Par par, uint colmask = ~0);
 
2251
  int setval(Par par, const ITab& itab);
 
2252
  int insrow(Par par);
 
2253
  int updrow(Par par);
 
2254
  int updrow(Par par, const ITab& itab);
 
2255
  int delrow(Par par);
 
2256
  int delrow(Par par, const ITab& itab);
 
2257
  int selrow(Par par);
 
2258
  int selrow(Par par, const ITab& itab);
 
2259
  int setrow(Par par);
 
2260
  // compare
 
2261
  int cmp(Par par, const Row& row2) const;
 
2262
  int cmp(Par par, const Row& row2, const ITab& itab) const;
 
2263
  int verify(Par par, const Row& row2, bool pkonly) const;
 
2264
private:
 
2265
  Row& operator=(const Row& row2);
 
2266
};
 
2267
 
 
2268
static NdbOut&
 
2269
operator<<(NdbOut& out, const Row* rowp);
 
2270
 
 
2271
static NdbOut&
 
2272
operator<<(NdbOut& out, const Row& row);
 
2273
 
 
2274
// construct
 
2275
 
 
2276
Row::Row(const Tab& tab) :
 
2277
  m_tab(tab)
 
2278
{
 
2279
  m_val = new Val* [tab.m_cols];
 
2280
  for (uint k = 0; k < tab.m_cols; k++) {
 
2281
    const Col& col = *tab.m_col[k];
 
2282
    m_val[k] = new Val(col);
 
2283
  }
 
2284
  m_st = StUndef;
 
2285
  m_op = OpNone;
 
2286
  m_txid = 0;
 
2287
  m_bi = 0;
 
2288
}
 
2289
 
 
2290
Row::~Row()
 
2291
{
 
2292
  const Tab& tab = m_tab;
 
2293
  for (uint k = 0; k < tab.m_cols; k++) {
 
2294
    delete m_val[k];
 
2295
  }
 
2296
  delete [] m_val;
 
2297
  delete m_bi;
 
2298
}
 
2299
 
 
2300
void
 
2301
Row::copy(const Row& row2, bool copy_bi)
 
2302
{
 
2303
  const Tab& tab = m_tab;
 
2304
  copyval(row2);
 
2305
  m_st = row2.m_st;
 
2306
  m_op = row2.m_op;
 
2307
  m_txid = row2.m_txid;
 
2308
  assert(m_bi == 0);
 
2309
  if (copy_bi && row2.m_bi != 0) {
 
2310
    m_bi = new Row(tab);
 
2311
    m_bi->copy(*row2.m_bi, copy_bi);
 
2312
  }
 
2313
}
 
2314
 
 
2315
void
 
2316
Row::copyval(const Row& row2, uint colmask)
 
2317
{
 
2318
  const Tab& tab = m_tab;
 
2319
  assert(&tab == &row2.m_tab);
 
2320
  for (uint k = 0; k < tab.m_cols; k++) {
 
2321
    Val& val = *m_val[k];
 
2322
    const Val& val2 = *row2.m_val[k];
 
2323
    if ((1 << k) & colmask)
 
2324
      val.copy(val2);
 
2325
  }
 
2326
}
 
2327
 
 
2328
void
 
2329
Row::calc(Par par, uint i, uint colmask)
 
2330
{
 
2331
  const Tab& tab = m_tab;
 
2332
  for (uint k = 0; k < tab.m_cols; k++) {
 
2333
    if ((1 << k) & colmask) {
 
2334
      Val& val = *m_val[k];
 
2335
      val.calc(par, i);
 
2336
    }
 
2337
  }
 
2338
}
 
2339
 
 
2340
// operations
 
2341
 
 
2342
int
 
2343
Row::setval(Par par, uint colmask)
 
2344
{
 
2345
  const Tab& tab = m_tab;
 
2346
  Rsq rsq(tab.m_cols);
 
2347
  for (uint k = 0; k < tab.m_cols; k++) {
 
2348
    uint k2 = rsq.next();
 
2349
    if ((1 << k2) & colmask) {
 
2350
      const Val& val = *m_val[k2];
 
2351
      CHK(val.setval(par) == 0);
 
2352
    }
 
2353
  }
 
2354
  return 0;
 
2355
}
 
2356
 
 
2357
int
 
2358
Row::setval(Par par, const ITab& itab)
 
2359
{
 
2360
  Con& con = par.con();
 
2361
  Rsq rsq(itab.m_icols);
 
2362
  for (uint k = 0; k < itab.m_icols; k++) {
 
2363
    uint k2 = rsq.next();
 
2364
    const ICol& icol = *itab.m_icol[k2];
 
2365
    const Col& col = icol.m_col;
 
2366
    uint m = col.m_num;
 
2367
    const Val& val = *m_val[m];
 
2368
    CHK(val.setval(par, icol) == 0);
 
2369
  }
 
2370
  return 0;
 
2371
}
 
2372
 
 
2373
int
 
2374
Row::insrow(Par par)
 
2375
{
 
2376
  Con& con = par.con();
 
2377
  const Tab& tab = m_tab;
 
2378
  CHK(con.getNdbOperation(tab) == 0);
 
2379
  CHKCON(con.m_op->insertTuple() == 0, con);
 
2380
  CHK(setval(par, tab.m_pkmask) == 0);
 
2381
  CHK(setval(par, ~tab.m_pkmask) == 0);
 
2382
  assert(m_st == StUndef);
 
2383
  m_st = StDefine;
 
2384
  m_op = OpIns;
 
2385
  m_txid = con.m_txid;
 
2386
  return 0;
 
2387
}
 
2388
 
 
2389
int
 
2390
Row::updrow(Par par)
 
2391
{
 
2392
  Con& con = par.con();
 
2393
  const Tab& tab = m_tab;
 
2394
  CHK(con.getNdbOperation(tab) == 0);
 
2395
  CHKCON(con.m_op->updateTuple() == 0, con);
 
2396
  CHK(setval(par, tab.m_pkmask) == 0);
 
2397
  CHK(setval(par, ~tab.m_pkmask) == 0);
 
2398
  assert(m_st == StUndef);
 
2399
  m_st = StDefine;
 
2400
  m_op = OpUpd;
 
2401
  m_txid = con.m_txid;
 
2402
  return 0;
 
2403
}
 
2404
 
 
2405
int
 
2406
Row::updrow(Par par, const ITab& itab)
 
2407
{
 
2408
  Con& con = par.con();
 
2409
  const Tab& tab = m_tab;
 
2410
  assert(itab.m_type == ITab::UniqueHashIndex && &itab.m_tab == &tab);
 
2411
  CHK(con.getNdbIndexOperation(itab, tab) == 0);
 
2412
  CHKCON(con.m_op->updateTuple() == 0, con);
 
2413
  CHK(setval(par, itab) == 0);
 
2414
  CHK(setval(par, ~tab.m_pkmask) == 0);
 
2415
  assert(m_st == StUndef);
 
2416
  m_st = StDefine;
 
2417
  m_op = OpUpd;
 
2418
  m_txid = con.m_txid;
 
2419
  return 0;
 
2420
}
 
2421
 
 
2422
int
 
2423
Row::delrow(Par par)
 
2424
{
 
2425
  Con& con = par.con();
 
2426
  const Tab& tab = m_tab;
 
2427
  CHK(con.getNdbOperation(m_tab) == 0);
 
2428
  CHKCON(con.m_op->deleteTuple() == 0, con);
 
2429
  CHK(setval(par, tab.m_pkmask) == 0);
 
2430
  assert(m_st == StUndef);
 
2431
  m_st = StDefine;
 
2432
  m_op = OpDel;
 
2433
  m_txid = con.m_txid;
 
2434
  return 0;
 
2435
}
 
2436
 
 
2437
int
 
2438
Row::delrow(Par par, const ITab& itab)
 
2439
{
 
2440
  Con& con = par.con();
 
2441
  const Tab& tab = m_tab;
 
2442
  assert(itab.m_type == ITab::UniqueHashIndex && &itab.m_tab == &tab);
 
2443
  CHK(con.getNdbIndexOperation(itab, tab) == 0);
 
2444
  CHKCON(con.m_op->deleteTuple() == 0, con);
 
2445
  CHK(setval(par, itab) == 0);
 
2446
  assert(m_st == StUndef);
 
2447
  m_st = StDefine;
 
2448
  m_op = OpDel;
 
2449
  m_txid = con.m_txid;
 
2450
  return 0;
 
2451
}
 
2452
 
 
2453
int
 
2454
Row::selrow(Par par)
 
2455
{
 
2456
  Con& con = par.con();
 
2457
  const Tab& tab = m_tab;
 
2458
  CHK(con.getNdbOperation(m_tab) == 0);
 
2459
  CHKCON(con.readTuple(par) == 0, con);
 
2460
  CHK(setval(par, tab.m_pkmask) == 0);
 
2461
  // TODO state
 
2462
  return 0;
 
2463
}
 
2464
 
 
2465
int
 
2466
Row::selrow(Par par, const ITab& itab)
 
2467
{
 
2468
  Con& con = par.con();
 
2469
  const Tab& tab = m_tab;
 
2470
  assert(itab.m_type == ITab::UniqueHashIndex && &itab.m_tab == &tab);
 
2471
  CHK(con.getNdbIndexOperation(itab, tab) == 0);
 
2472
  CHKCON(con.readTuple(par) == 0, con);
 
2473
  CHK(setval(par, itab) == 0);
 
2474
  // TODO state
 
2475
  return 0;
 
2476
}
 
2477
 
 
2478
int
 
2479
Row::setrow(Par par)
 
2480
{
 
2481
  Con& con = par.con();
 
2482
  const Tab& tab = m_tab;
 
2483
  CHK(setval(par, ~tab.m_pkmask) == 0);
 
2484
  assert(m_st == StUndef);
 
2485
  m_st = StDefine;
 
2486
  m_op = OpUpd;
 
2487
  m_txid = con.m_txid;
 
2488
  return 0;
 
2489
}
 
2490
 
 
2491
// compare
 
2492
 
 
2493
int
 
2494
Row::cmp(Par par, const Row& row2) const
 
2495
{
 
2496
  const Tab& tab = m_tab;
 
2497
  assert(&tab == &row2.m_tab);
 
2498
  int c = 0;
 
2499
  for (uint k = 0; k < tab.m_cols; k++) {
 
2500
    const Val& val = *m_val[k];
 
2501
    const Val& val2 = *row2.m_val[k];
 
2502
    if ((c = val.cmp(par, val2)) != 0)
 
2503
      break;
 
2504
  }
 
2505
  return c;
 
2506
}
 
2507
 
 
2508
int
 
2509
Row::cmp(Par par, const Row& row2, const ITab& itab) const
 
2510
{
 
2511
  const Tab& tab = m_tab;
 
2512
  int c = 0;
 
2513
  for (uint i = 0; i < itab.m_icols; i++) {
 
2514
    const ICol& icol = *itab.m_icol[i];
 
2515
    const Col& col = icol.m_col;
 
2516
    uint k = col.m_num;
 
2517
    assert(k < tab.m_cols);
 
2518
    const Val& val = *m_val[k];
 
2519
    const Val& val2 = *row2.m_val[k];
 
2520
    if ((c = val.cmp(par, val2)) != 0)
 
2521
      break;
 
2522
  }
 
2523
  return c;
 
2524
}
 
2525
 
 
2526
int
 
2527
Row::verify(Par par, const Row& row2, bool pkonly) const
 
2528
{
 
2529
  const Tab& tab = m_tab;
 
2530
  const Row& row1 = *this;
 
2531
  assert(&row1.m_tab == &row2.m_tab);
 
2532
  for (uint k = 0; k < tab.m_cols; k++) {
 
2533
    const Col& col = row1.m_val[k]->m_col;
 
2534
    if (!pkonly || col.m_pk) {
 
2535
      const Val& val1 = *row1.m_val[k];
 
2536
      const Val& val2 = *row2.m_val[k];
 
2537
      CHK(val1.verify(par, val2) == 0);
 
2538
    }
 
2539
  }
 
2540
  return 0;
 
2541
}
 
2542
 
 
2543
// print
 
2544
 
 
2545
static NdbOut&
 
2546
operator<<(NdbOut& out, const Row::St st)
 
2547
{
 
2548
  if (st == Row::StUndef)
 
2549
    out << "StUndef";
 
2550
  else if (st == Row::StDefine)
 
2551
    out << "StDefine";
 
2552
  else if (st == Row::StPrepare)
 
2553
    out << "StPrepare";
 
2554
  else if (st == Row::StCommit)
 
2555
    out << "StCommit";
 
2556
  else
 
2557
    out << "st=" << st;
 
2558
  return out;
 
2559
}
 
2560
 
 
2561
static NdbOut&
 
2562
operator<<(NdbOut& out, const Row::Op op)
 
2563
{
 
2564
  if (op == Row::OpNone)
 
2565
    out << "OpNone";
 
2566
  else if (op == Row::OpIns)
 
2567
    out << "OpIns";
 
2568
  else if (op == Row::OpUpd)
 
2569
    out << "OpUpd";
 
2570
  else if (op == Row::OpDel)
 
2571
    out << "OpDel";
 
2572
  else if (op == Row::OpRead)
 
2573
    out << "OpRead";
 
2574
  else if (op == Row::OpReadEx)
 
2575
    out << "OpReadEx";
 
2576
  else if (op == Row::OpReadCom)
 
2577
    out << "OpReadCom";
 
2578
  else
 
2579
    out << "op=" << op;
 
2580
  return out;
 
2581
}
 
2582
 
 
2583
static NdbOut&
 
2584
operator<<(NdbOut& out, const Row* rowp)
 
2585
{
 
2586
  if (rowp == 0)
 
2587
    out << "[null]";
 
2588
  else
 
2589
    out << *rowp;
 
2590
  return out;
 
2591
}
 
2592
 
 
2593
static NdbOut&
 
2594
operator<<(NdbOut& out, const Row& row)
 
2595
{
 
2596
  const Tab& tab = row.m_tab;
 
2597
  out << "[";
 
2598
  for (uint i = 0; i < tab.m_cols; i++) {
 
2599
    if (i > 0)
 
2600
      out << " ";
 
2601
    out << *row.m_val[i];
 
2602
  }
 
2603
  out << " " << row.m_st;
 
2604
  out << " " << row.m_op;
 
2605
  out << " " << HEX(row.m_txid);
 
2606
  if (row.m_bi != 0)
 
2607
    out << " " << row.m_bi;
 
2608
  out << "]";
 
2609
  return out;
 
2610
}
 
2611
 
 
2612
// Set - set of table tuples
 
2613
 
 
2614
struct Set {
 
2615
  const Tab& m_tab;
 
2616
  uint m_rows;
 
2617
  Row** m_row;
 
2618
  uint* m_rowkey; // maps row number (from 0) in scan to tuple key
 
2619
  Row* m_keyrow;
 
2620
  NdbRecAttr** m_rec;
 
2621
  // construct
 
2622
  Set(const Tab& tab, uint rows);
 
2623
  ~Set();
 
2624
  void reset();
 
2625
  bool compat(Par par, uint i, const Row::Op op) const;
 
2626
  void push(uint i);
 
2627
  void copyval(uint i, uint colmask = ~0); // from bi
 
2628
  void calc(Par par, uint i, uint colmask = ~0);
 
2629
  uint count() const;
 
2630
  const Row* getrow(uint i, bool dirty = false) const;
 
2631
  // transaction
 
2632
  void post(Par par, ExecType et);
 
2633
  // operations
 
2634
  int insrow(Par par, uint i);
 
2635
  int updrow(Par par, uint i);
 
2636
  int updrow(Par par, const ITab& itab, uint i);
 
2637
  int delrow(Par par, uint i);
 
2638
  int delrow(Par par, const ITab& itab, uint i);
 
2639
  int selrow(Par par, const Row& keyrow);
 
2640
  int selrow(Par par, const ITab& itab, const Row& keyrow);
 
2641
  int setrow(Par par, uint i);
 
2642
  int getval(Par par);
 
2643
  int getkey(Par par, uint* i);
 
2644
  int putval(uint i, bool force, uint n = ~0);
 
2645
  // compare
 
2646
  void sort(Par par, const ITab& itab);
 
2647
  int verify(Par par, const Set& set2, bool pkonly, bool dirty = false) const;
 
2648
  int verifyorder(Par par, const ITab& itab, bool descending) const;
 
2649
  // protect structure
 
2650
  NdbMutex* m_mutex;
 
2651
  void lock() const {
 
2652
    NdbMutex_Lock(m_mutex);
 
2653
  }
 
2654
  void unlock() const {
 
2655
    NdbMutex_Unlock(m_mutex);
 
2656
  }
 
2657
private:
 
2658
  void sort(Par par, const ITab& itab, uint lo, uint hi);
 
2659
  Set& operator=(const Set& set2);
 
2660
};
 
2661
 
 
2662
// construct
 
2663
 
 
2664
Set::Set(const Tab& tab, uint rows) :
 
2665
  m_tab(tab)
 
2666
{
 
2667
  m_rows = rows;
 
2668
  m_row = new Row* [m_rows];
 
2669
  for (uint i = 0; i < m_rows; i++) {
 
2670
    m_row[i] = 0;
 
2671
  }
 
2672
  m_rowkey = new uint [m_rows];
 
2673
  for (uint n = 0; n < m_rows; n++) {
 
2674
    m_rowkey[n] = ~0;
 
2675
  }
 
2676
  m_keyrow = new Row(tab);
 
2677
  m_rec = new NdbRecAttr* [tab.m_cols];
 
2678
  for (uint k = 0; k < tab.m_cols; k++) {
 
2679
    m_rec[k] = 0;
 
2680
  }
 
2681
  m_mutex = NdbMutex_Create();
 
2682
  assert(m_mutex != 0);
 
2683
}
 
2684
 
 
2685
Set::~Set()
 
2686
{
 
2687
  for (uint i = 0; i < m_rows; i++) {
 
2688
    delete m_row[i];
 
2689
  }
 
2690
  delete [] m_row;
 
2691
  delete [] m_rowkey;
 
2692
  delete m_keyrow;
 
2693
  delete [] m_rec;
 
2694
  NdbMutex_Destroy(m_mutex);
 
2695
}
 
2696
 
 
2697
void
 
2698
Set::reset()
 
2699
{
 
2700
  for (uint i = 0; i < m_rows; i++) {
 
2701
    m_row[i] = 0;
 
2702
  }
 
2703
}
 
2704
 
 
2705
// this sucks
 
2706
bool
 
2707
Set::compat(Par par, uint i, const Row::Op op) const
 
2708
{
 
2709
  Con& con = par.con();
 
2710
  int ret = -1;
 
2711
  int place = 0;
 
2712
  do {
 
2713
    const Row* rowp = getrow(i);
 
2714
    if (rowp == 0) {
 
2715
      ret = op == Row::OpIns;
 
2716
      place = 1;
 
2717
      break;
 
2718
    }
 
2719
    const Row& row = *rowp;
 
2720
    if (!(op & Row::OpREAD)) {
 
2721
      if (row.m_st == Row::StDefine || row.m_st == Row::StPrepare) {
 
2722
        assert(row.m_op & Row::OpDML);
 
2723
        assert(row.m_txid != 0);
 
2724
        if (con.m_txid != row.m_txid) {
 
2725
          ret = false;
 
2726
          place = 2;
 
2727
          break;
 
2728
        }
 
2729
        if (row.m_op != Row::OpDel) {
 
2730
          ret = op == Row::OpUpd || op == Row::OpDel;
 
2731
          place = 3;
 
2732
          break;
 
2733
        }
 
2734
        ret = op == Row::OpIns;
 
2735
        place = 4;
 
2736
        break;
 
2737
      }
 
2738
      if (row.m_st == Row::StCommit) {
 
2739
        assert(row.m_op == Row::OpNone);
 
2740
        assert(row.m_txid == 0);
 
2741
        ret = op == Row::OpUpd || op == Row::OpDel;
 
2742
        place = 5;
 
2743
        break;
 
2744
      }
 
2745
    }
 
2746
    if (op & Row::OpREAD) {
 
2747
      bool dirty =
 
2748
        con.m_txid != row.m_txid &&
 
2749
        par.m_lockmode == NdbOperation::LM_CommittedRead;
 
2750
      const Row* rowp2 = getrow(i, dirty);
 
2751
      if (rowp2 == 0 || rowp2->m_op == Row::OpDel) {
 
2752
        ret = false;
 
2753
        place = 6;
 
2754
        break;
 
2755
      }
 
2756
      ret = true;
 
2757
      place = 7;
 
2758
      break;
 
2759
    }
 
2760
  } while (0);
 
2761
  LL4("compat ret=" << ret << " place=" << place);
 
2762
  assert(ret == false || ret == true);
 
2763
  return ret;
 
2764
}
 
2765
 
 
2766
void
 
2767
Set::push(uint i)
 
2768
{
 
2769
  const Tab& tab = m_tab;
 
2770
  assert(i < m_rows);
 
2771
  Row* bi = m_row[i];
 
2772
  m_row[i] = new Row(tab);
 
2773
  Row& row = *m_row[i];
 
2774
  row.m_bi = bi;
 
2775
  if (bi != 0)
 
2776
    row.copyval(*bi);
 
2777
}
 
2778
 
 
2779
void
 
2780
Set::copyval(uint i, uint colmask)
 
2781
{
 
2782
  assert(m_row[i] != 0);
 
2783
  Row& row = *m_row[i];
 
2784
  assert(row.m_bi != 0);
 
2785
  row.copyval(*row.m_bi, colmask);
 
2786
}
 
2787
 
 
2788
void
 
2789
Set::calc(Par par, uint i, uint colmask)
 
2790
{
 
2791
  assert(m_row[i] != 0);
 
2792
  Row& row = *m_row[i];
 
2793
  row.calc(par, i, colmask);
 
2794
}
 
2795
 
 
2796
uint
 
2797
Set::count() const
 
2798
{
 
2799
  uint count = 0;
 
2800
  for (uint i = 0; i < m_rows; i++) {
 
2801
    if (m_row[i] != 0)
 
2802
      count++;
 
2803
  }
 
2804
  return count;
 
2805
}
 
2806
 
 
2807
const Row*
 
2808
Set::getrow(uint i, bool dirty) const
 
2809
{
 
2810
  assert(i < m_rows);
 
2811
  const Row* rowp = m_row[i];
 
2812
  if (dirty) {
 
2813
    while (rowp != 0) {
 
2814
      bool b1 = rowp->m_op == Row::OpNone;
 
2815
      bool b2 = rowp->m_st == Row::StCommit;
 
2816
      assert(b1 == b2);
 
2817
      if (b1) {
 
2818
        assert(rowp->m_bi == 0);
 
2819
        break;
 
2820
      }
 
2821
      rowp = rowp->m_bi;
 
2822
    }
 
2823
  }
 
2824
  return rowp;
 
2825
}
 
2826
 
 
2827
// transaction
 
2828
 
 
2829
void
 
2830
Set::post(Par par, ExecType et)
 
2831
{
 
2832
  LL4("post");
 
2833
  Con& con = par.con();
 
2834
  assert(con.m_txid != 0);
 
2835
  uint i;
 
2836
  for (i = 0; i < m_rows; i++) {
 
2837
    Row* rowp = m_row[i];
 
2838
    if (rowp == 0) {
 
2839
      LL5("skip " << i << " " << rowp);
 
2840
      continue;
 
2841
    }
 
2842
    if (rowp->m_st == Row::StCommit) {
 
2843
      assert(rowp->m_op == Row::OpNone);
 
2844
      assert(rowp->m_txid == 0);
 
2845
      assert(rowp->m_bi == 0);
 
2846
      LL5("skip committed " << i << " " << rowp);
 
2847
      continue;
 
2848
    }
 
2849
    assert(rowp->m_st == Row::StDefine || rowp->m_st == Row::StPrepare);
 
2850
    assert(rowp->m_txid != 0);
 
2851
    if (con.m_txid != rowp->m_txid) {
 
2852
      LL5("skip txid " << i << " " << HEX(con.m_txid) << " " << rowp);
 
2853
      continue;
 
2854
    }
 
2855
    // TODO read ops
 
2856
    assert(rowp->m_op & Row::OpDML);
 
2857
    LL4("post BEFORE " << rowp);
 
2858
    if (et == NoCommit) {
 
2859
      if (rowp->m_st == Row::StDefine) {
 
2860
        rowp->m_st = Row::StPrepare;
 
2861
        Row* bi = rowp->m_bi;
 
2862
        while (bi != 0 && bi->m_st == Row::StDefine) {
 
2863
          bi->m_st = Row::StPrepare;
 
2864
          bi = bi->m_bi;
 
2865
        }
 
2866
      }
 
2867
    } else if (et == Commit) {
 
2868
      if (rowp->m_op != Row::OpDel) {
 
2869
        rowp->m_st = Row::StCommit;
 
2870
        rowp->m_op = Row::OpNone;
 
2871
        rowp->m_txid = 0;
 
2872
        delete rowp->m_bi;
 
2873
        rowp->m_bi = 0;
 
2874
      } else {
 
2875
        delete rowp;
 
2876
        rowp = 0;
 
2877
      }
 
2878
    } else if (et == Rollback) {
 
2879
      while (rowp != 0 && rowp->m_st != Row::StCommit) {
 
2880
        Row* tmp = rowp;
 
2881
        rowp = rowp->m_bi;
 
2882
        tmp->m_bi = 0;
 
2883
        delete tmp;
 
2884
      }
 
2885
    } else {
 
2886
      assert(false);
 
2887
    }
 
2888
    m_row[i] = rowp;
 
2889
    LL4("post AFTER " << rowp);
 
2890
  }
 
2891
}
 
2892
 
 
2893
// operations
 
2894
 
 
2895
int
 
2896
Set::insrow(Par par, uint i)
 
2897
{
 
2898
  assert(m_row[i] != 0);
 
2899
  Row& row = *m_row[i];
 
2900
  CHK(row.insrow(par) == 0);
 
2901
  return 0;
 
2902
}
 
2903
 
 
2904
int
 
2905
Set::updrow(Par par, uint i)
 
2906
{
 
2907
  assert(m_row[i] != 0);
 
2908
  Row& row = *m_row[i];
 
2909
  CHK(row.updrow(par) == 0);
 
2910
  return 0;
 
2911
}
 
2912
 
 
2913
int
 
2914
Set::updrow(Par par, const ITab& itab, uint i)
 
2915
{
 
2916
  assert(m_row[i] != 0);
 
2917
  Row& row = *m_row[i];
 
2918
  CHK(row.updrow(par, itab) == 0);
 
2919
  return 0;
 
2920
}
 
2921
 
 
2922
int
 
2923
Set::delrow(Par par, uint i)
 
2924
{
 
2925
  assert(m_row[i] != 0);
 
2926
  Row& row = *m_row[i];
 
2927
  CHK(row.delrow(par) == 0);
 
2928
  return 0;
 
2929
}
 
2930
 
 
2931
int
 
2932
Set::delrow(Par par, const ITab& itab, uint i)
 
2933
{
 
2934
  assert(m_row[i] != 0);
 
2935
  Row& row = *m_row[i];
 
2936
  CHK(row.delrow(par, itab) == 0);
 
2937
  return 0;
 
2938
}
 
2939
 
 
2940
int
 
2941
Set::selrow(Par par, const Row& keyrow)
 
2942
{
 
2943
  Con& con = par.con();
 
2944
  const Tab& tab = par.tab();
 
2945
  LL5("selrow " << tab.m_name << " keyrow " << keyrow);
 
2946
  m_keyrow->copyval(keyrow, tab.m_pkmask);
 
2947
  CHK(m_keyrow->selrow(par) == 0);
 
2948
  CHK(getval(par) == 0);
 
2949
  return 0;
 
2950
}
 
2951
 
 
2952
int
 
2953
Set::selrow(Par par, const ITab& itab, const Row& keyrow)
 
2954
{
 
2955
  Con& con = par.con();
 
2956
  LL5("selrow " << itab.m_name << " keyrow " << keyrow);
 
2957
  m_keyrow->copyval(keyrow, itab.m_keymask);
 
2958
  CHK(m_keyrow->selrow(par, itab) == 0);
 
2959
  CHK(getval(par) == 0);
 
2960
  return 0;
 
2961
}
 
2962
 
 
2963
int
 
2964
Set::setrow(Par par, uint i)
 
2965
{
 
2966
  Con& con = par.con();
 
2967
  assert(m_row[i] != 0);
 
2968
  CHK(m_row[i]->setrow(par) == 0);
 
2969
  return 0;
 
2970
}
 
2971
 
 
2972
int
 
2973
Set::getval(Par par)
 
2974
{
 
2975
  Con& con = par.con();
 
2976
  const Tab& tab = m_tab;
 
2977
  Rsq rsq1(tab.m_cols);
 
2978
  for (uint k = 0; k < tab.m_cols; k++) {
 
2979
    uint k2 = rsq1.next();
 
2980
    CHK(con.getValue(k2, m_rec[k2]) == 0);
 
2981
  }
 
2982
  return 0;
 
2983
}
 
2984
 
 
2985
int
 
2986
Set::getkey(Par par, uint* i)
 
2987
{
 
2988
  const Tab& tab = m_tab;
 
2989
  uint k = tab.m_keycol;
 
2990
  assert(m_rec[k] != 0);
 
2991
  const char* aRef = m_rec[k]->aRef();
 
2992
  Uint32 key = *(const Uint32*)aRef;
 
2993
  LL5("getkey: " << key);
 
2994
  CHK(key < m_rows);
 
2995
  *i = key;
 
2996
  return 0;
 
2997
}
 
2998
 
 
2999
int
 
3000
Set::putval(uint i, bool force, uint n)
 
3001
{
 
3002
  const Tab& tab = m_tab;
 
3003
  LL4("putval key=" << i << " row=" << n << " old=" << m_row[i]);
 
3004
  if (m_row[i] != 0) {
 
3005
    assert(force);
 
3006
    delete m_row[i];
 
3007
    m_row[i] = 0;
 
3008
  }
 
3009
  m_row[i] = new Row(tab);
 
3010
  Row& row = *m_row[i];
 
3011
  for (uint k = 0; k < tab.m_cols; k++) {
 
3012
    Val& val = *row.m_val[k];
 
3013
    NdbRecAttr* rec = m_rec[k];
 
3014
    assert(rec != 0);
 
3015
    if (rec->isNULL()) {
 
3016
      val.m_null = true;
 
3017
      continue;
 
3018
    }
 
3019
    const char* aRef = m_rec[k]->aRef();
 
3020
    val.copy(aRef);
 
3021
    val.m_null = false;
 
3022
  }
 
3023
  if (n != ~0)
 
3024
    m_rowkey[n] = i;
 
3025
  return 0;
 
3026
}
 
3027
 
 
3028
// compare
 
3029
 
 
3030
void
 
3031
Set::sort(Par par, const ITab& itab)
 
3032
{
 
3033
  if (m_rows != 0)
 
3034
    sort(par, itab, 0, m_rows - 1);
 
3035
}
 
3036
 
 
3037
void
 
3038
Set::sort(Par par, const ITab& itab, uint lo, uint hi)
 
3039
{
 
3040
  assert(lo < m_rows && hi < m_rows && lo <= hi);
 
3041
  Row* const p = m_row[lo];
 
3042
  uint i = lo;
 
3043
  uint j = hi;
 
3044
  while (i < j) {
 
3045
    while (i < j && m_row[j]->cmp(par, *p, itab) >= 0)
 
3046
      j--;
 
3047
    if (i < j) {
 
3048
      m_row[i] = m_row[j];
 
3049
      i++;
 
3050
    }
 
3051
    while (i < j && m_row[i]->cmp(par, *p, itab) <= 0)
 
3052
      i++;
 
3053
    if (i < j) {
 
3054
      m_row[j] = m_row[i];
 
3055
      j--;
 
3056
    }
 
3057
  }
 
3058
  m_row[i] = p;
 
3059
  if (lo < i)
 
3060
    sort(par, itab, lo, i - 1);
 
3061
  if (hi > i)
 
3062
    sort(par, itab, i + 1, hi);
 
3063
}
 
3064
 
 
3065
/*
 
3066
 * set1 (self) is from dml and can contain un-committed operations.
 
3067
 * set2 is from read and contains no operations.  "dirty" applies
 
3068
 * to set1: false = use latest row, true = use committed row.
 
3069
 */
 
3070
int
 
3071
Set::verify(Par par, const Set& set2, bool pkonly, bool dirty) const
 
3072
{
 
3073
  const Set& set1 = *this;
 
3074
  assert(&set1.m_tab == &set2.m_tab && set1.m_rows == set2.m_rows);
 
3075
  LL3("verify dirty:" << dirty);
 
3076
  for (uint i = 0; i < set1.m_rows; i++) {
 
3077
    // the row versions we actually compare
 
3078
    const Row* row1p = set1.getrow(i, dirty);
 
3079
    const Row* row2p = set2.getrow(i);
 
3080
    bool ok = true;
 
3081
    int place = 0;
 
3082
    if (row1p == 0) {
 
3083
      if (row2p != 0) {
 
3084
        ok = false;
 
3085
        place = 1;
 
3086
      }
 
3087
    } else {
 
3088
      Row::Op op1 = row1p->m_op;
 
3089
      if (op1 != Row::OpDel) {
 
3090
        if (row2p == 0) {
 
3091
          ok = false;
 
3092
          place = 2;
 
3093
        } else if (row1p->verify(par, *row2p, pkonly) == -1) {
 
3094
          ok = false;
 
3095
          place = 3;
 
3096
        }
 
3097
      } else if (row2p != 0) {
 
3098
        ok = false;
 
3099
        place = 4;
 
3100
      }
 
3101
    }
 
3102
    if (!ok) {
 
3103
      LL1("verify " << i << " failed at " << place);
 
3104
      LL1("row1 " << row1p);
 
3105
      LL1("row2 " << row2p);
 
3106
      CHK(false);
 
3107
    }
 
3108
  }
 
3109
  return 0;
 
3110
}
 
3111
 
 
3112
int
 
3113
Set::verifyorder(Par par, const ITab& itab, bool descending) const
 
3114
{
 
3115
  const Tab& tab = m_tab;
 
3116
  for (uint n = 0; n < m_rows; n++) {
 
3117
    uint i2 = m_rowkey[n];
 
3118
    if (i2 == ~0)
 
3119
      break;
 
3120
    if (n == 0)
 
3121
      continue;
 
3122
    uint i1 = m_rowkey[n - 1];
 
3123
    assert(m_row[i1] != 0 && m_row[i2] != 0);
 
3124
    const Row& row1 = *m_row[i1];
 
3125
    const Row& row2 = *m_row[i2];
 
3126
    if (!descending)
 
3127
      CHK(row1.cmp(par, row2, itab) <= 0);
 
3128
    else
 
3129
      CHK(row1.cmp(par, row2, itab) >= 0);
 
3130
  }
 
3131
  return 0;
 
3132
}
 
3133
 
 
3134
// print
 
3135
 
 
3136
static NdbOut&
 
3137
operator<<(NdbOut& out, const Set& set)
 
3138
{
 
3139
  for (uint i = 0; i < set.m_rows; i++) {
 
3140
    const Row& row = *set.m_row[i];
 
3141
    if (i > 0)
 
3142
      out << endl;
 
3143
    out << row;
 
3144
  }
 
3145
  return out;
 
3146
}
 
3147
 
 
3148
// BVal - range scan bound
 
3149
 
 
3150
struct BVal : public Val {
 
3151
  const ICol& m_icol;
 
3152
  int m_type;
 
3153
  BVal(const ICol& icol);
 
3154
  int setbnd(Par par) const;
 
3155
  int setflt(Par par) const;
 
3156
};
 
3157
 
 
3158
BVal::BVal(const ICol& icol) :
 
3159
  Val(icol.m_col),
 
3160
  m_icol(icol)
 
3161
{
 
3162
}
 
3163
 
 
3164
int
 
3165
BVal::setbnd(Par par) const
 
3166
{
 
3167
  Con& con = par.con();
 
3168
  assert(g_compare_null || !m_null);
 
3169
  const char* addr = !m_null ? (const char*)dataaddr() : 0;
 
3170
  const ICol& icol = m_icol;
 
3171
  CHK(con.setBound(icol.m_num, m_type, addr) == 0);
 
3172
  return 0;
 
3173
}
 
3174
 
 
3175
int
 
3176
BVal::setflt(Par par) const
 
3177
{
 
3178
  static uint index_bound_to_filter_bound[5] = {
 
3179
    NdbScanFilter::COND_GE,
 
3180
    NdbScanFilter::COND_GT,
 
3181
    NdbScanFilter::COND_LE,
 
3182
    NdbScanFilter::COND_LT,
 
3183
    NdbScanFilter::COND_EQ
 
3184
  };
 
3185
  Con& con = par.con();
 
3186
  assert(g_compare_null || !m_null);
 
3187
  const char* addr = !m_null ? (const char*)dataaddr() : 0;
 
3188
  const ICol& icol = m_icol;
 
3189
  const Col& col = icol.m_col;
 
3190
  uint length = col.m_bytesize;
 
3191
  uint cond = index_bound_to_filter_bound[m_type];
 
3192
  CHK(con.setFilter(col.m_num, cond, addr, length) == 0);
 
3193
  return 0;
 
3194
}
 
3195
 
 
3196
static NdbOut&
 
3197
operator<<(NdbOut& out, const BVal& bval)
 
3198
{
 
3199
  const ICol& icol = bval.m_icol;
 
3200
  const Col& col = icol.m_col;
 
3201
  const Val& val = bval;
 
3202
  out << "type=" << bval.m_type;
 
3203
  out << " icol=" << icol.m_num;
 
3204
  out << " col=" << col.m_num << "," << col.m_name;
 
3205
  out << " value=" << val;
 
3206
  return out;
 
3207
}
 
3208
 
 
3209
// BSet - set of bounds
 
3210
 
 
3211
struct BSet {
 
3212
  const Tab& m_tab;
 
3213
  const ITab& m_itab;
 
3214
  uint m_alloc;
 
3215
  uint m_bvals;
 
3216
  BVal** m_bval;
 
3217
  BSet(const Tab& tab, const ITab& itab);
 
3218
  ~BSet();
 
3219
  void reset();
 
3220
  void calc(Par par);
 
3221
  void calcpk(Par par, uint i);
 
3222
  int setbnd(Par par) const;
 
3223
  int setflt(Par par) const;
 
3224
  void filter(Par par, const Set& set, Set& set2) const;
 
3225
};
 
3226
 
 
3227
BSet::BSet(const Tab& tab, const ITab& itab) :
 
3228
  m_tab(tab),
 
3229
  m_itab(itab),
 
3230
  m_alloc(2 * itab.m_icols),
 
3231
  m_bvals(0)
 
3232
{
 
3233
  m_bval = new BVal* [m_alloc];
 
3234
  for (uint i = 0; i < m_alloc; i++) {
 
3235
    m_bval[i] = 0;
 
3236
  }
 
3237
}
 
3238
 
 
3239
BSet::~BSet()
 
3240
{
 
3241
  delete [] m_bval;
 
3242
}
 
3243
 
 
3244
void
 
3245
BSet::reset()
 
3246
{
 
3247
  while (m_bvals > 0) {
 
3248
    uint i = --m_bvals;
 
3249
    delete m_bval[i];
 
3250
    m_bval[i] = 0;
 
3251
  }
 
3252
}
 
3253
 
 
3254
void
 
3255
BSet::calc(Par par)
 
3256
{
 
3257
  const ITab& itab = m_itab;
 
3258
  par.m_pctrange = par.m_pctbrange;
 
3259
  reset();
 
3260
  for (uint k = 0; k < itab.m_icols; k++) {
 
3261
    const ICol& icol = *itab.m_icol[k];
 
3262
    const Col& col = icol.m_col;
 
3263
    for (uint i = 0; i <= 1; i++) {
 
3264
      if (m_bvals == 0 && urandom(100) == 0)
 
3265
        return;
 
3266
      if (m_bvals != 0 && urandom(3) == 0)
 
3267
        return;
 
3268
      assert(m_bvals < m_alloc);
 
3269
      BVal& bval = *new BVal(icol);
 
3270
      m_bval[m_bvals++] = &bval;
 
3271
      bval.m_null = false;
 
3272
      uint sel;
 
3273
      do {
 
3274
        // equality bound only on i==0
 
3275
        sel = urandom(5 - i);
 
3276
      } while (strchr(par.m_bound, '0' + sel) == 0);
 
3277
      if (sel < 2)
 
3278
        bval.m_type = 0 | (1 << i);
 
3279
      else if (sel < 4)
 
3280
        bval.m_type = 1 | (1 << i);
 
3281
      else
 
3282
        bval.m_type = 4;
 
3283
      if (k + 1 < itab.m_icols)
 
3284
        bval.m_type = 4;
 
3285
      if (!g_compare_null)
 
3286
        par.m_pctnull = 0;
 
3287
      if (bval.m_type == 0 || bval.m_type == 1)
 
3288
        par.m_bdir = -1;
 
3289
      if (bval.m_type == 2 || bval.m_type == 3)
 
3290
        par.m_bdir = +1;
 
3291
      do {
 
3292
        bval.calcnokey(par);
 
3293
        if (i == 1) {
 
3294
          assert(m_bvals >= 2);
 
3295
          const BVal& bv1 = *m_bval[m_bvals - 2];
 
3296
          const BVal& bv2 = *m_bval[m_bvals - 1];
 
3297
          if (bv1.cmp(par, bv2) > 0 && urandom(100) != 0)
 
3298
            continue;
 
3299
        }
 
3300
      } while (0);
 
3301
      // equality bound only once
 
3302
      if (bval.m_type == 4)
 
3303
        break;
 
3304
    }
 
3305
  }
 
3306
}
 
3307
 
 
3308
void
 
3309
BSet::calcpk(Par par, uint i)
 
3310
{
 
3311
  const ITab& itab = m_itab;
 
3312
  reset();
 
3313
  for (uint k = 0; k < itab.m_icols; k++) {
 
3314
    const ICol& icol = *itab.m_icol[k];
 
3315
    const Col& col = icol.m_col;
 
3316
    assert(col.m_pk);
 
3317
    assert(m_bvals < m_alloc);
 
3318
    BVal& bval = *new BVal(icol);
 
3319
    m_bval[m_bvals++] = &bval;
 
3320
    bval.m_type = 4;
 
3321
    bval.calc(par, i);
 
3322
  }
 
3323
}
 
3324
 
 
3325
int
 
3326
BSet::setbnd(Par par) const
 
3327
{
 
3328
  if (m_bvals != 0) {
 
3329
    Rsq rsq1(m_bvals);
 
3330
    for (uint j = 0; j < m_bvals; j++) {
 
3331
      uint j2 = rsq1.next();
 
3332
      const BVal& bval = *m_bval[j2];
 
3333
      CHK(bval.setbnd(par) == 0);
 
3334
    }
 
3335
    // duplicate
 
3336
    if (urandom(5) == 0) {
 
3337
      uint j3 = urandom(m_bvals);
 
3338
      const BVal& bval = *m_bval[j3];
 
3339
      CHK(bval.setbnd(par) == 0);
 
3340
    }
 
3341
  }
 
3342
  return 0;
 
3343
}
 
3344
 
 
3345
int
 
3346
BSet::setflt(Par par) const
 
3347
{
 
3348
  Con& con = par.con();
 
3349
  CHK(con.getNdbScanFilter() == 0);
 
3350
  CHK(con.beginFilter(NdbScanFilter::AND) == 0);
 
3351
  if (m_bvals != 0) {
 
3352
    Rsq rsq1(m_bvals);
 
3353
    for (uint j = 0; j < m_bvals; j++) {
 
3354
      uint j2 = rsq1.next();
 
3355
      const BVal& bval = *m_bval[j2];
 
3356
      CHK(bval.setflt(par) == 0);
 
3357
    }
 
3358
    // duplicate
 
3359
    if (urandom(5) == 0) {
 
3360
      uint j3 = urandom(m_bvals);
 
3361
      const BVal& bval = *m_bval[j3];
 
3362
      CHK(bval.setflt(par) == 0);
 
3363
    }
 
3364
  }
 
3365
  CHK(con.endFilter() == 0);
 
3366
  return 0;
 
3367
}
 
3368
 
 
3369
void
 
3370
BSet::filter(Par par, const Set& set, Set& set2) const
 
3371
{
 
3372
  const Tab& tab = m_tab;
 
3373
  const ITab& itab = m_itab;
 
3374
  assert(&tab == &set2.m_tab && set.m_rows == set2.m_rows);
 
3375
  assert(set2.count() == 0);
 
3376
  for (uint i = 0; i < set.m_rows; i++) {
 
3377
    set.lock();
 
3378
    do {
 
3379
      if (set.m_row[i] == 0) {
 
3380
        break;
 
3381
      }
 
3382
      const Row& row = *set.m_row[i];
 
3383
      if (!g_store_null_key) {
 
3384
        bool ok1 = false;
 
3385
        for (uint k = 0; k < itab.m_icols; k++) {
 
3386
          const ICol& icol = *itab.m_icol[k];
 
3387
          const Col& col = icol.m_col;
 
3388
          const Val& val = *row.m_val[col.m_num];
 
3389
          if (!val.m_null) {
 
3390
            ok1 = true;
 
3391
            break;
 
3392
          }
 
3393
        }
 
3394
        if (!ok1)
 
3395
          break;
 
3396
      }
 
3397
      bool ok2 = true;
 
3398
      for (uint j = 0; j < m_bvals; j++) {
 
3399
        const BVal& bval = *m_bval[j];
 
3400
        const ICol& icol = bval.m_icol;
 
3401
        const Col& col = icol.m_col;
 
3402
        const Val& val = *row.m_val[col.m_num];
 
3403
        int ret = bval.cmp(par, val);
 
3404
        LL5("cmp: ret=" << ret << " " << bval << " vs " << val);
 
3405
        if (bval.m_type == 0)
 
3406
          ok2 = (ret <= 0);
 
3407
        else if (bval.m_type == 1)
 
3408
          ok2 = (ret < 0);
 
3409
        else if (bval.m_type == 2)
 
3410
          ok2 = (ret >= 0);
 
3411
        else if (bval.m_type == 3)
 
3412
          ok2 = (ret > 0);
 
3413
        else if (bval.m_type == 4)
 
3414
          ok2 = (ret == 0);
 
3415
        else {
 
3416
          assert(false);
 
3417
        }
 
3418
        if (!ok2)
 
3419
          break;
 
3420
      }
 
3421
      if (!ok2)
 
3422
        break;
 
3423
      assert(set2.m_row[i] == 0);
 
3424
      set2.m_row[i] = new Row(tab);
 
3425
      Row& row2 = *set2.m_row[i];
 
3426
      row2.copy(row, true);
 
3427
    } while (0);
 
3428
    set.unlock();
 
3429
  }
 
3430
}
 
3431
 
 
3432
static NdbOut&
 
3433
operator<<(NdbOut& out, const BSet& bset)
 
3434
{
 
3435
  out << "bounds=" << bset.m_bvals;
 
3436
  for (uint j = 0; j < bset.m_bvals; j++) {
 
3437
    const BVal& bval = *bset.m_bval[j];
 
3438
    out << " [bound " << j << ": " << bval << "]";
 
3439
  }
 
3440
  return out;
 
3441
}
 
3442
 
 
3443
// pk operations
 
3444
 
 
3445
static int
 
3446
pkinsert(Par par)
 
3447
{
 
3448
  Con& con = par.con();
 
3449
  const Tab& tab = par.tab();
 
3450
  Set& set = par.set();
 
3451
  LL3("pkinsert " << tab.m_name);
 
3452
  CHK(con.startTransaction() == 0);
 
3453
  uint batch = 0;
 
3454
  for (uint j = 0; j < par.m_rows; j++) {
 
3455
    uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
 
3456
    uint i = thrrow(par, j2);
 
3457
    set.lock();
 
3458
    if (!set.compat(par, i, Row::OpIns)) {
 
3459
      LL3("pkinsert SKIP " << i << " " << set.getrow(i));
 
3460
      set.unlock();
 
3461
    } else {
 
3462
      set.push(i);
 
3463
      set.calc(par, i);
 
3464
      CHK(set.insrow(par, i) == 0);
 
3465
      set.unlock();
 
3466
      LL4("pkinsert key=" << i << " " << set.getrow(i));
 
3467
      batch++;
 
3468
    }
 
3469
    bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
 
3470
    if (batch == par.m_batch || lastbatch) {
 
3471
      uint err = par.m_catcherr;
 
3472
      ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
 
3473
      CHK(con.execute(et, err) == 0);
 
3474
      set.lock();
 
3475
      set.post(par, !err ? et : Rollback);
 
3476
      set.unlock();
 
3477
      if (err) {
 
3478
        LL1("pkinsert key=" << i << " stop on " << con.errname(err));
 
3479
        break;
 
3480
      }
 
3481
      batch = 0;
 
3482
      if (!lastbatch) {
 
3483
        con.closeTransaction();
 
3484
        CHK(con.startTransaction() == 0);
 
3485
      }
 
3486
    }
 
3487
  }
 
3488
  con.closeTransaction();
 
3489
  return 0;
 
3490
}
 
3491
 
 
3492
static int
 
3493
pkupdate(Par par)
 
3494
{
 
3495
  Con& con = par.con();
 
3496
  const Tab& tab = par.tab();
 
3497
  Set& set = par.set();
 
3498
  LL3("pkupdate " << tab.m_name);
 
3499
  CHK(con.startTransaction() == 0);
 
3500
  uint batch = 0;
 
3501
  for (uint j = 0; j < par.m_rows; j++) {
 
3502
    uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
 
3503
    uint i = thrrow(par, j2);
 
3504
    set.lock();
 
3505
    if (!set.compat(par, i, Row::OpUpd)) {
 
3506
      LL3("pkupdate SKIP " << i << " " << set.getrow(i));
 
3507
      set.unlock();
 
3508
    } else {
 
3509
      set.push(i);
 
3510
      set.copyval(i, tab.m_pkmask);
 
3511
      set.calc(par, i, ~tab.m_pkmask);
 
3512
      CHK(set.updrow(par, i) == 0);
 
3513
      set.unlock();
 
3514
      LL4("pkupdate key=" << i << " " << set.getrow(i));
 
3515
      batch++;
 
3516
    }
 
3517
    bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
 
3518
    if (batch == par.m_batch || lastbatch) {
 
3519
      uint err = par.m_catcherr;
 
3520
      ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
 
3521
      CHK(con.execute(et, err) == 0);
 
3522
      set.lock();
 
3523
      set.post(par, !err ? et : Rollback);
 
3524
      set.unlock();
 
3525
      if (err) {
 
3526
        LL1("pkupdate key=" << i << ": stop on " << con.errname(err));
 
3527
        break;
 
3528
      }
 
3529
      batch = 0;
 
3530
      if (!lastbatch) {
 
3531
        con.closeTransaction();
 
3532
        CHK(con.startTransaction() == 0);
 
3533
      }
 
3534
    }
 
3535
  }
 
3536
  con.closeTransaction();
 
3537
  return 0;
 
3538
}
 
3539
 
 
3540
static int
 
3541
pkdelete(Par par)
 
3542
{
 
3543
  Con& con = par.con();
 
3544
  const Tab& tab = par.tab();
 
3545
  Set& set = par.set();
 
3546
  LL3("pkdelete " << tab.m_name);
 
3547
  CHK(con.startTransaction() == 0);
 
3548
  uint batch = 0;
 
3549
  for (uint j = 0; j < par.m_rows; j++) {
 
3550
    uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
 
3551
    uint i = thrrow(par, j2);
 
3552
    set.lock();
 
3553
    if (!set.compat(par, i, Row::OpDel)) {
 
3554
      LL3("pkdelete SKIP " << i << " " << set.getrow(i));
 
3555
      set.unlock();
 
3556
    } else {
 
3557
      set.push(i);
 
3558
      set.copyval(i, tab.m_pkmask);
 
3559
      CHK(set.delrow(par, i) == 0);
 
3560
      set.unlock();
 
3561
      LL4("pkdelete key=" << i << " " << set.getrow(i));
 
3562
      batch++;
 
3563
    }
 
3564
    bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
 
3565
    if (batch == par.m_batch || lastbatch) {
 
3566
      uint err = par.m_catcherr;
 
3567
      ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
 
3568
      CHK(con.execute(et, err) == 0);
 
3569
      set.lock();
 
3570
      set.post(par, !err ? et : Rollback);
 
3571
      set.unlock();
 
3572
      if (err) {
 
3573
        LL1("pkdelete key=" << i << " stop on " << con.errname(err));
 
3574
        break;
 
3575
      }
 
3576
      batch = 0;
 
3577
      if (!lastbatch) {
 
3578
        con.closeTransaction();
 
3579
        CHK(con.startTransaction() == 0);
 
3580
      }
 
3581
    }
 
3582
  }
 
3583
  con.closeTransaction();
 
3584
  return 0;
 
3585
}
 
3586
 
 
3587
static int
 
3588
pkread(Par par)
 
3589
{
 
3590
  Con& con = par.con();
 
3591
  const Tab& tab = par.tab();
 
3592
  Set& set = par.set();
 
3593
  LL3("pkread " << tab.m_name << " verify=" << par.m_verify);
 
3594
  // expected
 
3595
  const Set& set1 = set;
 
3596
  Set set2(tab, set.m_rows);
 
3597
  for (uint i = 0; i < set.m_rows; i++) {
 
3598
    set.lock();
 
3599
    // TODO lock mode
 
3600
    if (!set.compat(par, i, Row::OpREAD)) {
 
3601
      LL3("pkread SKIP " << i << " " << set.getrow(i));
 
3602
      set.unlock();
 
3603
      continue;
 
3604
    }
 
3605
    set.unlock();
 
3606
    CHK(con.startTransaction() == 0);
 
3607
    CHK(set2.selrow(par, *set1.m_row[i]) == 0);
 
3608
    CHK(con.execute(Commit) == 0);
 
3609
    uint i2 = (uint)-1;
 
3610
    CHK(set2.getkey(par, &i2) == 0 && i == i2);
 
3611
    CHK(set2.putval(i, false) == 0);
 
3612
    LL4("row " << set2.count() << " " << set2.getrow(i));
 
3613
    con.closeTransaction();
 
3614
  }
 
3615
  if (par.m_verify)
 
3616
    CHK(set1.verify(par, set2, false) == 0);
 
3617
  return 0;
 
3618
}
 
3619
 
 
3620
static int
 
3621
pkreadfast(Par par, uint count)
 
3622
{
 
3623
  Con& con = par.con();
 
3624
  const Tab& tab = par.tab();
 
3625
  const Set& set = par.set();
 
3626
  LL3("pkfast " << tab.m_name);
 
3627
  Row keyrow(tab);
 
3628
  // not batched on purpose
 
3629
  for (uint j = 0; j < count; j++) {
 
3630
    uint i = urandom(set.m_rows);
 
3631
    assert(set.compat(par, i, Row::OpREAD));
 
3632
    CHK(con.startTransaction() == 0);
 
3633
    // define key
 
3634
    keyrow.calc(par, i);
 
3635
    CHK(keyrow.selrow(par) == 0);
 
3636
    NdbRecAttr* rec;
 
3637
    // get 1st column
 
3638
    CHK(con.getValue((Uint32)0, rec) == 0);
 
3639
    CHK(con.execute(Commit) == 0);
 
3640
    con.closeTransaction();
 
3641
  }
 
3642
  return 0;
 
3643
}
 
3644
 
 
3645
// hash index operations
 
3646
 
 
3647
static int
 
3648
hashindexupdate(Par par, const ITab& itab)
 
3649
{
 
3650
  Con& con = par.con();
 
3651
  const Tab& tab = par.tab();
 
3652
  Set& set = par.set();
 
3653
  LL3("hashindexupdate " << itab.m_name);
 
3654
  CHK(con.startTransaction() == 0);
 
3655
  uint batch = 0;
 
3656
  for (uint j = 0; j < par.m_rows; j++) {
 
3657
    uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
 
3658
    uint i = thrrow(par, j2);
 
3659
    set.lock();
 
3660
    if (!set.compat(par, i, Row::OpUpd)) {
 
3661
      LL3("hashindexupdate SKIP " << i << " " << set.getrow(i));
 
3662
      set.unlock();
 
3663
    } else {
 
3664
      // table pk and index key are not updated
 
3665
      set.push(i);
 
3666
      uint keymask = tab.m_pkmask | itab.m_keymask;
 
3667
      set.copyval(i, keymask);
 
3668
      set.calc(par, i, ~keymask);
 
3669
      CHK(set.updrow(par, itab, i) == 0);
 
3670
      set.unlock();
 
3671
      LL4("hashindexupdate " << i << " " << set.getrow(i));
 
3672
      batch++;
 
3673
    }
 
3674
    bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
 
3675
    if (batch == par.m_batch || lastbatch) {
 
3676
      uint err = par.m_catcherr;
 
3677
      ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
 
3678
      CHK(con.execute(et, err) == 0);
 
3679
      set.lock();
 
3680
      set.post(par, !err ? et : Rollback);
 
3681
      set.unlock();
 
3682
      if (err) {
 
3683
        LL1("hashindexupdate " << i << " stop on " << con.errname(err));
 
3684
        break;
 
3685
      }
 
3686
      batch = 0;
 
3687
      if (!lastbatch) {
 
3688
        con.closeTransaction();
 
3689
        CHK(con.startTransaction() == 0);
 
3690
      }
 
3691
    }
 
3692
  }
 
3693
  con.closeTransaction();
 
3694
  return 0;
 
3695
}
 
3696
 
 
3697
static int
 
3698
hashindexdelete(Par par, const ITab& itab)
 
3699
{
 
3700
  Con& con = par.con();
 
3701
  Set& set = par.set();
 
3702
  LL3("hashindexdelete " << itab.m_name);
 
3703
  CHK(con.startTransaction() == 0);
 
3704
  uint batch = 0;
 
3705
  for (uint j = 0; j < par.m_rows; j++) {
 
3706
    uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
 
3707
    uint i = thrrow(par, j2);
 
3708
    set.lock();
 
3709
    if (!set.compat(par, i, Row::OpDel)) {
 
3710
      LL3("hashindexdelete SKIP " << i << " " << set.getrow(i));
 
3711
      set.unlock();
 
3712
    } else {
 
3713
      set.push(i);
 
3714
      set.copyval(i, itab.m_keymask);
 
3715
      CHK(set.delrow(par, itab, i) == 0);
 
3716
      set.unlock();
 
3717
      LL4("hashindexdelete " << i << " " << set.getrow(i));
 
3718
      batch++;
 
3719
    }
 
3720
    bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
 
3721
    if (batch == par.m_batch || lastbatch) {
 
3722
      uint err = par.m_catcherr;
 
3723
      ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
 
3724
      CHK(con.execute(et, err) == 0);
 
3725
      set.lock();
 
3726
      set.post(par, !err ? et : Rollback);
 
3727
      set.unlock();
 
3728
      if (err) {
 
3729
        LL1("hashindexdelete " << i << " stop on " << con.errname(err));
 
3730
        break;
 
3731
      }
 
3732
      batch = 0;
 
3733
      if (!lastbatch) {
 
3734
        con.closeTransaction();
 
3735
        CHK(con.startTransaction() == 0);
 
3736
      }
 
3737
    }
 
3738
  }
 
3739
  con.closeTransaction();
 
3740
  return 0;
 
3741
}
 
3742
 
 
3743
static int
 
3744
hashindexread(Par par, const ITab& itab)
 
3745
{
 
3746
  Con& con = par.con();
 
3747
  const Tab& tab = par.tab();
 
3748
  Set& set = par.set();
 
3749
  LL3("hashindexread " << itab.m_name << " verify=" << par.m_verify);
 
3750
  // expected
 
3751
  const Set& set1 = set;
 
3752
  Set set2(tab, set.m_rows);
 
3753
  for (uint i = 0; i < set.m_rows; i++) {
 
3754
    set.lock();
 
3755
    // TODO lock mode
 
3756
    if (!set.compat(par, i, Row::OpREAD)) {
 
3757
      LL3("hashindexread SKIP " << i << " " << set.getrow(i));
 
3758
      set.unlock();
 
3759
      continue;
 
3760
    }
 
3761
    set.unlock();
 
3762
    CHK(con.startTransaction() == 0);
 
3763
    CHK(set2.selrow(par, itab, *set1.m_row[i]) == 0);
 
3764
    CHK(con.execute(Commit) == 0);
 
3765
    uint i2 = (uint)-1;
 
3766
    CHK(set2.getkey(par, &i2) == 0 && i == i2);
 
3767
    CHK(set2.putval(i, false) == 0);
 
3768
    LL4("row " << set2.count() << " " << *set2.m_row[i]);
 
3769
    con.closeTransaction();
 
3770
  }
 
3771
  if (par.m_verify)
 
3772
    CHK(set1.verify(par, set2, false) == 0);
 
3773
  return 0;
 
3774
}
 
3775
 
 
3776
// scan read
 
3777
 
 
3778
static int
 
3779
scanreadtable(Par par)
 
3780
{
 
3781
  Con& con = par.con();
 
3782
  const Tab& tab = par.tab();
 
3783
  const Set& set = par.set();
 
3784
  // expected
 
3785
  const Set& set1 = set;
 
3786
  LL3("scanreadtable " << tab.m_name << " lockmode=" << par.m_lockmode << " tupscan=" << par.m_tupscan << " expect=" << set1.count() << " verify=" << par.m_verify);
 
3787
  Set set2(tab, set.m_rows);
 
3788
  CHK(con.startTransaction() == 0);
 
3789
  CHK(con.getNdbScanOperation(tab) == 0);
 
3790
  CHK(con.readTuples(par) == 0);
 
3791
  set2.getval(par);
 
3792
  CHK(con.executeScan() == 0);
 
3793
  uint n = 0;
 
3794
  while (1) {
 
3795
    int ret;
 
3796
    uint err = par.m_catcherr;
 
3797
    CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
 
3798
    if (ret == 1)
 
3799
      break;
 
3800
    if (err) {
 
3801
      LL1("scanreadtable stop on " << con.errname(err));
 
3802
      break;
 
3803
    }
 
3804
    uint i = (uint)-1;
 
3805
    CHK(set2.getkey(par, &i) == 0);
 
3806
    CHK(set2.putval(i, false, n) == 0);
 
3807
    LL4("row " << n << " " << *set2.m_row[i]);
 
3808
    n++;
 
3809
  }
 
3810
  con.closeTransaction();
 
3811
  if (par.m_verify)
 
3812
    CHK(set1.verify(par, set2, false) == 0);
 
3813
  LL3("scanreadtable " << tab.m_name << " done rows=" << n);
 
3814
  return 0;
 
3815
}
 
3816
 
 
3817
static int
 
3818
scanreadtablefast(Par par, uint countcheck)
 
3819
{
 
3820
  Con& con = par.con();
 
3821
  const Tab& tab = par.tab();
 
3822
  const Set& set = par.set();
 
3823
  LL3("scanfast " << tab.m_name);
 
3824
  CHK(con.startTransaction() == 0);
 
3825
  CHK(con.getNdbScanOperation(tab) == 0);
 
3826
  CHK(con.readTuples(par) == 0);
 
3827
  // get 1st column
 
3828
  NdbRecAttr* rec;
 
3829
  CHK(con.getValue((Uint32)0, rec) == 0);
 
3830
  CHK(con.executeScan() == 0);
 
3831
  uint count = 0;
 
3832
  while (1) {
 
3833
    int ret;
 
3834
    CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
 
3835
    if (ret == 1)
 
3836
      break;
 
3837
    count++;
 
3838
  }
 
3839
  con.closeTransaction();
 
3840
  CHK(count == countcheck);
 
3841
  return 0;
 
3842
}
 
3843
 
 
3844
// try to get interesting bounds
 
3845
static void
 
3846
calcscanbounds(Par par, const ITab& itab, BSet& bset, const Set& set, Set& set1)
 
3847
{
 
3848
  while (true) {
 
3849
    bset.calc(par);
 
3850
    bset.filter(par, set, set1);
 
3851
    uint n = set1.count();
 
3852
    // prefer proper subset
 
3853
    if (0 < n && n < set.m_rows)
 
3854
      break;
 
3855
    if (urandom(5) == 0)
 
3856
      break;
 
3857
    set1.reset();
 
3858
  }
 
3859
}
 
3860
 
 
3861
static int
 
3862
scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
 
3863
{
 
3864
  Con& con = par.con();
 
3865
  const Tab& tab = par.tab();
 
3866
  const Set& set = par.set();
 
3867
  Set set1(tab, set.m_rows);
 
3868
  if (calc) {
 
3869
    calcscanbounds(par, itab, bset, set, set1);
 
3870
  } else {
 
3871
    bset.filter(par, set, set1);
 
3872
  }
 
3873
  LL3("scanreadindex " << itab.m_name << " " << bset << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " ordered=" << par.m_ordered << " descending=" << par.m_descending << " verify=" << par.m_verify);
 
3874
  Set set2(tab, set.m_rows);
 
3875
  CHK(con.startTransaction() == 0);
 
3876
  CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
 
3877
  CHK(con.readIndexTuples(par) == 0);
 
3878
  CHK(bset.setbnd(par) == 0);
 
3879
  set2.getval(par);
 
3880
  CHK(con.executeScan() == 0);
 
3881
  uint n = 0;
 
3882
  while (1) {
 
3883
    int ret;
 
3884
    uint err = par.m_catcherr;
 
3885
    CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
 
3886
    if (ret == 1)
 
3887
      break;
 
3888
    if (err) {
 
3889
      LL1("scanreadindex stop on " << con.errname(err));
 
3890
      break;
 
3891
    }
 
3892
    uint i = (uint)-1;
 
3893
    CHK(set2.getkey(par, &i) == 0);
 
3894
    CHK(set2.putval(i, par.m_dups, n) == 0);
 
3895
    LL4("key " << i << " row " << n << " " << *set2.m_row[i]);
 
3896
    n++;
 
3897
  }
 
3898
  con.closeTransaction();
 
3899
  if (par.m_verify) {
 
3900
    CHK(set1.verify(par, set2, false) == 0);
 
3901
    if (par.m_ordered)
 
3902
      CHK(set2.verifyorder(par, itab, par.m_descending) == 0);
 
3903
  }
 
3904
  LL3("scanreadindex " << itab.m_name << " done rows=" << n);
 
3905
  return 0;
 
3906
}
 
3907
 
 
3908
static int
 
3909
scanreadindexfast(Par par, const ITab& itab, const BSet& bset, uint countcheck)
 
3910
{
 
3911
  Con& con = par.con();
 
3912
  const Tab& tab = par.tab();
 
3913
  const Set& set = par.set();
 
3914
  LL3("scanfast " << itab.m_name << " " << bset);
 
3915
  LL4(bset);
 
3916
  CHK(con.startTransaction() == 0);
 
3917
  CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
 
3918
  CHK(con.readIndexTuples(par) == 0);
 
3919
  CHK(bset.setbnd(par) == 0);
 
3920
  // get 1st column
 
3921
  NdbRecAttr* rec;
 
3922
  CHK(con.getValue((Uint32)0, rec) == 0);
 
3923
  CHK(con.executeScan() == 0);
 
3924
  uint count = 0;
 
3925
  while (1) {
 
3926
    int ret;
 
3927
    CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
 
3928
    if (ret == 1)
 
3929
      break;
 
3930
    count++;
 
3931
  }
 
3932
  con.closeTransaction();
 
3933
  CHK(count == countcheck);
 
3934
  return 0;
 
3935
}
 
3936
 
 
3937
static int
 
3938
scanreadfilter(Par par, const ITab& itab, BSet& bset, bool calc)
 
3939
{
 
3940
  Con& con = par.con();
 
3941
  const Tab& tab = par.tab();
 
3942
  const Set& set = par.set();
 
3943
  Set set1(tab, set.m_rows);
 
3944
  if (calc) {
 
3945
    calcscanbounds(par, itab, bset, set, set1);
 
3946
  } else {
 
3947
    bset.filter(par, set, set1);
 
3948
  }
 
3949
  LL3("scanfilter " << itab.m_name << " " << bset << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " verify=" << par.m_verify);
 
3950
  Set set2(tab, set.m_rows);
 
3951
  CHK(con.startTransaction() == 0);
 
3952
  CHK(con.getNdbScanOperation(tab) == 0);
 
3953
  CHK(con.readTuples(par) == 0);
 
3954
  CHK(bset.setflt(par) == 0);
 
3955
  set2.getval(par);
 
3956
  CHK(con.executeScan() == 0);
 
3957
  uint n = 0;
 
3958
  while (1) {
 
3959
    int ret;
 
3960
    uint err = par.m_catcherr;
 
3961
    CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
 
3962
    if (ret == 1)
 
3963
      break;
 
3964
    if (err) {
 
3965
      LL1("scanfilter stop on " << con.errname(err));
 
3966
      break;
 
3967
    }
 
3968
    uint i = (uint)-1;
 
3969
    CHK(set2.getkey(par, &i) == 0);
 
3970
    CHK(set2.putval(i, par.m_dups, n) == 0);
 
3971
    LL4("key " << i << " row " << n << " " << *set2.m_row[i]);
 
3972
    n++;
 
3973
  }
 
3974
  con.closeTransaction();
 
3975
  if (par.m_verify) {
 
3976
    CHK(set1.verify(par, set2, false) == 0);
 
3977
  }
 
3978
  LL3("scanfilter " << itab.m_name << " done rows=" << n);
 
3979
  return 0;
 
3980
}
 
3981
 
 
3982
static int
 
3983
scanreadindex(Par par, const ITab& itab)
 
3984
{
 
3985
  const Tab& tab = par.tab();
 
3986
  for (uint i = 0; i < par.m_ssloop; i++) {
 
3987
    if (itab.m_type == ITab::OrderedIndex) {
 
3988
      BSet bset(tab, itab);
 
3989
      CHK(scanreadfilter(par, itab, bset, true) == 0);
 
3990
      CHK(scanreadindex(par, itab, bset, true) == 0);
 
3991
    }
 
3992
  }
 
3993
  return 0;
 
3994
}
 
3995
 
 
3996
static int
 
3997
scanreadindex(Par par)
 
3998
{
 
3999
  const Tab& tab = par.tab();
 
4000
  for (uint i = 0; i < tab.m_itabs; i++) {
 
4001
    if (tab.m_itab[i] == 0)
 
4002
      continue;
 
4003
    const ITab& itab = *tab.m_itab[i];
 
4004
    if (itab.m_type == ITab::OrderedIndex) {
 
4005
      CHK(scanreadindex(par, itab) == 0);
 
4006
    } else {
 
4007
      CHK(hashindexread(par, itab) == 0);
 
4008
    }
 
4009
  }
 
4010
  return 0;
 
4011
}
 
4012
 
 
4013
static int
 
4014
scanreadall(Par par)
 
4015
{
 
4016
  CHK(scanreadtable(par) == 0);
 
4017
  CHK(scanreadindex(par) == 0);
 
4018
  return 0;
 
4019
}
 
4020
 
 
4021
// timing scans
 
4022
 
 
4023
static int
 
4024
timescantable(Par par)
 
4025
{
 
4026
  par.tmr().on();
 
4027
  CHK(scanreadtablefast(par, par.m_totrows) == 0);
 
4028
  par.tmr().off(par.set().m_rows);
 
4029
  return 0;
 
4030
}
 
4031
 
 
4032
static int
 
4033
timescanpkindex(Par par)
 
4034
{
 
4035
  const Tab& tab = par.tab();
 
4036
  const ITab& itab = *tab.m_itab[0];    // 1st index is on PK
 
4037
  BSet bset(tab, itab);
 
4038
  par.tmr().on();
 
4039
  CHK(scanreadindexfast(par, itab, bset, par.m_totrows) == 0);
 
4040
  par.tmr().off(par.set().m_rows);
 
4041
  return 0;
 
4042
}
 
4043
 
 
4044
static int
 
4045
timepkreadtable(Par par)
 
4046
{
 
4047
  par.tmr().on();
 
4048
  uint count = par.m_samples;
 
4049
  if (count == 0)
 
4050
    count = par.m_totrows;
 
4051
  CHK(pkreadfast(par, count) == 0);
 
4052
  par.tmr().off(count);
 
4053
  return 0;
 
4054
}
 
4055
 
 
4056
static int
 
4057
timepkreadindex(Par par)
 
4058
{
 
4059
  const Tab& tab = par.tab();
 
4060
  const ITab& itab = *tab.m_itab[0];    // 1st index is on PK
 
4061
  BSet bset(tab, itab);
 
4062
  uint count = par.m_samples;
 
4063
  if (count == 0)
 
4064
    count = par.m_totrows;
 
4065
  par.tmr().on();
 
4066
  for (uint j = 0; j < count; j++) {
 
4067
    uint i = urandom(par.m_totrows);
 
4068
    bset.calcpk(par, i);
 
4069
    CHK(scanreadindexfast(par, itab, bset, 1) == 0);
 
4070
  }
 
4071
  par.tmr().off(count);
 
4072
  return 0;
 
4073
}
 
4074
 
 
4075
// scan update
 
4076
 
 
4077
static int
 
4078
scanupdatetable(Par par)
 
4079
{
 
4080
  Con& con = par.con();
 
4081
  const Tab& tab = par.tab();
 
4082
  Set& set = par.set();
 
4083
  LL3("scanupdatetable " << tab.m_name);
 
4084
  Set set2(tab, set.m_rows);
 
4085
  par.m_lockmode = NdbOperation::LM_Exclusive;
 
4086
  CHK(con.startTransaction() == 0);
 
4087
  CHK(con.getNdbScanOperation(tab) == 0);
 
4088
  CHK(con.readTuples(par) == 0);
 
4089
  set2.getval(par);
 
4090
  CHK(con.executeScan() == 0);
 
4091
  uint count = 0;
 
4092
  // updating trans
 
4093
  Con con2;
 
4094
  con2.connect(con);
 
4095
  CHK(con2.startTransaction() == 0);
 
4096
  uint batch = 0;
 
4097
  while (1) {
 
4098
    int ret;
 
4099
    uint32 err = par.m_catcherr;
 
4100
    CHK((ret = con.nextScanResult(true, err)) != -1);
 
4101
    if (ret != 0)
 
4102
      break;
 
4103
    if (err) {
 
4104
      LL1("scanupdatetable [scan] stop on " << con.errname(err));
 
4105
      break;
 
4106
    }
 
4107
    if (par.m_scanstop != 0 && urandom(par.m_scanstop) == 0) {
 
4108
      con.closeScan();
 
4109
      break;
 
4110
    }
 
4111
    while (1) {
 
4112
      uint i = (uint)-1;
 
4113
      CHK(set2.getkey(par, &i) == 0);
 
4114
      set.lock();
 
4115
      if (!set.compat(par, i, Row::OpUpd)) {
 
4116
        LL3("scanupdatetable SKIP " << i << " " << set.getrow(i));
 
4117
      } else {
 
4118
        CHKTRY(set2.putval(i, false) == 0, set.unlock());
 
4119
        CHKTRY(con.updateScanTuple(con2) == 0, set.unlock());
 
4120
        Par par2 = par;
 
4121
        par2.m_con = &con2;
 
4122
        set.push(i);
 
4123
        set.calc(par, i, ~tab.m_pkmask);
 
4124
        CHKTRY(set.setrow(par2, i) == 0, set.unlock());
 
4125
        LL4("scanupdatetable " << i << " " << set.getrow(i));
 
4126
        batch++;
 
4127
      }
 
4128
      set.unlock();
 
4129
      CHK((ret = con.nextScanResult(false)) != -1);
 
4130
      bool lastbatch = (batch != 0 && ret != 0);
 
4131
      if (batch == par.m_batch || lastbatch) {
 
4132
        uint err = par.m_catcherr;
 
4133
        ExecType et = Commit;
 
4134
        CHK(con2.execute(et, err) == 0);
 
4135
        set.lock();
 
4136
        set.post(par, !err ? et : Rollback);
 
4137
        set.unlock();
 
4138
        if (err) {
 
4139
          LL1("scanupdatetable [update] stop on " << con2.errname(err));
 
4140
          goto out;
 
4141
        }
 
4142
        LL4("scanupdatetable committed batch");
 
4143
        count += batch;
 
4144
        batch = 0;
 
4145
        con2.closeTransaction();
 
4146
        CHK(con2.startTransaction() == 0);
 
4147
      }
 
4148
      if (ret != 0)
 
4149
        break;
 
4150
    }
 
4151
  }
 
4152
out:
 
4153
  con2.closeTransaction();
 
4154
  LL3("scanupdatetable " << tab.m_name << " rows updated=" << count);
 
4155
  con.closeTransaction();
 
4156
  return 0;
 
4157
}
 
4158
 
 
4159
static int
 
4160
scanupdateindex(Par par, const ITab& itab, BSet& bset, bool calc)
 
4161
{
 
4162
  Con& con = par.con();
 
4163
  const Tab& tab = par.tab();
 
4164
  Set& set = par.set();
 
4165
  // expected
 
4166
  Set set1(tab, set.m_rows);
 
4167
  if (calc) {
 
4168
    calcscanbounds(par, itab, bset, set, set1);
 
4169
  } else {
 
4170
    bset.filter(par, set, set1);
 
4171
  }
 
4172
  LL3("scanupdateindex " << itab.m_name << " " << bset << " expect=" << set1.count() << " ordered=" << par.m_ordered << " descending=" << par.m_descending << " verify=" << par.m_verify);
 
4173
  Set set2(tab, set.m_rows);
 
4174
  par.m_lockmode = NdbOperation::LM_Exclusive;
 
4175
  CHK(con.startTransaction() == 0);
 
4176
  CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
 
4177
  CHK(con.readTuples(par) == 0);
 
4178
  CHK(bset.setbnd(par) == 0);
 
4179
  set2.getval(par);
 
4180
  CHK(con.executeScan() == 0);
 
4181
  uint count = 0;
 
4182
  // updating trans
 
4183
  Con con2;
 
4184
  con2.connect(con);
 
4185
  CHK(con2.startTransaction() == 0);
 
4186
  uint batch = 0;
 
4187
  while (1) {
 
4188
    int ret;
 
4189
    uint err = par.m_catcherr;
 
4190
    CHK((ret = con.nextScanResult(true, err)) != -1);
 
4191
    if (ret != 0)
 
4192
      break;
 
4193
    if (err) {
 
4194
      LL1("scanupdateindex [scan] stop on " << con.errname(err));
 
4195
      break;
 
4196
    }
 
4197
    if (par.m_scanstop != 0 && urandom(par.m_scanstop) == 0) {
 
4198
      con.closeScan();
 
4199
      break;
 
4200
    }
 
4201
    while (1) {
 
4202
      uint i = (uint)-1;
 
4203
      CHK(set2.getkey(par, &i) == 0);
 
4204
      set.lock();
 
4205
      if (!set.compat(par, i, Row::OpUpd)) {
 
4206
        LL4("scanupdateindex SKIP " << set.getrow(i));
 
4207
      } else {
 
4208
        CHKTRY(set2.putval(i, par.m_dups) == 0, set.unlock());
 
4209
        CHKTRY(con.updateScanTuple(con2) == 0, set.unlock());
 
4210
        Par par2 = par;
 
4211
        par2.m_con = &con2;
 
4212
        set.push(i);
 
4213
        uint colmask = !par.m_noindexkeyupdate ? ~0 : ~itab.m_keymask;
 
4214
        set.calc(par, i, colmask);
 
4215
        CHKTRY(set.setrow(par2, i) == 0, set.unlock());
 
4216
        LL4("scanupdateindex " << i << " " << set.getrow(i));
 
4217
        batch++;
 
4218
      }
 
4219
      set.unlock();
 
4220
      CHK((ret = con.nextScanResult(false)) != -1);
 
4221
      bool lastbatch = (batch != 0 && ret != 0);
 
4222
      if (batch == par.m_batch || lastbatch) {
 
4223
        uint err = par.m_catcherr;
 
4224
        ExecType et = Commit;
 
4225
        CHK(con2.execute(et, err) == 0);
 
4226
        set.lock();
 
4227
        set.post(par, !err ? et : Rollback);
 
4228
        set.unlock();
 
4229
        if (err) {
 
4230
          LL1("scanupdateindex [update] stop on " << con2.errname(err));
 
4231
          goto out;
 
4232
        }
 
4233
        LL4("scanupdateindex committed batch");
 
4234
        count += batch;
 
4235
        batch = 0;
 
4236
        con2.closeTransaction();
 
4237
        CHK(con2.startTransaction() == 0);
 
4238
      }
 
4239
      if (ret != 0)
 
4240
        break;
 
4241
    }
 
4242
  }
 
4243
out:
 
4244
  con2.closeTransaction();
 
4245
  if (par.m_verify) {
 
4246
    CHK(set1.verify(par, set2, true) == 0);
 
4247
    if (par.m_ordered)
 
4248
      CHK(set2.verifyorder(par, itab, par.m_descending) == 0);
 
4249
  }
 
4250
  LL3("scanupdateindex " << itab.m_name << " rows updated=" << count);
 
4251
  con.closeTransaction();
 
4252
  return 0;
 
4253
}
 
4254
 
 
4255
static int
 
4256
scanupdateindex(Par par, const ITab& itab)
 
4257
{
 
4258
  const Tab& tab = par.tab();
 
4259
  for (uint i = 0; i < par.m_ssloop; i++) {
 
4260
    if (itab.m_type == ITab::OrderedIndex) {
 
4261
      BSet bset(tab, itab);
 
4262
      CHK(scanupdateindex(par, itab, bset, true) == 0);
 
4263
    } else {
 
4264
      CHK(hashindexupdate(par, itab) == 0);
 
4265
    }
 
4266
  }
 
4267
  return 0;
 
4268
}
 
4269
 
 
4270
static int
 
4271
scanupdateindex(Par par)
 
4272
{
 
4273
  const Tab& tab = par.tab();
 
4274
  for (uint i = 0; i < tab.m_itabs; i++) {
 
4275
    if (tab.m_itab[i] == 0)
 
4276
      continue;
 
4277
    const ITab& itab = *tab.m_itab[i];
 
4278
    CHK(scanupdateindex(par, itab) == 0);
 
4279
  }
 
4280
  return 0;
 
4281
}
 
4282
 
 
4283
static int
 
4284
scanupdateall(Par par)
 
4285
{
 
4286
  CHK(scanupdatetable(par) == 0);
 
4287
  CHK(scanupdateindex(par) == 0);
 
4288
  return 0;
 
4289
}
 
4290
 
 
4291
// medium level routines
 
4292
 
 
4293
static int
 
4294
readverifyfull(Par par)
 
4295
{
 
4296
  if (par.m_noverify)
 
4297
    return 0;
 
4298
  par.m_verify = true;
 
4299
  if (par.m_abortpct != 0) {
 
4300
    LL2("skip verify in this version"); // implement in 5.0 version
 
4301
    par.m_verify = false;
 
4302
  }
 
4303
  par.m_lockmode = NdbOperation::LM_CommittedRead;
 
4304
  const Tab& tab = par.tab();
 
4305
  if (par.m_no == 0) {
 
4306
    // thread 0 scans table
 
4307
    CHK(scanreadtable(par) == 0);
 
4308
    // once more via tup scan
 
4309
    par.m_tupscan = true;
 
4310
    CHK(scanreadtable(par) == 0);
 
4311
  }
 
4312
  // each thread scans different indexes
 
4313
  for (uint i = 0; i < tab.m_itabs; i++) {
 
4314
    if (i % par.m_usedthreads != par.m_no)
 
4315
      continue;
 
4316
    if (tab.m_itab[i] == 0)
 
4317
      continue;
 
4318
    const ITab& itab = *tab.m_itab[i];
 
4319
    if (itab.m_type == ITab::OrderedIndex) {
 
4320
      BSet bset(tab, itab);
 
4321
      CHK(scanreadindex(par, itab, bset, false) == 0);
 
4322
    } else {
 
4323
      CHK(hashindexread(par, itab) == 0);
 
4324
    }
 
4325
  }
 
4326
  return 0;
 
4327
}
 
4328
 
 
4329
static int
 
4330
readverifyindex(Par par)
 
4331
{
 
4332
  if (par.m_noverify)
 
4333
    return 0;
 
4334
  par.m_verify = true;
 
4335
  par.m_lockmode = NdbOperation::LM_CommittedRead;
 
4336
  uint sel = urandom(10);
 
4337
  if (sel < 9) {
 
4338
    par.m_ordered = true;
 
4339
    par.m_descending = (sel < 5);
 
4340
  }
 
4341
  CHK(scanreadindex(par) == 0);
 
4342
  return 0;
 
4343
}
 
4344
 
 
4345
static int
 
4346
pkops(Par par)
 
4347
{
 
4348
  const Tab& tab = par.tab();
 
4349
  par.m_randomkey = true;
 
4350
  for (uint i = 0; i < par.m_ssloop; i++) {
 
4351
    uint j = 0;
 
4352
    while (j < tab.m_itabs) {
 
4353
      if (tab.m_itab[j] != 0) {
 
4354
        const ITab& itab = *tab.m_itab[j];
 
4355
        if (itab.m_type == ITab::UniqueHashIndex && urandom(5) == 0)
 
4356
          break;
 
4357
      }
 
4358
      j++;
 
4359
    }
 
4360
    uint sel = urandom(10);
 
4361
    if (par.m_slno % 2 == 0) {
 
4362
      // favor insert
 
4363
      if (sel < 8) {
 
4364
        CHK(pkinsert(par) == 0);
 
4365
      } else if (sel < 9) {
 
4366
        if (j == tab.m_itabs)
 
4367
          CHK(pkupdate(par) == 0);
 
4368
        else {
 
4369
          const ITab& itab = *tab.m_itab[j];
 
4370
          CHK(hashindexupdate(par, itab) == 0);
 
4371
        }
 
4372
      } else {
 
4373
        if (j == tab.m_itabs)
 
4374
          CHK(pkdelete(par) == 0);
 
4375
        else {
 
4376
          const ITab& itab = *tab.m_itab[j];
 
4377
          CHK(hashindexdelete(par, itab) == 0);
 
4378
        }
 
4379
      }
 
4380
    } else {
 
4381
      // favor delete
 
4382
      if (sel < 1) {
 
4383
        CHK(pkinsert(par) == 0);
 
4384
      } else if (sel < 2) {
 
4385
        if (j == tab.m_itabs)
 
4386
          CHK(pkupdate(par) == 0);
 
4387
        else {
 
4388
          const ITab& itab = *tab.m_itab[j];
 
4389
          CHK(hashindexupdate(par, itab) == 0);
 
4390
        }
 
4391
      } else {
 
4392
        if (j == tab.m_itabs)
 
4393
          CHK(pkdelete(par) == 0);
 
4394
        else {
 
4395
          const ITab& itab = *tab.m_itab[j];
 
4396
          CHK(hashindexdelete(par, itab) == 0);
 
4397
        }
 
4398
      }
 
4399
    }
 
4400
  }
 
4401
  return 0;
 
4402
}
 
4403
 
 
4404
static int
 
4405
pkupdatescanread(Par par)
 
4406
{
 
4407
  par.m_dups = true;
 
4408
  par.m_catcherr |= Con::ErrDeadlock;
 
4409
  uint sel = urandom(10);
 
4410
  if (sel < 5) {
 
4411
    CHK(pkupdate(par) == 0);
 
4412
  } else if (sel < 6) {
 
4413
    par.m_verify = false;
 
4414
    CHK(scanreadtable(par) == 0);
 
4415
  } else {
 
4416
    par.m_verify = false;
 
4417
    if (sel < 8) {
 
4418
      par.m_ordered = true;
 
4419
      par.m_descending = (sel < 7);
 
4420
    }
 
4421
    CHK(scanreadindex(par) == 0);
 
4422
  }
 
4423
  return 0;
 
4424
}
 
4425
 
 
4426
static int
 
4427
mixedoperations(Par par)
 
4428
{
 
4429
  par.m_dups = true;
 
4430
  par.m_catcherr |= Con::ErrDeadlock;
 
4431
  par.m_scanstop = par.m_totrows;       // randomly close scans
 
4432
  uint sel = urandom(10);
 
4433
  if (sel < 2) {
 
4434
    CHK(pkdelete(par) == 0);
 
4435
  } else if (sel < 4) {
 
4436
    CHK(pkupdate(par) == 0);
 
4437
  } else if (sel < 6) {
 
4438
    CHK(scanupdatetable(par) == 0);
 
4439
  } else {
 
4440
    if (sel < 8) {
 
4441
      par.m_ordered = true;
 
4442
      par.m_descending = (sel < 7);
 
4443
    }
 
4444
    CHK(scanupdateindex(par) == 0);
 
4445
  }
 
4446
  return 0;
 
4447
}
 
4448
 
 
4449
static int
 
4450
parallelorderedupdate(Par par)
 
4451
{
 
4452
  const Tab& tab = par.tab();
 
4453
  uint k = 0;
 
4454
  for (uint i = 0; i < tab.m_itabs; i++) {
 
4455
    if (tab.m_itab[i] == 0)
 
4456
      continue;
 
4457
    const ITab& itab = *tab.m_itab[i];
 
4458
    if (itab.m_type != ITab::OrderedIndex)
 
4459
      continue;
 
4460
    // cannot sync threads yet except via subloop
 
4461
    if (k++ == par.m_slno % tab.m_orderedindexes) {
 
4462
      LL3("parallelorderedupdate: " << itab.m_name);
 
4463
      par.m_noindexkeyupdate = true;
 
4464
      par.m_ordered = true;
 
4465
      par.m_descending = (par.m_slno != 0);
 
4466
      par.m_dups = false;
 
4467
      par.m_verify = true;
 
4468
      BSet bset(tab, itab); // empty bounds
 
4469
      // prefer empty bounds
 
4470
      uint sel = urandom(10);
 
4471
      CHK(scanupdateindex(par, itab, bset, sel < 2) == 0);
 
4472
    }
 
4473
  }
 
4474
  return 0;
 
4475
}
 
4476
 
 
4477
static int
 
4478
pkupdateindexbuild(Par par)
 
4479
{
 
4480
  if (par.m_no == 0) {
 
4481
    CHK(createindex(par) == 0);
 
4482
  } else {
 
4483
    par.m_randomkey = true;
 
4484
    CHK(pkupdate(par) == 0);
 
4485
  }
 
4486
  return 0;
 
4487
}
 
4488
 
 
4489
// savepoint tests (single thread for now)
 
4490
 
 
4491
struct Spt {
 
4492
  enum Res { Committed, Latest, Deadlock };
 
4493
  bool m_same; // same transaction
 
4494
  NdbOperation::LockMode m_lm;
 
4495
  Res m_res;
 
4496
};
 
4497
 
 
4498
static Spt sptlist[] = {
 
4499
  { 1, NdbOperation::LM_Read, Spt::Latest },
 
4500
  { 1, NdbOperation::LM_Exclusive, Spt::Latest },
 
4501
  { 1, NdbOperation::LM_CommittedRead, Spt::Latest },
 
4502
  { 0, NdbOperation::LM_Read, Spt::Deadlock },
 
4503
  { 0, NdbOperation::LM_Exclusive, Spt::Deadlock },
 
4504
  { 0, NdbOperation::LM_CommittedRead, Spt::Committed }
 
4505
};
 
4506
static uint sptcount = sizeof(sptlist)/sizeof(sptlist[0]);
 
4507
 
 
4508
static int
 
4509
savepointreadpk(Par par, Spt spt)
 
4510
{
 
4511
  LL3("savepointreadpk");
 
4512
  Con& con = par.con();
 
4513
  const Tab& tab = par.tab();
 
4514
  Set& set = par.set();
 
4515
  const Set& set1 = set;
 
4516
  Set set2(tab, set.m_rows);
 
4517
  uint n = 0;
 
4518
  for (uint i = 0; i < set.m_rows; i++) {
 
4519
    set.lock();
 
4520
    if (!set.compat(par, i, Row::OpREAD)) {
 
4521
      LL4("savepointreadpk SKIP " << i << " " << set.getrow(i));
 
4522
      set.unlock();
 
4523
      continue;
 
4524
    }
 
4525
    set.unlock();
 
4526
    CHK(set2.selrow(par, *set1.m_row[i]) == 0);
 
4527
    uint err = par.m_catcherr | Con::ErrDeadlock;
 
4528
    ExecType et = NoCommit;
 
4529
    CHK(con.execute(et, err) == 0);
 
4530
    if (err) {
 
4531
      if (err & Con::ErrDeadlock) {
 
4532
        CHK(spt.m_res == Spt::Deadlock);
 
4533
        // all rows have same behaviour
 
4534
        CHK(n == 0);
 
4535
      }
 
4536
      LL1("savepointreadpk stop on " << con.errname(err));
 
4537
      break;
 
4538
    }
 
4539
    uint i2 = (uint)-1;
 
4540
    CHK(set2.getkey(par, &i2) == 0 && i == i2);
 
4541
    CHK(set2.putval(i, false) == 0);
 
4542
    LL4("row " << set2.count() << " " << set2.getrow(i));
 
4543
    n++;
 
4544
  }
 
4545
  bool dirty = (!spt.m_same && spt.m_lm == NdbOperation::LM_CommittedRead);
 
4546
  if (spt.m_res != Spt::Deadlock)
 
4547
    CHK(set1.verify(par, set2, false, dirty) == 0);
 
4548
  return 0;
 
4549
}
 
4550
 
 
4551
static int
 
4552
savepointreadhashindex(Par par, Spt spt)
 
4553
{
 
4554
  if (spt.m_lm == NdbOperation::LM_CommittedRead && !spt.m_same) {
 
4555
    LL1("skip hash index dirty read");
 
4556
    return 0;
 
4557
  }
 
4558
  LL3("savepointreadhashindex");
 
4559
  Con& con = par.con();
 
4560
  const Tab& tab = par.tab();
 
4561
  const ITab& itab = par.itab();
 
4562
  Set& set = par.set();
 
4563
  const Set& set1 = set;
 
4564
  Set set2(tab, set.m_rows);
 
4565
  uint n = 0;
 
4566
  for (uint i = 0; i < set.m_rows; i++) {
 
4567
    set.lock();
 
4568
    if (!set.compat(par, i, Row::OpREAD)) {
 
4569
      LL3("savepointreadhashindex SKIP " << i << " " << set.getrow(i));
 
4570
      set.unlock();
 
4571
      continue;
 
4572
    }
 
4573
    set.unlock();
 
4574
    CHK(set2.selrow(par, itab, *set1.m_row[i]) == 0);
 
4575
    uint err = par.m_catcherr | Con::ErrDeadlock;
 
4576
    ExecType et = NoCommit;
 
4577
    CHK(con.execute(et, err) == 0);
 
4578
    if (err) {
 
4579
      if (err & Con::ErrDeadlock) {
 
4580
        CHK(spt.m_res == Spt::Deadlock);
 
4581
        // all rows have same behaviour
 
4582
        CHK(n == 0);
 
4583
      }
 
4584
      LL1("savepointreadhashindex stop on " << con.errname(err));
 
4585
      break;
 
4586
    }
 
4587
    uint i2 = (uint)-1;
 
4588
    CHK(set2.getkey(par, &i2) == 0 && i == i2);
 
4589
    CHK(set2.putval(i, false) == 0);
 
4590
    LL4("row " << set2.count() << " " << *set2.m_row[i]);
 
4591
    n++;
 
4592
  }
 
4593
  bool dirty = (!spt.m_same && spt.m_lm == NdbOperation::LM_CommittedRead);
 
4594
  if (spt.m_res != Spt::Deadlock)
 
4595
    CHK(set1.verify(par, set2, false, dirty) == 0);
 
4596
  return 0;
 
4597
}
 
4598
 
 
4599
static int
 
4600
savepointscantable(Par par, Spt spt)
 
4601
{
 
4602
  LL3("savepointscantable");
 
4603
  Con& con = par.con();
 
4604
  const Tab& tab = par.tab();
 
4605
  const Set& set = par.set();
 
4606
  const Set& set1 = set; // not modifying current set
 
4607
  Set set2(tab, set.m_rows); // scan result
 
4608
  CHK(con.getNdbScanOperation(tab) == 0);
 
4609
  CHK(con.readTuples(par) == 0);
 
4610
  set2.getval(par); // getValue all columns
 
4611
  CHK(con.executeScan() == 0);
 
4612
  bool deadlock = false;
 
4613
  uint n = 0;
 
4614
  while (1) {
 
4615
    int ret;
 
4616
    uint err = par.m_catcherr | Con::ErrDeadlock;
 
4617
    CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
 
4618
    if (ret == 1)
 
4619
      break;
 
4620
    if (err) {
 
4621
      if (err & Con::ErrDeadlock) {
 
4622
        CHK(spt.m_res == Spt::Deadlock);
 
4623
        // all rows have same behaviour
 
4624
        CHK(n == 0);
 
4625
        deadlock = true;
 
4626
      }
 
4627
      LL1("savepointscantable stop on " << con.errname(err));
 
4628
      break;
 
4629
    }
 
4630
    CHK(spt.m_res != Spt::Deadlock);
 
4631
    uint i = (uint)-1;
 
4632
    CHK(set2.getkey(par, &i) == 0);
 
4633
    CHK(set2.putval(i, false, n) == 0);
 
4634
    LL4("row " << n << " key " << i << " " << set2.getrow(i));
 
4635
    n++;
 
4636
  }
 
4637
  if (set1.m_rows > 0) {
 
4638
    if (!deadlock)
 
4639
      CHK(spt.m_res != Spt::Deadlock);
 
4640
    else
 
4641
      CHK(spt.m_res == Spt::Deadlock);
 
4642
  }
 
4643
  LL2("savepointscantable " << n << " rows");
 
4644
  bool dirty = (!spt.m_same && spt.m_lm == NdbOperation::LM_CommittedRead);
 
4645
  if (spt.m_res != Spt::Deadlock)
 
4646
    CHK(set1.verify(par, set2, false, dirty) == 0);
 
4647
  return 0;
 
4648
}
 
4649
 
 
4650
static int
 
4651
savepointscanindex(Par par, Spt spt)
 
4652
{
 
4653
  LL3("savepointscanindex");
 
4654
  Con& con = par.con();
 
4655
  const Tab& tab = par.tab();
 
4656
  const ITab& itab = par.itab();
 
4657
  const Set& set = par.set();
 
4658
  const Set& set1 = set;
 
4659
  Set set2(tab, set.m_rows);
 
4660
  CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
 
4661
  CHK(con.readIndexTuples(par) == 0);
 
4662
  set2.getval(par);
 
4663
  CHK(con.executeScan() == 0);
 
4664
  bool deadlock = false;
 
4665
  uint n = 0;
 
4666
  while (1) {
 
4667
    int ret;
 
4668
    uint err = par.m_catcherr | Con::ErrDeadlock;
 
4669
    CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
 
4670
    if (ret == 1)
 
4671
      break;
 
4672
    if (err) {
 
4673
      if (err & Con::ErrDeadlock) {
 
4674
        CHK(spt.m_res == Spt::Deadlock);
 
4675
        // all rows have same behaviour
 
4676
        CHK(n == 0);
 
4677
        deadlock = true;
 
4678
      }
 
4679
      LL1("savepointscanindex stop on " << con.errname(err));
 
4680
      break;
 
4681
    }
 
4682
    CHK(spt.m_res != Spt::Deadlock);
 
4683
    uint i = (uint)-1;
 
4684
    CHK(set2.getkey(par, &i) == 0);
 
4685
    CHK(set2.putval(i, par.m_dups, n) == 0);
 
4686
    LL4("row " << n << " key " << i << " " << set2.getrow(i));
 
4687
    n++;
 
4688
  }
 
4689
  if (set1.m_rows > 0) {
 
4690
    if (!deadlock)
 
4691
      CHK(spt.m_res != Spt::Deadlock);
 
4692
    else
 
4693
      CHK(spt.m_res == Spt::Deadlock);
 
4694
  }
 
4695
  LL2("savepointscanindex " << n << " rows");
 
4696
  bool dirty = (!spt.m_same && spt.m_lm == NdbOperation::LM_CommittedRead);
 
4697
  if (spt.m_res != Spt::Deadlock)
 
4698
    CHK(set1.verify(par, set2, false, dirty) == 0);
 
4699
  return 0;
 
4700
}
 
4701
 
 
4702
typedef int (*SptFun)(Par, Spt);
 
4703
 
 
4704
static int
 
4705
savepointtest(Par par, Spt spt, SptFun fun)
 
4706
{
 
4707
  Con& con = par.con();
 
4708
  Par par2 = par;
 
4709
  Con con2;
 
4710
  if (!spt.m_same) {
 
4711
    con2.connect(con); // copy ndb reference
 
4712
    par2.m_con = &con2;
 
4713
    CHK(con2.startTransaction() == 0);
 
4714
  }
 
4715
  par2.m_lockmode = spt.m_lm;
 
4716
  CHK((*fun)(par2, spt) == 0);
 
4717
  if (!spt.m_same) {
 
4718
    con2.closeTransaction();
 
4719
  }
 
4720
  return 0;
 
4721
}
 
4722
 
 
4723
static int
 
4724
savepointtest(Par par, const char* op)
 
4725
{
 
4726
  Con& con = par.con();
 
4727
  const Tab& tab = par.tab();
 
4728
  Set& set = par.set();
 
4729
  LL2("savepointtest op=\"" << op << "\"");
 
4730
  CHK(con.startTransaction() == 0);
 
4731
  const char* p = op;
 
4732
  char c;
 
4733
  while ((c = *p++) != 0) {
 
4734
    uint j;
 
4735
    for (j = 0; j < par.m_rows; j++) {
 
4736
      uint i = thrrow(par, j);
 
4737
      if (c == 'c') {
 
4738
        ExecType et = Commit;
 
4739
        CHK(con.execute(et) == 0);
 
4740
        set.lock();
 
4741
        set.post(par, et);
 
4742
        set.unlock();
 
4743
        CHK(con.startTransaction() == 0);
 
4744
      } else {
 
4745
        set.lock();
 
4746
        set.push(i);
 
4747
        if (c == 'i') {
 
4748
          set.calc(par, i);
 
4749
          CHK(set.insrow(par, i) == 0);
 
4750
        } else if (c == 'u') {
 
4751
          set.copyval(i, tab.m_pkmask);
 
4752
          set.calc(par, i, ~tab.m_pkmask);
 
4753
          CHK(set.updrow(par, i) == 0);
 
4754
        } else if (c == 'd') {
 
4755
          set.copyval(i, tab.m_pkmask);
 
4756
          CHK(set.delrow(par, i) == 0);
 
4757
        } else {
 
4758
          assert(false);
 
4759
        }
 
4760
        set.unlock();
 
4761
      }
 
4762
    }
 
4763
  }
 
4764
  {
 
4765
    ExecType et = NoCommit;
 
4766
    CHK(con.execute(et) == 0);
 
4767
    set.lock();
 
4768
    set.post(par, et);
 
4769
    set.unlock();
 
4770
  }
 
4771
  for (uint k = 0; k < sptcount; k++) {
 
4772
    Spt spt = sptlist[k];
 
4773
    LL2("spt lm=" << spt.m_lm << " same=" << spt.m_same);
 
4774
    CHK(savepointtest(par, spt, &savepointreadpk) == 0);
 
4775
    CHK(savepointtest(par, spt, &savepointscantable) == 0);
 
4776
    for (uint i = 0; i < tab.m_itabs; i++) {
 
4777
      if (tab.m_itab[i] == 0)
 
4778
        continue;
 
4779
      const ITab& itab = *tab.m_itab[i];
 
4780
      par.m_itab = &itab;
 
4781
      if (itab.m_type == ITab::OrderedIndex)
 
4782
        CHK(savepointtest(par, spt, &savepointscanindex) == 0);
 
4783
      else
 
4784
        CHK(savepointtest(par, spt, &savepointreadhashindex) == 0);
 
4785
      par.m_itab = 0;
 
4786
    }
 
4787
  }
 
4788
  {
 
4789
    ExecType et = Rollback;
 
4790
    CHK(con.execute(et) == 0);
 
4791
    set.lock();
 
4792
    set.post(par, et);
 
4793
    set.unlock();
 
4794
  }
 
4795
  con.closeTransaction();
 
4796
  return 0;
 
4797
}
 
4798
 
 
4799
static int
 
4800
savepointtest(Par par)
 
4801
{
 
4802
  assert(par.m_usedthreads == 1);
 
4803
  const char* oplist[] = {
 
4804
    // each based on previous and "c" not last
 
4805
    "i",
 
4806
    "icu",
 
4807
    "uuuuu",
 
4808
    "d",
 
4809
    "dciuuuuud",
 
4810
    0
 
4811
  };
 
4812
  int i;
 
4813
  for (i = 0; oplist[i] != 0; i++) {
 
4814
    CHK(savepointtest(par, oplist[i]) == 0);
 
4815
  }
 
4816
  return 0;
 
4817
}
 
4818
 
 
4819
static int
 
4820
halloweentest(Par par, const ITab& itab)
 
4821
{
 
4822
  LL2("halloweentest " << itab.m_name);
 
4823
  Con& con = par.con();
 
4824
  const Tab& tab = par.tab();
 
4825
  Set& set = par.set();
 
4826
  CHK(con.startTransaction() == 0);
 
4827
  // insert 1 row
 
4828
  uint i = 0;
 
4829
  set.push(i);
 
4830
  set.calc(par, i);
 
4831
  CHK(set.insrow(par, i) == 0);
 
4832
  CHK(con.execute(NoCommit) == 0);
 
4833
  // scan via index until Set m_rows reached
 
4834
  uint scancount = 0;
 
4835
  bool stop = false;
 
4836
  while (!stop) {
 
4837
    par.m_lockmode = // makes no difference
 
4838
      scancount % 2 == 0 ? NdbOperation::LM_CommittedRead :
 
4839
                           NdbOperation::LM_Read;
 
4840
    Set set1(tab, set.m_rows); // expected scan result
 
4841
    Set set2(tab, set.m_rows); // actual scan result
 
4842
    BSet bset(tab, itab);
 
4843
    calcscanbounds(par, itab, bset, set, set1);
 
4844
    CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
 
4845
    CHK(con.readIndexTuples(par) == 0);
 
4846
    CHK(bset.setbnd(par) == 0);
 
4847
    set2.getval(par);
 
4848
    CHK(con.executeScan() == 0);
 
4849
    const uint savepoint = i;
 
4850
    LL3("scancount=" << scancount << " savepoint=" << savepoint);
 
4851
    uint n = 0;
 
4852
    while (1) {
 
4853
      int ret;
 
4854
      CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
 
4855
      if (ret == 1)
 
4856
        break;
 
4857
      uint k = (uint)-1;
 
4858
      CHK(set2.getkey(par, &k) == 0);
 
4859
      CHK(set2.putval(k, false, n) == 0);
 
4860
      LL3("row=" << n << " key=" << k);
 
4861
      CHK(k <= savepoint);
 
4862
      if (++i == set.m_rows) {
 
4863
        stop = true;
 
4864
        break;
 
4865
      }
 
4866
      set.push(i);
 
4867
      set.calc(par, i);
 
4868
      CHK(set.insrow(par, i) == 0);
 
4869
      CHK(con.execute(NoCommit) == 0);
 
4870
      n++;
 
4871
    }
 
4872
    con.closeScan();
 
4873
    LL3("scanrows=" << n);
 
4874
    if (!stop) {
 
4875
      CHK(set1.verify(par, set2, false) == 0);
 
4876
    }
 
4877
    scancount++;
 
4878
  }
 
4879
  CHK(con.execute(Commit) == 0);
 
4880
  set.post(par, Commit);
 
4881
  assert(set.count() == set.m_rows);
 
4882
  CHK(pkdelete(par) == 0);
 
4883
  return 0;
 
4884
}
 
4885
 
 
4886
static int
 
4887
halloweentest(Par par)
 
4888
{
 
4889
  assert(par.m_usedthreads == 1);
 
4890
  const Tab& tab = par.tab();
 
4891
  for (uint i = 0; i < tab.m_itabs; i++) {
 
4892
    if (tab.m_itab[i] == 0)
 
4893
      continue;
 
4894
    const ITab& itab = *tab.m_itab[i];
 
4895
    if (itab.m_type == ITab::OrderedIndex)
 
4896
      CHK(halloweentest(par, itab) == 0);
 
4897
  }
 
4898
  return 0;
 
4899
}
 
4900
 
 
4901
// threads
 
4902
 
 
4903
typedef int (*TFunc)(Par par);
 
4904
enum TMode { ST = 1, MT = 2 };
 
4905
 
 
4906
extern "C" { static void* runthread(void* arg); }
 
4907
 
 
4908
struct Thr {
 
4909
  enum State { Wait, Start, Stop, Exit };
 
4910
  State m_state;
 
4911
  Par m_par;
 
4912
  pthread_t m_id;
 
4913
  NdbThread* m_thread;
 
4914
  NdbMutex* m_mutex;
 
4915
  NdbCondition* m_cond;
 
4916
  TFunc m_func;
 
4917
  int m_ret;
 
4918
  void* m_status;
 
4919
  char m_tmp[20]; // used for debug msg prefix
 
4920
  Thr(Par par, uint n);
 
4921
  ~Thr();
 
4922
  int run();
 
4923
  void start();
 
4924
  void stop();
 
4925
  void exit();
 
4926
  //
 
4927
  void lock() {
 
4928
    NdbMutex_Lock(m_mutex);
 
4929
  }
 
4930
  void unlock() {
 
4931
    NdbMutex_Unlock(m_mutex);
 
4932
  }
 
4933
  void wait() {
 
4934
    NdbCondition_Wait(m_cond, m_mutex);
 
4935
  }
 
4936
  void signal() {
 
4937
    NdbCondition_Signal(m_cond);
 
4938
  }
 
4939
  void join() {
 
4940
    NdbThread_WaitFor(m_thread, &m_status);
 
4941
    m_thread = 0;
 
4942
  }
 
4943
};
 
4944
 
 
4945
Thr::Thr(Par par, uint n) :
 
4946
  m_state(Wait),
 
4947
  m_par(par),
 
4948
  m_thread(0),
 
4949
  m_mutex(0),
 
4950
  m_cond(0),
 
4951
  m_func(0),
 
4952
  m_ret(0),
 
4953
  m_status(0)
 
4954
{
 
4955
  m_par.m_no = n;
 
4956
  char buf[10];
 
4957
  sprintf(buf, "thr%03u", par.m_no);
 
4958
  const char* name = strcpy(new char[10], buf);
 
4959
  // mutex
 
4960
  m_mutex = NdbMutex_Create();
 
4961
  m_cond = NdbCondition_Create();
 
4962
  assert(m_mutex != 0 && m_cond != 0);
 
4963
  // run
 
4964
  const uint stacksize = 256 * 1024;
 
4965
  const NDB_THREAD_PRIO prio = NDB_THREAD_PRIO_LOW;
 
4966
  m_thread = NdbThread_Create(runthread, (void**)this, stacksize, name, prio);
 
4967
}
 
4968
 
 
4969
Thr::~Thr()
 
4970
{
 
4971
  if (m_thread != 0) {
 
4972
    NdbThread_Destroy(&m_thread);
 
4973
    m_thread = 0;
 
4974
  }
 
4975
  if (m_cond != 0) {
 
4976
    NdbCondition_Destroy(m_cond);
 
4977
    m_cond = 0;
 
4978
  }
 
4979
  if (m_mutex != 0) {
 
4980
    NdbMutex_Destroy(m_mutex);
 
4981
    m_mutex = 0;
 
4982
  }
 
4983
}
 
4984
 
 
4985
static void*
 
4986
runthread(void* arg)
 
4987
{
 
4988
  Thr& thr = *(Thr*)arg;
 
4989
  thr.m_id = pthread_self();
 
4990
  if (thr.run() < 0) {
 
4991
    LL1("exit on error");
 
4992
  } else {
 
4993
    LL4("exit ok");
 
4994
  }
 
4995
  return 0;
 
4996
}
 
4997
 
 
4998
int
 
4999
Thr::run()
 
5000
{
 
5001
  LL4("run");
 
5002
  Con con;
 
5003
  CHK(con.connect() == 0);
 
5004
  m_par.m_con = &con;
 
5005
  LL4("connected");
 
5006
  while (1) {
 
5007
    lock();
 
5008
    while (m_state != Start && m_state != Exit) {
 
5009
      LL4("wait");
 
5010
      wait();
 
5011
    }
 
5012
    if (m_state == Exit) {
 
5013
      LL4("exit");
 
5014
      unlock();
 
5015
      break;
 
5016
    }
 
5017
    LL4("start");
 
5018
    assert(m_state == Start);
 
5019
    m_ret = (*m_func)(m_par);
 
5020
    m_state = Stop;
 
5021
    LL4("stop");
 
5022
    signal();
 
5023
    unlock();
 
5024
    if (m_ret == -1) {
 
5025
      if (m_par.m_cont)
 
5026
        LL1("continue running due to -cont");
 
5027
      else
 
5028
        return -1;
 
5029
    }
 
5030
  }
 
5031
  con.disconnect();
 
5032
  return 0;
 
5033
}
 
5034
 
 
5035
void
 
5036
Thr::start()
 
5037
{
 
5038
  lock();
 
5039
  m_state = Start;
 
5040
  signal();
 
5041
  unlock();
 
5042
}
 
5043
 
 
5044
void
 
5045
Thr::stop()
 
5046
{
 
5047
  lock();
 
5048
  while (m_state != Stop)
 
5049
    wait();
 
5050
  m_state = Wait;
 
5051
  unlock();
 
5052
}
 
5053
 
 
5054
void
 
5055
Thr::exit()
 
5056
{
 
5057
  lock();
 
5058
  m_state = Exit;
 
5059
  signal();
 
5060
  unlock();
 
5061
}
 
5062
 
 
5063
// test run
 
5064
 
 
5065
static Thr** g_thrlist = 0;
 
5066
 
 
5067
static Thr*
 
5068
getthr()
 
5069
{
 
5070
  if (g_thrlist != 0) {
 
5071
    pthread_t id = pthread_self();
 
5072
    for (uint n = 0; n < g_opt.m_threads; n++) {
 
5073
      if (g_thrlist[n] != 0) {
 
5074
        Thr& thr = *g_thrlist[n];
 
5075
        if (pthread_equal(thr.m_id, id))
 
5076
          return &thr;
 
5077
      }
 
5078
    }
 
5079
  }
 
5080
  return 0;
 
5081
}
 
5082
 
 
5083
// for debug messages (par.m_no not available)
 
5084
static const char*
 
5085
getthrprefix()
 
5086
{
 
5087
  Thr* thrp = getthr();
 
5088
  if (thrp != 0) {
 
5089
    Thr& thr = *thrp;
 
5090
    uint n = thr.m_par.m_no;
 
5091
    uint m =
 
5092
      g_opt.m_threads < 10 ? 1 :
 
5093
      g_opt.m_threads < 100 ? 2 : 3;
 
5094
    sprintf(thr.m_tmp, "[%0*u] ", m, n);
 
5095
    return thr.m_tmp;
 
5096
  }
 
5097
  return "";
 
5098
}
 
5099
 
 
5100
static int
 
5101
runstep(Par par, const char* fname, TFunc func, uint mode)
 
5102
{
 
5103
  LL2("step: " << fname);
 
5104
  const int threads = (mode & ST ? 1 : par.m_usedthreads);
 
5105
  int n; 
 
5106
  for (n = 0; n < threads; n++) {
 
5107
    LL4("start " << n);
 
5108
    Thr& thr = *g_thrlist[n];
 
5109
    Par oldpar = thr.m_par;
 
5110
    // update parameters
 
5111
    thr.m_par = par;
 
5112
    thr.m_par.m_no = oldpar.m_no;
 
5113
    thr.m_par.m_con = oldpar.m_con;
 
5114
    thr.m_func = func;
 
5115
    thr.start();
 
5116
  }
 
5117
  uint errs = 0;
 
5118
  for (n = threads - 1; n >= 0; n--) {
 
5119
    LL4("stop " << n);
 
5120
    Thr& thr = *g_thrlist[n];
 
5121
    thr.stop();
 
5122
    if (thr.m_ret != 0)
 
5123
      errs++;
 
5124
  }
 
5125
  CHK(errs == 0);
 
5126
  return 0;
 
5127
}
 
5128
 
 
5129
#define RUNSTEP(par, func, mode) \
 
5130
  CHK(runstep(par, #func, func, mode) == 0)
 
5131
 
 
5132
#define SUBLOOP(par) \
 
5133
  "sloop: " << par.m_lno << "/" << par.m_currcase << "/" << \
 
5134
  par.m_tab->m_name << "/" << par.m_slno
 
5135
 
 
5136
static int
 
5137
tbuild(Par par)
 
5138
{
 
5139
  RUNSTEP(par, droptable, ST);
 
5140
  RUNSTEP(par, createtable, ST);
 
5141
  RUNSTEP(par, invalidatetable, MT);
 
5142
  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
 
5143
    LL1(SUBLOOP(par));
 
5144
    if (par.m_slno % 3 == 0) {
 
5145
      RUNSTEP(par, createindex, ST);
 
5146
      RUNSTEP(par, invalidateindex, MT);
 
5147
      RUNSTEP(par, pkinsert, MT);
 
5148
      RUNSTEP(par, pkupdate, MT);
 
5149
    } else if (par.m_slno % 3 == 1) {
 
5150
      RUNSTEP(par, pkinsert, MT);
 
5151
      RUNSTEP(par, createindex, ST);
 
5152
      RUNSTEP(par, invalidateindex, MT);
 
5153
      RUNSTEP(par, pkupdate, MT);
 
5154
    } else {
 
5155
      RUNSTEP(par, pkinsert, MT);
 
5156
      RUNSTEP(par, pkupdate, MT);
 
5157
      RUNSTEP(par, createindex, ST);
 
5158
      RUNSTEP(par, invalidateindex, MT);
 
5159
    }
 
5160
    RUNSTEP(par, readverifyfull, MT);
 
5161
    // leave last one
 
5162
    if (par.m_slno + 1 < par.m_sloop) {
 
5163
      RUNSTEP(par, pkdelete, MT);
 
5164
      RUNSTEP(par, readverifyfull, MT);
 
5165
      RUNSTEP(par, dropindex, ST);
 
5166
    }
 
5167
  }
 
5168
  return 0;
 
5169
}
 
5170
 
 
5171
static int
 
5172
tindexscan(Par par)
 
5173
{
 
5174
  RUNSTEP(par, droptable, ST);
 
5175
  RUNSTEP(par, createtable, ST);
 
5176
  RUNSTEP(par, invalidatetable, MT);
 
5177
  RUNSTEP(par, createindex, ST);
 
5178
  RUNSTEP(par, invalidateindex, MT);
 
5179
  RUNSTEP(par, pkinsert, MT);
 
5180
  RUNSTEP(par, readverifyfull, MT);
 
5181
  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
 
5182
    LL1(SUBLOOP(par));
 
5183
    RUNSTEP(par, readverifyindex, MT);
 
5184
  }
 
5185
  return 0;
 
5186
}
 
5187
 
 
5188
 
 
5189
static int
 
5190
tpkops(Par par)
 
5191
{
 
5192
  RUNSTEP(par, droptable, ST);
 
5193
  RUNSTEP(par, createtable, ST);
 
5194
  RUNSTEP(par, invalidatetable, MT);
 
5195
  RUNSTEP(par, createindex, ST);
 
5196
  RUNSTEP(par, invalidateindex, MT);
 
5197
  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
 
5198
    LL1(SUBLOOP(par));
 
5199
    RUNSTEP(par, pkops, MT);
 
5200
    LL2("rows=" << par.set().count());
 
5201
    RUNSTEP(par, readverifyfull, MT);
 
5202
  }
 
5203
  return 0;
 
5204
}
 
5205
 
 
5206
static int
 
5207
tpkopsread(Par par)
 
5208
{
 
5209
  RUNSTEP(par, droptable, ST);
 
5210
  RUNSTEP(par, createtable, ST);
 
5211
  RUNSTEP(par, invalidatetable, MT);
 
5212
  RUNSTEP(par, pkinsert, MT);
 
5213
  RUNSTEP(par, createindex, ST);
 
5214
  RUNSTEP(par, invalidateindex, MT);
 
5215
  RUNSTEP(par, readverifyfull, MT);
 
5216
  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
 
5217
    LL1(SUBLOOP(par));
 
5218
    RUNSTEP(par, pkupdatescanread, MT);
 
5219
    RUNSTEP(par, readverifyfull, MT);
 
5220
  }
 
5221
  RUNSTEP(par, pkdelete, MT);
 
5222
  RUNSTEP(par, readverifyfull, MT);
 
5223
  return 0;
 
5224
}
 
5225
 
 
5226
static int
 
5227
tmixedops(Par par)
 
5228
{
 
5229
  RUNSTEP(par, droptable, ST);
 
5230
  RUNSTEP(par, createtable, ST);
 
5231
  RUNSTEP(par, invalidatetable, MT);
 
5232
  RUNSTEP(par, pkinsert, MT);
 
5233
  RUNSTEP(par, createindex, ST);
 
5234
  RUNSTEP(par, invalidateindex, MT);
 
5235
  RUNSTEP(par, readverifyfull, MT);
 
5236
  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
 
5237
    LL1(SUBLOOP(par));
 
5238
    RUNSTEP(par, mixedoperations, MT);
 
5239
    RUNSTEP(par, readverifyfull, MT);
 
5240
  }
 
5241
  return 0;
 
5242
}
 
5243
 
 
5244
static int
 
5245
tbusybuild(Par par)
 
5246
{
 
5247
  RUNSTEP(par, droptable, ST);
 
5248
  RUNSTEP(par, createtable, ST);
 
5249
  RUNSTEP(par, invalidatetable, MT);
 
5250
  RUNSTEP(par, pkinsert, MT);
 
5251
  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
 
5252
    LL1(SUBLOOP(par));
 
5253
    RUNSTEP(par, pkupdateindexbuild, MT);
 
5254
    RUNSTEP(par, invalidateindex, MT);
 
5255
    RUNSTEP(par, readverifyfull, MT);
 
5256
    RUNSTEP(par, dropindex, ST);
 
5257
  }
 
5258
  return 0;
 
5259
}
 
5260
 
 
5261
static int
 
5262
trollback(Par par)
 
5263
{
 
5264
  par.m_abortpct = 50;
 
5265
  RUNSTEP(par, droptable, ST);
 
5266
  RUNSTEP(par, createtable, ST);
 
5267
  RUNSTEP(par, invalidatetable, MT);
 
5268
  RUNSTEP(par, pkinsert, MT);
 
5269
  RUNSTEP(par, createindex, ST);
 
5270
  RUNSTEP(par, invalidateindex, MT);
 
5271
  RUNSTEP(par, readverifyfull, MT);
 
5272
  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
 
5273
    LL1(SUBLOOP(par));
 
5274
    RUNSTEP(par, mixedoperations, MT);
 
5275
    RUNSTEP(par, readverifyfull, MT);
 
5276
  }
 
5277
  return 0;
 
5278
}
 
5279
 
 
5280
static int
 
5281
tparupdate(Par par)
 
5282
{
 
5283
  RUNSTEP(par, droptable, ST);
 
5284
  RUNSTEP(par, createtable, ST);
 
5285
  RUNSTEP(par, invalidatetable, MT);
 
5286
  RUNSTEP(par, pkinsert, MT);
 
5287
  RUNSTEP(par, createindex, ST);
 
5288
  RUNSTEP(par, invalidateindex, MT);
 
5289
  RUNSTEP(par, readverifyfull, MT);
 
5290
  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
 
5291
    LL1(SUBLOOP(par));
 
5292
    RUNSTEP(par, parallelorderedupdate, MT);
 
5293
    RUNSTEP(par, readverifyfull, MT);
 
5294
  }
 
5295
  return 0;
 
5296
}
 
5297
 
 
5298
static int
 
5299
tsavepoint(Par par)
 
5300
{
 
5301
  RUNSTEP(par, droptable, ST);
 
5302
  RUNSTEP(par, createtable, ST);
 
5303
  RUNSTEP(par, invalidatetable, MT);
 
5304
  RUNSTEP(par, createindex, ST);
 
5305
  RUNSTEP(par, invalidateindex, MT);
 
5306
  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
 
5307
    LL1(SUBLOOP(par));
 
5308
    RUNSTEP(par, savepointtest, MT);
 
5309
    RUNSTEP(par, readverifyfull, MT);
 
5310
  }
 
5311
  return 0;
 
5312
}
 
5313
 
 
5314
static int
 
5315
thalloween(Par par)
 
5316
{
 
5317
  RUNSTEP(par, droptable, ST);
 
5318
  RUNSTEP(par, createtable, ST);
 
5319
  RUNSTEP(par, invalidatetable, MT);
 
5320
  RUNSTEP(par, createindex, ST);
 
5321
  RUNSTEP(par, invalidateindex, MT);
 
5322
  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
 
5323
    LL1(SUBLOOP(par));
 
5324
    RUNSTEP(par, halloweentest, MT);
 
5325
  }
 
5326
  return 0;
 
5327
}
 
5328
 
 
5329
static int
 
5330
ttimebuild(Par par)
 
5331
{
 
5332
  Tmr t1;
 
5333
  RUNSTEP(par, droptable, ST);
 
5334
  RUNSTEP(par, createtable, ST);
 
5335
  RUNSTEP(par, invalidatetable, MT);
 
5336
  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
 
5337
    LL1(SUBLOOP(par));
 
5338
    RUNSTEP(par, pkinsert, MT);
 
5339
    t1.on();
 
5340
    RUNSTEP(par, createindex, ST);
 
5341
    t1.off(par.m_totrows);
 
5342
    RUNSTEP(par, invalidateindex, MT);
 
5343
    RUNSTEP(par, dropindex, ST);
 
5344
  }
 
5345
  LL1("build index - " << t1.time());
 
5346
  return 0;
 
5347
}
 
5348
 
 
5349
static int
 
5350
ttimemaint(Par par)
 
5351
{
 
5352
  Tmr t1, t2;
 
5353
  RUNSTEP(par, droptable, ST);
 
5354
  RUNSTEP(par, createtable, ST);
 
5355
  RUNSTEP(par, invalidatetable, MT);
 
5356
  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
 
5357
    LL1(SUBLOOP(par));
 
5358
    RUNSTEP(par, pkinsert, MT);
 
5359
    t1.on();
 
5360
    RUNSTEP(par, pkupdate, MT);
 
5361
    t1.off(par.m_totrows);
 
5362
    RUNSTEP(par, createindex, ST);
 
5363
    RUNSTEP(par, invalidateindex, MT);
 
5364
    t2.on();
 
5365
    RUNSTEP(par, pkupdate, MT);
 
5366
    t2.off(par.m_totrows);
 
5367
    RUNSTEP(par, dropindex, ST);
 
5368
  }
 
5369
  LL1("update - " << t1.time());
 
5370
  LL1("update indexed - " << t2.time());
 
5371
  LL1("overhead - " << t2.over(t1));
 
5372
  return 0;
 
5373
}
 
5374
 
 
5375
static int
 
5376
ttimescan(Par par)
 
5377
{
 
5378
  if (par.tab().m_itab[0] == 0) {
 
5379
    LL1("ttimescan - no index 0, skipped");
 
5380
    return 0;
 
5381
  }
 
5382
  Tmr t1, t2;
 
5383
  RUNSTEP(par, droptable, ST);
 
5384
  RUNSTEP(par, createtable, ST);
 
5385
  RUNSTEP(par, invalidatetable, MT);
 
5386
  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
 
5387
    LL1(SUBLOOP(par));
 
5388
    RUNSTEP(par, pkinsert, MT);
 
5389
    RUNSTEP(par, createindex, ST);
 
5390
    par.m_tmr = &t1;
 
5391
    RUNSTEP(par, timescantable, ST);
 
5392
    par.m_tmr = &t2;
 
5393
    RUNSTEP(par, timescanpkindex, ST);
 
5394
    RUNSTEP(par, dropindex, ST);
 
5395
  }
 
5396
  LL1("full scan table - " << t1.time());
 
5397
  LL1("full scan PK index - " << t2.time());
 
5398
  LL1("overhead - " << t2.over(t1));
 
5399
  return 0;
 
5400
}
 
5401
 
 
5402
static int
 
5403
ttimepkread(Par par)
 
5404
{
 
5405
  if (par.tab().m_itab[0] == 0) {
 
5406
    LL1("ttimescan - no index 0, skipped");
 
5407
    return 0;
 
5408
  }
 
5409
  Tmr t1, t2;
 
5410
  RUNSTEP(par, droptable, ST);
 
5411
  RUNSTEP(par, createtable, ST);
 
5412
  RUNSTEP(par, invalidatetable, MT);
 
5413
  for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
 
5414
    LL1(SUBLOOP(par));
 
5415
    RUNSTEP(par, pkinsert, MT);
 
5416
    RUNSTEP(par, createindex, ST);
 
5417
    par.m_tmr = &t1;
 
5418
    RUNSTEP(par, timepkreadtable, ST);
 
5419
    par.m_tmr = &t2;
 
5420
    RUNSTEP(par, timepkreadindex, ST);
 
5421
    RUNSTEP(par, dropindex, ST);
 
5422
  }
 
5423
  LL1("pk read table - " << t1.time());
 
5424
  LL1("pk read PK index - " << t2.time());
 
5425
  LL1("overhead - " << t2.over(t1));
 
5426
  return 0;
 
5427
}
 
5428
 
 
5429
static int
 
5430
tdrop(Par par)
 
5431
{
 
5432
  RUNSTEP(par, droptable, ST);
 
5433
  return 0;
 
5434
}
 
5435
 
 
5436
struct TCase {
 
5437
  const char* m_name;
 
5438
  TFunc m_func;
 
5439
  const char* m_desc;
 
5440
  TCase(const char* name, TFunc func, const char* desc) :
 
5441
    m_name(name),
 
5442
    m_func(func),
 
5443
    m_desc(desc) {
 
5444
  }
 
5445
};
 
5446
 
 
5447
static const TCase
 
5448
tcaselist[] = {
 
5449
  TCase("a", tbuild, "index build"),
 
5450
  TCase("b", tindexscan, "index scans"),
 
5451
  TCase("c", tpkops, "pk operations"),
 
5452
  TCase("d", tpkopsread, "pk operations and scan reads"),
 
5453
  TCase("e", tmixedops, "pk operations and scan operations"),
 
5454
  TCase("f", tbusybuild, "pk operations and index build"),
 
5455
  TCase("g", trollback, "operations with random rollbacks"),
 
5456
  TCase("h", tparupdate, "parallel ordered update bug#20446"),
 
5457
  TCase("i", tsavepoint, "savepoint test locking bug#31477"),
 
5458
  TCase("j", thalloween, "savepoint test halloween problem"),
 
5459
  TCase("t", ttimebuild, "time index build"),
 
5460
  TCase("u", ttimemaint, "time index maintenance"),
 
5461
  TCase("v", ttimescan, "time full scan table vs index on pk"),
 
5462
  TCase("w", ttimepkread, "time pk read table vs index on pk"),
 
5463
  TCase("z", tdrop, "drop test tables")
 
5464
};
 
5465
 
 
5466
static const uint
 
5467
tcasecount = sizeof(tcaselist) / sizeof(tcaselist[0]);
 
5468
 
 
5469
static void
 
5470
printcases()
 
5471
{
 
5472
  ndbout << "test cases:" << endl;
 
5473
  for (uint i = 0; i < tcasecount; i++) {
 
5474
    const TCase& tcase = tcaselist[i];
 
5475
    ndbout << "  " << tcase.m_name << " - " << tcase.m_desc << endl;
 
5476
  }
 
5477
}
 
5478
 
 
5479
static void
 
5480
printtables()
 
5481
{
 
5482
  Par par(g_opt);
 
5483
  makebuiltintables(par);
 
5484
  ndbout << "tables and indexes (x=ordered z=hash x0=on pk):" << endl;
 
5485
  for (uint j = 0; j < tabcount; j++) {
 
5486
    if (tablist[j] == 0)
 
5487
      continue;
 
5488
    const Tab& tab = *tablist[j];
 
5489
    const char* tname = tab.m_name;
 
5490
    ndbout << "  " << tname;
 
5491
    for (uint i = 0; i < tab.m_itabs; i++) {
 
5492
      if (tab.m_itab[i] == 0)
 
5493
        continue;
 
5494
      const ITab& itab = *tab.m_itab[i];
 
5495
      const char* iname = itab.m_name;
 
5496
      if (strncmp(tname, iname, strlen(tname)) == 0)
 
5497
        iname += strlen(tname);
 
5498
      ndbout << " " << iname;
 
5499
      ndbout << "(";
 
5500
      for (uint k = 0; k < itab.m_icols; k++) {
 
5501
        if (k != 0)
 
5502
          ndbout << ",";
 
5503
        const ICol& icol = *itab.m_icol[k];
 
5504
        const Col& col = icol.m_col;
 
5505
        ndbout << col.m_name;
 
5506
      }
 
5507
      ndbout << ")";
 
5508
    }
 
5509
    ndbout << endl;
 
5510
  }
 
5511
}
 
5512
 
 
5513
static bool
 
5514
setcasepar(Par& par)
 
5515
{
 
5516
  Opt d;
 
5517
  const char* c = par.m_currcase;
 
5518
  switch (c[0]) {
 
5519
  case 'i':
 
5520
    {
 
5521
      if (par.m_usedthreads > 1) {
 
5522
        par.m_usedthreads = 1;
 
5523
        LL1("case " << c << " reduce threads to " << par.m_usedthreads);
 
5524
      }
 
5525
      const uint rows = 100;
 
5526
      if (par.m_rows > rows) {
 
5527
        par.m_rows = rows;
 
5528
        LL1("case " << c << " reduce rows to " << rows);
 
5529
      }
 
5530
    }
 
5531
    break;
 
5532
  case 'j':
 
5533
    {
 
5534
      if (par.m_usedthreads > 1) {
 
5535
        par.m_usedthreads = 1;
 
5536
        LL1("case " << c << " reduce threads to " << par.m_usedthreads);
 
5537
      }
 
5538
    }
 
5539
    break;
 
5540
  default:
 
5541
    break;
 
5542
  }
 
5543
  return true;
 
5544
}
 
5545
 
 
5546
static int
 
5547
runtest(Par par)
 
5548
{
 
5549
  int totret = 0;
 
5550
  if (par.m_seed == -1) {
 
5551
    // good enough for daily run
 
5552
    ushort seed = (ushort)getpid();
 
5553
    LL0("random seed: " << seed);
 
5554
    srandom((uint)seed);
 
5555
  } else if (par.m_seed != 0) {
 
5556
    LL0("random seed: " << par.m_seed);
 
5557
    srandom(par.m_seed);
 
5558
  } else {
 
5559
    LL0("random seed: loop number");
 
5560
  }
 
5561
  // cs
 
5562
  assert(par.m_csname != 0);
 
5563
  if (strcmp(par.m_csname, "random") != 0) {
 
5564
    CHARSET_INFO* cs;
 
5565
    CHK((cs = get_charset_by_name(par.m_csname, MYF(0))) != 0 || (cs = get_charset_by_csname(par.m_csname, MY_CS_PRIMARY, MYF(0))) != 0);
 
5566
    par.m_cs = cs;
 
5567
  }
 
5568
  // con
 
5569
  Con con;
 
5570
  CHK(con.connect() == 0);
 
5571
  par.m_con = &con;
 
5572
  par.m_catcherr |= Con::ErrNospace;
 
5573
  // threads
 
5574
  g_thrlist = new Thr* [par.m_threads];
 
5575
  uint n;
 
5576
  for (n = 0; n < par.m_threads; n++) {
 
5577
    g_thrlist[n] = 0;
 
5578
  }
 
5579
  for (n = 0; n < par.m_threads; n++) {
 
5580
    g_thrlist[n] = new Thr(par, n);
 
5581
    Thr& thr = *g_thrlist[n];
 
5582
    assert(thr.m_thread != 0);
 
5583
  }
 
5584
  for (par.m_lno = 0; par.m_loop == 0 || par.m_lno < par.m_loop; par.m_lno++) {
 
5585
    LL1("loop: " << par.m_lno);
 
5586
    if (par.m_seed == 0) {
 
5587
      LL1("random seed: " << par.m_lno);
 
5588
      srandom(par.m_lno);
 
5589
    }
 
5590
    for (uint i = 0; i < tcasecount; i++) {
 
5591
      const TCase& tcase = tcaselist[i];
 
5592
      if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0 ||
 
5593
          par.m_skip != 0 && strchr(par.m_skip, tcase.m_name[0]) != 0) {
 
5594
        continue;
 
5595
      }
 
5596
      sprintf(par.m_currcase, "%c", tcase.m_name[0]);
 
5597
      par.m_usedthreads = par.m_threads;
 
5598
      if (!setcasepar(par)) {
 
5599
        LL1("case " << tcase.m_name << " cannot run with given options");
 
5600
        continue;
 
5601
      }
 
5602
      par.m_totrows = par.m_usedthreads * par.m_rows;
 
5603
      makebuiltintables(par);
 
5604
      LL1("case: " << par.m_lno << "/" << tcase.m_name << " - " << tcase.m_desc);
 
5605
      for (uint j = 0; j < tabcount; j++) {
 
5606
        if (tablist[j] == 0)
 
5607
          continue;
 
5608
        const Tab& tab = *tablist[j];
 
5609
        par.m_tab = &tab;
 
5610
        par.m_set = new Set(tab, par.m_totrows);
 
5611
        LL1("table: " << par.m_lno << "/" << tcase.m_name << "/" << tab.m_name);
 
5612
        int ret = tcase.m_func(par);
 
5613
        delete par.m_set;
 
5614
        par.m_set = 0;
 
5615
        if (ret == -1) {
 
5616
          if (!par.m_cont)
 
5617
            return -1;
 
5618
          totret = -1;
 
5619
          LL1("continue to next case due to -cont");
 
5620
          break;
 
5621
        }
 
5622
      }
 
5623
    }
 
5624
  }
 
5625
  for (n = 0; n < par.m_threads; n++) {
 
5626
    Thr& thr = *g_thrlist[n];
 
5627
    thr.exit();
 
5628
  }
 
5629
  for (n = 0; n < par.m_threads; n++) {
 
5630
    Thr& thr = *g_thrlist[n];
 
5631
    thr.join();
 
5632
    delete &thr;
 
5633
  }
 
5634
  delete [] g_thrlist;
 
5635
  g_thrlist = 0;
 
5636
  con.disconnect();
 
5637
  return totret;
 
5638
}
 
5639
 
 
5640
static const char* g_progname = "testOIBasic";
 
5641
 
 
5642
int
 
5643
main(int argc,  char** argv)
 
5644
{
 
5645
  ndb_init();
 
5646
  uint i;
 
5647
  ndbout << g_progname;
 
5648
  for (i = 1; i < argc; i++)
 
5649
    ndbout << " " << argv[i];
 
5650
  ndbout << endl;
 
5651
  ndbout_mutex = NdbMutex_Create();
 
5652
  while (++argv, --argc > 0) {
 
5653
    const char* arg = argv[0];
 
5654
    if (*arg != '-') {
 
5655
      ndbout << "testOIBasic: unknown argument " << arg;
 
5656
      goto usage;
 
5657
    }
 
5658
    if (strcmp(arg, "-batch") == 0) {
 
5659
      if (++argv, --argc > 0) {
 
5660
        g_opt.m_batch = atoi(argv[0]);
 
5661
        continue;
 
5662
      }
 
5663
    }
 
5664
    if (strcmp(arg, "-bound") == 0) {
 
5665
      if (++argv, --argc > 0) {
 
5666
        const char* p = argv[0];
 
5667
        if (strlen(p) != 0 && strlen(p) == strspn(p, "01234")) {
 
5668
          g_opt.m_bound = strdup(p);
 
5669
          continue;
 
5670
        }
 
5671
      }
 
5672
    }
 
5673
    if (strcmp(arg, "-case") == 0) {
 
5674
      if (++argv, --argc > 0) {
 
5675
        g_opt.m_case = strdup(argv[0]);
 
5676
        continue;
 
5677
      }
 
5678
    }
 
5679
    if (strcmp(arg, "-collsp") == 0) {
 
5680
      g_opt.m_collsp = true;
 
5681
      continue;
 
5682
    }
 
5683
    if (strcmp(arg, "-cont") == 0) {
 
5684
      g_opt.m_cont = true;
 
5685
      continue;
 
5686
    }
 
5687
    if (strcmp(arg, "-core") == 0) {
 
5688
      g_opt.m_core = true;
 
5689
      continue;
 
5690
    }
 
5691
    if (strcmp(arg, "-csname") == 0) {
 
5692
      if (++argv, --argc > 0) {
 
5693
        g_opt.m_csname = strdup(argv[0]);
 
5694
        continue;
 
5695
      }
 
5696
    }
 
5697
    if (strcmp(arg, "-die") == 0) {
 
5698
      if (++argv, --argc > 0) {
 
5699
        g_opt.m_die = atoi(argv[0]);
 
5700
        continue;
 
5701
      }
 
5702
    }
 
5703
    if (strcmp(arg, "-dups") == 0) {
 
5704
      g_opt.m_dups = true;
 
5705
      continue;
 
5706
    }
 
5707
    if (strcmp(arg, "-fragtype") == 0) {
 
5708
      if (++argv, --argc > 0) {
 
5709
        if (strcmp(argv[0], "single") == 0) {
 
5710
          g_opt.m_fragtype = NdbDictionary::Object::FragSingle;
 
5711
          continue;
 
5712
        }
 
5713
        if (strcmp(argv[0], "small") == 0) {
 
5714
          g_opt.m_fragtype = NdbDictionary::Object::FragAllSmall;
 
5715
          continue;
 
5716
        }
 
5717
        if (strcmp(argv[0], "medium") == 0) {
 
5718
          g_opt.m_fragtype = NdbDictionary::Object::FragAllMedium;
 
5719
          continue;
 
5720
        }
 
5721
        if (strcmp(argv[0], "large") == 0) {
 
5722
          g_opt.m_fragtype = NdbDictionary::Object::FragAllLarge;
 
5723
          continue;
 
5724
        }
 
5725
      }
 
5726
    }
 
5727
    if (strcmp(arg, "-index") == 0) {
 
5728
      if (++argv, --argc > 0) {
 
5729
        g_opt.m_index = strdup(argv[0]);
 
5730
        continue;
 
5731
      }
 
5732
    }
 
5733
    if (strcmp(arg, "-loop") == 0) {
 
5734
      if (++argv, --argc > 0) {
 
5735
        g_opt.m_loop = atoi(argv[0]);
 
5736
        continue;
 
5737
      }
 
5738
    }
 
5739
    if (strcmp(arg, "-nologging") == 0) {
 
5740
      g_opt.m_nologging = true;
 
5741
      continue;
 
5742
    }
 
5743
    if (strcmp(arg, "-noverify") == 0) {
 
5744
      g_opt.m_noverify = true;
 
5745
      continue;
 
5746
    }
 
5747
    if (strcmp(arg, "-pctnull") == 0) {
 
5748
      if (++argv, --argc > 0) {
 
5749
        g_opt.m_pctnull = atoi(argv[0]);
 
5750
        continue;
 
5751
      }
 
5752
    }
 
5753
    if (strcmp(arg, "-rows") == 0) {
 
5754
      if (++argv, --argc > 0) {
 
5755
        g_opt.m_rows = atoi(argv[0]);
 
5756
        continue;
 
5757
      }
 
5758
    }
 
5759
    if (strcmp(arg, "-samples") == 0) {
 
5760
      if (++argv, --argc > 0) {
 
5761
        g_opt.m_samples = atoi(argv[0]);
 
5762
        continue;
 
5763
      }
 
5764
    }
 
5765
    if (strcmp(arg, "-scanbatch") == 0) {
 
5766
      if (++argv, --argc > 0) {
 
5767
        g_opt.m_scanbatch = atoi(argv[0]);
 
5768
        continue;
 
5769
      }
 
5770
    }
 
5771
    if (strcmp(arg, "-scanpar") == 0) {
 
5772
      if (++argv, --argc > 0) {
 
5773
        g_opt.m_scanpar = atoi(argv[0]);
 
5774
        continue;
 
5775
      }
 
5776
    }
 
5777
    if (strcmp(arg, "-seed") == 0) {
 
5778
      if (++argv, --argc > 0) {
 
5779
        g_opt.m_seed = atoi(argv[0]);
 
5780
        continue;
 
5781
      }
 
5782
    }
 
5783
    if (strcmp(arg, "-skip") == 0) {
 
5784
      if (++argv, --argc > 0) {
 
5785
        g_opt.m_skip = strdup(argv[0]);
 
5786
        continue;
 
5787
      }
 
5788
    }
 
5789
    if (strcmp(arg, "-sloop") == 0) {
 
5790
      if (++argv, --argc > 0) {
 
5791
        g_opt.m_sloop = atoi(argv[0]);
 
5792
        continue;
 
5793
      }
 
5794
    }
 
5795
    if (strcmp(arg, "-ssloop") == 0) {
 
5796
      if (++argv, --argc > 0) {
 
5797
        g_opt.m_ssloop = atoi(argv[0]);
 
5798
        continue;
 
5799
      }
 
5800
    }
 
5801
    if (strcmp(arg, "-table") == 0) {
 
5802
      if (++argv, --argc > 0) {
 
5803
        g_opt.m_table = strdup(argv[0]);
 
5804
        continue;
 
5805
      }
 
5806
    }
 
5807
    if (strcmp(arg, "-threads") == 0) {
 
5808
      if (++argv, --argc > 0) {
 
5809
        g_opt.m_threads = atoi(argv[0]);
 
5810
        if (1 <= g_opt.m_threads)
 
5811
          continue;
 
5812
      }
 
5813
    }
 
5814
    if (strcmp(arg, "-v") == 0) {
 
5815
      if (++argv, --argc > 0) {
 
5816
        g_opt.m_v = atoi(argv[0]);
 
5817
        continue;
 
5818
      }
 
5819
    }
 
5820
    if (strncmp(arg, "-v", 2) == 0 && isdigit(arg[2])) {
 
5821
      g_opt.m_v = atoi(&arg[2]);
 
5822
      continue;
 
5823
    }
 
5824
    if (strcmp(arg, "-h") == 0 || strcmp(arg, "-help") == 0) {
 
5825
      printhelp();
 
5826
      goto wrongargs;
 
5827
    }
 
5828
    ndbout << "testOIBasic: bad or unknown option " << arg;
 
5829
    goto usage;
 
5830
  }
 
5831
  {
 
5832
    Par par(g_opt);
 
5833
    g_ncc = new Ndb_cluster_connection();
 
5834
    if (g_ncc->connect(30) != 0 || runtest(par) < 0)
 
5835
      goto failed;
 
5836
    delete g_ncc;
 
5837
    g_ncc = 0;
 
5838
  }
 
5839
ok:
 
5840
  return NDBT_ProgramExit(NDBT_OK);
 
5841
failed:
 
5842
  return NDBT_ProgramExit(NDBT_FAILED);
 
5843
usage:
 
5844
  ndbout << " (use -h for help)" << endl;
 
5845
wrongargs:
 
5846
  return NDBT_ProgramExit(NDBT_WRONGARGS);
 
5847
}
 
5848
 
 
5849
// vim: set sw=2 et: