~vlad-lesin/percona-server/mysql-5.0.33-original

« back to all changes in this revision

Viewing changes to ndb/tools/restore/consumer_restore.cpp

  • Committer: Vlad Lesin
  • Date: 2012-07-31 09:21:34 UTC
  • Revision ID: vladislav.lesin@percona.com-20120731092134-zfodx022b7992wsi
VirginĀ 5.0.33

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; either version 2 of the License, or
 
6
   (at your option) any later version.
 
7
 
 
8
   This program is distributed in the hope that it will be useful,
 
9
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
   GNU General Public License for more details.
 
12
 
 
13
   You should have received a copy of the GNU General Public License
 
14
   along with this program; if not, write to the Free Software
 
15
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
16
 
 
17
#include <NDBT_ReturnCodes.h>
 
18
#include "consumer_restore.hpp"
 
19
#include <NdbSleep.h>
 
20
 
 
21
extern my_bool opt_core;
 
22
 
 
23
extern FilteredNdbOut err;
 
24
extern FilteredNdbOut info;
 
25
extern FilteredNdbOut debug;
 
26
 
 
27
static void callback(int, NdbTransaction*, void*);
 
28
 
 
29
extern const char * g_connect_string;
 
30
extern BaseString g_options;
 
31
 
 
32
bool
 
33
BackupRestore::init()
 
34
{
 
35
  release();
 
36
 
 
37
  if (!m_restore && !m_restore_meta)
 
38
    return true;
 
39
 
 
40
  m_cluster_connection = new Ndb_cluster_connection(g_connect_string);
 
41
  m_cluster_connection->set_name(g_options.c_str());
 
42
  if(m_cluster_connection->connect(12, 5, 1) != 0)
 
43
  {
 
44
    return false;
 
45
  }
 
46
 
 
47
  m_ndb = new Ndb(m_cluster_connection);
 
48
 
 
49
  if (m_ndb == NULL)
 
50
    return false;
 
51
  
 
52
  m_ndb->init(1024);
 
53
  if (m_ndb->waitUntilReady(30) != 0)
 
54
  {
 
55
    err << "Failed to connect to ndb!!" << endl;
 
56
    return false;
 
57
  }
 
58
  info << "Connected to ndb!!" << endl;
 
59
 
 
60
  m_callback = new restore_callback_t[m_parallelism];
 
61
 
 
62
  if (m_callback == 0)
 
63
  {
 
64
    err << "Failed to allocate callback structs" << endl;
 
65
    return false;
 
66
  }
 
67
 
 
68
  m_free_callback= m_callback;
 
69
  for (Uint32 i= 0; i < m_parallelism; i++) {
 
70
    m_callback[i].restore= this;
 
71
    m_callback[i].connection= 0;
 
72
    if (i > 0)
 
73
      m_callback[i-1].next= &(m_callback[i]);
 
74
  }
 
75
  m_callback[m_parallelism-1].next = 0;
 
76
 
 
77
  return true;
 
78
}
 
79
 
 
80
void BackupRestore::release()
 
81
{
 
82
  if (m_ndb)
 
83
  {
 
84
    delete m_ndb;
 
85
    m_ndb= 0;
 
86
  }
 
87
 
 
88
  if (m_callback)
 
89
  {
 
90
    delete [] m_callback;
 
91
    m_callback= 0;
 
92
  }
 
93
 
 
94
  if (m_cluster_connection)
 
95
  {
 
96
    delete m_cluster_connection;
 
97
    m_cluster_connection= 0;
 
98
  }
 
99
}
 
100
 
 
101
BackupRestore::~BackupRestore()
 
102
{
 
103
  release();
 
104
}
 
105
 
 
106
static
 
107
int 
 
108
match_blob(const char * name){
 
109
  int cnt, id1, id2;
 
110
  char buf[256];
 
111
  if((cnt = sscanf(name, "%[^/]/%[^/]/NDB$BLOB_%d_%d", buf, buf, &id1, &id2)) == 4){
 
112
    return id1;
 
113
  }
 
114
  
 
115
  return -1;
 
116
}
 
