~drizzle-developers/ubuntu/natty/drizzle/natty

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Monty Taylor
  • Date: 2010-09-09 18:35:25 UTC
  • mto: (1308.1.63 trunk)
  • mto: This revision was merged to the branch mainline in revision 1312.
  • Revision ID: mordred@inaugust.com-20100909183525-6l72i1glofd3qe5m
Tags: upstream-2010.08.1742
ImportĀ upstreamĀ versionĀ 2010.08.1742

Show diffs side-by-side

added added

removed removed

Lines of Context:
80
80
namespace drizzled
81
81
{
82
82
 
 
83
/** @TODO: Make this a system variable */
 
84
static const size_t trx_msg_threshold= 1024 * 1024;
 
85
 
83
86
/**
84
87
 * @defgroup Transactions
85
88
 *
895
898
  return replication_services.isActive();
896
899
}
897
900
 
898
 
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session)
 
901
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
899
902
{
900
903
  message::Transaction *transaction= in_session->getTransactionMessage();
901
904
 
907
910
     * deleting transaction message when done with it.
908
911
     */
909
912
    transaction= new (nothrow) message::Transaction();
910
 
    initTransactionMessage(*transaction, in_session);
 
913
    initTransactionMessage(*transaction, in_session, should_inc_trx_id);
911
914
    in_session->setTransactionMessage(transaction);
912
915
    return transaction;
913
916
  }
916
919
}
917
920
 
918
921
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
919
 
                                          Session *in_session)
 
922
                                                 Session *in_session,
 
923
                                                 bool should_inc_trx_id)
920
924
{
921
925
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
922
926
  trx->set_server_id(in_session->getServerId());
923
 
  trx->set_transaction_id(getNextTransactionId());
 
927
 
 
928
  if (should_inc_trx_id)
 
929
    trx->set_transaction_id(getNextTransactionId());
 
930
  else
 
931
    trx->set_transaction_id(getCurrentTransactionId());
 
932
 
924
933
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
925
934
}
926
935
 
1014
1023
     * attach it to the transaction, and push it to replicators.
1015
1024
     */
1016
1025
    transaction->Clear();
1017
 
    initTransactionMessage(*transaction, in_session);
 
1026
    initTransactionMessage(*transaction, in_session, true);
1018
1027
 
1019
1028
    message::Statement *statement= transaction->add_statement();
1020
1029
 
1029
1038
}
1030
1039
 
1031
1040
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
1032
 
                                                                 Table *in_table)
 
1041
                                                            Table *in_table,
 
1042
                                                            uint32_t *next_segment_id)
