300
303
* transaction after all DDLs, just like the statement transaction
301
304
* is always committed at the end of all statements.
303
TransactionServices::TransactionServices()
307
static plugin::XaStorageEngine& xa_storage_engine()
305
plugin::StorageEngine *engine= plugin::StorageEngine::findByName("InnoDB");
308
xa_storage_engine= (plugin::XaStorageEngine*)engine;
312
xa_storage_engine= NULL;
309
static plugin::XaStorageEngine& engine= static_cast<plugin::XaStorageEngine&>(*plugin::StorageEngine::findByName("InnoDB"));
316
void TransactionServices::registerResourceForStatement(Session::reference session,
313
void TransactionServices::registerResourceForStatement(Session& session,
317
314
plugin::MonitoredInTransaction *monitored,
318
315
plugin::TransactionalStorageEngine *engine)
329
326
registerResourceForTransaction(session, monitored, engine);
332
TransactionContext *trans= &session.transaction.stmt;
333
ResourceContext *resource_context= session.getResourceContext(monitored, 0);
329
TransactionContext& trans= session.transaction.stmt;
330
ResourceContext& resource_context= session.getResourceContext(*monitored, 0);
335
if (resource_context->isStarted())
332
if (resource_context.isStarted())
336
333
return; /* already registered, return */
338
335
assert(monitored->participatesInSqlTransaction());
339
336
assert(not monitored->participatesInXaTransaction());
341
resource_context->setMonitored(monitored);
342
resource_context->setTransactionalStorageEngine(engine);
343
trans->registerResource(resource_context);
345
trans->no_2pc|= true;
338
resource_context.setMonitored(monitored);
339
resource_context.setTransactionalStorageEngine(engine);
340
trans.registerResource(&resource_context);
348
void TransactionServices::registerResourceForStatement(Session::reference session,
344
void TransactionServices::registerResourceForStatement(Session& session,
349
345
plugin::MonitoredInTransaction *monitored,
350
346
plugin::TransactionalStorageEngine *engine,
351
347
plugin::XaResourceManager *resource_manager)
362
358
registerResourceForTransaction(session, monitored, engine, resource_manager);
365
TransactionContext *trans= &session.transaction.stmt;
366
ResourceContext *resource_context= session.getResourceContext(monitored, 0);
361
TransactionContext& trans= session.transaction.stmt;
362
ResourceContext& resource_context= session.getResourceContext(*monitored, 0);
368
if (resource_context->isStarted())
364
if (resource_context.isStarted())
369
365
return; /* already registered, return */
371
367
assert(monitored->participatesInXaTransaction());
372
368
assert(monitored->participatesInSqlTransaction());
374
resource_context->setMonitored(monitored);
375
resource_context->setTransactionalStorageEngine(engine);
376
resource_context->setXaResourceManager(resource_manager);
377
trans->registerResource(resource_context);
379
trans->no_2pc|= false;
370
resource_context.setMonitored(monitored);
371
resource_context.setTransactionalStorageEngine(engine);
372
resource_context.setXaResourceManager(resource_manager);
373
trans.registerResource(&resource_context);
382
void TransactionServices::registerResourceForTransaction(Session::reference session,
376
void TransactionServices::registerResourceForTransaction(Session& session,
383
377
plugin::MonitoredInTransaction *monitored,
384
378
plugin::TransactionalStorageEngine *engine)
386
TransactionContext *trans= &session.transaction.all;
387
ResourceContext *resource_context= session.getResourceContext(monitored, 1);
380
TransactionContext& trans= session.transaction.all;
381
ResourceContext& resource_context= session.getResourceContext(*monitored, 1);
389
if (resource_context->isStarted())
383
if (resource_context.isStarted())
390
384
return; /* already registered, return */
392
386
session.server_status|= SERVER_STATUS_IN_TRANS;
394
trans->registerResource(resource_context);
388
trans.registerResource(&resource_context);
396
390
assert(monitored->participatesInSqlTransaction());
397
391
assert(not monitored->participatesInXaTransaction());
399
resource_context->setMonitored(monitored);
400
resource_context->setTransactionalStorageEngine(engine);
401
trans->no_2pc|= true;
393
resource_context.setMonitored(monitored);
394
resource_context.setTransactionalStorageEngine(engine);
403
397
if (session.transaction.xid_state.xid.is_null())
404
398
session.transaction.xid_state.xid.set(session.getQueryId());
406
400
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
407
if (! session.getResourceContext(monitored, 0)->isStarted())
401
if (not session.getResourceContext(*monitored, 0).isStarted())
408
402
registerResourceForStatement(session, monitored, engine);
411
void TransactionServices::registerResourceForTransaction(Session::reference session,
405
void TransactionServices::registerResourceForTransaction(Session& session,
412
406
plugin::MonitoredInTransaction *monitored,
413
407
plugin::TransactionalStorageEngine *engine,
414
408
plugin::XaResourceManager *resource_manager)
416
410
TransactionContext *trans= &session.transaction.all;
417
ResourceContext *resource_context= session.getResourceContext(monitored, 1);
411
ResourceContext& resource_context= session.getResourceContext(*monitored, 1);
419
if (resource_context->isStarted())
413
if (resource_context.isStarted())
420
414
return; /* already registered, return */
422
416
session.server_status|= SERVER_STATUS_IN_TRANS;
424
trans->registerResource(resource_context);
418
trans->registerResource(&resource_context);
426
420
assert(monitored->participatesInSqlTransaction());
428
resource_context->setMonitored(monitored);
429
resource_context->setXaResourceManager(resource_manager);
430
resource_context->setTransactionalStorageEngine(engine);
431
trans->no_2pc|= true;
422
resource_context.setMonitored(monitored);
423
resource_context.setXaResourceManager(resource_manager);
424
resource_context.setTransactionalStorageEngine(engine);
433
427
if (session.transaction.xid_state.xid.is_null())
434
428
session.transaction.xid_state.xid.set(session.getQueryId());
436
430
engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
438
432
/* Only true if user is executing a BEGIN WORK/START TRANSACTION */
439
if (! session.getResourceContext(monitored, 0)->isStarted())
433
if (! session.getResourceContext(*monitored, 0).isStarted())
440
434
registerResourceForStatement(session, monitored, engine, resource_manager);
443
437
void TransactionServices::allocateNewTransactionId()
445
ReplicationServices &replication_services= ReplicationServices::singleton();
446
if (! replication_services.isActive())
439
if (! ReplicationServices::isActive())
451
444
Session *my_session= current_session;
452
uint64_t xa_id= xa_storage_engine->getNewTransactionId(my_session);
445
uint64_t xa_id= xa_storage_engine().getNewTransactionId(my_session);
453
446
my_session->setXaId(xa_id);
456
uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
449
uint64_t TransactionServices::getCurrentTransactionId(Session& session)
458
451
if (session.getXaId() == 0)
460
session.setXaId(xa_storage_engine->getNewTransactionId(&session));
453
session.setXaId(xa_storage_engine().getNewTransactionId(&session));
463
456
return session.getXaId();
466
int TransactionServices::commitTransaction(Session::reference session,
459
int TransactionServices::commitTransaction(Session& session,
467
460
bool normal_transaction)
469
462
int error= 0, cookie= 0;
575
566
if (resource_contexts.empty() == false)
577
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
578
it != resource_contexts.end();
568
BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
582
ResourceContext *resource_context= *it;
584
570
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
586
572
if (resource->participatesInXaTransaction())
588
if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
574
if (int err= resource_context->getXaResourceManager()->xaCommit(&session, all))
590
576
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
637
623
We must not rollback the normal transaction if a statement
638
624
transaction is pending.
640
assert(session.transaction.stmt.getResourceContexts().empty() ||
641
trans == &session.transaction.stmt);
626
assert(session.transaction.stmt.getResourceContexts().empty() || trans == &session.transaction.stmt);
643
628
if (resource_contexts.empty() == false)
645
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
646
it != resource_contexts.end();
630
BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
650
ResourceContext *resource_context= *it;
652
632
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
654
634
if (resource->participatesInXaTransaction())
656
if ((err= resource_context->getXaResourceManager()->xaRollback(&session, all)))
636
if (int err= resource_context->getXaResourceManager()->xaRollback(&session, all))
658
638
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
781
757
rolling back to savepoint in all storage engines that were part of the
782
758
transaction when the savepoint was set
784
for (TransactionContext::ResourceContexts::iterator it= sv_resource_contexts.begin();
785
it != sv_resource_contexts.end();
760
BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, sv_resource_contexts)
789
ResourceContext *resource_context= *it;
791
762
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
793
764
if (resource->participatesInSqlTransaction())
795
if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv)))
766
if (int err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv))
797
768
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
838
809
* savepoint's resource contexts.
841
for (TransactionContext::ResourceContexts::iterator it= set_difference_contexts.begin();
842
it != set_difference_contexts.end();
812
BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, set_difference_contexts)
845
ResourceContext *resource_context= *it;
848
814
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
850
816
if (resource->participatesInSqlTransaction())
852
if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, !(0))))
818
if (int err= resource_context->getTransactionalStorageEngine()->rollback(&session, true))
854
820
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
908
874
if (resource_contexts.empty() == false)
910
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
911
it != resource_contexts.end();
876
BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
914
ResourceContext *resource_context= *it;
917
878
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
919
880
if (resource->participatesInSqlTransaction())
921
if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv)))
882
if (int err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv))
923
884
my_error(ER_GET_ERRNO, MYF(0), err);
953
int TransactionServices::releaseSavepoint(Session::reference session,
914
int TransactionServices::releaseSavepoint(Session& session,
954
915
NamedSavepoint &sv)
958
919
TransactionContext::ResourceContexts &resource_contexts= sv.getResourceContexts();
960
for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
961
it != resource_contexts.end();
921
BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
965
ResourceContext *resource_context= *it;
967
923
plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
969
925
if (resource->participatesInSqlTransaction())
971
if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(&session, sv)))
927
if (int err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(&session, sv))
973
929
my_error(ER_GET_ERRNO, MYF(0), err);
1094
1046
void TransactionServices::initStatementMessage(message::Statement &statement,
1095
1047
message::Statement::Type type,
1096
Session::const_reference session)
1048
const Session& session)
1098
1050
statement.set_type(type);
1099
statement.set_start_timestamp(session.getCurrentTimestamp());
1051
statement.set_start_timestamp(session.times.getCurrentTimestamp());
1101
1053
if (session.variables.replicate_query)
1102
1054
statement.set_sql(session.getQueryString()->c_str());
1105
1057
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1106
Session::reference session)
1108
statement.set_end_timestamp(session.getCurrentTimestamp());
1060
statement.set_end_timestamp(session.times.getCurrentTimestamp());
1109
1061
session.setStatementMessage(NULL);
1112
void TransactionServices::rollbackTransactionMessage(Session::reference session)
1064
void TransactionServices::rollbackTransactionMessage(Session& session)
1114
ReplicationServices &replication_services= ReplicationServices::singleton();
1115
if (! replication_services.isActive())
1066
if (! ReplicationServices::isActive())
1118
1069
message::Transaction *transaction= getActiveTransactionMessage(session);
1934
1864
finalizeTransactionMessage(*transaction, session);
1936
(void) replication_services.pushTransactionMessage(session, *transaction);
1866
(void) ReplicationServices::pushTransactionMessage(session, *transaction);
1938
1868
cleanupTransactionMessage(transaction, session);
1942
void TransactionServices::createSchema(Session::reference session,
1872
void TransactionServices::createSchema(Session& session,
1943
1873
const message::Schema &schema)
1945
ReplicationServices &replication_services= ReplicationServices::singleton();
1946
if (! replication_services.isActive())
1875
if (! ReplicationServices::isActive())
1949
1878
if (not message::is_replicated(schema))
1967
1896
finalizeTransactionMessage(*transaction, session);
1969
(void) replication_services.pushTransactionMessage(session, *transaction);
1898
(void) ReplicationServices::pushTransactionMessage(session, *transaction);
1971
1900
cleanupTransactionMessage(transaction, session);
1975
void TransactionServices::dropSchema(Session::reference session,
1976
identifier::Schema::const_reference identifier,
1904
void TransactionServices::dropSchema(Session& session,
1905
const identifier::Schema& identifier,
1977
1906
message::schema::const_reference schema)
1979
ReplicationServices &replication_services= ReplicationServices::singleton();
1980
if (not replication_services.isActive())
1908
if (not ReplicationServices::isActive())
1983
1911
if (not message::is_replicated(schema))
2001
1929
finalizeTransactionMessage(*transaction, session);
2003
(void) replication_services.pushTransactionMessage(session, *transaction);
1931
(void) ReplicationServices::pushTransactionMessage(session, *transaction);
2005
1933
cleanupTransactionMessage(transaction, session);
2008
void TransactionServices::alterSchema(Session::reference session,
1936
void TransactionServices::alterSchema(Session& session,
2009
1937
const message::Schema &old_schema,
2010
1938
const message::Schema &new_schema)
2012
ReplicationServices &replication_services= ReplicationServices::singleton();
2013
if (! replication_services.isActive())
1940
if (! ReplicationServices::isActive())
2016
1943
if (not message::is_replicated(old_schema))
2038
1965
finalizeTransactionMessage(*transaction, session);
2040
(void) replication_services.pushTransactionMessage(session, *transaction);
1967
(void) ReplicationServices::pushTransactionMessage(session, *transaction);
2042
1969
cleanupTransactionMessage(transaction, session);
2045
void TransactionServices::dropTable(Session::reference session,
2046
identifier::Table::const_reference identifier,
1972
void TransactionServices::dropTable(Session& session,
1973
const identifier::Table& identifier,
2047
1974
message::table::const_reference table,
2048
1975
bool if_exists)
2050
ReplicationServices &replication_services= ReplicationServices::singleton();
2051
if (! replication_services.isActive())
1977
if (! ReplicationServices::isActive())
2054
1980
if (not message::is_replicated(table))
2077
2003
finalizeTransactionMessage(*transaction, session);
2079
(void) replication_services.pushTransactionMessage(session, *transaction);
2005
(void) ReplicationServices::pushTransactionMessage(session, *transaction);
2081
2007
cleanupTransactionMessage(transaction, session);
2084
void TransactionServices::truncateTable(Session::reference session,
2010
void TransactionServices::truncateTable(Session& session, Table &table)
2087
ReplicationServices &replication_services= ReplicationServices::singleton();
2088
if (! replication_services.isActive())
2012
if (! ReplicationServices::isActive())
2091
2015
if (not table.getShare()->is_replicated())
2103
2027
message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
2104
2028
message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2107
(void) table.getShare()->getSchemaName(schema_name);
2110
(void) table.getShare()->getTableName(table_name);
2112
table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2113
table_metadata->set_table_name(table_name.c_str(), table_name.length());
2030
table_metadata->set_schema_name(table.getShare()->getSchemaName());
2031
table_metadata->set_table_name(table.getShare()->getTableName());
2115
2033
finalizeStatementMessage(*statement, session);
2117
2035
finalizeTransactionMessage(*transaction, session);
2119
(void) replication_services.pushTransactionMessage(session, *transaction);
2037
(void) ReplicationServices::pushTransactionMessage(session, *transaction);
2121
2039
cleanupTransactionMessage(transaction, session);
2124
void TransactionServices::rawStatement(Session::reference session,
2042
void TransactionServices::rawStatement(Session& session,
2125
2043
const string &query,
2126
2044
const string &schema)
2128
ReplicationServices &replication_services= ReplicationServices::singleton();
2129
if (! replication_services.isActive())
2046
if (! ReplicationServices::isActive())
2132
2049
message::Transaction *transaction= getActiveTransactionMessage(session);
2141
2058
finalizeTransactionMessage(*transaction, session);
2143
(void) replication_services.pushTransactionMessage(session, *transaction);
2060
(void) ReplicationServices::pushTransactionMessage(session, *transaction);
2145
2062
cleanupTransactionMessage(transaction, session);
2148
int TransactionServices::sendEvent(Session::reference session,
2149
const message::Event &event)
2065
int TransactionServices::sendEvent(Session& session, const message::Event &event)
2151
ReplicationServices &replication_services= ReplicationServices::singleton();
2152
if (! replication_services.isActive())
2067
if (not ReplicationServices::isActive())
2155
message::Transaction *transaction= new (nothrow) message::Transaction();
2069
message::Transaction transaction;
2157
2071
// set server id, start timestamp
2158
initTransactionMessage(*transaction, session, true);
2072
initTransactionMessage(transaction, session, true);
2160
2074
// set end timestamp
2161
finalizeTransactionMessage(*transaction, session);
2163
message::Event *trx_event= transaction->mutable_event();
2075
finalizeTransactionMessage(transaction, session);
2077
message::Event *trx_event= transaction.mutable_event();
2165
2078
trx_event->CopyFrom(event);
2167
plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
2171
return static_cast<int>(result);
2079
plugin::ReplicationReturnCode result= ReplicationServices::pushTransactionMessage(session, transaction);
2174
bool TransactionServices::sendStartupEvent(Session::reference session)
2083
bool TransactionServices::sendStartupEvent(Session& session)
2176
2085
message::Event event;
2177
2086
event.set_type(message::Event::STARTUP);
2178
if (sendEvent(session, event) != 0)
2087
return not sendEvent(session, event);
2183
bool TransactionServices::sendShutdownEvent(Session::reference session)
2090
bool TransactionServices::sendShutdownEvent(Session& session)
2185
2092
message::Event event;
2186
2093
event.set_type(message::Event::SHUTDOWN);
2187
if (sendEvent(session, event) != 0)
2094
return not sendEvent(session, event);
2192
2097
} /* namespace drizzled */