~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to storage/ndb/ndbapi-examples/ndbapi_scan/ndbapi_scan.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
 
 
2
/* Copyright (C) 2003 MySQL AB
 
3
 
 
4
   This program is free software; you can redistribute it and/or modify
 
5
   it under the terms of the GNU General Public License as published by
 
6
   the Free Software Foundation; version 2 of the License.
 
7
 
 
8
   This program is distributed in the hope that it will be useful,
 
9
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
   GNU General Public License for more details.
 
12
 
 
13
   You should have received a copy of the GNU General Public License
 
14
   along with this program; if not, write to the Free Software
 
15
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
16
 
 
17
 
 
18
/*
 
19
 * ndbapi_scan.cpp: 
 
20
 * Illustrates how to use the scan api in the NDBAPI.
 
21
 * The example shows how to do scan, scan for update and scan for delete
 
22
 * using NdbScanFilter and NdbScanOperation
 
23
 *
 
24
 * Classes and methods used in this example:
 
25
 *
 
26
 *  Ndb_cluster_connection
 
27
 *       connect()
 
28
 *       wait_until_ready()
 
29
 *
 
30
 *  Ndb
 
31
 *       init()
 
32
 *       getDictionary()
 
33
 *       startTransaction()
 
34
 *       closeTransaction()
 
35
 *
 
36
 *  NdbTransaction
 
37
 *       getNdbScanOperation()
 
38
 *       execute()
 
39
 *
 
40
 *  NdbScanOperation
 
41
 *       getValue() 
 
42
 *       readTuples()
 
43
 *       nextResult()
 
44
 *       deleteCurrentTuple()
 
45
 *       updateCurrentTuple()
 
46
 *
 
47
 *  const NdbDictionary::Dictionary
 
48
 *       getTable()
 
49
 *
 
50
 *  const NdbDictionary::Table
 
51
 *       getColumn()
 
52
 *
 
53
 *  const NdbDictionary::Column
 
54
 *       getLength()
 
55
 *
 
56
 *  NdbOperation
 
57
 *       insertTuple()
 
58
 *       equal()
 
59
 *       setValue()
 
60
 *
 
61
 *  NdbScanFilter
 
62
 *       begin()
 
63
 *       eq()
 
64
 *       end()
 
65
 *
 
66
 */
 
67
 
 
68
 
 
69
#include <mysql.h>
 
70
#include <mysqld_error.h>
 
71
#include <NdbApi.hpp>
 
72
// Used for cout
 
73
#include <iostream>
 
74
#include <stdio.h>
 
75
 
 
76
/**
 
77
 * Helper sleep function
 
78
 */
 
79
static void
 
80
milliSleep(int milliseconds){
 
81
  struct timeval sleeptime;
 
82
  sleeptime.tv_sec = milliseconds / 1000;
 
83
  sleeptime.tv_usec = (milliseconds - (sleeptime.tv_sec * 1000)) * 1000000;
 
84
  select(0, 0, 0, 0, &sleeptime);
 
85
}
 
86
 
 
87
 
 
88
/**
 
89
 * Helper sleep function
 
90
 */
 
91
#define PRINT_ERROR(code,msg) \
 
92
  std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \
 
93
            << ", code: " << code \
 
94
            << ", msg: " << msg << "." << std::endl
 
95
#define MYSQLERROR(mysql) { \
 
96
  PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
 
97
  exit(-1); }
 
98
#define APIERROR(error) { \
 
99
  PRINT_ERROR(error.code,error.message); \
 
100
  exit(-1); }
 
101
 
 
102
struct Car 
 
103
{
 
104
  /**
 
105
   * Note memset, so that entire char-fields are cleared
 
106
   *   as all 20 bytes are significant (as type is char)
 
107
   */
 
108
  Car() { memset(this, 0, sizeof(* this)); }
 
109
  
 
110
  unsigned int reg_no;
 
111
  char brand[20];
 
112
  char color[20];
 
113
};
 
