~skinny.moey/drizzle/innodb-replication

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Brian Aker
  • Date: 2010-11-08 22:35:57 UTC
  • mfrom: (1802.1.114 trunk)
  • Revision ID: brian@tangent.org-20101108223557-w3xzwp9hjjtjhtc1
MergeĀ inĀ trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
69
69
#include "drizzled/plugin/monitored_in_transaction.h"
70
70
#include "drizzled/plugin/transactional_storage_engine.h"
71
71
#include "drizzled/plugin/xa_resource_manager.h"
 
72
#include "drizzled/plugin/xa_storage_engine.h"
72
73
#include "drizzled/internal/my_sys.h"
73
74
 
74
 
using namespace std;
75
 
 
76
75
#include <vector>
77
76
#include <algorithm>
78
77
#include <functional>
 
78
#include <google/protobuf/repeated_field.h>
 
79
 
 
80
using namespace std;
 
81
using namespace google;
79
82
 
80
83
namespace drizzled
81
84
{
82
85
 
83
 
/** @TODO: Make this a system variable */
84
 
static const size_t trx_msg_threshold= 1024 * 1024;
85
 
 
86
86
/**
87
87
 * @defgroup Transactions
88
88
 *
300
300
 * transaction after all DDLs, just like the statement transaction
301
301
 * is always committed at the end of all statements.
302
302
 */
 
303
TransactionServices::TransactionServices()
 
304
{
 
305
  plugin::StorageEngine *engine= plugin::StorageEngine::findByName("InnoDB");
 
306
  if (engine)
 
307
  {
 
308
    xa_storage_engine= (plugin::XaStorageEngine*)engine; 
 
309
  }
 
310
  else 
 
311
  {
 
312
    xa_storage_engine= NULL;
 
313
  }
 
314
}
 
315
 
303
316
void TransactionServices::registerResourceForStatement(Session *session,
304
317
                                                       plugin::MonitoredInTransaction *monitored,
305
318
                                                       plugin::TransactionalStorageEngine *engine)
390
403
  if (session->transaction.xid_state.xid.is_null())
391
404
    session->transaction.xid_state.xid.set(session->getQueryId());
392
405
 
393
 
  engine->startTransaction(session, START_TRANS_NO_OPTIONS);
394
 
 
395
406
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
396
407
  if (! session->getResourceContext(monitored, 0)->isStarted())
397
408
    registerResourceForStatement(session, monitored, engine);
429
440
    registerResourceForStatement(session, monitored, engine, resource_manager);
430
441
}
431
442
 
 
443
void TransactionServices::allocateNewTransactionId()
 
444
{
 
445
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
446
  if (! replication_services.isActive())
 
447
  {
 
448
    return;
 
449
  }
 
450
 
 
451
  Session *my_session= current_session;
 
452
  uint64_t xa_id= xa_storage_engine->getNewTransactionId(my_session);
 
453
  my_session->setXaId(xa_id);
 
454
}
 
455
 
 
456
uint64_t TransactionServices::getCurrentTransactionId(Session *session)
 
457
{
 
458
  if (session->getXaId() == 0)
 
459
  {
 
460
    session->setXaId(xa_storage_engine->getNewTransactionId(session)); 
 
461
  }
 
462
 
 
463
  return session->getXaId();
 
464
}
 
465
 
432
466
/**
433
467
  @retval
434
468
    0   ok
466
500
 
467
501
  if (resource_contexts.empty() == false)
468
502
  {
469
 
    if (is_real_trans && wait_if_global_read_lock(session, 0, 0))
 
503
    if (is_real_trans && session->wait_if_global_read_lock(false, false))
470
504
    {
471
505
      rollbackTransaction(session, normal_transaction);
472
506
      return 1;
526
560
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
527
561
end:
528
562
    if (is_real_trans)
529
 
      start_waiting_global_read_lock(session);
 
563
      session->startWaitingGlobalReadLock();
530
564
  }
531
565
  return error;
532
566
}
673
707
   */
674
708
  if (is_real_trans &&
675
709
      session->transaction.all.hasModifiedNonTransData() &&
676
 
      session->killed != Session::KILL_CONNECTION)
 
710
      session->getKilled() != Session::KILL_CONNECTION)