117
 
 
118
const NdbDictionary::Table*
 
119
BackupRestore::get_table(const NdbDictionary::Table* tab){
 
120
  if(m_cache.m_old_table == tab)
 
121
    return m_cache.m_new_table;
 
122
  m_cache.m_old_table = tab;
 
123
 
 
124
  int cnt, id1, id2;
 
125
  char db[256], schema[256];
 
126
  if((cnt = sscanf(tab->getName(), "%[^/]/%[^/]/NDB$BLOB_%d_%d", 
 
127
                   db, schema, &id1, &id2)) == 4){
 
128
    m_ndb->setDatabaseName(db);
 
129
    m_ndb->setSchemaName(schema);
 
130
    
 
131
    BaseString::snprintf(db, sizeof(db), "NDB$BLOB_%d_%d", 
 
132
                         m_new_tables[id1]->getTableId(), id2);
 
133
    
 
134
    m_cache.m_new_table = m_ndb->getDictionary()->getTable(db);
 
135
    
 
136
  } else {
 
137
    m_cache.m_new_table = m_new_tables[tab->getTableId()];
 
138
  }
 
139
  assert(m_cache.m_new_table);
 
140
  return m_cache.m_new_table;
 
141
}
 
142
 
 
143
bool
 
144
BackupRestore::finalize_table(const TableS & table){
 
145
  bool ret= true;
 
146
  if (!m_restore && !m_restore_meta)
 
147
    return ret;
 
148
  if (!table.have_auto_inc())
 
149
    return ret;
 
150
 
 
151
  Uint64 max_val= table.get_max_auto_val();
 
152
  do
 
153
  {
 
154
    Uint64 auto_val = ~(Uint64)0;
 
155
    int r= m_ndb->readAutoIncrementValue(get_table(table.m_dictTable), auto_val);
 
156
    if (r == -1 && m_ndb->getNdbError().status == NdbError::TemporaryError)
 
157
    {
 
158
      NdbSleep_MilliSleep(50);
 
159
      continue; // retry
 
160
    }
 
161
    else if (r == -1 && m_ndb->getNdbError().code != 626)
 
162
    {
 
163
      ret= false;
 
164
    }
 
165
    else if ((r == -1 && m_ndb->getNdbError().code == 626) ||
 
166
             max_val+1 > auto_val || auto_val == ~(Uint64)0)
 
167
    {
 
168
      r= m_ndb->setAutoIncrementValue(get_table(table.m_dictTable),
 
169
                                      max_val+1, false);
 
170
      if (r == -1 &&
 
171
            m_ndb->getNdbError().status == NdbError::TemporaryError)
 
172
      {
 
173
        NdbSleep_MilliSleep(50);
 
174
        continue; // retry
 
175
      }
 
176
      ret = (r == 0);
 
177
    }
 
178
    return (ret);
 
179
  } while (1);
 
180
}
 
181
 
 
182
bool
 
183
BackupRestore::has_temp_error(){
 
184
  return m_temp_error;
 
185
}
 
186
 
 
187
bool
 