114
 
 
115
/**
 
116
 * Function to drop table
 
117
 */
 
118
void drop_table(MYSQL &mysql)
 
119
{
 
120
  if (mysql_query(&mysql, "DROP TABLE GARAGE"))
 
121
    MYSQLERROR(mysql);
 
122
}
 
123
 
 
124
 
 
125
/**
 
126
 * Function to create table
 
127
 */
 
128
void create_table(MYSQL &mysql) 
 
129
{
 
130
  while (mysql_query(&mysql, 
 
131
                  "CREATE TABLE"
 
132
                  "  GARAGE"
 
133
                  "    (REG_NO INT UNSIGNED NOT NULL,"
 
134
                  "     BRAND CHAR(20) NOT NULL,"
 
135
                  "     COLOR CHAR(20) NOT NULL,"
 
136
                  "     PRIMARY KEY USING HASH (REG_NO))"
 
137
                  "  ENGINE=NDB"))
 
138
  {
 
139
    if (mysql_errno(&mysql) != ER_TABLE_EXISTS_ERROR)
 
140
      MYSQLERROR(mysql);
 
141
    std::cout << "MySQL Cluster already has example table: GARAGE. "
 
142
              << "Dropping it..." << std::endl; 
 
143
    /******************
 
144
     * Recreate table *
 
145
     ******************/
 
146
    drop_table(mysql);
 
147
    create_table(mysql);
 
148
  }
 
149
}
 
150
 
 
151
int populate(Ndb * myNdb)
 
152
{
 
153
  int i;
 
154
  Car cars[15];
 
155
 
 
156
  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
 
157
  const NdbDictionary::Table *myTable= myDict->getTable("GARAGE");
 
158
 
 
159
  if (myTable == NULL) 
 
160
    APIERROR(myDict->getNdbError());
 
161
 
 
162
  /**
 
163
   * Five blue mercedes
 
164
   */
 
165
  for (i = 0; i < 5; i++)
 
166
  {
 
167
    cars[i].reg_no = i;
 
168
    sprintf(cars[i].brand, "Mercedes");
 
169
    sprintf(cars[i].color, "Blue");
 
170
  }
 
171
 
 
172
  /**
 
173
   * Five black bmw
 
174
   */
 
175
  for (i = 5; i < 10; i++)
 
176
  {
 
177
    cars[i].reg_no = i;
 
178
    sprintf(cars[i].brand, "BMW");
 
179
    sprintf(cars[i].color, "Black");
 
180
  }
 
181
 
 
182
  /**
 
183
   * Five pink toyotas
 
184
   */
 
185
  for (i = 10; i < 15; i++)
 
186
  {
 
187
    cars[i].reg_no = i;
 
188
    sprintf(cars[i].brand, "Toyota");
 
189
    sprintf(cars[i].color, "Pink");
 
190
  }
 
191
  
 
192
  NdbTransaction* myTrans = myNdb->startTransaction();
 
193
  if (myTrans == NULL)
 
194
    APIERROR(myNdb->getNdbError());
 
195
 
 
196
  for (i = 0; i < 15; i++) 
 
197
  {
 
198
    NdbOperation* myNdbOperation = myTrans->getNdbOperation(myTable);
 
199
    if (myNdbOperation == NULL) 
 
200
      APIERROR(myTrans->getNdbError());
 
201
    myNdbOperation->insertTuple();
 
202
    myNdbOperation->equal("REG_NO", cars[i].reg_no);
 
203
    myNdbOperation->setValue("BRAND", cars[i].brand);
 
204
    myNdbOperation->setValue("COLOR", cars[i].color);
 
205
  }
 
206
 
 
207
  int check = myTrans->execute(NdbTransaction::Commit);
 
208
 
 
209
  myTrans->close();
 
210
 
 
211
  return check != -1;
 
212
}
 
213
 
 
214
int scan_delete(Ndb* myNdb, 
 
215
                int column,
 
216
                const char * color)
 
