936
bool TransactionServices::shouldConstructMessages()
938
ReplicationServices &replication_services= ReplicationServices::singleton();
939
return replication_services.isActive();
942
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
944
message::Transaction *transaction= in_session->getTransactionMessage();
946
if (unlikely(transaction == NULL))
949
* Allocate and initialize a new transaction message
950
* for this Session object. Session is responsible for
951
* deleting transaction message when done with it.
953
transaction= new (nothrow) message::Transaction();
954
initTransactionMessage(*transaction, in_session, should_inc_trx_id);
955
in_session->setTransactionMessage(transaction);
962
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
964
bool should_inc_trx_id)
966
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
967
trx->set_server_id(in_session->getServerId());
969
if (should_inc_trx_id)
970
trx->set_transaction_id(getNextTransactionId());
972
trx->set_start_timestamp(in_session->getCurrentTimestamp());
975
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
978
message::TransactionContext *trx= in_transaction.mutable_transaction_context();
979
trx->set_end_timestamp(in_session->getCurrentTimestamp());
982
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
985
delete in_transaction;
986
in_session->setStatementMessage(NULL);
987
in_session->setTransactionMessage(NULL);
990
int TransactionServices::commitTransactionMessage(Session *in_session)
992
ReplicationServices &replication_services= ReplicationServices::singleton();
993
if (! replication_services.isActive())
996
/* If there is an active statement message, finalize it */
997
message::Statement *statement= in_session->getStatementMessage();
999
if (statement != NULL)
1001
finalizeStatementMessage(*statement, in_session);
1004
return 0; /* No data modification occurred inside the transaction */
1006
message::Transaction* transaction= getActiveTransactionMessage(in_session);
1008
finalizeTransactionMessage(*transaction, in_session);
1010
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1012
cleanupTransactionMessage(transaction, in_session);
1014
return static_cast<int>(result);
1017
void TransactionServices::initStatementMessage(message::Statement &statement,
1018
message::Statement::Type in_type,
1019
Session *in_session)
1021
statement.set_type(in_type);
1022
statement.set_start_timestamp(in_session->getCurrentTimestamp());
1023
/** @TODO Set sql string optionally */
1026
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1027
Session *in_session)
1029
statement.set_end_timestamp(in_session->getCurrentTimestamp());
1030
in_session->setStatementMessage(NULL);
1033
void TransactionServices::rollbackTransactionMessage(Session *in_session)
1035
ReplicationServices &replication_services= ReplicationServices::singleton();
1036
if (! replication_services.isActive())
1039
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1042
* OK, so there are two situations that we need to deal with here:
1044
* 1) We receive an instruction to ROLLBACK the current transaction
1045
* and the currently-stored Transaction message is *self-contained*,
1046
* meaning that no Statement messages in the Transaction message
1047
* contain a message having its segment_id member greater than 1. If
1048
* no non-segment ID 1 members are found, we can simply clear the
1049
* current Transaction message and remove it from memory.
1051
* 2) If the Transaction message does indeed have a non-end segment, that
1052
* means that a bulk update/delete/insert Transaction message segment
1053
* has previously been sent over the wire to replicators. In this case,
1054
* we need to package a Transaction with a Statement message of type
1055
* ROLLBACK to indicate to replicators that previously-transmitted
1056
* messages must be un-applied.
1058
if (unlikely(message::transactionContainsBulkSegment(*transaction)))
1060
/* Remember the transaction ID so we can re-use it */
1061
uint64_t trx_id= transaction->transaction_context().transaction_id();
1064
* Clear the transaction, create a Rollback statement message,
1065
* attach it to the transaction, and push it to replicators.
1067
transaction->Clear();
1068
initTransactionMessage(*transaction, in_session, false);
1070
/* Set the transaction ID to match the previous messages */
1071
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1073
message::Statement *statement= transaction->add_statement();
1075
initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
1076
finalizeStatementMessage(*statement, in_session);
1078
finalizeTransactionMessage(*transaction, in_session);
1080
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1082
cleanupTransactionMessage(transaction, in_session);
1085
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1087
uint32_t *next_segment_id)
1089
message::Statement *statement= in_session->getStatementMessage();
1090
message::Transaction *transaction= NULL;
1093
* Check the type for the current Statement message, if it is anything
1094
* other then INSERT we need to call finalize, this will ensure a
1095
* new InsertStatement is created. If it is of type INSERT check
1096
* what table the INSERT belongs to, if it is a different table
1097
* call finalize, so a new InsertStatement can be created.
1099
if (statement != NULL && statement->type() != message::Statement::INSERT)
1101
finalizeStatementMessage(*statement, in_session);
1102
statement= in_session->getStatementMessage();
1104
else if (statement != NULL)
1106
transaction= getActiveTransactionMessage(in_session);
1109
* If we've passed our threshold for the statement size (possible for
1110
* a bulk insert), we'll finalize the Statement and Transaction (doing
1111
* the Transaction will keep it from getting huge).
1113
if (static_cast<size_t>(transaction->ByteSize()) >= trx_msg_threshold)
1115
/* Remember the transaction ID so we can re-use it */
1116
uint64_t trx_id= transaction->transaction_context().transaction_id();
1118
message::InsertData *current_data= statement->mutable_insert_data();
1120
/* Caller should use this value when adding a new record */
1121
*next_segment_id= current_data->segment_id() + 1;
1123
current_data->set_end_segment(false);
1126
* Send the trx message to replicators after finalizing the
1127
* statement and transaction. This will also set the Transaction
1128
* and Statement objects in Session to NULL.
1130
commitTransactionMessage(in_session);
1133
* Statement and Transaction should now be NULL, so new ones will get
1134
* created. We reuse the transaction id since we are segmenting
1137
statement= in_session->getStatementMessage();
1138
transaction= getActiveTransactionMessage(in_session, false);
1139
assert(transaction != NULL);
1141
/* Set the transaction ID to match the previous messages */
1142
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1146
const message::InsertHeader &insert_header= statement->insert_header();
1147
string old_table_name= insert_header.table_metadata().table_name();
1149
string current_table_name;
1150
(void) in_table->getShare()->getTableName(current_table_name);
1152
if (current_table_name.compare(old_table_name))
1154
finalizeStatementMessage(*statement, in_session);
1155
statement= in_session->getStatementMessage();
1159
/* carry forward the existing segment id */
1160
const message::InsertData ¤t_data= statement->insert_data();
1161
*next_segment_id= current_data.segment_id();
1166
if (statement == NULL)
1169
* Transaction will be non-NULL only if we had to segment it due to
1170
* transaction size above.
1172
if (transaction == NULL)
1173
transaction= getActiveTransactionMessage(in_session);
1176
* Transaction message initialized and set, but no statement created
1177
* yet. We construct one and initialize it, here, then return the
1178
* message after attaching the new Statement message pointer to the
1179
* Session for easy retrieval later...
1181
statement= transaction->add_statement();
1182
setInsertHeader(*statement, in_session, in_table);
1183
in_session->setStatementMessage(statement);
1188
void TransactionServices::setInsertHeader(message::Statement &statement,
1189
Session *in_session,
1192
initStatementMessage(statement, message::Statement::INSERT, in_session);
1195
* Now we construct the specialized InsertHeader message inside
1196
* the generalized message::Statement container...
1198
/* Set up the insert header */
1199
message::InsertHeader *header= statement.mutable_insert_header();
1200
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1203
(void) in_table->getShare()->getSchemaName(schema_name);
1205
(void) in_table->getShare()->getTableName(table_name);
1207
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1208
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1210
Field *current_field;
1211
Field **table_fields= in_table->getFields();
1213
message::FieldMetadata *field_metadata;
1215
/* We will read all the table's fields... */
1216
in_table->setReadSet();
1218
while ((current_field= *table_fields++) != NULL)
1220
field_metadata= header->add_field_metadata();
1221
field_metadata->set_name(current_field->field_name);
1222
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1226
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
1228
ReplicationServices &replication_services= ReplicationServices::singleton();
1229
if (! replication_services.isActive())
1232
* We do this check here because we don't want to even create a
1233
* statement if there isn't a primary key on the table...
1237
* Multi-column primary keys are handled how exactly?
1239
if (not in_table->getShare()->hasPrimaryKey())
1241
my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1245
uint32_t next_segment_id= 1;
1246
message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
1248
message::InsertData *data= statement.mutable_insert_data();
1249
data->set_segment_id(next_segment_id);
1250
data->set_end_segment(true);
1251
message::InsertRecord *record= data->add_record();
1253
Field *current_field;
1254
Field **table_fields= in_table->getFields();
1256
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1257
string_value->set_charset(system_charset_info);
1259
/* We will read all the table's fields... */
1260
in_table->setReadSet();
1262
while ((current_field= *table_fields++) != NULL)
1264
if (current_field->is_null())
1266
record->add_is_null(true);
1267
record->add_insert_value("", 0);
1271
string_value= current_field->val_str(string_value);
1272
record->add_is_null(false);
1273
record->add_insert_value(string_value->c_ptr(), string_value->length());
1274
string_value->free();
1280
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1282
const unsigned char *old_record,
1283
const unsigned char *new_record,
1284
uint32_t *next_segment_id)
1286
message::Statement *statement= in_session->getStatementMessage();
1287
message::Transaction *transaction= NULL;
1290
* Check the type for the current Statement message, if it is anything
1291
* other then UPDATE we need to call finalize, this will ensure a
1292
* new UpdateStatement is created. If it is of type UPDATE check
1293
* what table the UPDATE belongs to, if it is a different table
1294
* call finalize, so a new UpdateStatement can be created.
1296
if (statement != NULL && statement->type() != message::Statement::UPDATE)
1298
finalizeStatementMessage(*statement, in_session);
1299
statement= in_session->getStatementMessage();
1301
else if (statement != NULL)
1303
transaction= getActiveTransactionMessage(in_session);
1306
* If we've passed our threshold for the statement size (possible for
1307
* a bulk insert), we'll finalize the Statement and Transaction (doing
1308
* the Transaction will keep it from getting huge).
1310
if (static_cast<size_t>(transaction->ByteSize()) >= trx_msg_threshold)
1312
/* Remember the transaction ID so we can re-use it */
1313
uint64_t trx_id= transaction->transaction_context().transaction_id();
1315
message::UpdateData *current_data= statement->mutable_update_data();
1317
/* Caller should use this value when adding a new record */
1318
*next_segment_id= current_data->segment_id() + 1;
1320
current_data->set_end_segment(false);
1323
* Send the trx message to replicators after finalizing the
1324
* statement and transaction. This will also set the Transaction
1325
* and Statement objects in Session to NULL.
1327
commitTransactionMessage(in_session);
1330
* Statement and Transaction should now be NULL, so new ones will get
1331
* created. We reuse the transaction id since we are segmenting
1334
statement= in_session->getStatementMessage();
1335
transaction= getActiveTransactionMessage(in_session, false);
1336
assert(transaction != NULL);
1338
/* Set the transaction ID to match the previous messages */
1339
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1343
const message::UpdateHeader &update_header= statement->update_header();
1344
string old_table_name= update_header.table_metadata().table_name();
1346
string current_table_name;
1347
(void) in_table->getShare()->getTableName(current_table_name);
1348
if (current_table_name.compare(old_table_name))
1350
finalizeStatementMessage(*statement, in_session);
1351
statement= in_session->getStatementMessage();
1355
/* carry forward the existing segment id */
1356
const message::UpdateData ¤t_data= statement->update_data();
1357
*next_segment_id= current_data.segment_id();
1362
if (statement == NULL)
1365
* Transaction will be non-NULL only if we had to segment it due to
1366
* transaction size above.
1368
if (transaction == NULL)
1369
transaction= getActiveTransactionMessage(in_session);
1372
* Transaction message initialized and set, but no statement created
1373
* yet. We construct one and initialize it, here, then return the
1374
* message after attaching the new Statement message pointer to the
1375
* Session for easy retrieval later...
1377
statement= transaction->add_statement();
1378
setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
1379
in_session->setStatementMessage(statement);
1384
void TransactionServices::setUpdateHeader(message::Statement &statement,
1385
Session *in_session,
1387
const unsigned char *old_record,
1388
const unsigned char *new_record)
1390
initStatementMessage(statement, message::Statement::UPDATE, in_session);
1393
* Now we construct the specialized UpdateHeader message inside
1394
* the generalized message::Statement container...
1396
/* Set up the update header */
1397
message::UpdateHeader *header= statement.mutable_update_header();
1398
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1401
(void) in_table->getShare()->getSchemaName(schema_name);
1403
(void) in_table->getShare()->getTableName(table_name);
1405
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1406
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1408
Field *current_field;
1409
Field **table_fields= in_table->getFields();
1411
message::FieldMetadata *field_metadata;
1413
/* We will read all the table's fields... */
1414
in_table->setReadSet();
1416
while ((current_field= *table_fields++) != NULL)
1419
* We add the "key field metadata" -- i.e. the fields which is
1420
* the primary key for the table.
1422
if (in_table->getShare()->fieldInPrimaryKey(current_field))
1424
field_metadata= header->add_key_field_metadata();
1425
field_metadata->set_name(current_field->field_name);
1426
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1429
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1431
/* Field is changed from old to new */
1432
field_metadata= header->add_set_field_metadata();
1433
field_metadata->set_name(current_field->field_name);
1434
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1438
void TransactionServices::updateRecord(Session *in_session,
1440
const unsigned char *old_record,
1441
const unsigned char *new_record)
1443
ReplicationServices &replication_services= ReplicationServices::singleton();
1444
if (! replication_services.isActive())
1447
uint32_t next_segment_id= 1;
1448
message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
1450
message::UpdateData *data= statement.mutable_update_data();
1451
data->set_segment_id(next_segment_id);
1452
data->set_end_segment(true);
1453
message::UpdateRecord *record= data->add_record();
1455
Field *current_field;
1456
Field **table_fields= in_table->getFields();
1457
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1458
string_value->set_charset(system_charset_info);
1460
while ((current_field= *table_fields++) != NULL)
1463
* Here, we add the SET field values. We used to do this in the setUpdateHeader() method,
1464
* but then realized that an UPDATE statement could potentially have different values for
1465
* the SET field. For instance, imagine this SQL scenario:
1467
* CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
1468
* INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
1469
* UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
1471
* We will generate two UpdateRecord messages with different set_value byte arrays.
1473
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1475
/* Store the original "read bit" for this field */
1476
bool is_read_set= current_field->isReadSet();
1478
/* We need to mark that we will "read" this field... */
1479
in_table->setReadSet(current_field->field_index);
1481
/* Read the string value of this field's contents */
1482
string_value= current_field->val_str(string_value);
1485
* Reset the read bit after reading field to its original state. This
1486
* prevents the field from being included in the WHERE clause
1488
current_field->setReadSet(is_read_set);
1490
if (current_field->is_null())
1492
record->add_is_null(true);
1493
record->add_after_value("", 0);
1497
record->add_is_null(false);
1498
record->add_after_value(string_value->c_ptr(), string_value->length());
1500
string_value->free();
1504
* Add the WHERE clause values now...for now, this means the
1505
* primary key field value. Replication only supports tables
1506
* with a primary key.
1508
if (in_table->getShare()->fieldInPrimaryKey(current_field))
1511
* To say the below is ugly is an understatement. But it works.
1513
* @todo Move this crap into a real Record API.
1515
string_value= current_field->val_str(string_value,
1517
current_field->offset(const_cast<unsigned char *>(new_record)));
1518
record->add_key_value(string_value->c_ptr(), string_value->length());
1519
string_value->free();
1525
bool TransactionServices::isFieldUpdated(Field *current_field,
1527
const unsigned char *old_record,
1528
const unsigned char *new_record)
1531
* The below really should be moved into the Field API and Record API. But for now
1532
* we do this crazy pointer fiddling to figure out if the current field
1533
* has been updated in the supplied record raw byte pointers.
1535
const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1536
const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
1538
uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
1540
bool old_value_is_null= current_field->is_null_in_record(old_record);
1541
bool new_value_is_null= current_field->is_null_in_record(new_record);
1543
bool isUpdated= false;
1544
if (old_value_is_null != new_value_is_null)
1546
if ((old_value_is_null) && (! new_value_is_null)) /* old value is NULL, new value is non NULL */
1550
else if ((! old_value_is_null) && (new_value_is_null)) /* old value is non NULL, new value is NULL */
1558
if (memcmp(old_ptr, new_ptr, field_length) != 0)
1566
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1568
uint32_t *next_segment_id)
1570
message::Statement *statement= in_session->getStatementMessage();
1571
message::Transaction *transaction= NULL;
1574
* Check the type for the current Statement message, if it is anything
1575
* other then DELETE we need to call finalize, this will ensure a
1576
* new DeleteStatement is created. If it is of type DELETE check
1577
* what table the DELETE belongs to, if it is a different table
1578
* call finalize, so a new DeleteStatement can be created.
1580
if (statement != NULL && statement->type() != message::Statement::DELETE)
1582
finalizeStatementMessage(*statement, in_session);
1583
statement= in_session->getStatementMessage();
1585
else if (statement != NULL)
1587
transaction= getActiveTransactionMessage(in_session);
1590
* If we've passed our threshold for the statement size (possible for
1591
* a bulk insert), we'll finalize the Statement and Transaction (doing
1592
* the Transaction will keep it from getting huge).
1594
if (static_cast<size_t>(transaction->ByteSize()) >= trx_msg_threshold)
1596
/* Remember the transaction ID so we can re-use it */
1597
uint64_t trx_id= transaction->transaction_context().transaction_id();
1599
message::DeleteData *current_data= statement->mutable_delete_data();
1601
/* Caller should use this value when adding a new record */
1602
*next_segment_id= current_data->segment_id() + 1;
1604
current_data->set_end_segment(false);
1607
* Send the trx message to replicators after finalizing the
1608
* statement and transaction. This will also set the Transaction
1609
* and Statement objects in Session to NULL.
1611
commitTransactionMessage(in_session);
1614
* Statement and Transaction should now be NULL, so new ones will get
1615
* created. We reuse the transaction id since we are segmenting
1618
statement= in_session->getStatementMessage();
1619
transaction= getActiveTransactionMessage(in_session, false);
1620
assert(transaction != NULL);
1622
/* Set the transaction ID to match the previous messages */
1623
transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1627
const message::DeleteHeader &delete_header= statement->delete_header();
1628
string old_table_name= delete_header.table_metadata().table_name();
1630
string current_table_name;
1631
(void) in_table->getShare()->getTableName(current_table_name);
1632
if (current_table_name.compare(old_table_name))
1634
finalizeStatementMessage(*statement, in_session);
1635
statement= in_session->getStatementMessage();
1639
/* carry forward the existing segment id */
1640
const message::DeleteData ¤t_data= statement->delete_data();
1641
*next_segment_id= current_data.segment_id();
1646
if (statement == NULL)
1649
* Transaction will be non-NULL only if we had to segment it due to
1650
* transaction size above.
1652
if (transaction == NULL)
1653
transaction= getActiveTransactionMessage(in_session);
1656
* Transaction message initialized and set, but no statement created
1657
* yet. We construct one and initialize it, here, then return the
1658
* message after attaching the new Statement message pointer to the
1659
* Session for easy retrieval later...
1661
statement= transaction->add_statement();
1662
setDeleteHeader(*statement, in_session, in_table);
1663
in_session->setStatementMessage(statement);
1668
void TransactionServices::setDeleteHeader(message::Statement &statement,
1669
Session *in_session,
1672
initStatementMessage(statement, message::Statement::DELETE, in_session);
1675
* Now we construct the specialized DeleteHeader message inside
1676
* the generalized message::Statement container...
1678
message::DeleteHeader *header= statement.mutable_delete_header();
1679
message::TableMetadata *table_metadata= header->mutable_table_metadata();
1682
(void) in_table->getShare()->getSchemaName(schema_name);
1684
(void) in_table->getShare()->getTableName(table_name);
1686
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1687
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1689
Field *current_field;
1690
Field **table_fields= in_table->getFields();
1692
message::FieldMetadata *field_metadata;
1694
while ((current_field= *table_fields++) != NULL)
1697
* Add the WHERE clause values now...for now, this means the
1698
* primary key field value. Replication only supports tables
1699
* with a primary key.
1701
if (in_table->getShare()->fieldInPrimaryKey(current_field))
1703
field_metadata= header->add_key_field_metadata();
1704
field_metadata->set_name(current_field->field_name);
1705
field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1710
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
1712
ReplicationServices &replication_services= ReplicationServices::singleton();
1713
if (! replication_services.isActive())
1716
uint32_t next_segment_id= 1;
1717
message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
1719
message::DeleteData *data= statement.mutable_delete_data();
1720
data->set_segment_id(next_segment_id);
1721
data->set_end_segment(true);
1722
message::DeleteRecord *record= data->add_record();
1724
Field *current_field;
1725
Field **table_fields= in_table->getFields();
1726
String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1727
string_value->set_charset(system_charset_info);
1729
while ((current_field= *table_fields++) != NULL)
1732
* Add the WHERE clause values now...for now, this means the
1733
* primary key field value. Replication only supports tables
1734
* with a primary key.
1736
if (in_table->getShare()->fieldInPrimaryKey(current_field))
1738
if (use_update_record)
1741
* Temporarily point to the update record to get its value.
1742
* This is pretty much a hack in order to get the PK value from
1743
* the update record rather than the insert record. Field::val_str()
1744
* should not change anything in Field::ptr, so this should be safe.
1745
* We are careful not to change anything in old_ptr.
1747
const unsigned char *old_ptr= current_field->ptr;
1748
current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
1749
string_value= current_field->val_str(string_value);
1750
current_field->ptr= const_cast<unsigned char *>(old_ptr);
1754
string_value= current_field->val_str(string_value);
1756
* @TODO Store optional old record value in the before data member
1759
record->add_key_value(string_value->c_ptr(), string_value->length());
1760
string_value->free();
1765
void TransactionServices::createTable(Session *in_session,
1766
const message::Table &table)
1768
ReplicationServices &replication_services= ReplicationServices::singleton();
1769
if (! replication_services.isActive())
1772
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1773
message::Statement *statement= transaction->add_statement();
1775
initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
1778
* Construct the specialized CreateTableStatement message and attach
1779
* it to the generic Statement message
1781
message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
1782
message::Table *new_table_message= create_table_statement->mutable_table();
1783
*new_table_message= table;
1785
finalizeStatementMessage(*statement, in_session);
1787
finalizeTransactionMessage(*transaction, in_session);
1789
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1791
cleanupTransactionMessage(transaction, in_session);
1795
void TransactionServices::createSchema(Session *in_session,
1796
const message::Schema &schema)
1798
ReplicationServices &replication_services= ReplicationServices::singleton();
1799
if (! replication_services.isActive())
1802
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1803
message::Statement *statement= transaction->add_statement();
1805
initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
1808
* Construct the specialized CreateSchemaStatement message and attach
1809
* it to the generic Statement message
1811
message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
1812
message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1813
*new_schema_message= schema;
1815
finalizeStatementMessage(*statement, in_session);
1817
finalizeTransactionMessage(*transaction, in_session);
1819
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1821
cleanupTransactionMessage(transaction, in_session);
1825
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
1827
ReplicationServices &replication_services= ReplicationServices::singleton();
1828
if (! replication_services.isActive())
1831
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1832
message::Statement *statement= transaction->add_statement();
1834
initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
1837
* Construct the specialized DropSchemaStatement message and attach
1838
* it to the generic Statement message
1840
message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1842
drop_schema_statement->set_schema_name(schema_name);
1844
finalizeStatementMessage(*statement, in_session);
1846
finalizeTransactionMessage(*transaction, in_session);
1848
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1850
cleanupTransactionMessage(transaction, in_session);
1853
void TransactionServices::dropTable(Session *in_session,
1854
const string &schema_name,
1855
const string &table_name,
1858
ReplicationServices &replication_services= ReplicationServices::singleton();
1859
if (! replication_services.isActive())
1862
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1863
message::Statement *statement= transaction->add_statement();
1865
initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
1868
* Construct the specialized DropTableStatement message and attach
1869
* it to the generic Statement message
1871
message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
1873
drop_table_statement->set_if_exists_clause(if_exists);
1875
message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
1877
table_metadata->set_schema_name(schema_name);
1878
table_metadata->set_table_name(table_name);
1880
finalizeStatementMessage(*statement, in_session);
1882
finalizeTransactionMessage(*transaction, in_session);
1884
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1886
cleanupTransactionMessage(transaction, in_session);
1889
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
1891
ReplicationServices &replication_services= ReplicationServices::singleton();
1892
if (! replication_services.isActive())
1895
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1896
message::Statement *statement= transaction->add_statement();
1898
initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
1901
* Construct the specialized TruncateTableStatement message and attach
1902
* it to the generic Statement message
1904
message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
1905
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
1908
(void) in_table->getShare()->getSchemaName(schema_name);
1910
(void) in_table->getShare()->getTableName(table_name);
1912
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1913
table_metadata->set_table_name(table_name.c_str(), table_name.length());
1915
finalizeStatementMessage(*statement, in_session);
1917
finalizeTransactionMessage(*transaction, in_session);
1919
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1921
cleanupTransactionMessage(transaction, in_session);
1924
void TransactionServices::rawStatement(Session *in_session, const string &query)
1926
ReplicationServices &replication_services= ReplicationServices::singleton();
1927
if (! replication_services.isActive())
1930
message::Transaction *transaction= getActiveTransactionMessage(in_session);
1931
message::Statement *statement= transaction->add_statement();
1933
initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
1934
statement->set_sql(query);
1935
finalizeStatementMessage(*statement, in_session);
1937
finalizeTransactionMessage(*transaction, in_session);
1939
(void) replication_services.pushTransactionMessage(*in_session, *transaction);
1941
cleanupTransactionMessage(transaction, in_session);
966
1944
} /* namespace drizzled */