1
/* Copyright (C) 2003 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#include "UtilTransactions.hpp"
18
#include <NdbScanFilter.hpp>
22
UtilTransactions::UtilTransactions(const NdbDictionary::Table& _tab,
23
const NdbDictionary::Index* _idx):
24
tab(_tab), idx(_idx), pTrans(0)
26
m_defaultClearMethod = 3;
29
UtilTransactions::UtilTransactions(Ndb* ndb,
32
tab(* ndb->getDictionary()->getTable(name)),
33
idx(index ? ndb->getDictionary()->getIndex(index, name) : 0),
36
m_defaultClearMethod = 3;
39
#define RESTART_SCAN 99
41
#define RETURN_FAIL(err) return (err.code != 0 ? err.code : NDBT_FAILED)
44
UtilTransactions::clearTable(Ndb* pNdb,
45
NdbScanOperation::ScanFlag flags,
48
// Scan all records exclusive and delete
51
const int retryMax = 10;
54
NdbScanOperation *pOp;
57
int par = parallelism;
60
if (retryAttempt++ >= retryMax){
61
g_info << "ERROR: has retried this operation " << retryAttempt
62
<< " times, failing!" << endl;
66
pTrans = pNdb->startTransaction();
68
err = pNdb->getNdbError();
69
if (err.status == NdbError::TemporaryError){
71
NdbSleep_MilliSleep(50);
77
pOp = getScanOperation(pTrans);
79
err = pTrans->getNdbError();
80
if(err.status == NdbError::TemporaryError){
82
closeTransaction(pNdb);
83
NdbSleep_MilliSleep(50);
90
if( pOp->readTuples(NdbOperation::LM_Exclusive, flags, par) ) {
91
err = pTrans->getNdbError();
95
if(pTrans->execute(NoCommit, AbortOnError) != 0){
96
err = pTrans->getNdbError();
97
if(err.status == NdbError::TemporaryError){
99
closeTransaction(pNdb);
100
NdbSleep_MilliSleep(50);
106
while((check = pOp->nextResult(true)) == 0){
108
if (pOp->deleteCurrentTuple() != 0){
112
} while((check = pOp->nextResult(false)) == 0);
115
check = pTrans->execute(Commit, AbortOnError);
119
err = pTrans->getNdbError();
121
if(err.status == NdbError::TemporaryError){
123
closeTransaction(pNdb);
124
NdbSleep_MilliSleep(50);
132
err = pTrans->getNdbError();
133
if(err.status == NdbError::TemporaryError){
135
closeTransaction(pNdb);
136
NdbSleep_MilliSleep(50);
142
closeTransaction(pNdb);
148
if(pTrans != 0) closeTransaction(pNdb);
150
return (err.code != 0 ? err.code : NDBT_FAILED);
154
UtilTransactions::clearTable(Ndb* pNdb,
158
return clearTable(pNdb, (NdbScanOperation::ScanFlag)0,
159
records, parallelism);
164
UtilTransactions::clearTable1(Ndb* pNdb,
168
return clearTable(pNdb, (NdbScanOperation::ScanFlag)0,
173
UtilTransactions::clearTable2(Ndb* pNdb,
177
return clearTable(pNdb, (NdbScanOperation::ScanFlag)0,
178
records, parallelism);
182
UtilTransactions::clearTable3(Ndb* pNdb,
186
return clearTable(pNdb, (NdbScanOperation::ScanFlag)0,
187
records, parallelism);
191
UtilTransactions::copyTableData(Ndb* pNdb,
192
const char* destName){
193
// Scan all records and copy
194
// them to destName table
195
int retryAttempt = 0;
196
const int retryMax = 10;
197
int insertedRows = 0;
198
int parallelism = 240;
200
NdbScanOperation *pOp;
201
NDBT_ResultRow row(tab);
205
if (retryAttempt >= retryMax){
206
g_info << "ERROR: has retried this operation " << retryAttempt
207
<< " times, failing!" << endl;
212
pTrans = pNdb->startTransaction();
213
if (pTrans == NULL) {
214
const NdbError err = pNdb->getNdbError();
216
if (err.status == NdbError::TemporaryError){
218
NdbSleep_MilliSleep(50);
226
pOp = pTrans->getNdbScanOperation(tab.getName());
228
ERR(pTrans->getNdbError());
229
closeTransaction(pNdb);
233
if( pOp->readTuples(NdbScanOperation::LM_Read, parallelism) ) {
234
ERR(pTrans->getNdbError());
235
closeTransaction(pNdb);
239
check = pOp->interpret_exit_ok();
241
ERR(pTrans->getNdbError());
242
closeTransaction(pNdb);
246
// Read all attributes
247
for (int a = 0; a < tab.getNoOfColumns(); a++){
248
if ((row.attributeStore(a) =
249
pOp->getValue(tab.getColumn(a)->getName())) == 0) {
250
ERR(pTrans->getNdbError());
251
closeTransaction(pNdb);
256
check = pTrans->execute(NoCommit, AbortOnError);
258
ERR(pTrans->getNdbError());
259
closeTransaction(pNdb);
264
while((eof = pOp->nextResult(true)) == 0){
267
if (addRowToInsert(pNdb, pTrans, row, destName) != 0){
268
closeTransaction(pNdb);
271
} while((eof = pOp->nextResult(false)) == 0);
273
check = pTrans->execute(Commit, AbortOnError);
276
const NdbError err = pTrans->getNdbError();
278
closeTransaction(pNdb);
283
const NdbError err = pTrans->getNdbError();
285
if (err.status == NdbError::TemporaryError){
287
closeTransaction(pNdb);
288
NdbSleep_MilliSleep(50);
289
// If error = 488 there should be no limit on number of retry attempts
295
closeTransaction(pNdb);
299
closeTransaction(pNdb);
301
g_info << insertedRows << " rows copied" << endl;
309
UtilTransactions::addRowToInsert(Ndb* pNdb,
310
NdbConnection* pInsTrans,
311
NDBT_ResultRow & row,
312
const char *insertTabName){
315
NdbOperation* pInsOp;
317
pInsOp = pInsTrans->getNdbOperation(insertTabName);
318
if (pInsOp == NULL) {
319
ERR(pInsTrans->getNdbError());
323
check = pInsOp->insertTuple();
325
ERR(pInsTrans->getNdbError());
329
// Set all attributes
330
for (int a = 0; a < tab.getNoOfColumns(); a++){
331
NdbRecAttr* r = row.attributeStore(a);
332
int sz = r->get_size_in_bytes();
333
if (pInsOp->setValue(tab.getColumn(a)->getName(),
336
ERR(pInsTrans->getNdbError());
346
UtilTransactions::scanReadRecords(Ndb* pNdb,
348
NdbOperation::LockMode lm,
354
int retryAttempt = 0;
355
const int retryMax = 100;
357
NdbScanOperation *pOp;
358
NDBT_ResultRow row(tab);
362
if (retryAttempt >= retryMax){
363
g_info << "ERROR: has retried this operation " << retryAttempt
364
<< " times, failing!" << endl;
368
pTrans = pNdb->startTransaction();
369
if (pTrans == NULL) {
370
const NdbError err = pNdb->getNdbError();
372
if (err.status == NdbError::TemporaryError){
374
NdbSleep_MilliSleep(50);
382
pOp = getScanOperation(pTrans);
384
const NdbError err = pNdb->getNdbError();
385
closeTransaction(pNdb);
387
if (err.status == NdbError::TemporaryError){
389
NdbSleep_MilliSleep(50);
397
if( pOp->readTuples(lm, 0, parallelism) ) {
398
ERR(pTrans->getNdbError());
399
closeTransaction(pNdb);
403
check = pOp->interpret_exit_ok();
405
ERR(pTrans->getNdbError());
406
closeTransaction(pNdb);
410
// Call getValue for all the attributes supplied in attrib_list
411
// ************************************************
412
for (int a = 0; a < noAttribs; a++){
413
if (attrib_list[a] < tab.getNoOfColumns()){
414
g_info << "getValue(" << attrib_list[a] << ")" << endl;
415
if ((row.attributeStore(attrib_list[a]) =
416
pOp->getValue(tab.getColumn(attrib_list[a])->getName())) == 0) {
417
ERR(pTrans->getNdbError());
418
closeTransaction(pNdb);
423
// *************************************************
425
check = pTrans->execute(NoCommit, AbortOnError);
427
const NdbError err = pTrans->getNdbError();
429
if (err.status == NdbError::TemporaryError){
431
closeTransaction(pNdb);
432
NdbSleep_MilliSleep(50);
437
closeTransaction(pNdb);
445
while((eof = pOp->nextResult()) == 0){
448
// Call callback for each record returned
453
const NdbError err = pTrans->getNdbError();
455
if (err.status == NdbError::TemporaryError){
457
closeTransaction(pNdb);
458
NdbSleep_MilliSleep(50);
463
closeTransaction(pNdb);
467
closeTransaction(pNdb);
468
g_info << rows << " rows have been read" << endl;
469
if (records != 0 && rows != records){
470
g_info << "Check expected number of records failed" << endl
471
<< " expected=" << records <<", " << endl
472
<< " read=" << rows << endl;
482
UtilTransactions::selectCount(Ndb* pNdb,
485
NdbOperation::LockMode lm,
486
NdbConnection* pTrans){
488
int retryAttempt = 0;
489
const int retryMax = 100;
491
NdbScanOperation *pOp;
494
pTrans = pNdb->startTransaction();
498
if (retryAttempt >= retryMax){
499
g_info << "ERROR: has retried this operation " << retryAttempt
500
<< " times, failing!" << endl;
503
pOp = getScanOperation(pTrans);
505
ERR(pTrans->getNdbError());
506
closeTransaction(pNdb);
510
if( pOp->readTuples(lm) ) {
511
ERR(pTrans->getNdbError());
512
closeTransaction(pNdb);
517
NdbScanFilter sf(pOp);
518
sf.begin(NdbScanFilter::OR);
519
sf.eq(2, (Uint32)30);
522
check = pOp->interpret_exit_ok();
524
ERR(pTrans->getNdbError());
525
closeTransaction(pNdb);
531
check = pTrans->execute(NoCommit, AbortOnError);
533
ERR(pTrans->getNdbError());
534
closeTransaction(pNdb);
542
while((eof = pOp->nextResult()) == 0){
546
const NdbError err = pTrans->getNdbError();
548
if (err.status == NdbError::TemporaryError){
549
closeTransaction(pNdb);
550
NdbSleep_MilliSleep(50);
555
closeTransaction(pNdb);
559
closeTransaction(pNdb);
561
if (count_rows != NULL){
571
UtilTransactions::verifyIndex(Ndb* pNdb,
572
const char* indexName,
577
const NdbDictionary::Index* pIndex
578
= pNdb->getDictionary()->getIndex(indexName, tab.getName());
580
ndbout << " Index " << indexName << " does not exist!" << endl;
584
switch (pIndex->getType()){
585
case NdbDictionary::Index::UniqueHashIndex:
586
return verifyUniqueIndex(pNdb, pIndex, parallelism, transactional);
587
case NdbDictionary::Index::OrderedIndex:
588
return verifyOrderedIndex(pNdb, pIndex, parallelism, transactional);
591
ndbout << "Unknown index type" << endl;
599
UtilTransactions::verifyUniqueIndex(Ndb* pNdb,
600
const NdbDictionary::Index * pIndex,
605
* Scan all rows in TABLE and for each found row make one read in
606
* TABLE and one using INDEX_TABLE. Then compare the two returned
607
* rows. They should be equal!
611
if (scanAndCompareUniqueIndex(pNdb,
614
transactional) != NDBT_OK){
625
UtilTransactions::scanAndCompareUniqueIndex(Ndb* pNdb,
626
const NdbDictionary::Index* pIndex,
630
int retryAttempt = 0;
631
const int retryMax = 100;
633
NdbScanOperation *pOp;
634
NDBT_ResultRow row(tab);
640
if (retryAttempt >= retryMax){
641
g_info << "ERROR: has retried this operation " << retryAttempt
642
<< " times, failing!" << endl;
646
pTrans = pNdb->startTransaction();
647
if (pTrans == NULL) {
648
const NdbError err = pNdb->getNdbError();
650
if (err.status == NdbError::TemporaryError){
652
NdbSleep_MilliSleep(50);
660
pOp = pTrans->getNdbScanOperation(tab.getName());
662
const NdbError err = pNdb->getNdbError();
663
closeTransaction(pNdb);
666
if (err.status == NdbError::TemporaryError){
667
NdbSleep_MilliSleep(50);
676
rs = pOp->readTuples(NdbScanOperation::LM_Read, 0, parallelism);
678
rs = pOp->readTuples(NdbScanOperation::LM_CommittedRead, 0, parallelism);
682
ERR(pTrans->getNdbError());
683
closeTransaction(pNdb);
687
check = pOp->interpret_exit_ok();
689
ERR(pTrans->getNdbError());
690
closeTransaction(pNdb);
694
// Read all attributes
695
for (int a = 0; a < tab.getNoOfColumns(); a++){
696
if ((row.attributeStore(a) =
697
pOp->getValue(tab.getColumn(a)->getName())) == 0) {
698
ERR(pTrans->getNdbError());
699
closeTransaction(pNdb);
704
check = pTrans->execute(NoCommit, AbortOnError);
706
const NdbError err = pTrans->getNdbError();
708
if (err.status == NdbError::TemporaryError){
710
closeTransaction(pNdb);
711
NdbSleep_MilliSleep(50);
716
closeTransaction(pNdb);
724
while((eof = pOp->nextResult()) == 0){
727
// ndbout << row.c_str().c_str() << endl;
729
if (readRowFromTableAndIndex(pNdb,
734
while((eof= pOp->nextResult(false)) == 0);
736
eof = pOp->nextResult(true); // this should give -1
739
const NdbError err = pTrans->getNdbError();
741
if (err.status == NdbError::TemporaryError){
743
closeTransaction(pNdb);
744
NdbSleep_MilliSleep(50);
749
closeTransaction(pNdb);
754
const NdbError err = pTrans->getNdbError();
756
if (err.status == NdbError::TemporaryError){
758
closeTransaction(pNdb);
759
NdbSleep_MilliSleep(50);
764
closeTransaction(pNdb);
768
closeTransaction(pNdb);
775
UtilTransactions::readRowFromTableAndIndex(Ndb* pNdb,
776
NdbConnection* scanTrans,
777
const NdbDictionary::Index* pIndex,
778
NDBT_ResultRow& row ){
781
NdbDictionary::Index::Type indexType= pIndex->getType();
782
int retryAttempt = 0;
783
const int retryMax = 100;
785
NdbConnection *pTrans1=NULL;
788
int return_code= NDBT_FAILED;
790
// Allocate place to store the result
791
NDBT_ResultRow tabRow(tab);
792
NDBT_ResultRow indexRow(tab);
793
const char * indexName = pIndex->getName();
797
ndbout_c("retryAttempt %d", retryAttempt);
798
if (retryAttempt >= retryMax){
799
g_info << "ERROR: has retried this operation " << retryAttempt
800
<< " times, failing!" << endl;
804
pTrans1 = pNdb->hupp(scanTrans); //startTransaction();
805
if (pTrans1 == NULL) {
806
const NdbError err = pNdb->getNdbError();
808
if (err.status == NdbError::TemporaryError){
810
NdbSleep_MilliSleep(50);
816
return_code = NDBT_OK;
824
* Read the record from TABLE
826
pOp = pTrans1->getNdbOperation(tab.getName());
828
ERR(pTrans1->getNdbError());
832
check = pOp->readTuple();
834
ERR(pTrans1->getNdbError());
838
// Define primary keys
842
for(a = 0; a<tab.getNoOfColumns(); a++){
843
const NdbDictionary::Column* attr = tab.getColumn(a);
844
if (attr->getPrimaryKey() == true){
845
if (pOp->equal(attr->getName(), row.attributeStore(a)->aRef()) != 0){
846
ERR(pTrans1->getNdbError());
850
printf("%s = %d: ", attr->getName(), row.attributeStore(a)->aRef());
857
// Read all attributes
859
printf("Reading %u attributes: ", tab.getNoOfColumns());
861
for(a = 0; a<tab.getNoOfColumns(); a++){
862
if((tabRow.attributeStore(a) =
863
pOp->getValue(tab.getColumn(a)->getName())) == 0) {
864
ERR(pTrans1->getNdbError());
868
printf("%s ", tab.getColumn(a)->getName());
876
* Read the record from INDEX_TABLE
878
NdbIndexOperation* pIndexOp= NULL;
879
NdbIndexScanOperation *pScanOp= NULL;
880
NdbOperation *pIOp= 0;
882
bool null_found= false;
883
for(a = 0; a<(int)pIndex->getNoOfColumns(); a++){
884
const NdbDictionary::Column * col = pIndex->getColumn(a);
886
if (row.attributeStore(col->getName())->isNULL())
893
const char * tabName= tab.getName();
896
if (indexType == NdbDictionary::Index::UniqueHashIndex) {
897
pIOp= pIndexOp= pTrans1->getNdbIndexOperation(indexName, tabName);
899
pIOp= pScanOp= pTrans1->getNdbIndexScanOperation(indexName, tabName);
903
ERR(pTrans1->getNdbError());
910
not_ok = pIndexOp->readTuple() == -1;
912
not_ok = pScanOp->readTuples();
916
ERR(pTrans1->getNdbError());
921
// Define primary keys for index
925
for(a = 0; a<(int)pIndex->getNoOfColumns(); a++){
926
const NdbDictionary::Column * col = pIndex->getColumn(a);
929
if ( !row.attributeStore(col->getName())->isNULL() ) {
930
if(pIOp->equal(col->getName(),
931
row.attributeStore(col->getName())->aRef()) != 0){
932
ERR(pTrans1->getNdbError());
937
printf("%s = %d: ", col->getName(), row.attributeStore(a)->aRef());
944
// Read all attributes
946
printf("Reading %u attributes: ", tab.getNoOfColumns());
948
for(a = 0; a<tab.getNoOfColumns(); a++){
951
pCheck= indexRow.attributeStore(a)=
952
pIOp->getValue(tab.getColumn(a)->getName());
955
ERR(pTrans1->getNdbError());
959
printf("%s ", tab.getColumn(a)->getName());
966
scanTrans->refresh();
967
check = pTrans1->execute(Commit, AbortOnError);
969
const NdbError err = pTrans1->getNdbError();
971
if (err.status == NdbError::TemporaryError){
973
pNdb->closeTransaction(pTrans1);
974
NdbSleep_MilliSleep(50);
978
ndbout << "Error when comparing records - normal op" << endl;
980
ndbout << "row: " << row.c_str().c_str() << endl;
985
* Compare the two rows
989
if (pScanOp->nextResult() != 0){
990
const NdbError err = pTrans1->getNdbError();
992
ndbout << "Error when comparing records - index op next_result missing" << endl;
993
ndbout << "row: " << row.c_str().c_str() << endl;
997
if (!(tabRow.c_str() == indexRow.c_str())){
998
ndbout << "Error when comapring records" << endl;
999
ndbout << " tabRow: \n" << tabRow.c_str().c_str() << endl;
1000
ndbout << " indexRow: \n" << indexRow.c_str().c_str() << endl;
1004
if (pScanOp->nextResult() == 0){
1005
ndbout << "Error when comparing records - index op next_result to many" << endl;
1006
ndbout << "row: " << row.c_str().c_str() << endl;
1011
return_code= NDBT_OK;
1017
pNdb->closeTransaction(pTrans1);
1023
UtilTransactions::verifyOrderedIndex(Ndb* pNdb,
1024
const NdbDictionary::Index* pIndex,
1026
bool transactional){
1028
int retryAttempt = 0;
1029
const int retryMax = 100;
1031
NdbScanOperation *pOp;
1032
NdbIndexScanOperation * iop = 0;
1034
NDBT_ResultRow scanRow(tab);
1035
NDBT_ResultRow pkRow(tab);
1036
NDBT_ResultRow indexRow(tab);
1037
const char * indexName = pIndex->getName();
1044
if (retryAttempt >= retryMax){
1045
g_info << "ERROR: has retried this operation " << retryAttempt
1046
<< " times, failing!" << endl;
1050
pTrans = pNdb->startTransaction();
1051
if (pTrans == NULL) {
1052
const NdbError err = pNdb->getNdbError();
1054
if (err.status == NdbError::TemporaryError){
1056
NdbSleep_MilliSleep(50);
1064
pOp = pTrans->getNdbScanOperation(tab.getName());
1066
ERR(pTrans->getNdbError());
1067
closeTransaction(pNdb);
1071
if( pOp->readTuples(NdbScanOperation::LM_Read, 0, parallelism) ) {
1072
ERR(pTrans->getNdbError());
1073
closeTransaction(pNdb);
1077
check = pOp->interpret_exit_ok();
1079
ERR(pTrans->getNdbError());
1080
closeTransaction(pNdb);
1084
if(get_values(pOp, scanRow))
1089
check = pTrans->execute(NoCommit, AbortOnError);
1091
const NdbError err = pTrans->getNdbError();
1093
if (err.status == NdbError::TemporaryError){
1095
closeTransaction(pNdb);
1096
NdbSleep_MilliSleep(50);
1101
closeTransaction(pNdb);
1107
while(check == 0 && (eof = pOp->nextResult()) == 0){
1110
bool null_found= false;
1111
for(int a = 0; a<(int)pIndex->getNoOfColumns(); a++){
1112
const NdbDictionary::Column * col = pIndex->getColumn(a);
1113
if (scanRow.attributeStore(col->getName())->isNULL())
1121
NdbOperation * pk = pTrans->getNdbOperation(tab.getName());
1122
if(!pk || pk->readTuple())
1124
if(equal(&tab, pk, scanRow) || get_values(pk, pkRow))
1129
if(!iop && (iop= pTrans->getNdbIndexScanOperation(indexName,
1132
if(iop->readTuples(NdbScanOperation::LM_CommittedRead,
1135
iop->interpret_exit_ok();
1136
if(get_values(iop, indexRow))
1139
else if(!iop || iop->reset_bounds())
1144
if(equal(pIndex, iop, scanRow))
1148
check = pTrans->execute(NoCommit, AbortOnError);
1152
if(scanRow.c_str() != pkRow.c_str()){
1153
g_err << "Error when comapring records" << endl;
1154
g_err << " scanRow: \n" << scanRow.c_str().c_str() << endl;
1155
g_err << " pkRow: \n" << pkRow.c_str().c_str() << endl;
1156
closeTransaction(pNdb);
1163
if((res= iop->nextResult()) != 0){
1164
g_err << "Failed to find row using index: " << res << endl;
1165
ERR(pTrans->getNdbError());
1166
closeTransaction(pNdb);
1170
if(scanRow.c_str() != indexRow.c_str()){
1171
g_err << "Error when comapring records" << endl;
1172
g_err << " scanRow: \n" << scanRow.c_str().c_str() << endl;
1173
g_err << " indexRow: \n" << indexRow.c_str().c_str() << endl;
1174
closeTransaction(pNdb);
1178
if(iop->nextResult() == 0){
1179
g_err << "Found extra row!!" << endl;
1180
g_err << " indexRow: \n" << indexRow.c_str().c_str() << endl;
1181
closeTransaction(pNdb);
1187
if (eof == -1 || check == -1) {
1189
const NdbError err = pTrans->getNdbError();
1191
if (err.status == NdbError::TemporaryError){
1194
closeTransaction(pNdb);
1195
NdbSleep_MilliSleep(50);
1201
closeTransaction(pNdb);
1205
closeTransaction(pNdb);
1213
UtilTransactions::get_values(NdbOperation* op, NDBT_ResultRow& dst)
1215
for (int a = 0; a < tab.getNoOfColumns(); a++){
1216
NdbRecAttr*& ref= dst.attributeStore(a);
1217
if ((ref= op->getValue(a)) == 0)
1226
UtilTransactions::equal(const NdbDictionary::Index* pIndex,
1227
NdbOperation* op, const NDBT_ResultRow& src)
1229
for(Uint32 a = 0; a<pIndex->getNoOfColumns(); a++){
1230
const NdbDictionary::Column * col = pIndex->getColumn(a);
1231
if(op->equal(col->getName(),
1232
src.attributeStore(col->getName())->aRef()) != 0){
1240
UtilTransactions::equal(const NdbDictionary::Table* pTable,
1241
NdbOperation* op, const NDBT_ResultRow& src)
1243
for(Uint32 a = 0; a<tab.getNoOfColumns(); a++){
1244
const NdbDictionary::Column* attr = tab.getColumn(a);
1245
if (attr->getPrimaryKey() == true){
1246
if (op->equal(attr->getName(), src.attributeStore(a)->aRef()) != 0){
1255
UtilTransactions::getScanOperation(NdbConnection* pTrans)
1257
return (NdbScanOperation*)
1258
getOperation(pTrans, NdbOperation::OpenScanRequest);
1262
UtilTransactions::getOperation(NdbConnection* pTrans,
1263
NdbOperation::OperationType type)
1266
case NdbOperation::ReadRequest:
1267
case NdbOperation::ReadExclusive:
1270
switch(idx->getType()){
1271
case NdbDictionary::Index::UniqueHashIndex:
1272
return pTrans->getNdbIndexOperation(idx->getName(), tab.getName());
1273
case NdbDictionary::Index::OrderedIndex:
1274
return pTrans->getNdbIndexScanOperation(idx->getName(), tab.getName());
1277
case NdbOperation::InsertRequest:
1278
case NdbOperation::WriteRequest:
1279
return pTrans->getNdbOperation(tab.getName());
1280
case NdbOperation::UpdateRequest:
1281
case NdbOperation::DeleteRequest:
1284
switch(idx->getType()){
1285
case NdbDictionary::Index::UniqueHashIndex:
1286
return pTrans->getNdbIndexOperation(idx->getName(), tab.getName());
1289
return pTrans->getNdbOperation(tab.getName());
1290
case NdbOperation::OpenScanRequest:
1293
switch(idx->getType()){
1294
case NdbDictionary::Index::OrderedIndex:
1295
return pTrans->getNdbIndexScanOperation(idx->getName(), tab.getName());
1298
return pTrans->getNdbScanOperation(tab.getName());
1299
case NdbOperation::OpenRangeScanRequest:
1302
switch(idx->getType()){
1303
case NdbDictionary::Index::OrderedIndex:
1304
return pTrans->getNdbIndexScanOperation(idx->getName(), tab.getName());
1311
#include <HugoOperations.hpp>
1314
UtilTransactions::closeTransaction(Ndb* pNdb)
1316
if (pTrans != NULL){
1317
pNdb->closeTransaction(pTrans);
1324
UtilTransactions::compare(Ndb* pNdb, const char* tab_name2, int flags){
1328
int return_code= 0, row_count= 0;
1329
int retryAttempt = 0, retryMax = 10;
1331
HugoCalculator calc(tab);
1332
NDBT_ResultRow row(tab);
1333
const NdbDictionary::Table* tmp= pNdb->getDictionary()->getTable(tab_name2);
1336
g_err << "Unable to lookup table: " << tab_name2
1337
<< endl << pNdb->getDictionary()->getNdbError() << endl;
1340
const NdbDictionary::Table& tab2= *tmp;
1342
HugoOperations cmp(tab2);
1343
UtilTransactions count(tab2);
1347
if (retryAttempt++ >= retryMax){
1348
g_err << "ERROR: compare has retried this operation " << retryAttempt
1349
<< " times, failing!" << endl;
1353
NdbScanOperation *pOp= 0;
1354
pTrans = pNdb->startTransaction();
1355
if (pTrans == NULL) {
1356
err = pNdb->getNdbError();
1360
pOp= pTrans->getNdbScanOperation(tab.getName());
1362
ERR(err= pTrans->getNdbError());
1366
if( pOp->readTuples(NdbScanOperation::LM_Read) ) {
1367
ERR(err= pTrans->getNdbError());
1371
if( pOp->interpret_exit_ok() == -1 ) {
1372
ERR(err= pTrans->getNdbError());
1376
// Read all attributes
1378
for (int a = 0; a < tab.getNoOfColumns(); a++){
1379
if ((row.attributeStore(a) =
1380
pOp->getValue(tab.getColumn(a)->getName())) == 0) {
1381
ERR(err= pTrans->getNdbError());
1387
if( pTrans->execute(NoCommit, AbortOnError) == -1 ) {
1388
ERR(err= pTrans->getNdbError());
1395
while((eof = pOp->nextResult(true)) == 0)
1399
if(cmp.startTransaction(pNdb) != NDBT_OK)
1401
ERR(err= pNdb->getNdbError());
1404
int rowNo= calc.getIdValue(&row);
1405
if(cmp.pkReadRecord(pNdb, rowNo, 1) != NDBT_OK)
1407
ERR(err= cmp.getTransaction()->getNdbError());
1410
if(cmp.execute_Commit(pNdb) != NDBT_OK ||
1411
cmp.getTransaction()->getNdbError().code)
1413
ERR(err= cmp.getTransaction()->getNdbError());
1416
if(row != cmp.get_row(0))
1418
g_err << "COMPARE FAILED" << endl;
1419
g_err << row << endl;
1420
g_err << cmp.get_row(0) << endl;
1424
cmp.closeTransaction(pNdb);
1425
} while((eof = pOp->nextResult(false)) == 0);
1429
err = pTrans->getNdbError();
1434
closeTransaction(pNdb);
1436
g_info << row_count << " rows compared" << endl;
1439
if(count.selectCount(pNdb, 0, &row_count2) != NDBT_OK)
1441
g_err << "Failed to count rows in tab_name2" << endl;
1445
g_info << row_count2 << " rows in tab_name2 - failed " << return_code
1447
return (row_count == row_count2 ? return_code : 1);
1450
if(err.status == NdbError::TemporaryError)
1452
g_err << err << endl;
1453
NdbSleep_MilliSleep(50);
1454
closeTransaction(pNdb);
1455
if(cmp.getTransaction())
1456
cmp.closeTransaction(pNdb);
1460
g_err << "ERROR" << endl;
1461
g_err << err << endl;
1467
closeTransaction(pNdb);