677
711
  {
678
712
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
679
713
                 ER_WARNING_NOT_COMPLETE_ROLLBACK,
696
730
*/
697
731
int TransactionServices::autocommitOrRollback(Session *session, int error)
698
732
{
 
733
 
699
734
  if (session->transaction.stmt.getResourceContexts().empty() == false)
700
735
  {
 
736
    TransactionContext *trans = &session->transaction.stmt;
 
737
    TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
 
738
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
 
739
         it != resource_contexts.end();
 
740
         ++it)
 
741
    {
 
742
      ResourceContext *resource_context= *it;
 
743
 
 
744
      resource_context->getTransactionalStorageEngine()->endStatement(session);
 
745
    }
 
746
 
701
747
    if (! error)
702
748
    {
703
749
      if (commitTransaction(session, false))
967
1013
  trx->set_server_id(in_session->getServerId());
968
1014
 
969
1015
  if (should_inc_trx_id)
970
 
    trx->set_transaction_id(getNextTransactionId());
 
1016
  {
 
1017
    trx->set_transaction_id(getCurrentTransactionId(in_session));
 
1018
    in_session->setXaId(0);
 
1019
  }  
 
1020
  else
 
1021
  { 
 
1022
    trx->set_transaction_id(0);
 
1023
  }
971
1024
 
972
1025
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
973
1026
}
1020
1073
{
1021
1074
  statement.set_type(in_type);
1022
1075
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
1023
 
  /** @TODO Set sql string optionally */
1024
1076
}
1025
1077
 
1026
1078
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1110
1162
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1111
1163
     * the Transaction will keep it from getting huge).
1112
1164
     */
1113
 
    if (static_cast<size_t>(transaction->ByteSize()) >= trx_msg_threshold)
 
1165
    if (static_cast<size_t>(transaction->ByteSize()) >= 
 
1166
      in_session->variables.transaction_message_threshold)
1114
1167
    {
1115
1168
      /* Remember the transaction ID so we can re-use it */
1116
1169
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1307
1360
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1308
1361
     * the Transaction will keep it from getting huge).
1309
1362
     */
1310
 
    if (static_cast<size_t>(transaction->ByteSize()) >= trx_msg_threshold)
 
1363
    if (static_cast<size_t>(transaction->ByteSize()) >= 
 
1364
      in_session->variables.transaction_message_threshold)
1311
1365
    {
1312
1366
      /* Remember the transaction ID so we can re-use it */
1313
1367
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1340
1394
    }
1341
1395
    else
1342
1396
    {
1343
 
      const message::UpdateHeader &update_header= statement->update_header();
1344
 
      string old_table_name= update_header.table_metadata().table_name();
1345
 
 
1346
 
      string current_table_name;
1347
 
      (void) in_table->getShare()->getTableName(current_table_name);
1348
 
      if (current_table_name.compare(old_table_name))
1349
 
      {
1350
 
        finalizeStatementMessage(*statement, in_session);
1351
 
        statement= in_session->getStatementMessage();
1352
 
      }
1353
 
      else
 
1397
      if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
1354
1398
      {
1355
1399
        /* carry forward the existing segment id */
1356
1400
        const message::UpdateData &current_data= statement->update_data();
1357
1401
        *next_segment_id= current_data.segment_id();
 
1402
      } 
 
1403
      else 
 
1404
      {
 
1405
        finalizeStatementMessage(*statement, in_session);
 
1406
        statement= in_session->getStatementMessage();
1358
1407
      }
1359
1408
    }
1360
1409
  }
1381
1430
  return *statement;
1382
1431
}
1383
1432
 
 
1433
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
 
1434
                                                  Table *in_table,
 
1435
                                                  const unsigned char *old_record,
 
1436
                                                  const unsigned char *new_record)
 
