2
/* Copyright (C) 2003 MySQL AB
4
This program is free software; you can redistribute it and/or modify
5
it under the terms of the GNU General Public License as published by
6
the Free Software Foundation; version 2 of the License.
8
This program is distributed in the hope that it will be useful,
9
but WITHOUT ANY WARRANTY; without even the implied warranty of
10
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
GNU General Public License for more details.
13
You should have received a copy of the GNU General Public License
14
along with this program; if not, write to the Free Software
15
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
20
* Illustrates how to use the scan api in the NDBAPI.
21
* The example shows how to do scan, scan for update and scan for delete
22
* using NdbScanFilter and NdbScanOperation
24
* Classes and methods used in this example:
26
* Ndb_cluster_connection
37
* getNdbScanOperation()
44
* deleteCurrentTuple()
45
* updateCurrentTuple()
47
* const NdbDictionary::Dictionary
50
* const NdbDictionary::Table
53
* const NdbDictionary::Column
70
#include <mysqld_error.h>
77
* Helper sleep function
80
milliSleep(int milliseconds){
81
struct timeval sleeptime;
82
sleeptime.tv_sec = milliseconds / 1000;
83
sleeptime.tv_usec = (milliseconds - (sleeptime.tv_sec * 1000)) * 1000000;
84
select(0, 0, 0, 0, &sleeptime);
89
* Helper sleep function
91
#define PRINT_ERROR(code,msg) \
92
std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \
93
<< ", code: " << code \
94
<< ", msg: " << msg << "." << std::endl
95
#define MYSQLERROR(mysql) { \
96
PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
98
#define APIERROR(error) { \
99
PRINT_ERROR(error.code,error.message); \
105
* Note memset, so that entire char-fields are cleared
106
* as all 20 bytes are significant (as type is char)
108
Car() { memset(this, 0, sizeof(* this)); }
116
* Function to drop table
118
void drop_table(MYSQL &mysql)
120
if (mysql_query(&mysql, "DROP TABLE GARAGE"))
126
* Function to create table
128
void create_table(MYSQL &mysql)
130
while (mysql_query(&mysql,
133
" (REG_NO INT UNSIGNED NOT NULL,"
134
" BRAND CHAR(20) NOT NULL,"
135
" COLOR CHAR(20) NOT NULL,"
136
" PRIMARY KEY USING HASH (REG_NO))"
139
if (mysql_errno(&mysql) != ER_TABLE_EXISTS_ERROR)
141
std::cout << "MySQL Cluster already has example table: GARAGE. "
142
<< "Dropping it..." << std::endl;
151
int populate(Ndb * myNdb)
156
const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
157
const NdbDictionary::Table *myTable= myDict->getTable("GARAGE");
160
APIERROR(myDict->getNdbError());
165
for (i = 0; i < 5; i++)
168
sprintf(cars[i].brand, "Mercedes");
169
sprintf(cars[i].color, "Blue");
175
for (i = 5; i < 10; i++)
178
sprintf(cars[i].brand, "BMW");
179
sprintf(cars[i].color, "Black");
185
for (i = 10; i < 15; i++)
188
sprintf(cars[i].brand, "Toyota");
189
sprintf(cars[i].color, "Pink");
192
NdbTransaction* myTrans = myNdb->startTransaction();
194
APIERROR(myNdb->getNdbError());
196
for (i = 0; i < 15; i++)
198
NdbOperation* myNdbOperation = myTrans->getNdbOperation(myTable);
199
if (myNdbOperation == NULL)
200
APIERROR(myTrans->getNdbError());
201
myNdbOperation->insertTuple();
202
myNdbOperation->equal("REG_NO", cars[i].reg_no);
203
myNdbOperation->setValue("BRAND", cars[i].brand);
204
myNdbOperation->setValue("COLOR", cars[i].color);
207
int check = myTrans->execute(NdbTransaction::Commit);
214
int scan_delete(Ndb* myNdb,
220
// Scan all records exclusive and delete
222
int retryAttempt = 0;
223
const int retryMax = 10;
227
NdbTransaction *myTrans;
228
NdbScanOperation *myScanOp;
230
const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
231
const NdbDictionary::Table *myTable= myDict->getTable("GARAGE");
234
APIERROR(myDict->getNdbError());
238
* retryMax not reached
239
* failed operations due to TEMPORARY erros
243
* Permanent error (return -1)
247
if (retryAttempt >= retryMax)
249
std::cout << "ERROR: has retried this operation " << retryAttempt
250
<< " times, failing!" << std::endl;
254
myTrans = myNdb->startTransaction();
257
const NdbError err = myNdb->getNdbError();
259
if (err.status == NdbError::TemporaryError)
265
std::cout << err.message << std::endl;
270
* Get a scan operation.
272
myScanOp = myTrans->getNdbScanOperation(myTable);
273
if (myScanOp == NULL)
275
std::cout << myTrans->getNdbError().message << std::endl;
276
myNdb->closeTransaction(myTrans);
281
* Define a result set for the scan.
283
if(myScanOp->readTuples(NdbOperation::LM_Exclusive) != 0)
285
std::cout << myTrans->getNdbError().message << std::endl;
286
myNdb->closeTransaction(myTrans);
291
* Use NdbScanFilter to define a search critera
293
NdbScanFilter filter(myScanOp) ;
294
if(filter.begin(NdbScanFilter::AND) < 0 ||
295
filter.cmp(NdbScanFilter::COND_EQ, column, color) < 0 ||
298
std::cout << myTrans->getNdbError().message << std::endl;
299
myNdb->closeTransaction(myTrans);
304
* Start scan (NoCommit since we are only reading at this stage);
306
if(myTrans->execute(NdbTransaction::NoCommit) != 0){
307
err = myTrans->getNdbError();
308
if(err.status == NdbError::TemporaryError){
309
std::cout << myTrans->getNdbError().message << std::endl;
310
myNdb->closeTransaction(myTrans);
314
std::cout << err.code << std::endl;
315
std::cout << myTrans->getNdbError().code << std::endl;
316
myNdb->closeTransaction(myTrans);
322
* start of loop: nextResult(true) means that "parallelism" number of
323
* rows are fetched from NDB and cached in NDBAPI
325
while((check = myScanOp->nextResult(true)) == 0){
328
if (myScanOp->deleteCurrentTuple() != 0)
330
std::cout << myTrans->getNdbError().message << std::endl;
331
myNdb->closeTransaction(myTrans);
337
* nextResult(false) means that the records
338
* cached in the NDBAPI are modified before
339
* fetching more rows from NDB.
341
} while((check = myScanOp->nextResult(false)) == 0);
344
* Commit when all cached tuple have been marked for deletion
348
check = myTrans->execute(NdbTransaction::Commit);
354
* Create a new transaction, while keeping scan open
356
check = myTrans->restart();
362
err = myTrans->getNdbError();
365
if(err.status == NdbError::TemporaryError)
367
std::cout << myTrans->getNdbError().message << std::endl;
368
myNdb->closeTransaction(myTrans);
377
std::cout << myTrans->getNdbError().message << std::endl;
378
myNdb->closeTransaction(myTrans);
384
std::cout << myTrans->getNdbError().message << std::endl;
385
myNdb->closeTransaction(myTrans);
391
int scan_update(Ndb* myNdb,
393
const char * before_color,
394
const char * after_color)
398
// Scan all records exclusive and update
400
int retryAttempt = 0;
401
const int retryMax = 10;
405
NdbTransaction *myTrans;
406
NdbScanOperation *myScanOp;
408
const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
409
const NdbDictionary::Table *myTable= myDict->getTable("GARAGE");
412
APIERROR(myDict->getNdbError());
416
* retryMax not reached
417
* failed operations due to TEMPORARY erros
421
* Permanent error (return -1)
426
if (retryAttempt >= retryMax)
428
std::cout << "ERROR: has retried this operation " << retryAttempt
429
<< " times, failing!" << std::endl;
433
myTrans = myNdb->startTransaction();
436
const NdbError err = myNdb->getNdbError();
438
if (err.status == NdbError::TemporaryError)
444
std::cout << err.message << std::endl;
449
* Get a scan operation.
451
myScanOp = myTrans->getNdbScanOperation(myTable);
452
if (myScanOp == NULL)
454
std::cout << myTrans->getNdbError().message << std::endl;
455
myNdb->closeTransaction(myTrans);
460
* Define a result set for the scan.
462
if( myScanOp->readTuples(NdbOperation::LM_Exclusive) )
464
std::cout << myTrans->getNdbError().message << std::endl;
465
myNdb->closeTransaction(myTrans);
470
* Use NdbScanFilter to define a search critera
472
NdbScanFilter filter(myScanOp) ;
473
if(filter.begin(NdbScanFilter::AND) < 0 ||
474
filter.cmp(NdbScanFilter::COND_EQ, update_column, before_color) <0||
477
std::cout << myTrans->getNdbError().message << std::endl;
478
myNdb->closeTransaction(myTrans);
483
* Start scan (NoCommit since we are only reading at this stage);
485
if(myTrans->execute(NdbTransaction::NoCommit) != 0)
487
err = myTrans->getNdbError();
488
if(err.status == NdbError::TemporaryError){
489
std::cout << myTrans->getNdbError().message << std::endl;
490
myNdb->closeTransaction(myTrans);
494
std::cout << myTrans->getNdbError().code << std::endl;
495
myNdb->closeTransaction(myTrans);
500
* start of loop: nextResult(true) means that "parallelism" number of
501
* rows are fetched from NDB and cached in NDBAPI
503
while((check = myScanOp->nextResult(true)) == 0){
506
* Get update operation
508
NdbOperation * myUpdateOp = myScanOp->updateCurrentTuple();
511
std::cout << myTrans->getNdbError().message << std::endl;
512
myNdb->closeTransaction(myTrans);
520
myUpdateOp->setValue(update_column, after_color);
522
* nextResult(false) means that the records
523
* cached in the NDBAPI are modified before
524
* fetching more rows from NDB.
526
} while((check = myScanOp->nextResult(false)) == 0);
529
* NoCommit when all cached tuple have been updated
533
check = myTrans->execute(NdbTransaction::NoCommit);
539
err = myTrans->getNdbError();
542
if(err.status == NdbError::TemporaryError){
543
std::cout << myTrans->getNdbError().message << std::endl;
544
myNdb->closeTransaction(myTrans);
555
* Commit all prepared operations
557
if(myTrans->execute(NdbTransaction::Commit) == -1)
559
if(err.status == NdbError::TemporaryError){
560
std::cout << myTrans->getNdbError().message << std::endl;
561
myNdb->closeTransaction(myTrans);
567
std::cout << myTrans->getNdbError().message << std::endl;
568
myNdb->closeTransaction(myTrans);
575
std::cout << myTrans->getNdbError().message << std::endl;
576
myNdb->closeTransaction(myTrans);
583
int scan_print(Ndb * myNdb)
585
// Scan all records exclusive and update
587
int retryAttempt = 0;
588
const int retryMax = 10;
592
NdbTransaction *myTrans;
593
NdbScanOperation *myScanOp;
594
/* Result of reading attribute value, three columns:
595
REG_NO, BRAND, and COLOR
597
NdbRecAttr * myRecAttr[3];
599
const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
600
const NdbDictionary::Table *myTable= myDict->getTable("GARAGE");
603
APIERROR(myDict->getNdbError());
607
* retryMax not reached
608
* failed operations due to TEMPORARY erros
612
* Permanent error (return -1)
617
if (retryAttempt >= retryMax)
619
std::cout << "ERROR: has retried this operation " << retryAttempt
620
<< " times, failing!" << std::endl;
624
myTrans = myNdb->startTransaction();
627
const NdbError err = myNdb->getNdbError();
629
if (err.status == NdbError::TemporaryError)
635
std::cout << err.message << std::endl;
639
* Define a scan operation.
642
myScanOp = myTrans->getNdbScanOperation(myTable);
643
if (myScanOp == NULL)
645
std::cout << myTrans->getNdbError().message << std::endl;
646
myNdb->closeTransaction(myTrans);
651
* Read without locks, without being placed in lock queue
653
if( myScanOp->readTuples(NdbOperation::LM_CommittedRead) == -1)
655
std::cout << myTrans->getNdbError().message << std::endl;
656
myNdb->closeTransaction(myTrans);
661
* Define storage for fetched attributes.
662
* E.g., the resulting attributes of executing
663
* myOp->getValue("REG_NO") is placed in myRecAttr[0].
664
* No data exists in myRecAttr until transaction has commited!
666
myRecAttr[0] = myScanOp->getValue("REG_NO");
667
myRecAttr[1] = myScanOp->getValue("BRAND");
668
myRecAttr[2] = myScanOp->getValue("COLOR");
669
if(myRecAttr[0] ==NULL || myRecAttr[1] == NULL || myRecAttr[2]==NULL)
671
std::cout << myTrans->getNdbError().message << std::endl;
672
myNdb->closeTransaction(myTrans);
676
* Start scan (NoCommit since we are only reading at this stage);
678
if(myTrans->execute(NdbTransaction::NoCommit) != 0){
679
err = myTrans->getNdbError();
680
if(err.status == NdbError::TemporaryError){
681
std::cout << myTrans->getNdbError().message << std::endl;
682
myNdb->closeTransaction(myTrans);
686
std::cout << err.code << std::endl;
687
std::cout << myTrans->getNdbError().code << std::endl;
688
myNdb->closeTransaction(myTrans);
693
* start of loop: nextResult(true) means that "parallelism" number of
694
* rows are fetched from NDB and cached in NDBAPI
696
while((check = myScanOp->nextResult(true)) == 0){
701
* print REG_NO unsigned int
703
std::cout << myRecAttr[0]->u_32_value() << "\t";
706
* print BRAND character string
708
std::cout << myRecAttr[1]->aRef() << "\t";
711
* print COLOR character string
713
std::cout << myRecAttr[2]->aRef() << std::endl;
716
* nextResult(false) means that the records
717
* cached in the NDBAPI are modified before
718
* fetching more rows from NDB.
720
} while((check = myScanOp->nextResult(false)) == 0);
723
myNdb->closeTransaction(myTrans);
731
int main(int argc, char** argv)
735
std::cout << "Arguments are <socket mysqld> <connect_string cluster>.\n";
738
char * mysqld_sock = argv[1];
739
const char *connectstring = argv[2];
743
/**************************************************************
744
* Connect to mysql server and create table *
745
**************************************************************/
747
if ( !mysql_init(&mysql) ) {
748
std::cout << "mysql_init failed\n";
751
if ( !mysql_real_connect(&mysql, "localhost", "root", "", "",
755
mysql_query(&mysql, "CREATE DATABASE TEST_DB");
756
if (mysql_query(&mysql, "USE TEST_DB") != 0) MYSQLERROR(mysql);
761
/**************************************************************
762
* Connect to ndb cluster *
763
**************************************************************/
765
Ndb_cluster_connection cluster_connection(connectstring);
766
if (cluster_connection.connect(4, 5, 1))
768
std::cout << "Unable to connect to cluster within 30 secs." << std::endl;
771
// Optionally connect and wait for the storage nodes (ndbd's)
772
if (cluster_connection.wait_until_ready(30,0) < 0)
774
std::cout << "Cluster was not ready within 30 secs.\n";
778
Ndb myNdb(&cluster_connection,"TEST_DB");
779
if (myNdb.init(1024) == -1) { // Set max 1024 parallel transactions
780
APIERROR(myNdb.getNdbError());
784
/*******************************************
785
* Check table definition *
786
*******************************************/
789
const NdbDictionary::Dictionary* myDict= myNdb.getDictionary();
790
const NdbDictionary::Table *t= myDict->getTable("GARAGE");
793
if (t->getColumn("COLOR")->getLength() != sizeof(car.color) ||
794
t->getColumn("BRAND")->getLength() != sizeof(car.brand))
796
std::cout << "Wrong table definition" << std::endl;
799
column_color= t->getColumn("COLOR")->getColumnNo();
802
if(populate(&myNdb) > 0)
803
std::cout << "populate: Success!" << std::endl;
805
if(scan_print(&myNdb) > 0)
806
std::cout << "scan_print: Success!" << std::endl << std::endl;
808
std::cout << "Going to delete all pink cars!" << std::endl;
812
* Note! color needs to be of exact the same size as column defined
815
sprintf(tmp.color, "Pink");
816
if(scan_delete(&myNdb, column_color, tmp.color) > 0)
817
std::cout << "scan_delete: Success!" << std::endl << std::endl;
820
if(scan_print(&myNdb) > 0)
821
std::cout << "scan_print: Success!" << std::endl << std::endl;
825
* Note! color1 & 2 need to be of exact the same size as column defined
828
sprintf(tmp1.color, "Blue");
829
sprintf(tmp2.color, "Black");
830
std::cout << "Going to update all " << tmp1.color
831
<< " cars to " << tmp2.color << " cars!" << std::endl;
832
if(scan_update(&myNdb, column_color, tmp1.color, tmp2.color) > 0)
833
std::cout << "scan_update: Success!" << std::endl << std::endl;
835
if(scan_print(&myNdb) > 0)
836
std::cout << "scan_print: Success!" << std::endl << std::endl;