300
300
* transaction after all DDLs, just like the statement transaction
301
301
* is always committed at the end of all statements.
303
TransactionServices::TransactionServices()
305
plugin::StorageEngine *engine= plugin::StorageEngine::findByName("InnoDB");
308
xa_storage_engine= (plugin::XaStorageEngine*)engine;
312
xa_storage_engine= NULL;
303
316
void TransactionServices::registerResourceForStatement(Session *session,
304
317
plugin::MonitoredInTransaction *monitored,
305
318
plugin::TransactionalStorageEngine *engine)
429
440
registerResourceForStatement(session, monitored, engine, resource_manager);
443
void TransactionServices::allocateNewTransactionId()
445
ReplicationServices &replication_services= ReplicationServices::singleton();
446
if (! replication_services.isActive())
451
Session *my_session= current_session;
452
uint64_t xa_id= xa_storage_engine->getNewTransactionId(my_session);
453
my_session->setXaId(xa_id);
456
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
458
if (session->getXaId() == 0)
460
session->setXaId(xa_storage_engine->getNewTransactionId(session));
463
return session->getXaId();
697
731
int TransactionServices::autocommitOrRollback(Session *session, int error)
699
734
if (session->transaction.stmt.getResourceContexts().empty() == false)
736
TransactionContext *trans = &session->transaction.stmt;
737
TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
738
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
739
it != resource_contexts.end();
742
ResourceContext *resource_context= *it;
744
resource_context->getTransactionalStorageEngine()->endStatement(session);
703
749
if (commitTransaction(session, false))
1343
const message::UpdateHeader &update_header= statement->update_header();
1344
string old_table_name= update_header.table_metadata().table_name();
1346
string current_table_name;
1347
(void) in_table->getShare()->getTableName(current_table_name);
1348
if (current_table_name.compare(old_table_name))
1350
finalizeStatementMessage(*statement, in_session);
1351
statement= in_session->getStatementMessage();
1397
if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
1355
1399
/* carry forward the existing segment id */
1356
1400
const message::UpdateData ¤t_data= statement->update_data();
1357
1401
*next_segment_id= current_data.segment_id();
1405
finalizeStatementMessage(*statement, in_session);
1406
statement= in_session->getStatementMessage();
1381
1430
return *statement;
1433
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
1435
const unsigned char *old_record,
1436
const unsigned char *new_record)
1438
const message::UpdateHeader &update_header= statement.update_header();
1439
string old_table_name= update_header.table_metadata().table_name();
1441
string current_table_name;
1442
(void) in_table->getShare()->getTableName(current_table_name);
1443
if (current_table_name.compare(old_table_name))
1449
/* Compare the set fields in the existing UpdateHeader and see if they
1450
* match the updated fields in the new record, if they do not we must
1451
* create a new UpdateHeader
1453
size_t num_set_fields= update_header.set_field_metadata_size();
1455
Field *current_field;
1456
Field **table_fields= in_table->getFields();
1457
in_table->setReadSet();
1459
size_t num_calculated_updated_fields= 0;
1461
while ((current_field= *table_fields++) != NULL)
1463
if (num_calculated_updated_fields > num_set_fields)
1468
if (isFieldUpdated(current_field, in_table, old_record, new_record))
1470
/* check that this field exists in the UpdateHeader record */
1473
for (size_t x= 0; x < num_set_fields; ++x)
1475
const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
1476
string name= field_metadata.name();
1477
if (name.compare(current_field->field_name) == 0)
1480
++num_calculated_updated_fields;
1491
if ((num_calculated_updated_fields == num_set_fields) && found)
1384
1502
void TransactionServices::setUpdateHeader(message::Statement &statement,
1385
1503
Session *in_session,
1386
1504
Table *in_table,
1886
* Template for removing Statement records of different types.
1888
* The code for removing records from different Statement message types
1889
* is identical except for the class types that are embedded within the
1892
* There are 3 scenarios we need to look for:
1893
* - We've been asked to remove more records than exist in the Statement
1894
* - We've been asked to remove less records than exist in the Statement
1895
* - We've been asked to remove ALL records that exist in the Statement
1897
* If we are removing ALL records, then effectively we would be left with
1898
* an empty Statement message, so we should just remove it and clean up
1899
* message pointers in the Session object.
1901
template <class DataType, class RecordType>
1902
static bool removeStatementRecordsWithType(Session *session,
1906
uint32_t num_avail_recs= static_cast<uint32_t>(data->record_size());
1908
/* If there aren't enough records to remove 'count' of them, error. */
1909
if (num_avail_recs < count)
1913
* If we are removing all of the data records, we'll just remove this
1914
* entire Statement message.
1916
if (num_avail_recs == count)
1918
message::Transaction *transaction= session->getTransactionMessage();
1919
protobuf::RepeatedPtrField<message::Statement> *statements= transaction->mutable_statement();
1920
statements->RemoveLast();
1923
* Now need to set the Session Statement pointer to either the previous
1924
* Statement, or NULL if there isn't one.
1926
if (statements->size() == 0)
1928
session->setStatementMessage(NULL);
1933
* There isn't a great way to get a pointer to the previous Statement
1934
* message using the RepeatedPtrField object, so we'll just get to it
1935
* using the Transaction message.
1937
int last_stmt_idx= transaction->statement_size() - 1;
1938
session->setStatementMessage(transaction->mutable_statement(last_stmt_idx));
1941
/* We only need to remove 'count' records */
1942
else if (num_avail_recs > count)
1944
protobuf::RepeatedPtrField<RecordType> *records= data->mutable_record();
1946
records->RemoveLast();
1953
bool TransactionServices::removeStatementRecords(Session *session,
1956
ReplicationServices &replication_services= ReplicationServices::singleton();
1957
if (! replication_services.isActive())
1960
/* Get the most current Statement */
1961
message::Statement *statement= session->getStatementMessage();
1963
/* Make sure we have work to do */
1964
if (statement == NULL)
1969
switch (statement->type())
1971
case message::Statement::INSERT:
1973
message::InsertData *data= statement->mutable_insert_data();
1974
retval= removeStatementRecordsWithType<message::InsertData, message::InsertRecord>(session, data, count);
1978
case message::Statement::UPDATE:
1980
message::UpdateData *data= statement->mutable_update_data();
1981
retval= removeStatementRecordsWithType<message::UpdateData, message::UpdateRecord>(session, data, count);
1985
case message::Statement::DELETE: /* not sure if this one is possible... */
1987
message::DeleteData *data= statement->mutable_delete_data();
1988
retval= removeStatementRecordsWithType<message::DeleteData, message::DeleteRecord>(session, data, count);
1765
2001
void TransactionServices::createTable(Session *in_session,
1766
2002
const message::Table &table)
1941
2177
cleanupTransactionMessage(transaction, in_session);
2180
int TransactionServices::sendEvent(Session *session, const message::Event &event)
2182
ReplicationServices &replication_services= ReplicationServices::singleton();
2183
if (! replication_services.isActive())
2186
message::Transaction *transaction= new (nothrow) message::Transaction();
2188
// set server id, start timestamp
2189
initTransactionMessage(*transaction, session, true);
2191
// set end timestamp
2192
finalizeTransactionMessage(*transaction, session);
2194
message::Event *trx_event= transaction->mutable_event();
2196
trx_event->CopyFrom(event);
2198
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
2202
return static_cast<int>(result);
2205
bool TransactionServices::sendStartupEvent(Session *session)
2207
message::Event event;
2208
event.set_type(message::Event::STARTUP);
2209
if (sendEvent(session, event) != 0)
2214
bool TransactionServices::sendShutdownEvent(Session *session)
2216
message::Event event;
2217
event.set_type(message::Event::SHUTDOWN);
2218
if (sendEvent(session, event) != 0)
1944
2223
} /* namespace drizzled */