217
  
 
218
{
 
219
  
 
220
  // Scan all records exclusive and delete 
 
221
  // them one by one
 
222
  int                  retryAttempt = 0;
 
223
  const int            retryMax = 10;
 
224
  int deletedRows = 0;
 
225
  int check;
 
226
  NdbError              err;
 
227
  NdbTransaction        *myTrans;
 
228
  NdbScanOperation      *myScanOp;
 
229
 
 
230
  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
 
231
  const NdbDictionary::Table *myTable= myDict->getTable("GARAGE");
 
232
 
 
233
  if (myTable == NULL) 
 
234
    APIERROR(myDict->getNdbError());
 
235
 
 
236
  /**
 
237
   * Loop as long as :
 
238
   *  retryMax not reached
 
239
   *  failed operations due to TEMPORARY erros
 
240
   *
 
241
   * Exit loop;
 
242
   *  retyrMax reached
 
243
   *  Permanent error (return -1)
 
244
   */
 
245
  while (true)
 
246
  {
 
247
    if (retryAttempt >= retryMax)
 
248
    {
 
249
      std::cout << "ERROR: has retried this operation " << retryAttempt 
 
250
                << " times, failing!" << std::endl;
 
251
      return -1;
 
252
    }
 
253
 
 
254
    myTrans = myNdb->startTransaction();
 
255
    if (myTrans == NULL) 
 
256
    {
 
257
      const NdbError err = myNdb->getNdbError();
 
258
 
 
259
      if (err.status == NdbError::TemporaryError)
 
260
      {
 
261
        milliSleep(50);
 
262
        retryAttempt++;
 
263
        continue;
 
264
      }
 
265
      std::cout <<  err.message << std::endl;
 
266
      return -1;
 
267
    }
 
268
 
 
269
   /**
 
270
    * Get a scan operation.
 
271
    */
 
272
    myScanOp = myTrans->getNdbScanOperation(myTable);   
 
273
    if (myScanOp == NULL) 
 
274
    {
 
275
      std::cout << myTrans->getNdbError().message << std::endl;
 
276
      myNdb->closeTransaction(myTrans);
 
277
      return -1;
 
278
    }
 
279
 
 
280
    /**
 
281
     * Define a result set for the scan.
 
282
     */ 
 
283
    if(myScanOp->readTuples(NdbOperation::LM_Exclusive) != 0)
 
284
    {
 
285
      std::cout << myTrans->getNdbError().message << std::endl;
 
286
      myNdb->closeTransaction(myTrans);
 
287
      return -1;
 
288
    } 
 
289
    
 
290
    /**
 
291
     * Use NdbScanFilter to define a search critera
 
292
     */ 
 
293
    NdbScanFilter filter(myScanOp) ;   
 
294
    if(filter.begin(NdbScanFilter::AND) < 0  || 
 
295
       filter.cmp(NdbScanFilter::COND_EQ, column, color) < 0 ||
 
296
       filter.end() < 0)
 
297
    {
 
298
      std::cout <<  myTrans->getNdbError().message << std::endl;
 
299
      myNdb->closeTransaction(myTrans);
 
300
      return -1;
 
301
    }    
 
302
    
 
303
    /**
 
304
     * Start scan    (NoCommit since we are only reading at this stage);
 
305
     */     
 
306
    if(myTrans->execute(NdbTransaction::NoCommit) != 0){      
 
307
      err = myTrans->getNdbError();    
 
308
      if(err.status == NdbError::TemporaryError){
 
309
        std::cout << myTrans->getNdbError().message << std::endl;
 
310
        myNdb->closeTransaction(myTrans);
 
311
        milliSleep(50);
 
312
        continue;
 
313
      }
 
314
      std::cout << err.code << std::endl;
 
315
      std::cout << myTrans->getNdbError().code << std::endl;
 
316
      myNdb->closeTransaction(myTrans);
 
317
      return -1;
 
318
    }
 
319
 
 
320
 
 
321
   /**
 
322
    * start of loop: nextResult(true) means that "parallelism" number of
 
323
    * rows are fetched from NDB and cached in NDBAPI
 
324
    */    
 
325
    while((check = myScanOp->nextResult(true)) == 0){
 
326
      do 
 
327
      {
 
328
        if (myScanOp->deleteCurrentTuple() != 0)
 
329
        {
 
330
          std::cout << myTrans->getNdbError().message << std::endl;
 
331
          myNdb->closeTransaction(myTrans);
 
332
          return -1;
 
333
        }
 
334
        deletedRows++;
 
335
        
 
336
        /**
 
337
         * nextResult(false) means that the records 
 
338
         * cached in the NDBAPI are modified before
 
339
         * fetching more rows from NDB.
 
340
         */    
 
341
      } while((check = myScanOp->nextResult(false)) == 0);
 
342
      
 
343
      /**
 
344
       * Commit when all cached tuple have been marked for deletion
 
345
       */    
 
346
      if(check != -1)
 
347
      {
 
348
        check = myTrans->execute(NdbTransaction::Commit);   
 
349
      }
 
350
 
 
351
      if(check == -1)
 
352
      {
 
353
        /**
 
354
         * Create a new transaction, while keeping scan open
 
355
         */
 
356
        check = myTrans->restart();
 
357
      }
 
358
 
 
359
      /**
 
360
       * Check for errors
 
361
       */
 
362
      err = myTrans->getNdbError();    
 
363
      if(check == -1)
 
364
      {
 
365
        if(err.status == NdbError::TemporaryError)
 
366
        {
 
367
          std::cout << myTrans->getNdbError().message << std::endl;
 
368
          myNdb->closeTransaction(myTrans);
 
369
          milliSleep(50);
 
370
          continue;
 
371
        }       
 
372
      }
 
373
      /**
 
374
       * End of loop 
 
375
       */
 
376
    }
 
377
    std::cout << myTrans->getNdbError().message << std::endl;
 
378
    myNdb->closeTransaction(myTrans);
 
379
    return 0;
 
380
  }
 
381
  
 
382
  if(myTrans!=0) 
 
383
  {
 
384
    std::cout << myTrans->getNdbError().message << std::endl;
 
385
    myNdb->closeTransaction(myTrans);
 
386
  }
 
387
  return -1;
 
388
}
 
