1
/* Copyright (C) 2003 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#include <NDBT_ReturnCodes.h>
17
#include "consumer_restore.hpp"
21
extern my_bool opt_core;
23
extern FilteredNdbOut err;
24
extern FilteredNdbOut info;
25
extern FilteredNdbOut debug;
27
static void callback(int, NdbTransaction*, void*);
28
static Uint32 get_part_id(const NdbDictionary::Table *table,
31
extern const char * g_connect_string;
32
extern BaseString g_options;
39
if (!m_restore && !m_restore_meta && !m_restore_epoch)
42
m_cluster_connection = new Ndb_cluster_connection(g_connect_string);
43
m_cluster_connection->set_name(g_options.c_str());
44
if(m_cluster_connection->connect(12, 5, 1) != 0)
49
m_ndb = new Ndb(m_cluster_connection);
55
if (m_ndb->waitUntilReady(30) != 0)
57
err << "Failed to connect to ndb!!" << endl;
60
info << "Connected to ndb!!" << endl;
62
m_callback = new restore_callback_t[m_parallelism];
66
err << "Failed to allocate callback structs" << endl;
70
m_free_callback= m_callback;
71
for (Uint32 i= 0; i < m_parallelism; i++) {
72
m_callback[i].restore= this;
73
m_callback[i].connection= 0;
75
m_callback[i-1].next= &(m_callback[i]);
77
m_callback[m_parallelism-1].next = 0;
82
void BackupRestore::release()
96
if (m_cluster_connection)
98
delete m_cluster_connection;
99
m_cluster_connection= 0;
103
BackupRestore::~BackupRestore()
110
match_blob(const char * name){
113
if((cnt = sscanf(name, "%[^/]/%[^/]/NDB$BLOB_%d_%d", buf, buf, &id1, &id2)) == 4){
120
const NdbDictionary::Table*
121
BackupRestore::get_table(const NdbDictionary::Table* tab){
122
if(m_cache.m_old_table == tab)
123
return m_cache.m_new_table;
124
m_cache.m_old_table = tab;
127
char db[256], schema[256];
128
if((cnt = sscanf(tab->getName(), "%[^/]/%[^/]/NDB$BLOB_%d_%d",
129
db, schema, &id1, &id2)) == 4){
130
m_ndb->setDatabaseName(db);
131
m_ndb->setSchemaName(schema);
133
BaseString::snprintf(db, sizeof(db), "NDB$BLOB_%d_%d",
134
m_new_tables[id1]->getTableId(), id2);
136
m_cache.m_new_table = m_ndb->getDictionary()->getTable(db);
139
m_cache.m_new_table = m_new_tables[tab->getTableId()];
141
assert(m_cache.m_new_table);
142
return m_cache.m_new_table;
146
BackupRestore::finalize_table(const TableS & table){
148
if (!m_restore && !m_restore_meta)
150
if (!table.have_auto_inc())
153
Uint64 max_val= table.get_max_auto_val();
156
Uint64 auto_val = ~(Uint64)0;
157
int r= m_ndb->readAutoIncrementValue(get_table(table.m_dictTable), auto_val);
158
if (r == -1 && m_ndb->getNdbError().status == NdbError::TemporaryError)
160
NdbSleep_MilliSleep(50);
163
else if (r == -1 && m_ndb->getNdbError().code != 626)
167
else if ((r == -1 && m_ndb->getNdbError().code == 626) ||
168
max_val+1 > auto_val || auto_val == ~(Uint64)0)
170
r= m_ndb->setAutoIncrementValue(get_table(table.m_dictTable),
173
m_ndb->getNdbError().status == NdbError::TemporaryError)
175
NdbSleep_MilliSleep(50);
186
static bool default_nodegroups(NdbDictionary::Table *table)
188
Uint16 *node_groups = (Uint16*)table->getFragmentData();
189
Uint32 no_parts = table->getFragmentDataLen() >> 1;
192
if (node_groups[0] != 0)
194
for (i = 1; i < no_parts; i++)
196
if (node_groups[i] != UNDEF_NODEGROUP)
204
static Uint32 get_no_fragments(Uint64 max_rows, Uint32 no_nodes)
207
Uint32 acc_row_size = 27;
208
Uint32 acc_fragment_size = 512*1024*1024;
209
Uint32 no_parts= (max_rows*acc_row_size)/acc_fragment_size + 1;
210
Uint32 reported_parts = no_nodes;
211
while (reported_parts < no_parts && ++i < 4 &&
212
(reported_parts + no_parts) < MAX_NDB_PARTITIONS)
213
reported_parts+= no_nodes;
214
if (reported_parts < no_parts)
216
err << "Table will be restored but will not be able to handle the maximum";
217
err << " amount of rows as requested" << endl;
219
return reported_parts;
223
static void set_default_nodegroups(NdbDictionary::Table *table)
225
Uint32 no_parts = table->getFragmentCount();
226
Uint16 node_group[MAX_NDB_PARTITIONS];
230
for (i = 1; i < no_parts; i++)
232
node_group[i] = UNDEF_NODEGROUP;
234
table->setFragmentData((const void*)node_group, 2 * no_parts);
237
Uint32 BackupRestore::map_ng(Uint32 ng)
239
NODE_GROUP_MAP *ng_map = m_nodegroup_map;
241
if (ng == UNDEF_NODEGROUP ||
242
ng_map[ng].map_array[0] == UNDEF_NODEGROUP)
249
Uint32 curr_inx = ng_map[ng].curr_index;
250
Uint32 new_curr_inx = curr_inx + 1;
252
assert(ng < MAX_NDB_PARTITIONS);
253
assert(curr_inx < MAX_MAPS_PER_NODE_GROUP);
254
assert(new_curr_inx < MAX_MAPS_PER_NODE_GROUP);
256
if (new_curr_inx >= MAX_MAPS_PER_NODE_GROUP)
258
else if (ng_map[ng].map_array[new_curr_inx] == UNDEF_NODEGROUP)
260
new_ng = ng_map[ng].map_array[curr_inx];
261
ng_map[ng].curr_index = new_curr_inx;
267
bool BackupRestore::map_nodegroups(Uint16 *ng_array, Uint32 no_parts)
271
DBUG_ENTER("map_nodegroups");
273
assert(no_parts < MAX_NDB_PARTITIONS);
274
for (i = 0; i < no_parts; i++)
277
ng = map_ng((Uint32)ng_array[i]);
278
if (ng != ng_array[i])
286
static void copy_byte(const char **data, char **new_data, uint *len)
295
bool BackupRestore::search_replace(char *search_str, char **new_data,
296
const char **data, const char *end_data,
299
uint search_str_len = strlen(search_str);
301
bool in_delimiters = FALSE;
302
bool escape_char = FALSE;
303
char start_delimiter = 0;
304
DBUG_ENTER("search_replace");
309
copy_byte(data, new_data, new_data_len);
314
else if (in_delimiters)
316
if (c == start_delimiter)
317
in_delimiters = FALSE;
319
else if (c == '\'' || c == '\"')
321
in_delimiters = TRUE;
328
else if (c == search_str[inx])
331
if (inx == search_str_len)
335
while (*data != end_data)
340
number = (10 * number) + (**data);
341
if (number > MAX_NDB_NODES)
347
After long and tedious preparations we have actually found
348
a node group identifier to convert. We'll use the mapping
349
table created for node groups and then insert the new number
350
instead of the old number.
352
uint temp = map_ng(number);
357
digits[no_digits] = temp % 10;
361
for (no_digits--; no_digits >= 0; no_digits--)
363
**new_data = digits[no_digits];
377
} while (*data < end_data);
381
bool BackupRestore::map_in_frm(char *new_data, const char *data,
382
uint data_len, uint *new_data_len)
384
const char *end_data= data + data_len;
385
const char *end_part_data;
386
const char *part_data;
388
uint start_key_definition_len = uint2korr(data + 6);
389
uint key_definition_len = uint4korr(data + 47);
391
DBUG_ENTER("map_in_frm");
393
if (data_len < 4096) goto error;
394
extra_ptr = (char*)data + start_key_definition_len + key_definition_len;
395
if ((int)data_len < ((extra_ptr - data) + 2)) goto error;
396
extra_ptr = extra_ptr + 2 + uint2korr(extra_ptr);
397
if ((int)data_len < ((extra_ptr - data) + 2)) goto error;
398
extra_ptr = extra_ptr + 2 + uint2korr(extra_ptr);
399
if ((int)data_len < ((extra_ptr - data) + 4)) goto error;
400
part_info_len = uint4korr(extra_ptr);
401
part_data = extra_ptr + 4;
402
if ((int)data_len < ((part_data + part_info_len) - data)) goto error;
406
copy_byte(&data, &new_data, new_data_len);
407
} while (data < part_data);
408
end_part_data = part_data + part_info_len;
411
if (search_replace((char*)" NODEGROUP = ", &new_data, &data,
412
end_part_data, new_data_len))
414
} while (data != end_part_data);
417
copy_byte(&data, &new_data, new_data_len);
418
} while (data < end_data);
425
bool BackupRestore::translate_frm(NdbDictionary::Table *table)
427
uchar *pack_data, *data, *new_pack_data;
430
size_t data_len, new_pack_len;
431
uint no_parts, extra_growth;
432
DBUG_ENTER("translate_frm");
434
pack_data = (uchar*) table->getFrmData();
435
no_parts = table->getFragmentCount();
437
Add max 4 characters per partition to handle worst case
438
of mapping from single digit to 5-digit number.
439
Fairly future-proof, ok up to 99999 node groups.
441
extra_growth = no_parts * 4;
442
if (unpackfrm(&data, &data_len, pack_data))
446
if ((new_data = (char*) my_malloc(data_len + extra_growth, MYF(0))))
450
if (map_in_frm(new_data, (const char*)data, data_len, &new_data_len))
452
my_free(new_data, MYF(0));
455
if (packfrm((uchar*) new_data, new_data_len,
456
&new_pack_data, &new_pack_len))
458
my_free(new_data, MYF(0));
461
table->setFrm(new_pack_data, new_pack_len);
465
#include <signaldata/DictTabInfo.hpp>
468
BackupRestore::object(Uint32 type, const void * ptr)
473
NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
475
case DictTabInfo::Tablespace:
477
NdbDictionary::Tablespace old(*(NdbDictionary::Tablespace*)ptr);
479
Uint32 id = old.getObjectId();
481
if (!m_no_restore_disk)
483
NdbDictionary::LogfileGroup * lg = m_logfilegroups[old.getDefaultLogfileGroupId()];
484
old.setDefaultLogfileGroup(* lg);
485
info << "Creating tablespace: " << old.getName() << "..." << flush;
486
int ret = dict->createTablespace(old);
489
NdbError errobj= dict->getNdbError();
490
info << "FAILED" << endl;
491
err << "Create tablespace failed: " << old.getName() << ": " << errobj << endl;
494
info << "done" << endl;
497
NdbDictionary::Tablespace curr = dict->getTablespace(old.getName());
498
NdbError errobj = dict->getNdbError();
499
if ((int) errobj.classification == (int) ndberror_cl_none)
501
NdbDictionary::Tablespace* currptr = new NdbDictionary::Tablespace(curr);
502
NdbDictionary::Tablespace * null = 0;
503
m_tablespaces.set(currptr, id, null);
504
debug << "Retreived tablespace: " << currptr->getName()
505
<< " oldid: " << id << " newid: " << currptr->getObjectId()
506
<< " " << (void*)currptr << endl;
510
err << "Failed to retrieve tablespace \"" << old.getName() << "\": "
516
case DictTabInfo::LogfileGroup:
518
NdbDictionary::LogfileGroup old(*(NdbDictionary::LogfileGroup*)ptr);
520
Uint32 id = old.getObjectId();
522
if (!m_no_restore_disk)
524
info << "Creating logfile group: " << old.getName() << "..." << flush;
525
int ret = dict->createLogfileGroup(old);
528
NdbError errobj= dict->getNdbError();
529
info << "FAILED" << endl;
530
err << "Create logfile group failed: " << old.getName() << ": " << errobj << endl;
533
info << "done" << endl;
536
NdbDictionary::LogfileGroup curr = dict->getLogfileGroup(old.getName());
537
NdbError errobj = dict->getNdbError();
538
if ((int) errobj.classification == (int) ndberror_cl_none)
540
NdbDictionary::LogfileGroup* currptr =
541
new NdbDictionary::LogfileGroup(curr);
542
NdbDictionary::LogfileGroup * null = 0;
543
m_logfilegroups.set(currptr, id, null);
544
debug << "Retreived logfile group: " << currptr->getName()
545
<< " oldid: " << id << " newid: " << currptr->getObjectId()
546
<< " " << (void*)currptr << endl;
550
err << "Failed to retrieve logfile group \"" << old.getName() << "\": "
556
case DictTabInfo::Datafile:
558
if (!m_no_restore_disk)
560
NdbDictionary::Datafile old(*(NdbDictionary::Datafile*)ptr);
561
NdbDictionary::ObjectId objid;
562
old.getTablespaceId(&objid);
563
NdbDictionary::Tablespace * ts = m_tablespaces[objid.getObjectId()];
564
debug << "Connecting datafile " << old.getPath()
565
<< " to tablespace: oldid: " << objid.getObjectId()
566
<< " newid: " << ts->getObjectId() << endl;
567
old.setTablespace(* ts);
568
info << "Creating datafile \"" << old.getPath() << "\"..." << flush;
569
if (dict->createDatafile(old))
571
NdbError errobj= dict->getNdbError();
572
info << "FAILED" << endl;
573
err << "Create datafile failed: " << old.getPath() << ": " << errobj << endl;
576
info << "done" << endl;
581
case DictTabInfo::Undofile:
583
if (!m_no_restore_disk)
585
NdbDictionary::Undofile old(*(NdbDictionary::Undofile*)ptr);
586
NdbDictionary::ObjectId objid;
587
old.getLogfileGroupId(&objid);
588
NdbDictionary::LogfileGroup * lg = m_logfilegroups[objid.getObjectId()];
589
debug << "Connecting undofile " << old.getPath()
590
<< " to logfile group: oldid: " << objid.getObjectId()
591
<< " newid: " << lg->getObjectId()
592
<< " " << (void*)lg << endl;
593
old.setLogfileGroup(* lg);
594
info << "Creating undofile \"" << old.getPath() << "\"..." << flush;
595
if (dict->createUndofile(old))
597
NdbError errobj= dict->getNdbError();
598
info << "FAILED" << endl;
599
err << "Create undofile failed: " << old.getPath() << ": " << errobj << endl;
602
info << "done" << endl;
612
BackupRestore::has_temp_error(){
617
BackupRestore::update_apply_status(const RestoreMetaData &metaData)
619
if (!m_restore_epoch)
623
unsigned apply_table_format= 0;
625
m_ndb->setDatabaseName(NDB_REP_DB);
626
m_ndb->setSchemaName("def");
628
NdbDictionary::Dictionary *dict= m_ndb->getDictionary();
629
const NdbDictionary::Table *ndbtab= dict->getTable(Ndb_apply_table);
632
err << Ndb_apply_table << ": "
633
<< dict->getNdbError() << endl;
637
(ndbtab->getColumn(0)->getType() == NdbDictionary::Column::Unsigned &&
638
ndbtab->getColumn(1)->getType() == NdbDictionary::Column::Bigunsigned)
640
if (ndbtab->getNoOfColumns() == 2)
642
apply_table_format= 1;
645
(ndbtab->getColumn(2)->getType() == NdbDictionary::Column::Varchar &&
646
ndbtab->getColumn(3)->getType() == NdbDictionary::Column::Bigunsigned &&
647
ndbtab->getColumn(4)->getType() == NdbDictionary::Column::Bigunsigned)
649
apply_table_format= 2;
652
if (apply_table_format == 0)
654
err << Ndb_apply_table << " has wrong format\n";
659
Uint64 epoch= metaData.getStopGCP();
661
char empty_string[1];
663
NdbTransaction * trans= m_ndb->startTransaction();
666
err << Ndb_apply_table << ": "
667
<< m_ndb->getNdbError() << endl;
670
NdbOperation * op= trans->getNdbOperation(ndbtab);
673
err << Ndb_apply_table << ": "
674
<< trans->getNdbError() << endl;
677
if (op->writeTuple() ||
678
op->equal(0u, (const char *)&server_id, sizeof(server_id)) ||
679
op->setValue(1u, (const char *)&epoch, sizeof(epoch)))
681
err << Ndb_apply_table << ": "
682
<< op->getNdbError() << endl;
685
if ((apply_table_format == 2) &&
686
(op->setValue(2u, (const char *)&empty_string, 1) ||
687
op->setValue(3u, (const char *)&zero, sizeof(zero)) ||
688
op->setValue(4u, (const char *)&zero, sizeof(zero))))
690
err << Ndb_apply_table << ": "
691
<< op->getNdbError() << endl;
694
if (trans->execute(NdbTransaction::Commit))
696
err << Ndb_apply_table << ": "
697
<< trans->getNdbError() << endl;
702
m_ndb->closeTransaction(trans);
707
BackupRestore::table_equal(const TableS &tableS)
712
const char *tablename = tableS.getTableName();
714
if(tableS.m_dictTable == NULL){
715
ndbout<<"Table %s has no m_dictTable " << tablename << endl;
721
if(match_blob(tablename) >= 0)
724
const NdbTableImpl & tmptab = NdbTableImpl::getImpl(* tableS.m_dictTable);
725
if ((int) tmptab.m_indexType != (int) NdbDictionary::Index::Undefined){
729
BaseString tmp(tablename);
730
Vector<BaseString> split;
731
if(tmp.split(split, "/") != 3){
732
err << "Invalid table name format " << tablename << endl;
736
m_ndb->setDatabaseName(split[0].c_str());
737
m_ndb->setSchemaName(split[1].c_str());
739
NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
740
const NdbDictionary::Table* tab = dict->getTable(split[2].c_str());
742
err << "Unable to find table: " << split[2].c_str() << endl;
746
if(tab->getNoOfColumns() != tableS.m_dictTable->getNoOfColumns())
748
ndbout_c("m_columns.size %d != %d",tab->getNoOfColumns(),
749
tableS.m_dictTable->getNoOfColumns());
753
for(int i = 0; i<tab->getNoOfColumns(); i++)
755
if(!tab->getColumn(i)->equal(*(tableS.m_dictTable->getColumn(i))))
757
ndbout_c("m_columns %s != %s",tab->getColumn(i)->getName(),
758
tableS.m_dictTable->getColumn(i)->getName());
767
BackupRestore::createSystable(const TableS & tables){
768
if (!m_restore && !m_restore_meta && !m_restore_epoch)
770
const char *tablename = tables.getTableName();
772
if( strcmp(tablename, NDB_REP_DB "/def/" NDB_APPLY_TABLE) != 0 &&
773
strcmp(tablename, NDB_REP_DB "/def/" NDB_SCHEMA_TABLE) != 0 )
778
BaseString tmp(tablename);
779
Vector<BaseString> split;
780
if(tmp.split(split, "/") != 3){
781
err << "Invalid table name format " << tablename << endl;
785
m_ndb->setDatabaseName(split[0].c_str());
786
m_ndb->setSchemaName(split[1].c_str());
788
NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
789
if( dict->getTable(split[2].c_str()) != NULL ){
792
return table(tables);
796
BackupRestore::table(const TableS & table){
797
if (!m_restore && !m_restore_meta)
800
const char * name = table.getTableName();
805
if(match_blob(name) >= 0)
808
const NdbTableImpl & tmptab = NdbTableImpl::getImpl(* table.m_dictTable);
809
if ((int) tmptab.m_indexType != (int) NdbDictionary::Index::Undefined){
810
m_indexes.push_back(table.m_dictTable);
814
BaseString tmp(name);
815
Vector<BaseString> split;
816
if(tmp.split(split, "/") != 3){
817
err << "Invalid table name format `" << name << "`" << endl;
821
m_ndb->setDatabaseName(split[0].c_str());
822
m_ndb->setSchemaName(split[1].c_str());
824
NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
827
NdbDictionary::Table copy(*table.m_dictTable);
829
copy.setName(split[2].c_str());
831
if (copy.getTablespace(&id))
833
debug << "Connecting " << name << " to tablespace oldid: " << id << flush;
834
NdbDictionary::Tablespace* ts = m_tablespaces[id];
835
debug << " newid: " << ts->getObjectId() << endl;
836
copy.setTablespace(* ts);
839
if (copy.getDefaultNoPartitionsFlag())
842
Table was defined with default number of partitions. We can restore
843
it with whatever is the default in this cluster.
844
We use the max_rows parameter in calculating the default number.
846
Uint32 no_nodes = m_cluster_connection->no_db_nodes();
847
copy.setFragmentCount(get_no_fragments(copy.getMaxRows(),
849
set_default_nodegroups(©);
854
Table was defined with specific number of partitions. It should be
855
restored with the same number of partitions. It will either be
856
restored in the same node groups as when backup was taken or by
857
using a node group map supplied to the ndb_restore program.
859
Uint16 *ng_array = (Uint16*)copy.getFragmentData();
860
Uint16 no_parts = copy.getFragmentCount();
861
if (map_nodegroups(ng_array, no_parts))
863
if (translate_frm(©))
865
err << "Create table " << table.getTableName() << " failed: ";
866
err << "Translate frm error" << endl;
870
copy.setFragmentData((const void *)ng_array, no_parts << 1);
874
* Force of varpart was introduced in 5.1.18, telco 6.1.7 and 6.2.1
875
* Since default from mysqld is to add force of varpart (disable with
876
* ROW_FORMAT=FIXED) we force varpart onto tables when they are restored
877
* from backups taken with older versions. This will be wrong if
878
* ROW_FORMAT=FIXED was used on original table, however the likelyhood of
879
* this is low, since ROW_FORMAT= was a NOOP in older versions.
882
if (table.getBackupVersion() < MAKE_VERSION(5,1,18))
883
copy.setForceVarPart(true);
884
else if (getMajor(table.getBackupVersion()) == 6 &&
885
(table.getBackupVersion() < MAKE_VERSION(6,1,7) ||
886
table.getBackupVersion() == MAKE_VERSION(6,2,0)))
887
copy.setForceVarPart(true);
890
update min and max rows to reflect the table, this to
891
ensure that memory is allocated properly in the ndb kernel
893
copy.setMinRows(table.getNoOfRecords());
894
if (table.getNoOfRecords() > copy.getMaxRows())
896
copy.setMaxRows(table.getNoOfRecords());
899
NdbTableImpl &tableImpl = NdbTableImpl::getImpl(copy);
900
if (table.getBackupVersion() < MAKE_VERSION(5,1,0) && !m_no_upgrade){
901
for(int i= 0; i < copy.getNoOfColumns(); i++)
903
NdbDictionary::Column::Type t = copy.getColumn(i)->getType();
905
if (t == NdbDictionary::Column::Varchar ||
906
t == NdbDictionary::Column::Varbinary)
907
tableImpl.getColumn(i)->setArrayType(NdbDictionary::Column::ArrayTypeShortVar);
908
if (t == NdbDictionary::Column::Longvarchar ||
909
t == NdbDictionary::Column::Longvarbinary)
910
tableImpl.getColumn(i)->setArrayType(NdbDictionary::Column::ArrayTypeMediumVar);
914
if (dict->createTable(copy) == -1)
916
err << "Create table `" << table.getTableName() << "` failed: "
917
<< dict->getNdbError() << endl;
918
if (dict->getNdbError().code == 771)
921
The user on the cluster where the backup was created had specified
922
specific node groups for partitions. Some of these node groups
923
didn't exist on this cluster. We will warn the user of this and
924
inform him of his option.
926
err << "The node groups defined in the table didn't exist in this";
927
err << " cluster." << endl << "There is an option to use the";
928
err << " the parameter ndb-nodegroup-map to define a mapping from";
929
err << endl << "the old nodegroups to new nodegroups" << endl;
933
info << "Successfully restored table `"
934
<< table.getTableName() << "`" << endl;
937
const NdbDictionary::Table* tab = dict->getTable(split[2].c_str());
939
err << "Unable to find table: `" << split[2].c_str() << "`" << endl;
944
if (tab->getFrmData())
946
// a MySQL Server table is restored, thus an event should be created
947
BaseString event_name("REPL$");
948
event_name.append(split[0].c_str());
949
event_name.append("/");
950
event_name.append(split[2].c_str());
952
NdbDictionary::Event my_event(event_name.c_str());
953
my_event.setTable(*tab);
954
my_event.addTableEvent(NdbDictionary::Event::TE_ALL);
956
// add all columns to the event
957
bool has_blobs = false;
958
for(int a= 0; a < tab->getNoOfColumns(); a++)
960
my_event.addEventColumn(a);
961
NdbDictionary::Column::Type t = tab->getColumn(a)->getType();
962
if (t == NdbDictionary::Column::Blob ||
963
t == NdbDictionary::Column::Text)
967
my_event.mergeEvents(true);
969
while ( dict->createEvent(my_event) ) // Add event to database
971
if (dict->getNdbError().classification == NdbError::SchemaObjectExists)
973
info << "Event for table " << table.getTableName()
974
<< " already exists, removing.\n";
975
if (!dict->dropEvent(my_event.getName()))
978
err << "Create table event for " << table.getTableName() << " failed: "
979
<< dict->getNdbError() << endl;
980
dict->dropTable(split[2].c_str());
983
info << "Successfully restored table event " << event_name << endl ;
986
const NdbDictionary::Table* null = 0;
987
m_new_tables.fill(table.m_dictTable->getTableId(), null);
988
m_new_tables[table.m_dictTable->getTableId()] = tab;
993
BackupRestore::endOfTables(){
997
NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
998
for(size_t i = 0; i<m_indexes.size(); i++){
999
NdbTableImpl & indtab = NdbTableImpl::getImpl(* m_indexes[i]);
1001
Vector<BaseString> split;
1003
BaseString tmp(indtab.m_primaryTable.c_str());
1004
if (tmp.split(split, "/") != 3)
1006
err << "Invalid table name format `" << indtab.m_primaryTable.c_str()
1012
m_ndb->setDatabaseName(split[0].c_str());
1013
m_ndb->setSchemaName(split[1].c_str());
1015
const NdbDictionary::Table * prim = dict->getTable(split[2].c_str());
1017
err << "Unable to find base table `" << split[2].c_str()
1019
<< indtab.getName() << "`" << endl;
1022
NdbTableImpl& base = NdbTableImpl::getImpl(*prim);
1024
Vector<BaseString> split_idx;
1026
BaseString tmp(indtab.getName());
1027
if (tmp.split(split_idx, "/") != 4)
1029
err << "Invalid index name format `" << indtab.getName() << "`" << endl;
1033
if(NdbDictInterface::create_index_obj_from_table(&idx, &indtab, &base))
1035
err << "Failed to create index `" << split_idx[3]
1036
<< "` on " << split[2].c_str() << endl;
1039
idx->setName(split_idx[3].c_str());
1040
if(dict->createIndex(* idx) != 0)
1043
err << "Failed to create index `" << split_idx[3].c_str()
1044
<< "` on `" << split[2].c_str() << "`" << endl
1045
<< dict->getNdbError() << endl;
1050
info << "Successfully created index `" << split_idx[3].c_str()
1051
<< "` on `" << split[2].c_str() << "`" << endl;
1056
void BackupRestore::tuple(const TupleS & tup, Uint32 fragmentId)
1061
while (m_free_callback == 0)
1063
assert(m_transactions == m_parallelism);
1064
// send-poll all transactions
1065
// close transaction is done in callback
1066
m_ndb->sendPollNdb(3000, 1);
1069
restore_callback_t * cb = m_free_callback;
1074
m_free_callback = cb->next;
1076
cb->fragId = fragmentId;
1077
cb->tup = tup; // must do copy!
1082
void BackupRestore::tuple_a(restore_callback_t *cb)
1084
Uint32 partition_id = cb->fragId;
1085
while (cb->retries < 10)
1088
* start transactions
1090
cb->connection = m_ndb->startTransaction();
1091
if (cb->connection == NULL)
1093
if (errorHandler(cb))
1095
m_ndb->sendPollNdb(3000, 1);
1098
err << "Cannot start transaction" << endl;
1102
const TupleS &tup = cb->tup;
1103
const NdbDictionary::Table * table = get_table(tup.getTable()->m_dictTable);
1105
NdbOperation * op = cb->connection->getNdbOperation(table);
1109
if (errorHandler(cb))
1111
err << "Cannot get operation: " << cb->connection->getNdbError() << endl;
1115
if (op->writeTuple() == -1)
1117
if (errorHandler(cb))
1119
err << "Error defining op: " << cb->connection->getNdbError() << endl;
1123
if (table->getFragmentType() == NdbDictionary::Object::UserDefined)
1125
if (table->getDefaultNoPartitionsFlag())
1128
This can only happen for HASH partitioning with
1129
user defined hash function where user hasn't
1130
specified the number of partitions and we
1131
have to calculate it. We use the hash value
1132
stored in the record to calculate the partition
1135
int i = tup.getNoOfAttributes() - 1;
1136
const AttributeData *attr_data = tup.getData(i);
1137
Uint32 hash_value = *attr_data->u_int32_value;
1138
op->setPartitionId(get_part_id(table, hash_value));
1143
Either RANGE or LIST (with or without subparts)
1144
OR HASH partitioning with user defined hash
1145
function but with fixed set of partitions.
1147
op->setPartitionId(partition_id);
1151
for (int j = 0; j < 2; j++)
1153
for (int i = 0; i < tup.getNoOfAttributes(); i++)
1155
const AttributeDesc * attr_desc = tup.getDesc(i);
1156
const AttributeData * attr_data = tup.getData(i);
1157
int size = attr_desc->size;
1158
int arraySize = attr_desc->arraySize;
1159
char * dataPtr = attr_data->string_value;
1162
if (!attr_data->null)
1164
const unsigned char * src = (const unsigned char *)dataPtr;
1165
switch(attr_desc->m_column->getType()){
1166
case NdbDictionary::Column::Varchar:
1167
case NdbDictionary::Column::Varbinary:
1168
length = src[0] + 1;
1170
case NdbDictionary::Column::Longvarchar:
1171
case NdbDictionary::Column::Longvarbinary:
1172
length = src[0] + (src[1] << 8) + 2;
1175
length = attr_data->size;
1179
if (j == 0 && tup.getTable()->have_auto_inc(i))
1180
tup.getTable()->update_max_auto_val(dataPtr,size*arraySize);
1182
if (attr_desc->m_column->getPrimaryKey())
1184
if (j == 1) continue;
1185
ret = op->equal(i, dataPtr, length);
1189
if (j == 0) continue;
1190
if (attr_data->null)
1191
ret = op->setValue(i, NULL, 0);
1193
ret = op->setValue(i, dataPtr, length);
1196
ndbout_c("Column: %d type %d %d %d %d",i,
1197
attr_desc->m_column->getType(),
1198
size, arraySize, length);
1207
if (errorHandler(cb))
1209
err << "Error defining op: " << cb->connection->getNdbError() << endl;
1213
// Prepare transaction (the transaction is NOT yet sent to NDB)
1214
cb->connection->executeAsynchPrepare(NdbTransaction::Commit,
1219
err << "Retried transaction " << cb->retries << " times.\nLast error"
1220
<< m_ndb->getNdbError(cb->error_code) << endl
1221
<< "...Unable to recover from errors. Exiting..." << endl;
1225
void BackupRestore::cback(int result, restore_callback_t *cb)
1232
* Error. temporary or permanent?
1234
if (errorHandler(cb))
1235
tuple_a(cb); // retry
1238
err << "Restore: Failed to restore data due to a unrecoverable error. Exiting..." << endl;
1245
* OK! close transaction
1247
m_ndb->closeTransaction(cb->connection);
1249
cb->next= m_free_callback;
1250
m_free_callback= cb;
1256
* returns true if is recoverable,
1257
* Error handling based on hugo
1258
* false if it is an error that generates an abort.
1260
bool BackupRestore::errorHandler(restore_callback_t *cb)
1265
error= cb->connection->getNdbError();
1266
m_ndb->closeTransaction(cb->connection);
1271
error= m_ndb->getNdbError();
1274
Uint32 sleepTime = 100 + cb->retries * 300;
1277
cb->error_code = error.code;
1279
switch(error.status)
1281
case NdbError::Success:
1282
err << "Success error: " << error << endl;
1286
case NdbError::TemporaryError:
1287
err << "Temporary error: " << error << endl;
1288
m_temp_error = true;
1289
NdbSleep_MilliSleep(sleepTime);
1293
case NdbError::UnknownResult:
1294
err << "Unknown: " << error << endl;
1299
case NdbError::PermanentError:
1301
err << "Permanent: " << error << endl;
1304
err << "No error status" << endl;
1308
void BackupRestore::exitHandler()
1311
NDBT_ProgramExit(NDBT_FAILED);
1320
BackupRestore::tuple_free()
1325
// Poll all transactions
1326
while (m_transactions)
1328
m_ndb->sendPollNdb(3000);
1333
BackupRestore::endOfTuples()
1339
static bool use_part_id(const NdbDictionary::Table *table)
1341
if (table->getDefaultNoPartitionsFlag() &&
1342
(table->getFragmentType() == NdbDictionary::Object::UserDefined))
1349
static Uint32 get_part_id(const NdbDictionary::Table *table,
1352
Uint32 no_frags = table->getFragmentCount();
1354
if (table->getLinearFlag())
1358
while (no_frags > mask) mask <<= 1;
1360
part_id = hash_value & mask;
1361
if (part_id >= no_frags)
1362
part_id = hash_value & (mask >> 1);
1366
return (hash_value % no_frags);
1370
BackupRestore::logEntry(const LogEntry & tup)
1375
NdbTransaction * trans = m_ndb->startTransaction();
1378
// TODO: handle the error
1379
err << "Cannot start transaction" << endl;
1383
const NdbDictionary::Table * table = get_table(tup.m_table->m_dictTable);
1384
NdbOperation * op = trans->getNdbOperation(table);
1387
err << "Cannot get operation: " << trans->getNdbError() << endl;
1394
case LogEntry::LE_INSERT:
1395
check = op->insertTuple();
1397
case LogEntry::LE_UPDATE:
1398
check = op->updateTuple();
1400
case LogEntry::LE_DELETE:
1401
check = op->deleteTuple();
1404
err << "Log entry has wrong operation type."
1411
err << "Error defining op: " << trans->getNdbError() << endl;
1415
if (table->getFragmentType() == NdbDictionary::Object::UserDefined)
1417
if (table->getDefaultNoPartitionsFlag())
1419
const AttributeS * attr = tup[tup.size()-1];
1420
Uint32 hash_value = *(Uint32*)attr->Data.string_value;
1421
op->setPartitionId(get_part_id(table, hash_value));
1424
op->setPartitionId(tup.m_frag_id);
1428
for (Uint32 i= 0; i < tup.size(); i++)
1430
const AttributeS * attr = tup[i];
1431
int size = attr->Desc->size;
1432
int arraySize = attr->Desc->arraySize;
1433
const char * dataPtr = attr->Data.string_value;
1435
if (tup.m_table->have_auto_inc(attr->Desc->attrId))
1436
tup.m_table->update_max_auto_val(dataPtr,size*arraySize);
1438
const Uint32 length = (size / 8) * arraySize;
1439
if (attr->Desc->m_column->getPrimaryKey())
1441
if(!keys.get(attr->Desc->attrId))
1443
keys.set(attr->Desc->attrId);
1444
check= op->equal(attr->Desc->attrId, dataPtr, length);
1448
check= op->setValue(attr->Desc->attrId, dataPtr, length);
1452
err << "Error defining op: " << trans->getNdbError() << endl;
1457
const int ret = trans->execute(NdbTransaction::Commit);
1460
// Both insert update and delete can fail during log running
1462
// TODO: check that the error is either tuple exists or tuple does not exist?
1464
NdbError errobj= trans->getNdbError();
1467
case LogEntry::LE_INSERT:
1468
if(errobj.status == NdbError::PermanentError &&
1469
errobj.classification == NdbError::ConstraintViolation)
1472
case LogEntry::LE_UPDATE:
1473
case LogEntry::LE_DELETE:
1474
if(errobj.status == NdbError::PermanentError &&
1475
errobj.classification == NdbError::NoDataFound)
1481
err << "execute failed: " << errobj << endl;
1486
m_ndb->closeTransaction(trans);
1491
BackupRestore::endOfLogEntrys()
1496
info << "Restored " << m_dataCount << " tuples and "
1497
<< m_logCount << " log entries" << endl;
1501
* callback : This is called when the transaction is polled
1503
* (This function must have three arguments:
1504
* - The result of the transaction,
1505
* - The NdbTransaction object, and
1506
* - A pointer to an arbitrary object.)
1510
callback(int result, NdbTransaction* trans, void* aObject)
1512
restore_callback_t *cb = (restore_callback_t *)aObject;
1513
(cb->restore)->cback(result, cb);
1516
#if 0 // old tuple impl
1518
BackupRestore::tuple(const TupleS & tup)
1524
NdbTransaction * trans = m_ndb->startTransaction();
1527
// TODO: handle the error
1528
ndbout << "Cannot start transaction" << endl;
1532
const TableS * table = tup.getTable();
1533
NdbOperation * op = trans->getNdbOperation(table->getTableName());
1536
ndbout << "Cannot get operation: ";
1537
ndbout << trans->getNdbError() << endl;
1541
// TODO: check return value and handle error
1542
if (op->writeTuple() == -1)
1544
ndbout << "writeTuple call failed: ";
1545
ndbout << trans->getNdbError() << endl;
1549
for (int i = 0; i < tup.getNoOfAttributes(); i++)
1551
const AttributeS * attr = tup[i];
1552
int size = attr->Desc->size;
1553
int arraySize = attr->Desc->arraySize;
1554
const char * dataPtr = attr->Data.string_value;
1556
const Uint32 length = (size * arraySize) / 8;
1557
if (attr->Desc->m_column->getPrimaryKey())
1558
op->equal(i, dataPtr, length);
1561
for (int i = 0; i < tup.getNoOfAttributes(); i++)
1563
const AttributeS * attr = tup[i];
1564
int size = attr->Desc->size;
1565
int arraySize = attr->Desc->arraySize;
1566
const char * dataPtr = attr->Data.string_value;
1568
const Uint32 length = (size * arraySize) / 8;
1569
if (!attr->Desc->m_column->getPrimaryKey())
1570
if (attr->Data.null)
1571
op->setValue(i, NULL, 0);
1573
op->setValue(i, dataPtr, length);
1575
int ret = trans->execute(NdbTransaction::Commit);
1578
ndbout << "execute failed: ";
1579
ndbout << trans->getNdbError() << endl;
1582
m_ndb->closeTransaction(trans);
1590
template class Vector<NdbDictionary::Table*>;
1591
template class Vector<const NdbDictionary::Table*>;
1592
template class Vector<NdbDictionary::Tablespace*>;
1593
template class Vector<NdbDictionary::LogfileGroup*>;