1437
{
 
1438
  const message::UpdateHeader &update_header= statement.update_header();
 
1439
  string old_table_name= update_header.table_metadata().table_name();
 
1440
 
 
1441
  string current_table_name;
 
1442
  (void) in_table->getShare()->getTableName(current_table_name);
 
1443
  if (current_table_name.compare(old_table_name))
 
1444
  {
 
1445
    return false;
 
1446
  }
 
1447
  else
 
1448
  {
 
1449
    /* Compare the set fields in the existing UpdateHeader and see if they
 
1450
     * match the updated fields in the new record, if they do not we must
 
1451
     * create a new UpdateHeader 
 
1452
     */
 
1453
    size_t num_set_fields= update_header.set_field_metadata_size();
 
1454
 
 
1455
    Field *current_field;
 
1456
    Field **table_fields= in_table->getFields();
 
1457
    in_table->setReadSet();
 
1458
 
 
1459
    size_t num_calculated_updated_fields= 0;
 
1460
    bool found= false;
 
1461
    while ((current_field= *table_fields++) != NULL)
 
1462
    {
 
1463
      if (num_calculated_updated_fields > num_set_fields)
 
1464
      {
 
1465
        break;
 
1466
      }
 
1467
 
 
1468
      if (isFieldUpdated(current_field, in_table, old_record, new_record))
 
1469
      {
 
1470
        /* check that this field exists in the UpdateHeader record */
 
1471
        found= false;
 
1472
 
 
1473
        for (size_t x= 0; x < num_set_fields; ++x)
 
1474
        {
 
1475
          const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
 
1476
          string name= field_metadata.name();
 
1477
          if (name.compare(current_field->field_name) == 0)
 
1478
          {
 
1479
            found= true;
 
1480
            ++num_calculated_updated_fields;
 
1481
            break;
 
1482
          } 
 
1483
        }
 
1484
        if (! found)
 
1485
        {
 
1486
          break;
 
1487
        } 
 
1488
      }
 
1489
    }
 
1490
 
 
1491
    if ((num_calculated_updated_fields == num_set_fields) && found)
 
1492
    {
 
1493
      return true;
 
1494
    } 
 
1495
    else 
 
1496
    {
 
1497
      return false;
 
1498
    }
 
1499
  }
 
1500
}  
 
1501
 
1384
1502
void TransactionServices::setUpdateHeader(message::Statement &statement,
1385
1503
                                          Session *in_session,
1386
1504
                                          Table *in_table,
1591
1709
     * a bulk insert), we'll finalize the Statement and Transaction (doing
1592
1710
     * the Transaction will keep it from getting huge).
1593
1711
     */
1594
 
    if (static_cast<size_t>(transaction->ByteSize()) >= trx_msg_threshold)
 
1712
    if (static_cast<size_t>(transaction->ByteSize()) >= 
 
1713
      in_session->variables.transaction_message_threshold)
1595
1714
    {
1596
1715
      /* Remember the transaction ID so we can re-use it */
1597
1716
      uint64_t trx_id= transaction->transaction_context().transaction_id();
1762
1881
  }
1763
1882
}
1764
1883
 
 
1884
 
 
1885
/**
 
1886
 * Template for removing Statement records of different types.
 
1887
 *
 
1888
 * The code for removing records from different Statement message types
 
1889
 * is identical except for the class types that are embedded within the
 
1890
 * Statement.
 
1891
 *
 
1892
 * There are 3 scenarios we need to look for:
 
1893
 *   - We've been asked to remove more records than exist in the Statement
 
1894
 *   - We've been asked to remove less records than exist in the Statement
 
1895
 *   - We've been asked to remove ALL records that exist in the Statement
 
1896
 *
 
1897
 * If we are removing ALL records, then effectively we would be left with
 
1898
 * an empty Statement message, so we should just remove it and clean up
 
1899
 * message pointers in the Session object.
 
1900
 */
 
1901
template <class DataType, class RecordType>
 
1902
static bool removeStatementRecordsWithType(Session *session,
 
1903
                                           DataType *data,
 
1904
                                           uint32_t count)
 
1905
{
 
1906
  uint32_t num_avail_recs= static_cast<uint32_t>(data->record_size());
 
1907
 
 
1908
  /* If there aren't enough records to remove 'count' of them, error. */
 
1909
  if (num_avail_recs < count)
 
1910
    return false;
 
1911
 
 
1912
  /*
 
1913
   * If we are removing all of the data records, we'll just remove this
 
1914
   * entire Statement message.
 
1915
   */
 
1916
  if (num_avail_recs == count)
 
1917
  {
 
1918
    message::Transaction *transaction= session->getTransactionMessage();
 
1919
    protobuf::RepeatedPtrField<message::Statement> *statements= transaction->mutable_statement();
 
1920
    statements->RemoveLast();
 
1921
 
 
1922
    /*
 
1923
     * Now need to set the Session Statement pointer to either the previous
 
1924
     * Statement, or NULL if there isn't one.
 
1925
     */
 
1926
    if (statements->size() == 0)
 
1927
    {
 
1928
      session->setStatementMessage(NULL);
 
1929
    }
 
1930
    else
 
1931
    {
 
1932
      /*
 
1933
       * There isn't a great way to get a pointer to the previous Statement
 
1934
       * message using the RepeatedPtrField object, so we'll just get to it
 
1935
       * using the Transaction message.
 
1936
       */
 
1937
      int last_stmt_idx= transaction->statement_size() - 1;
 
1938
      session->setStatementMessage(transaction->mutable_statement(last_stmt_idx));
 
1939
    }
 
1940
  }
 
1941
  /* We only need to remove 'count' records */
 
1942
  else if (num_avail_recs > count)
 
1943
  {
 
1944
    protobuf::RepeatedPtrField<RecordType> *records= data->mutable_record();
 
1945
    while (count--)
 
1946
      records->RemoveLast();
 
1947
  }
 
1948
 
 
1949
  return true;
 
1950
}
 