389
 
 
390
 
 
391
int scan_update(Ndb* myNdb, 
 
392
                int update_column,
 
393
                const char * before_color,
 
394
                const char * after_color)
 
395
                
 
396
{
 
397
  
 
398
  // Scan all records exclusive and update
 
399
  // them one by one
 
400
  int                  retryAttempt = 0;
 
401
  const int            retryMax = 10;
 
402
  int updatedRows = 0;
 
403
  int check;
 
404
  NdbError              err;
 
405
  NdbTransaction        *myTrans;
 
406
  NdbScanOperation      *myScanOp;
 
407
 
 
408
  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
 
409
  const NdbDictionary::Table *myTable= myDict->getTable("GARAGE");
 
410
 
 
411
  if (myTable == NULL) 
 
412
    APIERROR(myDict->getNdbError());
 
413
 
 
414
  /**
 
415
   * Loop as long as :
 
416
   *  retryMax not reached
 
417
   *  failed operations due to TEMPORARY erros
 
418
   *
 
419
   * Exit loop;
 
420
   *  retyrMax reached
 
421
   *  Permanent error (return -1)
 
422
   */
 
423
  while (true)
 
424
  {
 
425
 
 
426
    if (retryAttempt >= retryMax)
 
427
    {
 
428
      std::cout << "ERROR: has retried this operation " << retryAttempt 
 
429
                << " times, failing!" << std::endl;
 
430
      return -1;
 
431
    }
 
432
 
 
433
    myTrans = myNdb->startTransaction();
 
434
    if (myTrans == NULL) 
 
435
    {
 
436
      const NdbError err = myNdb->getNdbError();
 
437
 
 
438
      if (err.status == NdbError::TemporaryError)
 
439
      {
 
440
        milliSleep(50);
 
441
        retryAttempt++;
 
442
        continue;
 
443
      }
 
444
      std::cout <<  err.message << std::endl;
 
445
      return -1;
 
446
    }
 
447
 
 
448
   /**
 
449
    * Get a scan operation.
 
450
    */
 
451
    myScanOp = myTrans->getNdbScanOperation(myTable);   
 
452
    if (myScanOp == NULL) 
 
453
    {
 
454
      std::cout << myTrans->getNdbError().message << std::endl;
 
455
      myNdb->closeTransaction(myTrans);
 
456
      return -1;
 
457
    }
 
458
 
 
459
    /**
 
460
     * Define a result set for the scan.
 
461
     */ 
 
462
    if( myScanOp->readTuples(NdbOperation::LM_Exclusive) ) 
 
463
    {
 
464
      std::cout << myTrans->getNdbError().message << std::endl;
 
465
      myNdb->closeTransaction(myTrans);
 
466
      return -1;
 
467
    } 
 
468
 
 
469
    /**
 
470
     * Use NdbScanFilter to define a search critera
 
471
     */ 
 
472
    NdbScanFilter filter(myScanOp) ;   
 
473
    if(filter.begin(NdbScanFilter::AND) < 0  || 
 
474
       filter.cmp(NdbScanFilter::COND_EQ, update_column, before_color) <0||
 
475
       filter.end() <0)
 
476
    {
 
477
      std::cout <<  myTrans->getNdbError().message << std::endl;
 
478
      myNdb->closeTransaction(myTrans);
 
479
      return -1;
 
480
    }    
 
481
    
 
482
    /**
 
483
     * Start scan    (NoCommit since we are only reading at this stage);
 
484
     */     
 
485
    if(myTrans->execute(NdbTransaction::NoCommit) != 0)
 
486
    {      
 
487
      err = myTrans->getNdbError();    
 
488
      if(err.status == NdbError::TemporaryError){
 
489
        std::cout << myTrans->getNdbError().message << std::endl;
 
490
        myNdb->closeTransaction(myTrans);
 
491
        milliSleep(50);
 
492
        continue;
 
493
      }
 
494
      std::cout << myTrans->getNdbError().code << std::endl;
 
495
      myNdb->closeTransaction(myTrans);
 
496
      return -1;
 
497
    }
 
498
 
 
499
    /**
 
500
     * start of loop: nextResult(true) means that "parallelism" number of
 
501
     * rows are fetched from NDB and cached in NDBAPI
 
502
     */    
 
503
    while((check = myScanOp->nextResult(true)) == 0){
 
504
      do {
 
505
        /**
 
506
         * Get update operation
 
507
         */    
 
508
        NdbOperation * myUpdateOp = myScanOp->updateCurrentTuple();
 
509
        if (myUpdateOp == 0)
 
510
        {
 
511
          std::cout << myTrans->getNdbError().message << std::endl;
 
512
          myNdb->closeTransaction(myTrans);
 
513
          return -1;
 
514
        }
 
515
        updatedRows++;
 
516
 
 
517
        /**
 
518
         * do the update
 
519
         */    
 
520
        myUpdateOp->setValue(update_column, after_color);
 
521
        /**
 
522
         * nextResult(false) means that the records 
 
523
         * cached in the NDBAPI are modified before
 
524
         * fetching more rows from NDB.
 
525
         */    
 
526
      } while((check = myScanOp->nextResult(false)) == 0);
 
527
      
 
528
      /**
 
529
       * NoCommit when all cached tuple have been updated
 
530
       */    
 
531
      if(check != -1)
 
532
      {
 
533
        check = myTrans->execute(NdbTransaction::NoCommit);   
 
534
      }
 
535
 
 
536
      /**
 
537
       * Check for errors
 
538
       */
 
539
      err = myTrans->getNdbError();    
 
540
      if(check == -1)
 
541
      {
 
542
        if(err.status == NdbError::TemporaryError){
 
543
          std::cout << myTrans->getNdbError().message << std::endl;
 
544
          myNdb->closeTransaction(myTrans);
 
545
          milliSleep(50);
 
546
          continue;
 
547
        }       
 
548
      }
 
549
      /**
 
550
       * End of loop 
 
551
       */
 
552
    }
 
553
 
 
554
    /**
 
555
     * Commit all prepared operations
 
556
     */
 
557
    if(myTrans->execute(NdbTransaction::Commit) == -1)
 
558
    {
 
559
      if(err.status == NdbError::TemporaryError){
 
560
        std::cout << myTrans->getNdbError().message << std::endl;
 
561
        myNdb->closeTransaction(myTrans);
 
562
        milliSleep(50);
 
563
        continue;
 
564
      } 
 
565
    }
 
566
 
 
567
    std::cout << myTrans->getNdbError().message << std::endl;
 
568
    myNdb->closeTransaction(myTrans);
 
569
    return 0;    
 
570
  }
 
571
 
 
572
 
 
573
  if(myTrans!=0) 
 
574
  {
 
575
    std::cout << myTrans->getNdbError().message << std::endl;
 
576
    myNdb->closeTransaction(myTrans);
 
577
  }
 
578
  return -1;
 
579
}
 
