1
/* Copyright (C) 2003 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; either version 2 of the License, or
6
(at your option) any later version.
8
This program is distributed in the hope that it will be useful,
9
but WITHOUT ANY WARRANTY; without even the implied warranty of
10
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
GNU General Public License for more details.
13
You should have received a copy of the GNU General Public License
14
along with this program; if not, write to the Free Software
15
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
#include <NDBT_ReturnCodes.h>
18
#include "consumer_restore.hpp"
21
extern my_bool opt_core;
23
extern FilteredNdbOut err;
24
extern FilteredNdbOut info;
25
extern FilteredNdbOut debug;
27
static void callback(int, NdbTransaction*, void*);
29
extern const char * g_connect_string;
30
extern BaseString g_options;
37
if (!m_restore && !m_restore_meta)
40
m_cluster_connection = new Ndb_cluster_connection(g_connect_string);
41
m_cluster_connection->set_name(g_options.c_str());
42
if(m_cluster_connection->connect(12, 5, 1) != 0)
47
m_ndb = new Ndb(m_cluster_connection);
53
if (m_ndb->waitUntilReady(30) != 0)
55
err << "Failed to connect to ndb!!" << endl;
58
info << "Connected to ndb!!" << endl;
60
m_callback = new restore_callback_t[m_parallelism];
64
err << "Failed to allocate callback structs" << endl;
68
m_free_callback= m_callback;
69
for (Uint32 i= 0; i < m_parallelism; i++) {
70
m_callback[i].restore= this;
71
m_callback[i].connection= 0;
73
m_callback[i-1].next= &(m_callback[i]);
75
m_callback[m_parallelism-1].next = 0;
80
void BackupRestore::release()
94
if (m_cluster_connection)
96
delete m_cluster_connection;
97
m_cluster_connection= 0;
101
BackupRestore::~BackupRestore()
108
match_blob(const char * name){
111
if((cnt = sscanf(name, "%[^/]/%[^/]/NDB$BLOB_%d_%d", buf, buf, &id1, &id2)) == 4){
118
const NdbDictionary::Table*
119
BackupRestore::get_table(const NdbDictionary::Table* tab){
120
if(m_cache.m_old_table == tab)
121
return m_cache.m_new_table;
122
m_cache.m_old_table = tab;
125
char db[256], schema[256];
126
if((cnt = sscanf(tab->getName(), "%[^/]/%[^/]/NDB$BLOB_%d_%d",
127
db, schema, &id1, &id2)) == 4){
128
m_ndb->setDatabaseName(db);
129
m_ndb->setSchemaName(schema);
131
BaseString::snprintf(db, sizeof(db), "NDB$BLOB_%d_%d",
132
m_new_tables[id1]->getTableId(), id2);
134
m_cache.m_new_table = m_ndb->getDictionary()->getTable(db);
137
m_cache.m_new_table = m_new_tables[tab->getTableId()];
139
assert(m_cache.m_new_table);
140
return m_cache.m_new_table;
144
BackupRestore::finalize_table(const TableS & table){
146
if (!m_restore && !m_restore_meta)
148
if (!table.have_auto_inc())
151
Uint64 max_val= table.get_max_auto_val();
154
Uint64 auto_val = ~(Uint64)0;
155
int r= m_ndb->readAutoIncrementValue(get_table(table.m_dictTable), auto_val);
156
if (r == -1 && m_ndb->getNdbError().status == NdbError::TemporaryError)
158
NdbSleep_MilliSleep(50);
161
else if (r == -1 && m_ndb->getNdbError().code != 626)
165
else if ((r == -1 && m_ndb->getNdbError().code == 626) ||
166
max_val+1 > auto_val || auto_val == ~(Uint64)0)
168
r= m_ndb->setAutoIncrementValue(get_table(table.m_dictTable),
171
m_ndb->getNdbError().status == NdbError::TemporaryError)
173
NdbSleep_MilliSleep(50);
183
BackupRestore::has_temp_error(){
188
BackupRestore::table(const TableS & table){
189
if (!m_restore && !m_restore_meta)
192
const char * name = table.getTableName();
197
if(match_blob(name) >= 0)
200
const NdbTableImpl & tmptab = NdbTableImpl::getImpl(* table.m_dictTable);
201
if(tmptab.m_indexType != NdbDictionary::Index::Undefined){
202
m_indexes.push_back(table.m_dictTable);
206
BaseString tmp(name);
207
Vector<BaseString> split;
208
if(tmp.split(split, "/") != 3){
209
err << "Invalid table name format " << name << endl;
213
m_ndb->setDatabaseName(split[0].c_str());
214
m_ndb->setSchemaName(split[1].c_str());
216
NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
218
NdbDictionary::Table copy(*table.m_dictTable);
220
copy.setName(split[2].c_str());
223
update min and max rows to reflect the table, this to
224
ensure that memory is allocated properly in the ndb kernel
226
copy.setMinRows(table.getNoOfRecords());
227
if (table.getNoOfRecords() > copy.getMaxRows())
229
copy.setMaxRows(table.getNoOfRecords());
232
if (dict->createTable(copy) == -1)
234
err << "Create table " << table.getTableName() << " failed: "
235
<< dict->getNdbError() << endl;
238
info << "Successfully restored table " << table.getTableName()<< endl ;
241
const NdbDictionary::Table* tab = dict->getTable(split[2].c_str());
243
err << "Unable to find table: " << split[2].c_str() << endl;
246
const NdbDictionary::Table* null = 0;
247
m_new_tables.fill(table.m_dictTable->getTableId(), null);
248
m_new_tables[table.m_dictTable->getTableId()] = tab;
253
BackupRestore::endOfTables(){
257
NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
258
for(size_t i = 0; i<m_indexes.size(); i++){
259
NdbTableImpl & indtab = NdbTableImpl::getImpl(* m_indexes[i]);
261
BaseString tmp(indtab.m_primaryTable.c_str());
262
Vector<BaseString> split;
263
if(tmp.split(split, "/") != 3){
264
err << "Invalid table name format " << indtab.m_primaryTable.c_str()
269
m_ndb->setDatabaseName(split[0].c_str());
270
m_ndb->setSchemaName(split[1].c_str());
272
const NdbDictionary::Table * prim = dict->getTable(split[2].c_str());
274
err << "Unable to find base table \"" << split[2].c_str()
276
<< indtab.getName() << endl;
279
NdbTableImpl& base = NdbTableImpl::getImpl(*prim);
282
char idxName[255], buf[255];
283
if(sscanf(indtab.getName(), "%[^/]/%[^/]/%d/%s",
284
buf, buf, &id, idxName) != 4){
285
err << "Invalid index name format " << indtab.getName() << endl;
288
if(NdbDictInterface::create_index_obj_from_table(&idx, &indtab, &base))
290
err << "Failed to create index " << idxName
291
<< " on " << split[2].c_str() << endl;
294
idx->setName(idxName);
295
if(dict->createIndex(* idx) != 0)
298
err << "Failed to create index " << idxName
299
<< " on " << split[2].c_str() << endl
300
<< dict->getNdbError() << endl;
305
info << "Successfully created index " << idxName
306
<< " on " << split[2].c_str() << endl;
311
void BackupRestore::tuple(const TupleS & tup)
316
while (m_free_callback == 0)
318
assert(m_transactions == m_parallelism);
319
// send-poll all transactions
320
// close transaction is done in callback
321
m_ndb->sendPollNdb(3000, 1);
324
restore_callback_t * cb = m_free_callback;
329
m_free_callback = cb->next;
331
cb->tup = tup; // must do copy!
336
void BackupRestore::tuple_a(restore_callback_t *cb)
338
while (cb->retries < 10)
343
cb->connection = m_ndb->startTransaction();
344
if (cb->connection == NULL)
346
if (errorHandler(cb))
348
m_ndb->sendPollNdb(3000, 1);
354
const TupleS &tup = cb->tup;
355
const NdbDictionary::Table * table = get_table(tup.getTable()->m_dictTable);
357
NdbOperation * op = cb->connection->getNdbOperation(table);
361
if (errorHandler(cb))
366
if (op->writeTuple() == -1)
368
if (errorHandler(cb))
374
for (int j = 0; j < 2; j++)
376
for (int i = 0; i < tup.getNoOfAttributes(); i++)
378
const AttributeDesc * attr_desc = tup.getDesc(i);
379
const AttributeData * attr_data = tup.getData(i);
380
int size = attr_desc->size;
381
int arraySize = attr_desc->arraySize;
382
char * dataPtr = attr_data->string_value;
383
Uint32 length = (size * arraySize) / 8;
385
if (j == 0 && tup.getTable()->have_auto_inc(i))
386
tup.getTable()->update_max_auto_val(dataPtr,size);
388
if (attr_desc->m_column->getPrimaryKey())
390
if (j == 1) continue;
391
ret = op->equal(i, dataPtr, length);
395
if (j == 0) continue;
397
ret = op->setValue(i, NULL, 0);
399
ret = op->setValue(i, dataPtr, length);
402
ndbout_c("Column: %d type %d %d %d %d",i,
403
attr_desc->m_column->getType(),
404
size, arraySize, attr_data->size);
413
if (errorHandler(cb))
418
// Prepare transaction (the transaction is NOT yet sent to NDB)
419
cb->connection->executeAsynchPrepare(NdbTransaction::Commit,
424
err << "Retried transaction " << cb->retries << " times.\nLast error"
425
<< m_ndb->getNdbError(cb->error_code) << endl
426
<< "...Unable to recover from errors. Exiting..." << endl;
430
void BackupRestore::cback(int result, restore_callback_t *cb)
437
* Error. temporary or permanent?
439
if (errorHandler(cb))
440
tuple_a(cb); // retry
443
err << "Restore: Failed to restore data due to a unrecoverable error. Exiting..." << endl;
450
* OK! close transaction
452
m_ndb->closeTransaction(cb->connection);
454
cb->next= m_free_callback;
461
* returns true if is recoverable,
462
* Error handling based on hugo
463
* false if it is an error that generates an abort.
465
bool BackupRestore::errorHandler(restore_callback_t *cb)
470
error= cb->connection->getNdbError();
471
m_ndb->closeTransaction(cb->connection);
476
error= m_ndb->getNdbError();
479
Uint32 sleepTime = 100 + cb->retries * 300;
482
cb->error_code = error.code;
486
case NdbError::Success:
491
case NdbError::TemporaryError:
492
err << "Temporary error: " << error << endl;
494
NdbSleep_MilliSleep(sleepTime);
499
case NdbError::UnknownResult:
500
err << error << endl;
506
case NdbError::PermanentError:
508
err << error << endl;
515
void BackupRestore::exitHandler()
518
NDBT_ProgramExit(NDBT_FAILED);
527
BackupRestore::tuple_free()
532
// Poll all transactions
533
while (m_transactions)
535
m_ndb->sendPollNdb(3000);
540
BackupRestore::endOfTuples()
546
BackupRestore::logEntry(const LogEntry & tup)
551
NdbTransaction * trans = m_ndb->startTransaction();
554
// Deep shit, TODO: handle the error
555
err << "Cannot start transaction" << endl;
559
const NdbDictionary::Table * table = get_table(tup.m_table->m_dictTable);
560
NdbOperation * op = trans->getNdbOperation(table);
563
err << "Cannot get operation: " << trans->getNdbError() << endl;
570
case LogEntry::LE_INSERT:
571
check = op->insertTuple();
573
case LogEntry::LE_UPDATE:
574
check = op->updateTuple();
576
case LogEntry::LE_DELETE:
577
check = op->deleteTuple();
580
err << "Log entry has wrong operation type."
587
err << "Error defining op: " << trans->getNdbError() << endl;
592
for (Uint32 i= 0; i < tup.size(); i++)
594
const AttributeS * attr = tup[i];
595
int size = attr->Desc->size;
596
int arraySize = attr->Desc->arraySize;
597
const char * dataPtr = attr->Data.string_value;
599
if (tup.m_table->have_auto_inc(attr->Desc->attrId))
600
tup.m_table->update_max_auto_val(dataPtr,size);
602
const Uint32 length = (size / 8) * arraySize;
603
if (attr->Desc->m_column->getPrimaryKey())
605
if(!keys.get(attr->Desc->attrId))
607
keys.set(attr->Desc->attrId);
608
check= op->equal(attr->Desc->attrId, dataPtr, length);
612
check= op->setValue(attr->Desc->attrId, dataPtr, length);
616
err << "Error defining op: " << trans->getNdbError() << endl;
621
const int ret = trans->execute(NdbTransaction::Commit);
624
// Both insert update and delete can fail during log running
626
// TODO: check that the error is either tuple exists or tuple does not exist?
628
NdbError errobj= trans->getNdbError();
631
case LogEntry::LE_INSERT:
632
if(errobj.status == NdbError::PermanentError &&
633
errobj.classification == NdbError::ConstraintViolation)
636
case LogEntry::LE_UPDATE:
637
case LogEntry::LE_DELETE:
638
if(errobj.status == NdbError::PermanentError &&
639
errobj.classification == NdbError::NoDataFound)
645
err << "execute failed: " << errobj << endl;
650
m_ndb->closeTransaction(trans);
655
BackupRestore::endOfLogEntrys()
660
info << "Restored " << m_dataCount << " tuples and "
661
<< m_logCount << " log entries" << endl;
665
* callback : This is called when the transaction is polled
667
* (This function must have three arguments:
668
* - The result of the transaction,
669
* - The NdbTransaction object, and
670
* - A pointer to an arbitrary object.)
674
callback(int result, NdbTransaction* trans, void* aObject)
676
restore_callback_t *cb = (restore_callback_t *)aObject;
677
(cb->restore)->cback(result, cb);
680
#if 0 // old tuple impl
682
BackupRestore::tuple(const TupleS & tup)
688
NdbTransaction * trans = m_ndb->startTransaction();
691
// Deep shit, TODO: handle the error
692
ndbout << "Cannot start transaction" << endl;
696
const TableS * table = tup.getTable();
697
NdbOperation * op = trans->getNdbOperation(table->getTableName());
700
ndbout << "Cannot get operation: ";
701
ndbout << trans->getNdbError() << endl;
705
// TODO: check return value and handle error
706
if (op->writeTuple() == -1)
708
ndbout << "writeTuple call failed: ";
709
ndbout << trans->getNdbError() << endl;
713
for (int i = 0; i < tup.getNoOfAttributes(); i++)
715
const AttributeS * attr = tup[i];
716
int size = attr->Desc->size;
717
int arraySize = attr->Desc->arraySize;
718
const char * dataPtr = attr->Data.string_value;
720
const Uint32 length = (size * arraySize) / 8;
721
if (attr->Desc->m_column->getPrimaryKey())
722
op->equal(i, dataPtr, length);
725
for (int i = 0; i < tup.getNoOfAttributes(); i++)
727
const AttributeS * attr = tup[i];
728
int size = attr->Desc->size;
729
int arraySize = attr->Desc->arraySize;
730
const char * dataPtr = attr->Data.string_value;
732
const Uint32 length = (size * arraySize) / 8;
733
if (!attr->Desc->m_column->getPrimaryKey())
735
op->setValue(i, NULL, 0);
737
op->setValue(i, dataPtr, length);
739
int ret = trans->execute(NdbTransaction::Commit);
742
ndbout << "execute failed: ";
743
ndbout << trans->getNdbError() << endl;
746
m_ndb->closeTransaction(trans);
754
template class Vector<NdbDictionary::Table*>;
755
template class Vector<const NdbDictionary::Table*>;