2
Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
4
This program is free software; you can redistribute it and/or modify
5
it under the terms of the GNU General Public License as published by
6
the Free Software Foundation; version 2 of the License.
8
This program is distributed in the hope that it will be useful,
9
but WITHOUT ANY WARRANTY; without even the implied warranty of
10
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
GNU General Public License for more details.
13
You should have received a copy of the GNU General Public License
14
along with this program; if not, write to the Free Software
15
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20
#include <NdbRestarter.hpp>
21
#include <HugoOperations.hpp>
22
#include <HugoTransactions.hpp>
23
#include <UtilTransactions.hpp>
24
#include <signaldata/DumpStateOrd.hpp>
27
#include <InputStream.hpp>
40
static CASE g_op_types[] =
42
{ false, true, false, "INS", 0, 0, 0 }, // 0x001 a
43
{ true, true, false, "UPD", 0, 0, 0 }, // 0x002 d
44
{ true, false, false, "DEL", 0, 0, 0 }, // 0x004 g
46
{ false, true, false, "INS", "UPD", 0, 0 }, // 0x008 b
47
{ false, false, false, "INS", "DEL", 0, 0 }, // 0x010 c
48
{ true, true, false, "UPD", "UPD", 0, 0 }, // 0x020 e
49
{ true, false, false, "UPD", "DEL", 0, 0 }, // 0x040 f
50
{ true, true, false, "DEL", "INS", 0, 0 }, // 0x080 h
52
{ false, true, false, "INS", "DEL", "INS", 0 }, // 0x100 i
53
{ true, false, false, "DEL", "INS", "DEL", 0 } // 0x200 j
55
const size_t OP_COUNT = (sizeof(g_op_types)/sizeof(g_op_types[0]));
57
static Ndb* g_ndb = 0;
59
static Ndb_cluster_connection *g_cluster_connection= 0;
60
static HugoOperations* g_hugo_ops;
61
static int g_use_ops = 1 | 2 | 4;
62
static int g_cases = 0x1;
63
static int g_case_loop = 2;
64
static int g_rows = 10;
65
static int g_setup_tables = 1;
66
static int g_one_op_at_a_time = 0;
67
static const char * g_tablename = "T1";
68
static const NdbDictionary::Table* g_table = 0;
69
static NdbRestarter g_restarter;
71
static int init_ndb(int argc, char** argv);
72
static int parse_args(int argc, char** argv);
73
static int connect_ndb();
74
static int drop_all_tables();
75
static int load_table();
76
static int pause_lcp(int error);
77
static int do_op(int row);
78
static int continue_lcp(int error = 0);
81
static int validate();
84
main(int argc, char ** argv){
86
require(!init_ndb(argc, argv));
87
if(parse_args(argc, argv))
89
require(!connect_ndb());
92
require(!drop_all_tables());
94
if(NDBT_Tables::createTable(g_ndb, g_tablename) != 0){
99
g_table = g_ndb->getDictionary()->getTable(g_tablename);
101
g_err << "Failed to retreive table: " << g_tablename << endl;
104
require((g_hugo_ops = new HugoOperations(* g_table)) != 0);
105
require(!g_hugo_ops->startTransaction(g_ndb));
107
g_ops= new CASE[g_rows];
109
const int use_ops = g_use_ops;
110
for(size_t i = 0; i<OP_COUNT; i++)
112
if(g_one_op_at_a_time){
113
while(i < OP_COUNT && (use_ops & (1 << i)) == 0) i++;
116
ndbout_c("-- loop\noperation: %c use_ops: %x", int('a'+i), use_ops);
117
g_use_ops = (1 << i);
122
size_t test_case = 0;
123
if((1 << test_case++) & g_cases)
125
for(size_t tl = 0; tl<(size_t)g_case_loop; tl++){
126
g_info << "Performing all ops wo/ inteference of LCP" << endl;
128
g_info << "Testing pre LCP operations, ZLCP_OP_WRITE_RT_BREAK" << endl;
129
g_info << " where ZLCP_OP_WRITE_RT_BREAK is "
130
" finished before SAVE_PAGES" << endl;
131
require(!load_table());
132
require(!pause_lcp(5900));
133
for(size_t j = 0; j<(size_t)g_rows; j++){
136
require(!continue_lcp(5900));
138
require(!pause_lcp(5900));
140
require(!validate());
144
if((1 << test_case++) & g_cases)
146
for(size_t tl = 0; tl<(size_t)g_case_loop; tl++){
147
g_info << "Testing pre LCP operations, ZLCP_OP_WRITE_RT_BREAK" << endl;
148
g_info << " where ZLCP_OP_WRITE_RT_BREAK is finished after SAVE_PAGES"
150
require(!load_table());
151
require(!pause_lcp(5901));
152
for(size_t j = 0; j<(size_t)g_rows; j++){
155
require(!continue_lcp(5901));
157
require(!pause_lcp(5900));
159
require(!validate());
163
if((1 << test_case++) & g_cases)
165
for(size_t tl = 0; tl<(size_t)g_case_loop; tl++){
166
g_info << "Testing pre LCP operations, undo-ed at commit" << endl;
167
require(!load_table());
168
require(!pause_lcp(5902));
169
for(size_t j = 0; j<(size_t)g_rows; j++){
172
require(!continue_lcp(5902));
174
require(!continue_lcp(5903));
175
require(!pause_lcp(5900));
177
require(!validate());
181
if((1 << test_case++) & g_cases)
183
for(size_t tl = 0; tl<(size_t)g_case_loop; tl++){
184
g_info << "Testing prepared during LCP and committed after" << endl;
185
require(!load_table());
186
require(!pause_lcp(5904)); // Start LCP, but don't save pages
187
for(size_t j = 0; j<(size_t)g_rows; j++){
190
require(!continue_lcp(5904)); // Start ACC save pages
191
require(!pause_lcp(5900)); // Next LCP
194
require(!validate());
200
static int init_ndb(int argc, char** argv)
206
static int parse_args(int argc, char** argv)
209
char * ops= 0, *cases=0;
210
struct getargs args[] = {
211
{ "records", 0, arg_integer, &g_rows, "Number of records", "records" },
212
{ "operations", 'o', arg_string, &ops, "Operations [a-h]", 0 },
213
{ "1", '1', arg_flag, &g_one_op_at_a_time, "One op at a time", 0 },
214
{ "0", '0', arg_negative_flag, &g_one_op_at_a_time, "All ops at once", 0 },
215
{ "cases", 'c', arg_string, &cases, "Cases [a-c]", 0 },
216
{ 0, 't', arg_flag, &g_setup_tables, "Create table", 0 },
217
{ 0, 'u', arg_negative_flag, &g_setup_tables, "Dont create table", 0 }
221
const int num_args = sizeof(args)/sizeof(args[0]);
222
if(getarg(args, num_args, argc, (const char**)argv, &optind)) {
223
arg_printusage(args, num_args, argv[0], " tabname1\n");
224
ndbout_c("\n -- Operations [a-%c] = ", int('a'+OP_COUNT-1));
225
for(i = 0; i<OP_COUNT; i++){
226
ndbout_c("\t%c = %s %s",
227
int('a'+i), g_op_types[i].op1,
228
g_op_types[i].op2 ? g_op_types[i].op2 : "");
237
g_use_ops |= (1 << ((* s++) - 'a'));
244
g_cases |= (1 << ((* s++) - 'a'));
247
ndbout_c("table: %s", g_tablename);
248
printf("operations: ");
249
for(i = 0; i<OP_COUNT; i++)
250
if(g_use_ops & (1 << i))
251
printf("%c", int('a'+i));
254
printf("test cases: ");
256
if(g_cases & (1 << i))
257
printf("%c", int('1'+i));
259
printf("-------------\n");
263
static int connect_ndb()
265
g_cluster_connection = new Ndb_cluster_connection();
266
if(g_cluster_connection->connect(12, 5, 1) != 0)
271
g_ndb = new Ndb(g_cluster_connection, "TEST_DB");
273
if(g_ndb->waitUntilReady(30) == 0){
275
// int args[] = { DumpStateOrd::DihMaxTimeBetweenLCP };
276
// return g_restarter.dumpStateAllNodes(args, 1);
281
static int disconnect_ndb()
284
delete g_cluster_connection;
287
g_cluster_connection= 0;
291
static int drop_all_tables()
293
NdbDictionary::Dictionary * dict = g_ndb->getDictionary();
296
BaseString db = g_ndb->getDatabaseName();
297
BaseString schema = g_ndb->getSchemaName();
299
NdbDictionary::Dictionary::List list;
300
if (dict->listObjects(list, NdbDictionary::Object::TypeUndefined) == -1){
301
g_err << "Failed to list tables: " << endl
302
<< dict->getNdbError() << endl;
305
for (unsigned i = 0; i < list.count; i++) {
306
NdbDictionary::Dictionary::List::Element& elt = list.elements[i];
308
case NdbDictionary::Object::SystemTable:
309
case NdbDictionary::Object::UserTable:
310
g_ndb->setDatabaseName(elt.database);
311
g_ndb->setSchemaName(elt.schema);
312
if(dict->dropTable(elt.name) != 0){
313
g_err << "Failed to drop table: "
314
<< elt.database << "/" << elt.schema << "/" << elt.name <<endl;
315
g_err << dict->getNdbError() << endl;
319
case NdbDictionary::Object::UniqueHashIndex:
320
case NdbDictionary::Object::OrderedIndex:
321
case NdbDictionary::Object::HashIndexTrigger:
322
case NdbDictionary::Object::IndexTrigger:
323
case NdbDictionary::Object::SubscriptionTrigger:
324
case NdbDictionary::Object::ReadOnlyConstraint:
330
g_ndb->setDatabaseName(db.c_str());
331
g_ndb->setSchemaName(schema.c_str());
336
static int load_table()
338
UtilTransactions clear(* g_table);
339
require(!clear.clearTable(g_ndb));
341
HugoOperations ops(* g_table);
342
require(!ops.startTransaction(g_ndb));
345
size_t uncommitted = 0;
346
//bool prepared = false;
347
for(size_t i = 0; i<(size_t)g_rows; i++){
348
for(op %= OP_COUNT; !((1 << op) & g_use_ops); op = (op + 1) % OP_COUNT);
349
g_ops[i] = g_op_types[op++];
350
if(g_ops[i].start_row){
351
g_ops[i].curr_row = true;
352
g_ops[i].val = rand();
353
require(!ops.pkInsertRecord(g_ndb, i, 1, g_ops[i].val));
356
g_ops[i].curr_row = false;
358
if(uncommitted >= 100){
359
require(!ops.execute_Commit(g_ndb));
360
require(!ops.getTransaction()->restart());
366
require(!ops.execute_Commit(g_ndb));
368
require(!ops.closeTransaction(g_ndb));
370
g_info << "Inserted " << rows << " rows" << endl;
374
static int pause_lcp(int error)
376
int nodes = g_restarter.getNumDbNodes();
378
int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_INFO, 0 };
380
NDB_SOCKET_TYPE my_fd;
382
SOCKET fd= ndb_mgm_listen_event(g_restarter.handle, filter);
385
int fd = ndb_mgm_listen_event(g_restarter.handle, filter);
389
require(my_socket_valid(my_fd));
390
require(!g_restarter.insertErrorInAllNodes(error));
391
int dump[] = { DumpStateOrd::DihStartLcpImmediately };
392
require(!g_restarter.dumpStateAllNodes(dump, 1));
396
SocketInputStream in(my_fd, 1000);
399
tmp = in.gets(buf, 1024);
403
if(sscanf(tmp, "%*[^:]: LCP: %d ", &id) == 1 && id == error &&
405
my_socket_close(my_fd);
409
} while(count++ < 30);
411
my_socket_close(my_fd);
415
static int do_op(int row)
417
HugoOperations & ops = * g_hugo_ops;
418
if(strcmp(g_ops[row].op1, "INS") == 0){
419
require(!g_ops[row].curr_row);
420
g_ops[row].curr_row = true;
421
g_ops[row].val = rand();
422
require(!ops.pkInsertRecord(g_ndb, row, 1, g_ops[row].val));
423
} else if(strcmp(g_ops[row].op1, "UPD") == 0){
424
require(g_ops[row].curr_row);
425
g_ops[row].val = rand();
426
require(!ops.pkUpdateRecord(g_ndb, row, 1, g_ops[row].val));
427
} else if(strcmp(g_ops[row].op1, "DEL") == 0){
428
require(g_ops[row].curr_row);
429
g_ops[row].curr_row = false;
430
require(!ops.pkDeleteRecord(g_ndb, row, 1));
433
require(!ops.execute_NoCommit(g_ndb));
435
if(g_ops[row].op2 == 0){
436
} else if(strcmp(g_ops[row].op2, "INS") == 0){
437
require(!g_ops[row].curr_row);
438
g_ops[row].curr_row = true;
439
g_ops[row].val = rand();
440
require(!ops.pkInsertRecord(g_ndb, row, 1, g_ops[row].val));
441
} else if(strcmp(g_ops[row].op2, "UPD") == 0){
442
require(g_ops[row].curr_row);
443
g_ops[row].val = rand();
444
require(!ops.pkUpdateRecord(g_ndb, row, 1, g_ops[row].val));
445
} else if(strcmp(g_ops[row].op2, "DEL") == 0){
446
require(g_ops[row].curr_row);
447
g_ops[row].curr_row = false;
448
require(!ops.pkDeleteRecord(g_ndb, row, 1));
451
if(g_ops[row].op2 != 0)
452
require(!ops.execute_NoCommit(g_ndb));
454
if(g_ops[row].op3 == 0){
455
} else if(strcmp(g_ops[row].op3, "INS") == 0){
456
require(!g_ops[row].curr_row);
457
g_ops[row].curr_row = true;
458
g_ops[row].val = rand();
459
require(!ops.pkInsertRecord(g_ndb, row, 1, g_ops[row].val));
460
} else if(strcmp(g_ops[row].op3, "UPD") == 0){
461
require(g_ops[row].curr_row);
462
g_ops[row].val = rand();
463
require(!ops.pkUpdateRecord(g_ndb, row, 1, g_ops[row].val));
464
} else if(strcmp(g_ops[row].op3, "DEL") == 0){
465
require(g_ops[row].curr_row);
466
g_ops[row].curr_row = false;
467
require(!ops.pkDeleteRecord(g_ndb, row, 1));
470
if(g_ops[row].op3 != 0)
471
require(!ops.execute_NoCommit(g_ndb));
476
static int continue_lcp(int error)
478
int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_INFO, 0 };
479
NDB_SOCKET_TYPE my_fd;
480
my_socket_invalidate(&my_fd);
488
fd = ndb_mgm_listen_event(g_restarter.handle, filter);
494
require(my_socket_valid(my_fd));
497
int args[] = { DumpStateOrd::LCPContinue };
498
if(g_restarter.dumpStateAllNodes(args, 1) != 0)
504
SocketInputStream in(my_fd, 1000);
506
int nodes = g_restarter.getNumDbNodes();
508
tmp = in.gets(buf, 1024);
512
if(sscanf(tmp, "%*[^:]: LCP: %d ", &id) == 1 && id == error &&
514
my_socket_close(my_fd);
518
} while(count++ < 30);
520
my_socket_close(my_fd);
527
HugoOperations & ops = * g_hugo_ops;
528
int res = ops.execute_Commit(g_ndb);
530
return ops.getTransaction()->restart();
537
g_info << "Restarting cluster" << endl;
538
g_hugo_ops->closeTransaction(g_ndb);
542
require(!g_restarter.restartAll());
543
require(!g_restarter.waitClusterStarted(30));
544
require(!connect_ndb());
546
g_table = g_ndb->getDictionary()->getTable(g_tablename);
548
require((g_hugo_ops = new HugoOperations(* g_table)) != 0);
549
require(!g_hugo_ops->startTransaction(g_ndb));
553
static int validate()
555
HugoOperations ops(* g_table);
556
for(size_t i = 0; i<(size_t)g_rows; i++){
557
require(g_ops[i].curr_row == g_ops[i].end_row);
558
require(!ops.startTransaction(g_ndb));
559
ops.pkReadRecord(g_ndb, i, 1);
560
int res = ops.execute_Commit(g_ndb);
561
if(g_ops[i].curr_row){
562
require(res == 0 && ops.verifyUpdatesValue(g_ops[i].val) == 0);
566
ops.closeTransaction(g_ndb);
569
for(size_t j = 0; j<10; j++){
570
UtilTransactions clear(* g_table);
571
require(!clear.clearTable(g_ndb));
573
HugoTransactions trans(* g_table);
574
require(trans.loadTable(g_ndb, 1024) == 0);