580
 
 
581
 
 
582
 
 
583
int scan_print(Ndb * myNdb)
 
584
{
 
585
// Scan all records exclusive and update
 
586
  // them one by one
 
587
  int                  retryAttempt = 0;
 
588
  const int            retryMax = 10;
 
589
  int fetchedRows = 0;
 
590
  int check;
 
591
  NdbError              err;
 
592
  NdbTransaction        *myTrans;
 
593
  NdbScanOperation      *myScanOp;
 
594
  /* Result of reading attribute value, three columns:
 
595
     REG_NO, BRAND, and COLOR
 
596
   */
 
597
  NdbRecAttr *          myRecAttr[3];   
 
598
 
 
599
  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
 
600
  const NdbDictionary::Table *myTable= myDict->getTable("GARAGE");
 
601
 
 
602
  if (myTable == NULL) 
 
603
    APIERROR(myDict->getNdbError());
 
604
 
 
605
  /**
 
606
   * Loop as long as :
 
607
   *  retryMax not reached
 
608
   *  failed operations due to TEMPORARY erros
 
609
   *
 
610
   * Exit loop;
 
611
   *  retyrMax reached
 
612
   *  Permanent error (return -1)
 
613
   */
 
614
  while (true)
 
615
  {
 
616
 
 
617
    if (retryAttempt >= retryMax)
 
618
    {
 
619
      std::cout << "ERROR: has retried this operation " << retryAttempt 
 
620
                << " times, failing!" << std::endl;
 
621
      return -1;
 
622
    }
 
623
 
 
624
    myTrans = myNdb->startTransaction();
 
625
    if (myTrans == NULL) 
 
626
    {
 
627
      const NdbError err = myNdb->getNdbError();
 
628
 
 
629
      if (err.status == NdbError::TemporaryError)
 
630
      {
 
631
        milliSleep(50);
 
632
        retryAttempt++;
 
633
        continue;
 
634
      }
 
635
     std::cout << err.message << std::endl;
 
636
      return -1;
 
637
    }
 
638
    /*
 
639
     * Define a scan operation. 
 
640
     * NDBAPI.
 
641
     */
 
642
    myScanOp = myTrans->getNdbScanOperation(myTable);   
 
643
    if (myScanOp == NULL) 
 
644
    {
 
645
      std::cout << myTrans->getNdbError().message << std::endl;
 
646
      myNdb->closeTransaction(myTrans);
 
647
      return -1;
 
648
    }
 
649
 
 
650
    /**
 
651
     * Read without locks, without being placed in lock queue
 
652
     */
 
653
    if( myScanOp->readTuples(NdbOperation::LM_CommittedRead) == -1)
 
654
    {
 
655
      std::cout << myTrans->getNdbError().message << std::endl;
 
656
      myNdb->closeTransaction(myTrans);
 
657
      return -1;
 
658
    } 
 
659
 
 
660
    /**
 
661
     * Define storage for fetched attributes.
 
662
     * E.g., the resulting attributes of executing
 
663
     * myOp->getValue("REG_NO") is placed in myRecAttr[0].
 
664
     * No data exists in myRecAttr until transaction has commited!
 
665
     */
 
666
    myRecAttr[0] = myScanOp->getValue("REG_NO");
 
667
    myRecAttr[1] = myScanOp->getValue("BRAND");
 
668
    myRecAttr[2] = myScanOp->getValue("COLOR");
 
669
    if(myRecAttr[0] ==NULL || myRecAttr[1] == NULL || myRecAttr[2]==NULL) 
 
670
    {
 
671
        std::cout << myTrans->getNdbError().message << std::endl;
 
672
        myNdb->closeTransaction(myTrans);
 
673
        return -1;
 
674
    }
 
675
    /**
 
676
     * Start scan   (NoCommit since we are only reading at this stage);
 
677
     */     
 
678
    if(myTrans->execute(NdbTransaction::NoCommit) != 0){      
 
679
      err = myTrans->getNdbError();    
 
680
      if(err.status == NdbError::TemporaryError){
 
681
        std::cout << myTrans->getNdbError().message << std::endl;
 
682
        myNdb->closeTransaction(myTrans);
 
683
        milliSleep(50);
 
684
        continue;
 
685
      }
 
686
      std::cout << err.code << std::endl;
 
687
      std::cout << myTrans->getNdbError().code << std::endl;
 
688
      myNdb->closeTransaction(myTrans);
 
689
      return -1;
 
690
    }
 
691
    
 
692
    /**
 
693
     * start of loop: nextResult(true) means that "parallelism" number of
 
694
     * rows are fetched from NDB and cached in NDBAPI
 
695
     */    
 
696
    while((check = myScanOp->nextResult(true)) == 0){
 
697
      do {
 
698
        
 
699
        fetchedRows++;
 
700
        /**
 
701
         * print  REG_NO unsigned int
 
702
         */
 
703
        std::cout << myRecAttr[0]->u_32_value() << "\t";
 
704
 
 
705
        /**
 
706
         * print  BRAND character string
 
707
         */
 
708
        std::cout << myRecAttr[1]->aRef() << "\t";
 
709
 
 
710
        /**
 
711
         * print  COLOR character string
 
712
         */
 
713
        std::cout << myRecAttr[2]->aRef() << std::endl;
 
714
 
 
715
        /**
 
716
         * nextResult(false) means that the records 
 
717
         * cached in the NDBAPI are modified before
 
718
         * fetching more rows from NDB.
 
719
         */    
 
720
      } while((check = myScanOp->nextResult(false)) == 0);
 
721
 
 
722
    }    
 
723
    myNdb->closeTransaction(myTrans);
 
724
    return 1;
 
725
  }
 
726
  return -1;
 
727
 
 
728
}
 
