1342
if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
1342
const message::UpdateHeader &update_header= statement->update_header();
1343
string old_table_name= update_header.table_metadata().table_name();
1345
string current_table_name;
1346
(void) in_table->getShare()->getTableName(current_table_name);
1347
if (current_table_name.compare(old_table_name))
1349
finalizeStatementMessage(*statement, in_session);
1350
statement= in_session->getStatementMessage();
1344
1354
/* carry forward the existing segment id */
1345
1355
const message::UpdateData ¤t_data= statement->update_data();
1346
1356
*next_segment_id= current_data.segment_id();
1350
finalizeStatementMessage(*statement, in_session);
1351
statement= in_session->getStatementMessage();
1375
1380
return *statement;
1378
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
1380
const unsigned char *old_record,
1381
const unsigned char *new_record)
1383
const message::UpdateHeader &update_header= statement.update_header();
1384
string old_table_name= update_header.table_metadata().table_name();
1386
string current_table_name;
1387
(void) in_table->getShare()->getTableName(current_table_name);
1388
if (current_table_name.compare(old_table_name))
1394
/* Compare the set fields in the existing UpdateHeader and see if they
1395
* match the updated fields in the new record, if they do not we must
1396
* create a new UpdateHeader
1398
size_t num_set_fields= update_header.set_field_metadata_size();
1400
Field *current_field;
1401
Field **table_fields= in_table->getFields();
1402
in_table->setReadSet();
1404
size_t num_calculated_updated_fields= 0;
1406
while ((current_field= *table_fields++) != NULL)
1408
if (num_calculated_updated_fields > num_set_fields)
1413
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1415
/* check that this field exists in the UpdateHeader record */
1418
for (size_t x= 0; x < num_set_fields; ++x)
1420
const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
1421
string name= field_metadata.name();
1422
if (name.compare(current_field->field_name) == 0)
1425
++num_calculated_updated_fields;
1436
if ((num_calculated_updated_fields == num_set_fields) && found)
1447
1383
void TransactionServices::setUpdateHeader(message::Statement &statement,
1448
1384
Session *in_session,
1449
1385
Table *in_table,
2005
1941
cleanupTransactionMessage(transaction, in_session);
2008
int TransactionServices::sendEvent(Session *session, const message::Event &event)
2010
ReplicationServices &replication_services= ReplicationServices::singleton();
2011
if (! replication_services.isActive())
2014
message::Transaction *transaction= new (nothrow) message::Transaction();
2016
// set server id, start timestamp
2017
initTransactionMessage(*transaction, session, true);
2019
// set end timestamp
2020
finalizeTransactionMessage(*transaction, session);
2022
message::Event *trx_event= transaction->mutable_event();
2024
trx_event->CopyFrom(event);
2026
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
2030
return static_cast<int>(result);
2033
bool TransactionServices::sendStartupEvent(Session *session)
2035
message::Event event;
2036
event.set_type(message::Event::STARTUP);
2037
if (sendEvent(session, event) != 0)
2042
bool TransactionServices::sendShutdownEvent(Session *session)
2044
message::Event event;
2045
event.set_type(message::Event::SHUTDOWN);
2046
if (sendEvent(session, event) != 0)
2051
1944
} /* namespace drizzled */