1033
1043
{
1034
1044
  message::Statement *statement= in_session->getStatementMessage();
 
1045
  message::Transaction *transaction= NULL;
1035
1046
 
1036
1047
  /* 
1037
1048
   * Check the type for the current Statement message, if it is anything
1047
1058
  } 
1048
1059
  else if (statement != NULL)
1049
1060
  {
1050
 
    const message::InsertHeader &insert_header= statement->insert_header();
1051
 
    string old_table_name= insert_header.table_metadata().table_name();
 
1061
    /*
 
1062
     * If we've passed our threshold for the statement size (possible for
 
1063
     * a bulk insert), we'll finalize the Statement and Transaction (doing
 
1064
     * the Transaction will keep it from getting huge).
 
1065
     */
 
1066
    if (static_cast<size_t>(statement->ByteSize()) >= trx_msg_threshold)
 
1067
    {
 
1068
      message::InsertData *current_data= statement->mutable_insert_data();
 
1069
 
 
1070
      /* Caller should use this value when adding a new record */
 
1071
      *next_segment_id= current_data->segment_id() + 1;
 
1072
 
 
1073
      current_data->set_end_segment(false);
 
1074
 
 
1075
      /* 
 
1076
       * Send the trx message to replicators after finalizing the 
 
1077
       * statement and transaction. This will also set the Transaction
 
1078
       * and Statement objects in Session to NULL.
 
1079
       */
 
1080
      commitTransactionMessage(in_session);
 
1081
 
 
1082
      /*
 
1083
       * Statement and Transaction should now be NULL, so new ones will get
 
1084
       * created. We reuse the transaction id since we are segmenting
 
1085
       * one transaction.
 
1086
       */
 
1087
      statement= in_session->getStatementMessage();
 
1088
      transaction= getActiveTransactionMessage(in_session, false);
 
1089
    }
 
1090
    else
 
1091
    {
 
1092
      const message::InsertHeader &insert_header= statement->insert_header();
 
1093
      string old_table_name= insert_header.table_metadata().table_name();
1052
1094
     
1053
 
    string current_table_name;
1054
 
    (void) in_table->getShare()->getTableName(current_table_name);
1055
 
    if (current_table_name.compare(old_table_name))
1056
 
    {
1057
 
      finalizeStatementMessage(*statement, in_session);
1058
 
      statement= in_session->getStatementMessage();
 
1095
      string current_table_name;
 
1096
      (void) in_table->getShare()->getTableName(current_table_name);
 
1097
      if (current_table_name.compare(old_table_name))
 
1098
      {
 
1099
        finalizeStatementMessage(*statement, in_session);
 
1100
        statement= in_session->getStatementMessage();
 
1101
      }
1059
1102
    }
1060
1103
  } 
1061
1104
 
1062
1105
  if (statement == NULL)
1063
1106
  {
1064
 
    message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1107
    /*
 
1108
     * Transaction will be non-NULL only if we had to segment it due to
 
1109
     * transaction size above.
 
1110
     */
 
1111
    if (transaction == NULL)
 
1112
      transaction= getActiveTransactionMessage(in_session);
 
1113
 
1065
1114
    /* 
1066
1115
     * Transaction message initialized and set, but no statement created
1067
1116
     * yet.  We construct one and initialize it, here, then return the
1132
1181
    return true;
1133
1182
  }
1134
1183
 
1135
 
  message::Statement &statement= getInsertStatement(in_session, in_table);
 
1184
  uint32_t next_segment_id= 1;
 
1185
  message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
1136
1186
 
1137
1187
  message::InsertData *data= statement.mutable_insert_data();
1138
 
  data->set_segment_id(1);
 
1188
  data->set_segment_id(next_segment_id);
1139
1189
  data->set_end_segment(true);
1140
1190
  message::InsertRecord *record= data->add_record();
1141
1191
 
1169
1219
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
1170
1220
                                                            Table *in_table,
1171
1221
                                                            const unsigned char *old_record, 
1172
 
                                                            const unsigned char *new_record)
 
1222
                                                            const unsigned char *new_record,
 
1223
                                                            uint32_t *next_segment_id)
1173
1224
{
1174
1225
  message::Statement *statement= in_session->getStatementMessage();
 
1226
  message::Transaction *transaction= NULL;
1175
1227
 
1176
1228
  /*
1177
1229
   * Check the type for the current Statement message, if it is anything
1187
1239
  }
1188
1240
  else if (statement != NULL)
1189
1241
  {
1190
 
    const message::UpdateHeader &update_header= statement->update_header();
1191
 
    string old_table_name= update_header.table_metadata().table_name();
1192
 
 
1193
 
    string current_table_name;
1194
 
    (void) in_table->getShare()->getTableName(current_table_name);
1195
 
    if (current_table_name.compare(old_table_name))
 
1242
    /*
 
1243
     * If we've passed our threshold for the statement size (possible for
 
1244
     * a bulk insert), we'll finalize the Statement and Transaction (doing
 
1245
     * the Transaction will keep it from getting huge).
 
1246
     */
 
1247
    if (static_cast<size_t>(statement->ByteSize()) >= trx_msg_threshold)
1196
1248
    {
1197
 
      finalizeStatementMessage(*statement, in_session);
 
1249
      message::UpdateData *current_data= statement->mutable_update_data();
 
1250
 
 
1251
      /* Caller should use this value when adding a new record */
 
1252
      *next_segment_id= current_data->segment_id() + 1;
 
1253
 
 
1254
      current_data->set_end_segment(false);
 
1255
 
 
1256
      /*
 
1257
       * Send the trx message to replicators after finalizing the 
 
1258
       * statement and transaction. This will also set the Transaction
 
1259
       * and Statement objects in Session to NULL.
 
1260
       */
 
1261
      commitTransactionMessage(in_session);
 
1262
 
 
1263
      /*
 
1264
       * Statement and Transaction should now be NULL, so new ones will get
 
1265
       * created. We reuse the transaction id since we are segmenting
 
1266
       * one transaction.
 
1267
       */
1198
1268
      statement= in_session->getStatementMessage();
 
1269
      transaction= getActiveTransactionMessage(in_session, false);
 
1270
    }
 
1271
    else
 
1272
    {
 
1273
      const message::UpdateHeader &update_header= statement->update_header();
 
1274
      string old_table_name= update_header.table_metadata().table_name();
 
1275
 
 
1276
      string current_table_name;
 
1277
      (void) in_table->getShare()->getTableName(current_table_name);
 
1278
      if (current_table_name.compare(old_table_name))
 
1279
      {
 
1280
        finalizeStatementMessage(*statement, in_session);
 
1281
        statement= in_session->getStatementMessage();
 
1282
      }
1199
1283
    }
1200
1284
  }
1201
1285
 
1202
1286
  if (statement == NULL)
1203
1287
  {
1204
 
    message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1288
    /*
 
1289
     * Transaction will be non-NULL only if we had to segment it due to
 
1290
     * transaction size above.
 
1291
     */
 
1292
    if (transaction == NULL)
 
1293
      transaction= getActiveTransactionMessage(in_session);
 
1294
 
1205
1295
    /* 
1206
1296
     * Transaction message initialized and set, but no statement created
1207
1297
     * yet.  We construct one and initialize it, here, then return the
1288
1378
  if (! replication_services.isActive())
1289
1379
    return;
1290
1380
 
1291
 
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record);
 
1381
  uint32_t next_segment_id= 1;
 
1382
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
1292
1383
 
1293
1384
  message::UpdateData *data= statement.mutable_update_data();
1294
 
  data->set_segment_id(1);
 
1385
  data->set_segment_id(next_segment_id);
1295
1386
  data->set_end_segment(true);
1296
1387
  message::UpdateRecord *record= data->add_record();
1297
1388
 
1375
1466
}
1376
1467
 
1377
1468
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
1378
 
                                                            Table *in_table)
 
1469
                                                            Table *in_table,
 
1470
                                                            uint32_t *next_segment_id)
1379
1471
{
1380
1472
  message::Statement *statement= in_session->getStatementMessage();
 
1473
  message::Transaction *transaction= NULL;
1381
1474
 
1382
1475
  /*
1383
1476
   * Check the type for the current Statement message, if it is anything
1393
1486
  }
1394
1487
  else if (statement != NULL)
1395
1488
  {
1396
 
    const message::DeleteHeader &delete_header= statement->delete_header();
1397
 
    string old_table_name= delete_header.table_metadata().table_name();
1398
 
 
1399
 
    string current_table_name;
1400
 
    (void) in_table->getShare()->getTableName(current_table_name);
1401
 
    if (current_table_name.compare(old_table_name))
 
1489
    /*
 
1490
     * If we've passed our threshold for the statement size (possible for
 
1491
     * a bulk insert), we'll finalize the Statement and Transaction (doing
 
1492
     * the Transaction will keep it from getting huge).
 
1493
     */
 
1494
    if (static_cast<size_t>(statement->ByteSize()) >= trx_msg_threshold)
1402
1495
    {
1403
 
      finalizeStatementMessage(*statement, in_session);
 
1496
      message::DeleteData *current_data= statement->mutable_delete_data();
 
1497
 
 
1498
      /* Caller should use this value when adding a new record */
 
1499
      *next_segment_id= current_data->segment_id() + 1;
 
1500
 
 
1501
      current_data->set_end_segment(false);
 
1502
 
 
1503
      /* 
 
1504
       * Send the trx message to replicators after finalizing the 
 
1505
       * statement and transaction. This will also set the Transaction
 
1506
       * and Statement objects in Session to NULL.
 
1507
       */
 
1508
      commitTransactionMessage(in_session);
 
1509
 
 
1510
      /*
 
1511
       * Statement and Transaction should now be NULL, so new ones will get
 
1512
       * created. We reuse the transaction id since we are segmenting
 
1513
       * one transaction.
 
1514
       */
1404
1515
      statement= in_session->getStatementMessage();
 
1516
      transaction= getActiveTransactionMessage(in_session, false);
 
1517
    }
 
1518
    else
 
1519
    {
 
1520
      const message::DeleteHeader &delete_header= statement->delete_header();
 
1521
      string old_table_name= delete_header.table_metadata().table_name();
 
1522
 
 
1523
      string current_table_name;
 
1524
      (void) in_table->getShare()->getTableName(current_table_name);
 
1525
      if (current_table_name.compare(old_table_name))
 
1526
      {
 
1527
        finalizeStatementMessage(*statement, in_session);
 
1528
        statement= in_session->getStatementMessage();
 
1529
      }
1405
1530
    }
1406
1531
  }
1407
1532
 
1408
1533
  if (statement == NULL)
1409
1534
  {
1410
 
    message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1535
    /*
 
1536
     * Transaction will be non-NULL only if we had to segment it due to
 
1537
     * transaction size above.
 
1538
     */
 
1539
    if (transaction == NULL)
 
1540
      transaction= getActiveTransactionMessage(in_session);
 
1541
 
1411
1542
    /* 
1412
1543
     * Transaction message initialized and set, but no statement created
1413
1544
     * yet.  We construct one and initialize it, here, then return the
1463
1594
  }
1464
1595
}
1465
1596
 
1466
 
void TransactionServices::deleteRecord(Session *in_session, Table *in_table)
 
1597
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
1467
1598
{
1468
1599
  ReplicationServices &replication_services= ReplicationServices::singleton();
1469
1600
  if (! replication_services.isActive())
1470
1601
    return;
1471
1602
 
1472
 
  message::Statement &statement= getDeleteStatement(in_session, in_table);
 
1603
  uint32_t next_segment_id;
 
1604
  message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
1473
1605
 
1474
1606
  message::DeleteData *data= statement.mutable_delete_data();
1475
 
  data->set_segment_id(1);
 
1607
  data->set_segment_id(next_segment_id);
1476
1608
  data->set_end_segment(true);
1477
1609
  message::DeleteRecord *record= data->add_record();
1478
1610
 
1490
1622
     */
1491
1623
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
1492
1624
    {
1493
 
      string_value= current_field->val_str(string_value);
 
1625
      if (use_update_record)
 
1626
      {
 
1627
        /*
 
1628
         * Temporarily point to the update record to get its value.
 
1629
         * This is pretty much a hack in order to get the PK value from
 
1630
         * the update record rather than the insert record. Field::val_str()
 
1631
         * should not change anything in Field::ptr, so this should be safe.
 
1632
         * We are careful not to change anything in old_ptr.
 
1633
         */
 
1634
        const unsigned char *old_ptr= current_field->ptr;
 
1635
        current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
 
1636
        string_value= current_field->val_str(string_value);
 
1637
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
 
1638
      }
 
1639
      else
 
1640
      {
 
1641
        string_value= current_field->val_str(string_value);
 
1642
        /**
 
1643
         * @TODO Store optional old record value in the before data member
 
1644
         */
 
1645
      }
1494
1646
      record->add_key_value(string_value->c_ptr(), string_value->length());
1495
 
      /**
1496
 
       * @TODO Store optional old record value in the before data member
1497
 
       */
1498
1647
      string_value->free();
1499
1648
    }
1500
1649
  }