188
BackupRestore::table(const TableS & table){
 
189
  if (!m_restore && !m_restore_meta)
 
190
    return true;
 
191
 
 
192
  const char * name = table.getTableName();
 
193
  
 
194
  /**
 
195
   * Ignore blob tables
 
196
   */
 
197
  if(match_blob(name) >= 0)
 
198
    return true;
 
199
  
 
200
  const NdbTableImpl & tmptab = NdbTableImpl::getImpl(* table.m_dictTable);
 
201
  if(tmptab.m_indexType != NdbDictionary::Index::Undefined){
 
202
    m_indexes.push_back(table.m_dictTable);
 
203
    return true;
 
204
  }
 
205
  
 
206
  BaseString tmp(name);
 
207
  Vector<BaseString> split;
 
208
  if(tmp.split(split, "/") != 3){
 
209
    err << "Invalid table name format " << name << endl;
 
210
    return false;
 
211
  }
 
212
 
 
213
  m_ndb->setDatabaseName(split[0].c_str());
 
214
  m_ndb->setSchemaName(split[1].c_str());
 
215
  
 
216
  NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
 
217
  if(m_restore_meta){
 
218
    NdbDictionary::Table copy(*table.m_dictTable);
 
219
 
 
220
    copy.setName(split[2].c_str());
 
221
 
 
222
    /*
 
223
      update min and max rows to reflect the table, this to
 
224
      ensure that memory is allocated properly in the ndb kernel
 
225
    */
 
226
    copy.setMinRows(table.getNoOfRecords());
 
227
    if (table.getNoOfRecords() > copy.getMaxRows())
 
228
    {
 
229
      copy.setMaxRows(table.getNoOfRecords());
 
230
    }
 
231
 
 
232
    if (dict->createTable(copy) == -1) 
 
233
    {
 
234
      err << "Create table " << table.getTableName() << " failed: "
 
235
          << dict->getNdbError() << endl;
 
236
      return false;
 
237
    }
 
238
    info << "Successfully restored table " << table.getTableName()<< endl ;
 
239
  }  
 
240
  
 
241
  const NdbDictionary::Table* tab = dict->getTable(split[2].c_str());
 
242
  if(tab == 0){
 
243
    err << "Unable to find table: " << split[2].c_str() << endl;
 
244
    return false;
 
245
  }
 
246
  const NdbDictionary::Table* null = 0;
 
247
  m_new_tables.fill(table.m_dictTable->getTableId(), null);
 
248
  m_new_tables[table.m_dictTable->getTableId()] = tab;
 
249
  return true;
 
250
}
 
251
 
 
252
bool
 
253
BackupRestore::endOfTables(){
 
254
  if(!m_restore_meta)
 
255
    return true;
 
256
 
 
257
  NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
 
258
  for(size_t i = 0; i<m_indexes.size(); i++){
 
259
    NdbTableImpl & indtab = NdbTableImpl::getImpl(* m_indexes[i]);
 
260
 
 
261
    BaseString tmp(indtab.m_primaryTable.c_str());
 
262
    Vector<BaseString> split;
 
263
    if(tmp.split(split, "/") != 3){
 
264
      err << "Invalid table name format " << indtab.m_primaryTable.c_str()
 
265
          << endl;
 
266
      return false;
 
267
    }
 
268
    
 
269
    m_ndb->setDatabaseName(split[0].c_str());
 
270
    m_ndb->setSchemaName(split[1].c_str());
 
271
    
 
272
    const NdbDictionary::Table * prim = dict->getTable(split[2].c_str());
 
273
    if(prim == 0){
 
274
      err << "Unable to find base table \"" << split[2].c_str() 
 
275
          << "\" for index "
 
276
          << indtab.getName() << endl;
 
277
      return false;
 
278
    }
 
279
    NdbTableImpl& base = NdbTableImpl::getImpl(*prim);
 
280
    NdbIndexImpl* idx;
 
281
    int id;
 
282
    char idxName[255], buf[255];
 
283
    if(sscanf(indtab.getName(), "%[^/]/%[^/]/%d/%s",
 
284
              buf, buf, &id, idxName) != 4){
 
285
      err << "Invalid index name format " << indtab.getName() << endl;
 
286
      return false;
 
287
    }
 
288
    if(NdbDictInterface::create_index_obj_from_table(&idx, &indtab, &base))
 
289
    {
 
290
      err << "Failed to create index " << idxName
 
291
          << " on " << split[2].c_str() << endl;
 
292
        return false;
 
293
    }
 
294
    idx->setName(idxName);
 
295
    if(dict->createIndex(* idx) != 0)
 
296
    {
 
297
      delete idx;
 
298
      err << "Failed to create index " << idxName
 
299
          << " on " << split[2].c_str() << endl
 
300
          << dict->getNdbError() << endl;
 
301
 
 
302
      return false;
 
303
    }
 
304
    delete idx;
 
305
    info << "Successfully created index " << idxName
 
306
         << " on " << split[2].c_str() << endl;
 
307
  }
 
308
  return true;
 
309
}
 