1951
 
 
1952
 
 
1953
bool TransactionServices::removeStatementRecords(Session *session,
 
1954
                                                 uint32_t count)
 
1955
{
 
1956
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1957
  if (! replication_services.isActive())
 
1958
    return false;
 
1959
 
 
1960
  /* Get the most current Statement */
 
1961
  message::Statement *statement= session->getStatementMessage();
 
1962
 
 
1963
  /* Make sure we have work to do */
 
1964
  if (statement == NULL)
 
1965
    return false;
 
1966
 
 
1967
  bool retval= false;
 
1968
 
 
1969
  switch (statement->type())
 
1970
  {
 
1971
    case message::Statement::INSERT:
 
1972
    {
 
1973
      message::InsertData *data= statement->mutable_insert_data();
 
1974
      retval= removeStatementRecordsWithType<message::InsertData, message::InsertRecord>(session, data, count);
 
1975
      break;
 
1976
    }
 
1977
 
 
1978
    case message::Statement::UPDATE:
 
1979
    {
 
1980
      message::UpdateData *data= statement->mutable_update_data();
 
1981
      retval= removeStatementRecordsWithType<message::UpdateData, message::UpdateRecord>(session, data, count);
 
1982
      break;
 
1983
    }
 
1984
 
 
1985
    case message::Statement::DELETE:  /* not sure if this one is possible... */
 
1986
    {
 
1987
      message::DeleteData *data= statement->mutable_delete_data();
 
1988
      retval= removeStatementRecordsWithType<message::DeleteData, message::DeleteRecord>(session, data, count);
 
1989
      break;
 
1990
    }
 
1991
 
 
1992
    default:
 
1993
      retval= false;
 
1994
      break;
 
1995
  }
 
1996
 
 
1997
  return retval;
 
1998
}
 
1999
 
 
2000
 
1765
2001
void TransactionServices::createTable(Session *in_session,
1766
2002
                                      const message::Table &table)
1767
2003
{
1941
2177
  cleanupTransactionMessage(transaction, in_session);
1942
2178
}
1943
2179
 
 
2180
int TransactionServices::sendEvent(Session *session, const message::Event &event)
 
2181
{
 
2182
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
2183
  if (! replication_services.isActive())
 
2184
    return 0;
 
2185
 
 
2186
  message::Transaction *transaction= new (nothrow) message::Transaction();
 
2187
 
 
2188
  // set server id, start timestamp
 
2189
  initTransactionMessage(*transaction, session, true);
 
2190
 
 
2191
  // set end timestamp
 
2192
  finalizeTransactionMessage(*transaction, session);
 
2193
 
 
2194
  message::Event *trx_event= transaction->mutable_event();
 
2195
 
 
2196
  trx_event->CopyFrom(event);
 
2197
 
 
2198
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
 
2199
 
 
2200
  delete transaction;
 
2201
 
 
2202
  return static_cast<int>(result);
 
2203
}
 
2204
 
 
2205
bool TransactionServices::sendStartupEvent(Session *session)
 
2206
{
 
2207
  message::Event event;
 
2208
  event.set_type(message::Event::STARTUP);
 
2209
  if (sendEvent(session, event) != 0)
 
2210
    return false;
 
2211
  return true;
 
2212
}
 
2213
 
 
2214
bool TransactionServices::sendShutdownEvent(Session *session)
 
2215
{
 
2216
  message::Event event;
 
2217
  event.set_type(message::Event::SHUTDOWN);
 
2218
  if (sendEvent(session, event) != 0)
 
2219
    return false;
 
2220
  return true;
 
2221
}
 
2222
 
1944
2223
} /* namespace drizzled */