729
 
 
730
 
 
731
int main(int argc, char** argv)
 
732
{
 
733
  if (argc != 3)
 
734
  {
 
735
    std::cout << "Arguments are <socket mysqld> <connect_string cluster>.\n";
 
736
    exit(-1);
 
737
  }
 
738
  char * mysqld_sock  = argv[1];
 
739
  const char *connectstring = argv[2];
 
740
  ndb_init();
 
741
  MYSQL mysql;
 
742
 
 
743
  /**************************************************************
 
744
   * Connect to mysql server and create table                   *
 
745
   **************************************************************/
 
746
  {
 
747
    if ( !mysql_init(&mysql) ) {
 
748
      std::cout << "mysql_init failed\n";
 
749
      exit(-1);
 
750
    }
 
751
    if ( !mysql_real_connect(&mysql, "localhost", "root", "", "",
 
752
                             0, mysqld_sock, 0) )
 
753
      MYSQLERROR(mysql);
 
754
 
 
755
    mysql_query(&mysql, "CREATE DATABASE TEST_DB");
 
756
    if (mysql_query(&mysql, "USE TEST_DB") != 0) MYSQLERROR(mysql);
 
757
 
 
758
    create_table(mysql);
 
759
  }
 
760
 
 
761
  /**************************************************************
 
762
   * Connect to ndb cluster                                     *
 
763
   **************************************************************/
 
764
 
 
765
  Ndb_cluster_connection cluster_connection(connectstring);
 
766
  if (cluster_connection.connect(4, 5, 1))
 
767
  {
 
768
    std::cout << "Unable to connect to cluster within 30 secs." << std::endl;
 
769
    exit(-1);
 
770
  }
 
771
  // Optionally connect and wait for the storage nodes (ndbd's)
 
772
  if (cluster_connection.wait_until_ready(30,0) < 0)
 
773
  {
 
774
    std::cout << "Cluster was not ready within 30 secs.\n";
 
775
    exit(-1);
 
776
  }
 
777
 
 
778
  Ndb myNdb(&cluster_connection,"TEST_DB");
 
779
  if (myNdb.init(1024) == -1) {      // Set max 1024  parallel transactions
 
780
    APIERROR(myNdb.getNdbError());
 
781
    exit(-1);
 
782
  }
 
783
 
 
784
  /*******************************************
 
785
   * Check table definition                  *
 
786
   *******************************************/
 
787
  int column_color;
 
788
  {
 
789
    const NdbDictionary::Dictionary* myDict= myNdb.getDictionary();
 
790
    const NdbDictionary::Table *t= myDict->getTable("GARAGE");
 
791
 
 
792
    Car car;
 
793
    if (t->getColumn("COLOR")->getLength() != sizeof(car.color) ||
 
794
        t->getColumn("BRAND")->getLength() != sizeof(car.brand))
 
795
    {
 
796
      std::cout << "Wrong table definition" << std::endl;
 
797
      exit(-1);
 
798
    }
 
799
    column_color= t->getColumn("COLOR")->getColumnNo();
 
800
  }
 
801
 
 
802
  if(populate(&myNdb) > 0)
 
803
    std::cout << "populate: Success!" << std::endl;
 
804
  
 
805
  if(scan_print(&myNdb) > 0)
 
806
    std::cout << "scan_print: Success!" << std::endl  << std::endl;
 
807
  
 
808
  std::cout << "Going to delete all pink cars!" << std::endl;
 
809
  
 
810
  {
 
811
    /**
 
812
     * Note! color needs to be of exact the same size as column defined
 
813
     */
 
814
    Car tmp;
 
815
    sprintf(tmp.color, "Pink");
 
816
    if(scan_delete(&myNdb, column_color, tmp.color) > 0)
 
817
      std::cout << "scan_delete: Success!" << std::endl  << std::endl;
 
818
  }
 
819
 
 
820
  if(scan_print(&myNdb) > 0)
 
821
    std::cout << "scan_print: Success!" << std::endl  << std::endl;
 
822
  
 
823
  {
 
824
    /**
 
825
     * Note! color1 & 2 need to be of exact the same size as column defined
 
826
     */
 
827
    Car tmp1, tmp2;
 
828
    sprintf(tmp1.color, "Blue");
 
829
    sprintf(tmp2.color, "Black");
 
830
    std::cout << "Going to update all " << tmp1.color 
 
831
              << " cars to " << tmp2.color << " cars!" << std::endl;
 
832
    if(scan_update(&myNdb, column_color, tmp1.color, tmp2.color) > 0) 
 
833
      std::cout << "scan_update: Success!" << std::endl  << std::endl;
 
834
  }
 
835
  if(scan_print(&myNdb) > 0)
 
836
    std::cout << "scan_print: Success!" << std::endl  << std::endl;
 
837
 
 
838
  /**
 
839
   * Drop table
 
840
   */
 
841
  drop_table(mysql);
 
842
 
 
843
  return 0;
 
844
}