310
 
 
311
void BackupRestore::tuple(const TupleS & tup)
 
312
{
 
313
  if (!m_restore) 
 
314
    return;
 
315
 
 
316
  while (m_free_callback == 0)
 
317
  {
 
318
    assert(m_transactions == m_parallelism);
 
319
    // send-poll all transactions
 
320
    // close transaction is done in callback
 
321
    m_ndb->sendPollNdb(3000, 1);
 
322
  }
 
323
  
 
324
  restore_callback_t * cb = m_free_callback;
 
325
  
 
326
  if (cb == 0)
 
327
    assert(false);
 
328
  
 
329
  m_free_callback = cb->next;
 
330
  cb->retries = 0;
 
331
  cb->tup = tup; // must do copy!
 
332
  tuple_a(cb);
 
333
 
 
334
}
 
335
 
 
336
void BackupRestore::tuple_a(restore_callback_t *cb)
 
337
{
 
338
  while (cb->retries < 10) 
 
339
  {
 
340
    /**
 
341
     * start transactions
 
342
     */
 
343
    cb->connection = m_ndb->startTransaction();
 
344
    if (cb->connection == NULL) 
 
345
    {
 
346
      if (errorHandler(cb)) 
 
347
      {
 
348
        m_ndb->sendPollNdb(3000, 1);
 
349
        continue;
 
350
      }
 
351
      exitHandler();
 
352
    } // if
 
353
    
 
354
    const TupleS &tup = cb->tup;
 
355
    const NdbDictionary::Table * table = get_table(tup.getTable()->m_dictTable);
 
356
 
 
357
    NdbOperation * op = cb->connection->getNdbOperation(table);
 
358
    
 
359
    if (op == NULL) 
 
360
    {
 
361
      if (errorHandler(cb)) 
 
362
        continue;
 
363
      exitHandler();
 
364
    } // if
 
365
    
 
366
    if (op->writeTuple() == -1) 
 
367
    {
 
368
      if (errorHandler(cb))
 
369
        continue;
 
370
      exitHandler();
 
371
    } // if
 
372
    
 
373
    int ret = 0;
 
374
    for (int j = 0; j < 2; j++)
 
375
    {
 
376
      for (int i = 0; i < tup.getNoOfAttributes(); i++) 
 
377
      {
 
378
        const AttributeDesc * attr_desc = tup.getDesc(i);
 
379
        const AttributeData * attr_data = tup.getData(i);
 
380
        int size = attr_desc->size;
 
381
        int arraySize = attr_desc->arraySize;
 
382
        char * dataPtr = attr_data->string_value;
 
383
        Uint32 length = (size * arraySize) / 8;
 
384
 
 
385
        if (j == 0 && tup.getTable()->have_auto_inc(i))
 
386
          tup.getTable()->update_max_auto_val(dataPtr,size);
 
387
 
 
388
        if (attr_desc->m_column->getPrimaryKey())
 
389
        {
 
390
          if (j == 1) continue;
 
391
          ret = op->equal(i, dataPtr, length);
 
392
        }
 
393
        else
 
394
        {
 
395
          if (j == 0) continue;
 
396
          if (attr_data->null) 
 
397
            ret = op->setValue(i, NULL, 0);
 
398
          else
 
399
            ret = op->setValue(i, dataPtr, length);
 
400
        }
 
401
        if (ret < 0) {
 
402
          ndbout_c("Column: %d type %d %d %d %d",i,
 
403
                   attr_desc->m_column->getType(),
 
404
                   size, arraySize, attr_data->size);
 
405
          break;
 
406
        }
 
407
      }
 
408
      if (ret < 0)
 
409
        break;
 
410
    }
 
411
    if (ret < 0)
 
412
    {
 
413
      if (errorHandler(cb)) 
 
414
        continue;
 
415
      exitHandler();
 
416
    }
 
417
 
 
418
    // Prepare transaction (the transaction is NOT yet sent to NDB)
 
419
    cb->connection->executeAsynchPrepare(NdbTransaction::Commit,
 
420
                                         &callback, cb);
 
421
    m_transactions++;
 
422
    return;
 
423
  }
 
424
  err << "Retried transaction " << cb->retries << " times.\nLast error"
 
425
      << m_ndb->getNdbError(cb->error_code) << endl
 
426
      << "...Unable to recover from errors. Exiting..." << endl;
 
427
  exitHandler();
 
428
}
 
