~thomir-deactivatedaccount/drizzle/drizzle-fix-bug653747

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Brian Aker
  • Date: 2010-10-10 02:07:52 UTC
  • mfrom: (1827.2.3 staging)
  • Revision ID: brian@tangent.org-20101010020752-ktv73isay5dxtvp3
Merge in switch on table_share_instance inheritance.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1339
1339
    }
1340
1340
    else
1341
1341
    {
1342
 
      if (useExistingUpdateHeader(*statement, in_table, old_record, new_record))
 
1342
      const message::UpdateHeader &update_header= statement->update_header();
 
1343
      string old_table_name= update_header.table_metadata().table_name();
 
1344
 
 
1345
      string current_table_name;
 
1346
      (void) in_table->getShare()->getTableName(current_table_name);
 
1347
      if (current_table_name.compare(old_table_name))
 
1348
      {
 
1349
        finalizeStatementMessage(*statement, in_session);
 
1350
        statement= in_session->getStatementMessage();
 
1351
      }
 
1352
      else
1343
1353
      {
1344
1354
        /* carry forward the existing segment id */
1345
1355
        const message::UpdateData &current_data= statement->update_data();
1346
1356
        *next_segment_id= current_data.segment_id();
1347
 
      } 
1348
 
      else 
1349
 
      {
1350
 
        finalizeStatementMessage(*statement, in_session);
1351
 
        statement= in_session->getStatementMessage();
1352
1357
      }
1353
1358
    }
1354
1359
  }
1375
1380
  return *statement;
1376
1381
}
1377
1382
 
1378
 
bool TransactionServices::useExistingUpdateHeader(message::Statement &statement,
1379
 
                                                  Table *in_table,
1380
 
                                                  const unsigned char *old_record,
1381
 
                                                  const unsigned char *new_record)
1382
 
{
1383
 
  const message::UpdateHeader &update_header= statement.update_header();
1384
 
  string old_table_name= update_header.table_metadata().table_name();
1385
 
 
1386
 
  string current_table_name;
1387
 
  (void) in_table->getShare()->getTableName(current_table_name);
1388
 
  if (current_table_name.compare(old_table_name))
1389
 
  {
1390
 
    return false;
1391
 
  }
1392
 
  else
1393
 
  {
1394
 
    /* Compare the set fields in the existing UpdateHeader and see if they
1395
 
     * match the updated fields in the new record, if they do not we must
1396
 
     * create a new UpdateHeader 
1397
 
     */
1398
 
    size_t num_set_fields= update_header.set_field_metadata_size();
1399
 
 
1400
 
    Field *current_field;
1401
 
    Field **table_fields= in_table->getFields();
1402
 
    in_table->setReadSet();
1403
 
 
1404
 
    size_t num_calculated_updated_fields= 0;
1405
 
    bool found= false;
1406
 
    while ((current_field= *table_fields++) != NULL)
1407
 
    {
1408
 
      if (num_calculated_updated_fields > num_set_fields)
1409
 
      {
1410
 
        break;
1411
 
      }
1412
 
 
1413
 
      if (isFieldUpdated(current_field, in_table, old_record, new_record))
1414
 
      {
1415
 
        /* check that this field exists in the UpdateHeader record */
1416
 
        found= false;
1417
 
 
1418
 
        for (size_t x= 0; x < num_set_fields; ++x)
1419
 
        {
1420
 
          const message::FieldMetadata &field_metadata= update_header.set_field_metadata(x);
1421
 
          string name= field_metadata.name();
1422
 
          if (name.compare(current_field->field_name) == 0)
1423
 
          {
1424
 
            found= true;
1425
 
            ++num_calculated_updated_fields;
1426
 
            break;
1427
 
          } 
1428
 
        }
1429
 
        if (! found)
1430
 
        {
1431
 
          break;
1432
 
        } 
1433
 
      }
1434
 
    }
1435
 
 
1436
 
    if ((num_calculated_updated_fields == num_set_fields) && found)
1437
 
    {
1438
 
      return true;
1439
 
    } 
1440
 
    else 
1441
 
    {
1442
 
      return false;
1443
 
    }
1444
 
  }
1445
 
}  
1446
 
 
1447
1383
void TransactionServices::setUpdateHeader(message::Statement &statement,
1448
1384
                                          Session *in_session,
1449
1385
                                          Table *in_table,
2005
1941
  cleanupTransactionMessage(transaction, in_session);
2006
1942
}
2007
1943
 
2008
 
int TransactionServices::sendEvent(Session *session, const message::Event &event)
2009
 
{
2010
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2011
 
  if (! replication_services.isActive())
2012
 
    return 0;
2013
 
 
2014
 
  message::Transaction *transaction= new (nothrow) message::Transaction();
2015
 
 
2016
 
  // set server id, start timestamp
2017
 
  initTransactionMessage(*transaction, session, true);
2018
 
 
2019
 
  // set end timestamp
2020
 
  finalizeTransactionMessage(*transaction, session);
2021
 
 
2022
 
  message::Event *trx_event= transaction->mutable_event();
2023
 
 
2024
 
  trx_event->CopyFrom(event);
2025
 
 
2026
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*session, *transaction);
2027
 
 
2028
 
  delete transaction;
2029
 
 
2030
 
  return static_cast<int>(result);
2031
 
}
2032
 
 
2033
 
bool TransactionServices::sendStartupEvent(Session *session)
2034
 
{
2035
 
  message::Event event;
2036
 
  event.set_type(message::Event::STARTUP);
2037
 
  if (sendEvent(session, event) != 0)
2038
 
    return false;
2039
 
  return true;
2040
 
}
2041
 
 
2042
 
bool TransactionServices::sendShutdownEvent(Session *session)
2043
 
{
2044
 
  message::Event event;
2045
 
  event.set_type(message::Event::SHUTDOWN);
2046
 
  if (sendEvent(session, event) != 0)
2047
 
    return false;
2048
 
  return true;
2049
 
}
2050
 
 
2051
1944
} /* namespace drizzled */