429
 
 
430
void BackupRestore::cback(int result, restore_callback_t *cb)
 
431
{
 
432
  m_transactions--;
 
433
 
 
434
  if (result < 0)
 
435
  {
 
436
    /**
 
437
     * Error. temporary or permanent?
 
438
     */
 
439
    if (errorHandler(cb))
 
440
      tuple_a(cb); // retry
 
441
    else
 
442
    {
 
443
      err << "Restore: Failed to restore data due to a unrecoverable error. Exiting..." << endl;
 
444
      exitHandler();
 
445
    }
 
446
  }
 
447
  else
 
448
  {
 
449
    /**
 
450
     * OK! close transaction
 
451
     */
 
452
    m_ndb->closeTransaction(cb->connection);
 
453
    cb->connection= 0;
 
454
    cb->next= m_free_callback;
 
455
    m_free_callback= cb;
 
456
    m_dataCount++;
 
457
  }
 
458
}
 
459
 
 
460
/**
 
461
 * returns true if is recoverable,
 
462
 * Error handling based on hugo
 
463
 *  false if it is an  error that generates an abort.
 
464
 */
 
465
bool BackupRestore::errorHandler(restore_callback_t *cb) 
 
466
{
 
467
  NdbError error;
 
468
  if(cb->connection)
 
469
  {
 
470
    error= cb->connection->getNdbError();
 
471
    m_ndb->closeTransaction(cb->connection);
 
472
    cb->connection= 0;
 
473
  }
 
474
  else
 
475
  {
 
476
    error= m_ndb->getNdbError();
 
477
  } 
 
478
 
 
479
  Uint32 sleepTime = 100 + cb->retries * 300;
 
480
  
 
481
  cb->retries++;
 
482
  cb->error_code = error.code;
 
483
 
 
484
  switch(error.status)
 
485
  {
 
486
  case NdbError::Success:
 
487
    return false;
 
488
    // ERROR!
 
489
    break;
 
490
    
 
491
  case NdbError::TemporaryError:
 
492
    err << "Temporary error: " << error << endl;
 
493
    m_temp_error = true;
 
494
    NdbSleep_MilliSleep(sleepTime);
 
495
    return true;
 
496
    // RETRY
 
497
    break;
 
498
    
 
499
  case NdbError::UnknownResult:
 
500
    err << error << endl;
 
501
    return false;
 
502
    // ERROR!
 
503
    break;
 
504
    
 
505
  default:
 
506
  case NdbError::PermanentError:
 
507
    //ERROR
 
508
    err << error << endl;
 
509
    return false;
 
510
    break;
 
511
  }
 
512
  return false;
 
513
}
 
514
 
 
515
void BackupRestore::exitHandler() 
 
516
{
 
517
  release();
 
518
  NDBT_ProgramExit(NDBT_FAILED);
 
519
  if (opt_core)
 
520
    abort();
 
521
  else
 
522
    exit(NDBT_FAILED);
 
523
}
 
524
 
 
525
 
 
526
void
 
527
BackupRestore::tuple_free()
 
528
{
 
529
  if (!m_restore)
 
530
    return;
 
531
 
 
532
  // Poll all transactions
 
533
  while (m_transactions)
 
534
  {
 
535
    m_ndb->sendPollNdb(3000);
 
536
  }
 
537
}
 
538
 
 
539
void
 
540
BackupRestore::endOfTuples()
 
541
{
 
542
  tuple_free();
 
543
}
 
544
 
 
545
void
 
546
BackupRestore::logEntry(const LogEntry & tup)
 
547
{
 
548
  if (!m_restore)
 
549
    return;
 
550
 
 
551
  NdbTransaction * trans = m_ndb->startTransaction();
 
552
  if (trans == NULL) 
 
553
  {
 
554
    // Deep shit, TODO: handle the error
 
555
    err << "Cannot start transaction" << endl;
 
556
    exitHandler();
 
557
  } // if
 
558
  
 
559
  const NdbDictionary::Table * table = get_table(tup.m_table->m_dictTable);
 
560
  NdbOperation * op = trans->getNdbOperation(table);
 
561
  if (op == NULL) 
 
562
  {
 
563
    err << "Cannot get operation: " << trans->getNdbError() << endl;
 
564
    exitHandler();
 
565
  } // if
 
566
  
 
567
  int check = 0;
 
568
  switch(tup.m_type)
 
569
  {
 
570
  case LogEntry::LE_INSERT:
 
571
    check = op->insertTuple();
 
572
    break;
 
573
  case LogEntry::LE_UPDATE:
 
574
    check = op->updateTuple();
 
575
    break;
 
576
  case LogEntry::LE_DELETE:
 
577
    check = op->deleteTuple();
 
578
    break;
 
579
  default:
 
580
    err << "Log entry has wrong operation type."
 
581
           << " Exiting...";
 
582
    exitHandler();
 
583
  }
 
584
 
 
585
  if (check != 0) 
 
586
  {
 
587
    err << "Error defining op: " << trans->getNdbError() << endl;
 
588
    exitHandler();
 
589
  } // if
 
590
  
 
591
  Bitmask<4096> keys;
 
592
  for (Uint32 i= 0; i < tup.size(); i++) 
 
593
  {
 
594
    const AttributeS * attr = tup[i];
 
595
    int size = attr->Desc->size;
 
596
    int arraySize = attr->Desc->arraySize;
 
597
    const char * dataPtr = attr->Data.string_value;
 
598
    
 
599
    if (tup.m_table->have_auto_inc(attr->Desc->attrId))
 
600
      tup.m_table->update_max_auto_val(dataPtr,size);
 
601
 
 
602
    const Uint32 length = (size / 8) * arraySize;
 
603
    if (attr->Desc->m_column->getPrimaryKey())
 
604
    {
 
605
      if(!keys.get(attr->Desc->attrId))
 
606
      {
 
607
        keys.set(attr->Desc->attrId);
 
608
        check= op->equal(attr->Desc->attrId, dataPtr, length);
 
609
      }
 
610
    }
 
611
    else
 
612
      check= op->setValue(attr->Desc->attrId, dataPtr, length);
 
613
    
 
614
    if (check != 0) 
 
615
    {
 
616
      err << "Error defining op: " << trans->getNdbError() << endl;
 
617
      exitHandler();
 
618
    } // if
 
619
  }
 
620
  
 
621
  const int ret = trans->execute(NdbTransaction::Commit);
 
622
  if (ret != 0)
 
623
  {
 
624
    // Both insert update and delete can fail during log running
 
625
    // and it's ok
 
626
    // TODO: check that the error is either tuple exists or tuple does not exist?
 
627
    bool ok= false;
 
628
    NdbError errobj= trans->getNdbError();
 
629
    switch(tup.m_type)
 
630
    {
 
631
    case LogEntry::LE_INSERT:
 
632
      if(errobj.status == NdbError::PermanentError &&
 
633
         errobj.classification == NdbError::ConstraintViolation)
 
634
        ok= true;
 
635
      break;
 
636
    case LogEntry::LE_UPDATE:
 
637
    case LogEntry::LE_DELETE:
 
638
      if(errobj.status == NdbError::PermanentError &&
 
639
         errobj.classification == NdbError::NoDataFound)
 
640
        ok= true;
 
641
      break;
 
642
    }
 
643
    if (!ok)
 
644
    {
 
645
      err << "execute failed: " << errobj << endl;
 
646
      exitHandler();
 
647
    }
 
648
  }
 
649
  
 
650
  m_ndb->closeTransaction(trans);
 
651
  m_logCount++;
 
652
}
 
653
 
 
654
void
 
655
BackupRestore::endOfLogEntrys()
 
656
{
 
657
  if (!m_restore)
 
658
    return;
 
659
 
 
660
  info << "Restored " << m_dataCount << " tuples and "
 
661
       << m_logCount << " log entries" << endl;
 
662
}
 
663
 
 
664
/*
 
665
 *   callback : This is called when the transaction is polled
 
666
 *              
 
667
 *   (This function must have three arguments: 
 
668
 *   - The result of the transaction, 
 
669
 *   - The NdbTransaction object, and 
 
670
 *   - A pointer to an arbitrary object.)
 
671
 */
 
672
 
 
673
static void
 
674
callback(int result, NdbTransaction* trans, void* aObject)
 
675
{
 
676
  restore_callback_t *cb = (restore_callback_t *)aObject;
 
677
  (cb->restore)->cback(result, cb);
 
678
}
 
679
 
 
680
#if 0 // old tuple impl
 
681
void
 
682
BackupRestore::tuple(const TupleS & tup)
 
683
{
 
684
  if (!m_restore)
 
685
    return;
 
686
  while (1) 
 
687
  {
 
688
    NdbTransaction * trans = m_ndb->startTransaction();
 
689
    if (trans == NULL) 
 
690
    {
 
691
      // Deep shit, TODO: handle the error
 
692
      ndbout << "Cannot start transaction" << endl;
 
693
      exitHandler();
 
694
    } // if
 
695
    
 
696
    const TableS * table = tup.getTable();
 
697
    NdbOperation * op = trans->getNdbOperation(table->getTableName());
 
698
    if (op == NULL) 
 
699
    {
 
700
      ndbout << "Cannot get operation: ";
 
701
      ndbout << trans->getNdbError() << endl;
 
702
      exitHandler();
 
703
    } // if
 
704
    
 
705
    // TODO: check return value and handle error
 
706
    if (op->writeTuple() == -1) 
 
707
    {
 
708
      ndbout << "writeTuple call failed: ";
 
709
      ndbout << trans->getNdbError() << endl;
 
710
      exitHandler();
 
711
    } // if
 
712
    
 
713
    for (int i = 0; i < tup.getNoOfAttributes(); i++) 
 
714
    {
 
715
      const AttributeS * attr = tup[i];
 
716
      int size = attr->Desc->size;
 
717
      int arraySize = attr->Desc->arraySize;
 
718
      const char * dataPtr = attr->Data.string_value;
 
719
      
 
720
      const Uint32 length = (size * arraySize) / 8;
 
721
      if (attr->Desc->m_column->getPrimaryKey()) 
 
722
        op->equal(i, dataPtr, length);
 
723
    }
 
724
    
 
725
    for (int i = 0; i < tup.getNoOfAttributes(); i++) 
 
726
    {
 
727
      const AttributeS * attr = tup[i];
 
728
      int size = attr->Desc->size;
 
729
      int arraySize = attr->Desc->arraySize;
 
730
      const char * dataPtr = attr->Data.string_value;
 
731
      
 
732
      const Uint32 length = (size * arraySize) / 8;
 
733
      if (!attr->Desc->m_column->getPrimaryKey())
 
734
        if (attr->Data.null)
 
735
          op->setValue(i, NULL, 0);
 
736
        else
 
737
          op->setValue(i, dataPtr, length);
 
738
    }
 
739
    int ret = trans->execute(NdbTransaction::Commit);
 
740
    if (ret != 0)
 
741
    {
 
742
      ndbout << "execute failed: ";
 
743
      ndbout << trans->getNdbError() << endl;
 
744
      exitHandler();
 
745
    }
 
746
    m_ndb->closeTransaction(trans);
 
747
    if (ret == 0)
 
748
      break;
 
749
  }
 
750
  m_dataCount++;
 
751
}
 
752
#endif
 
753
 
 
754
template class Vector<NdbDictionary::Table*>;
 
755
template class Vector<const NdbDictionary::Table*>;