~ubuntu-branches/ubuntu/saucy/drizzle/saucy-proposed

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Package Import Robot
  • Author(s): Clint Byrum
  • Date: 2012-06-19 10:46:49 UTC
  • mfrom: (1.1.6)
  • mto: This revision was merged to the branch mainline in revision 29.
  • Revision ID: package-import@ubuntu.com-20120619104649-e2l0ggd4oz3um0f4
Tags: upstream-7.1.36-stable
ImportĀ upstreamĀ versionĀ 7.1.36-stable

Show diffs side-by-side

added added

removed removed

Lines of Context:
54
54
#include <drizzled/probes.h>
55
55
#include <drizzled/sql_parse.h>
56
56
#include <drizzled/session.h>
 
57
#include <drizzled/session/times.h>
57
58
#include <drizzled/sql_base.h>
58
59
#include <drizzled/replication_services.h>
59
60
#include <drizzled/transaction_services.h>
71
72
#include <drizzled/plugin/xa_resource_manager.h>
72
73
#include <drizzled/plugin/xa_storage_engine.h>
73
74
#include <drizzled/internal/my_sys.h>
 
75
#include <drizzled/statistics_variables.h>
 
76
#include <drizzled/system_variables.h>
 
77
#include <drizzled/session/transactions.h>
74
78
 
75
79
#include <vector>
76
80
#include <algorithm>
80
84
using namespace std;
81
85
using namespace google;
82
86
 
83
 
namespace drizzled
84
 
{
 
87
namespace drizzled {
85
88
 
86
89
/**
87
90
 * @defgroup Transactions
300
303
 * transaction after all DDLs, just like the statement transaction
301
304
 * is always committed at the end of all statements.
302
305
 */
303
 
TransactionServices::TransactionServices()
 
306
 
 
307
static plugin::XaStorageEngine& xa_storage_engine()
304
308
{
305
 
  plugin::StorageEngine *engine= plugin::StorageEngine::findByName("InnoDB");
306
 
  if (engine)
307
 
  {
308
 
    xa_storage_engine= (plugin::XaStorageEngine*)engine; 
309
 
  }
310
 
  else 
311
 
  {
312
 
    xa_storage_engine= NULL;
313
 
  }
 
309
  static plugin::XaStorageEngine& engine= static_cast<plugin::XaStorageEngine&>(*plugin::StorageEngine::findByName("InnoDB"));
 
310
  return engine;
314
311
}
315
312
 
316
 
void TransactionServices::registerResourceForStatement(Session::reference session,
 
313
void TransactionServices::registerResourceForStatement(Session& session,
317
314
                                                       plugin::MonitoredInTransaction *monitored,
318
315
                                                       plugin::TransactionalStorageEngine *engine)
319
316
{
329
326
    registerResourceForTransaction(session, monitored, engine);
330
327
  }
331
328
 
332
 
  TransactionContext *trans= &session.transaction.stmt;
333
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
 
329
  TransactionContext& trans= session.transaction.stmt;
 
330
  ResourceContext& resource_context= session.getResourceContext(*monitored, 0);
334
331
 
335
 
  if (resource_context->isStarted())
 
332
  if (resource_context.isStarted())
336
333
    return; /* already registered, return */
337
334
 
338
335
  assert(monitored->participatesInSqlTransaction());
339
336
  assert(not monitored->participatesInXaTransaction());
340
337
 
341
 
  resource_context->setMonitored(monitored);
342
 
  resource_context->setTransactionalStorageEngine(engine);
343
 
  trans->registerResource(resource_context);
344
 
 
345
 
  trans->no_2pc|= true;
 
338
  resource_context.setMonitored(monitored);
 
339
  resource_context.setTransactionalStorageEngine(engine);
 
340
  trans.registerResource(&resource_context);
 
341
  trans.no_2pc= true;
346
342
}
347
343
 
348
 
void TransactionServices::registerResourceForStatement(Session::reference session,
 
344
void TransactionServices::registerResourceForStatement(Session& session,
349
345
                                                       plugin::MonitoredInTransaction *monitored,
350
346
                                                       plugin::TransactionalStorageEngine *engine,
351
347
                                                       plugin::XaResourceManager *resource_manager)
362
358
    registerResourceForTransaction(session, monitored, engine, resource_manager);
363
359
  }
364
360
 
365
 
  TransactionContext *trans= &session.transaction.stmt;
366
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 0);
 
361
  TransactionContext& trans= session.transaction.stmt;
 
362
  ResourceContext& resource_context= session.getResourceContext(*monitored, 0);
367
363
 
368
 
  if (resource_context->isStarted())
 
364
  if (resource_context.isStarted())
369
365
    return; /* already registered, return */
370
366
 
371
367
  assert(monitored->participatesInXaTransaction());
372
368
  assert(monitored->participatesInSqlTransaction());
373
369
 
374
 
  resource_context->setMonitored(monitored);
375
 
  resource_context->setTransactionalStorageEngine(engine);
376
 
  resource_context->setXaResourceManager(resource_manager);
377
 
  trans->registerResource(resource_context);
378
 
 
379
 
  trans->no_2pc|= false;
 
370
  resource_context.setMonitored(monitored);
 
371
  resource_context.setTransactionalStorageEngine(engine);
 
372
  resource_context.setXaResourceManager(resource_manager);
 
373
  trans.registerResource(&resource_context);
380
374
}
381
375
 
382
 
void TransactionServices::registerResourceForTransaction(Session::reference session,
 
376
void TransactionServices::registerResourceForTransaction(Session& session,
383
377
                                                         plugin::MonitoredInTransaction *monitored,
384
378
                                                         plugin::TransactionalStorageEngine *engine)
385
379
{
386
 
  TransactionContext *trans= &session.transaction.all;
387
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
 
380
  TransactionContext& trans= session.transaction.all;
 
381
  ResourceContext& resource_context= session.getResourceContext(*monitored, 1);
388
382
 
389
 
  if (resource_context->isStarted())
 
383
  if (resource_context.isStarted())
390
384
    return; /* already registered, return */
391
385
 
392
386
  session.server_status|= SERVER_STATUS_IN_TRANS;
393
387
 
394
 
  trans->registerResource(resource_context);
 
388
  trans.registerResource(&resource_context);
395
389
 
396
390
  assert(monitored->participatesInSqlTransaction());
397
391
  assert(not monitored->participatesInXaTransaction());
398
392
 
399
 
  resource_context->setMonitored(monitored);
400
 
  resource_context->setTransactionalStorageEngine(engine);
401
 
  trans->no_2pc|= true;
 
393
  resource_context.setMonitored(monitored);
 
394
  resource_context.setTransactionalStorageEngine(engine);
 
395
  trans.no_2pc= true;
402
396
 
403
397
  if (session.transaction.xid_state.xid.is_null())
404
398
    session.transaction.xid_state.xid.set(session.getQueryId());
405
399
 
406
400
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
407
 
  if (! session.getResourceContext(monitored, 0)->isStarted())
 
401
  if (not session.getResourceContext(*monitored, 0).isStarted())
408
402
    registerResourceForStatement(session, monitored, engine);
409
403
}
410
404
 
411
 
void TransactionServices::registerResourceForTransaction(Session::reference session,
 
405
void TransactionServices::registerResourceForTransaction(Session& session,
412
406
                                                         plugin::MonitoredInTransaction *monitored,
413
407
                                                         plugin::TransactionalStorageEngine *engine,
414
408
                                                         plugin::XaResourceManager *resource_manager)
415
409
{
416
410
  TransactionContext *trans= &session.transaction.all;
417
 
  ResourceContext *resource_context= session.getResourceContext(monitored, 1);
 
411
  ResourceContext& resource_context= session.getResourceContext(*monitored, 1);
418
412
 
419
 
  if (resource_context->isStarted())
 
413
  if (resource_context.isStarted())
420
414
    return; /* already registered, return */
421
415
 
422
416
  session.server_status|= SERVER_STATUS_IN_TRANS;
423
417
 
424
 
  trans->registerResource(resource_context);
 
418
  trans->registerResource(&resource_context);
425
419
 
426
420
  assert(monitored->participatesInSqlTransaction());
427
421
 
428
 
  resource_context->setMonitored(monitored);
429
 
  resource_context->setXaResourceManager(resource_manager);
430
 
  resource_context->setTransactionalStorageEngine(engine);
431
 
  trans->no_2pc|= true;
 
422
  resource_context.setMonitored(monitored);
 
423
  resource_context.setXaResourceManager(resource_manager);
 
424
  resource_context.setTransactionalStorageEngine(engine);
 
425
  trans->no_2pc= true;
432
426
 
433
427
  if (session.transaction.xid_state.xid.is_null())
434
428
    session.transaction.xid_state.xid.set(session.getQueryId());
436
430
  engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
437
431
 
438
432
  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
439
 
  if (! session.getResourceContext(monitored, 0)->isStarted())
 
433
  if (! session.getResourceContext(*monitored, 0).isStarted())
440
434
    registerResourceForStatement(session, monitored, engine, resource_manager);
441
435
}
442
436
 
443
437
void TransactionServices::allocateNewTransactionId()
444
438
{
445
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
446
 
  if (! replication_services.isActive())
 
439
  if (! ReplicationServices::isActive())
447
440
  {
448
441
    return;
449
442
  }
450
443
 
451
444
  Session *my_session= current_session;
452
 
  uint64_t xa_id= xa_storage_engine->getNewTransactionId(my_session);
 
445
  uint64_t xa_id= xa_storage_engine().getNewTransactionId(my_session);
453
446
  my_session->setXaId(xa_id);
454
447
}
455
448
 
456
 
uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
 
449
uint64_t TransactionServices::getCurrentTransactionId(Session& session)
457
450
{
458
451
  if (session.getXaId() == 0)
459
452
  {
460
 
    session.setXaId(xa_storage_engine->getNewTransactionId(&session)); 
 
453
    session.setXaId(xa_storage_engine().getNewTransactionId(&session)); 
461
454
  }
462
455
 
463
456
  return session.getXaId();
464
457
}
465
458
 
466
 
int TransactionServices::commitTransaction(Session::reference session,
 
459
int TransactionServices::commitTransaction(Session& session,
467
460
                                           bool normal_transaction)
468
461
{
469
462
  int error= 0, cookie= 0;
500
493
     */
501
494
    if (shouldConstructMessages())
502
495
    {
503
 
      for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
504
 
           it != resource_contexts.end() && ! error;
505
 
           ++it)
 
496
      BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
506
497
      {
507
 
        ResourceContext *resource_context= *it;
508
 
        int err;
 
498
        if (error)
 
499
          break;
509
500
        /*
510
501
          Do not call two-phase commit if this particular
511
502
          transaction is read-only. This allows for simpler
518
509
 
519
510
        if (resource->participatesInXaTransaction())
520
511
        {
521
 
          if ((err= resource_context->getXaResourceManager()->xaPrepare(&session, normal_transaction)))
 
512
          if (int err= resource_context->getXaResourceManager()->xaPrepare(&session, normal_transaction))
522
513
          {
523
514
            my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
524
515
            error= 1;
556
547
  @note
557
548
  This function does not care about global read lock. A caller should.
558
549
*/
559
 
int TransactionServices::commitPhaseOne(Session::reference session,
 
550
int TransactionServices::commitPhaseOne(Session& session,
560
551
                                        bool normal_transaction)
561
552
{
562
553
  int error=0;
574
565
 
575
566
  if (resource_contexts.empty() == false)
576
567
  {
577
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
578
 
         it != resource_contexts.end();
579
 
         ++it)
 
568
    BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
580
569
    {
581
 
      int err;
582
 
      ResourceContext *resource_context= *it;
583
 
 
584
570
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
585
571
 
586
572
      if (resource->participatesInXaTransaction())
587
573
      {
588
 
        if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
 
574
        if (int err= resource_context->getXaResourceManager()->xaCommit(&session, all))
589
575
        {
590
576
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
591
577
          error= 1;
597
583
      }
598
584
      else if (resource->participatesInSqlTransaction())
599
585
      {
600
 
        if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
 
586
        if (int err= resource_context->getTransactionalStorageEngine()->commit(&session, all))
601
587
        {
602
588
          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
603
589
          error= 1;
611
597
    }
612
598
 
613
599
    if (is_real_trans)
614
 
      session.transaction.xid_state.xid.null();
 
600
      session.transaction.xid_state.xid.set_null();
615
601
 
616
602
    if (normal_transaction)
617
603
    {
623
609
  return error;
624
610
}
625
611
 
626
 
int TransactionServices::rollbackTransaction(Session::reference session,
 
612
int TransactionServices::rollbackTransaction(Session& session,
627
613
                                             bool normal_transaction)
628
614
{
629
615
  int error= 0;
637
623
    We must not rollback the normal transaction if a statement
638
624
    transaction is pending.
639
625
  */
640
 
  assert(session.transaction.stmt.getResourceContexts().empty() ||
641
 
              trans == &session.transaction.stmt);
 
626
  assert(session.transaction.stmt.getResourceContexts().empty() || trans == &session.transaction.stmt);
642
627
 
643
628
  if (resource_contexts.empty() == false)
644
629
  {
645
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
646
 
         it != resource_contexts.end();
647
 
         ++it)
 
630
    BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
648
631
    {
649
 
      int err;
650
 
      ResourceContext *resource_context= *it;
651
 
 
652
632
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
653
633
 
654
634
      if (resource->participatesInXaTransaction())
655
635
      {
656
 
        if ((err= resource_context->getXaResourceManager()->xaRollback(&session, all)))
 
636
        if (int err= resource_context->getXaResourceManager()->xaRollback(&session, all))
657
637
        {
658
638
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
659
639
          error= 1;
665
645
      }
666
646
      else if (resource->participatesInSqlTransaction())
667
647
      {
668
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, all)))
 
648
        if (int err= resource_context->getTransactionalStorageEngine()->rollback(&session, all))
669
649
        {
670
650
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
671
651
          error= 1;
691
671
      rollbackStatementMessage(session);
692
672
 
693
673
    if (is_real_trans)
694
 
      session.transaction.xid_state.xid.null();
 
674
      session.transaction.xid_state.xid.set_null();
695
675
    if (normal_transaction)
696
676
    {
697
677
      session.variables.tx_isolation=session.session_tx_isolation;
716
696
  return error;
717
697
}
718
698
 
719
 
int TransactionServices::autocommitOrRollback(Session::reference session,
 
699
int TransactionServices::autocommitOrRollback(Session& session,
720
700
                                              int error)
721
701
{
722
702
  /* One GPB Statement message per SQL statement */
728
708
  {
729
709
    TransactionContext *trans = &session.transaction.stmt;
730
710
    TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
731
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
732
 
         it != resource_contexts.end();
733
 
         ++it)
 
711
    BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
734
712
    {
735
 
      ResourceContext *resource_context= *it;
736
 
 
737
713
      resource_context->getTransactionalStorageEngine()->endStatement(&session);
738
714
    }
739
715
 
768
744
  }
769
745
};
770
746
 
771
 
int TransactionServices::rollbackToSavepoint(Session::reference session,
 
747
int TransactionServices::rollbackToSavepoint(Session& session,
772
748
                                             NamedSavepoint &sv)
773
749
{
774
750
  int error= 0;
781
757
    rolling back to savepoint in all storage engines that were part of the
782
758
    transaction when the savepoint was set
783
759
  */
784
 
  for (TransactionContext::ResourceContexts::iterator it= sv_resource_contexts.begin();
785
 
       it != sv_resource_contexts.end();
786
 
       ++it)
 
760
  BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, sv_resource_contexts)
787
761
  {
788
 
    int err;
789
 
    ResourceContext *resource_context= *it;
790
 
 
791
762
    plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
792
763
 
793
764
    if (resource->participatesInSqlTransaction())
794
765
    {
795
 
      if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv)))
 
766
      if (int err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv))
796
767
      {
797
768
        my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
798
769
        error= 1;
838
809
     * savepoint's resource contexts.
839
810
     */
840
811
        
841
 
    for (TransactionContext::ResourceContexts::iterator it= set_difference_contexts.begin();
842
 
         it != set_difference_contexts.end();
843
 
         ++it)
 
812
    BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, set_difference_contexts)
844
813
    {
845
 
      ResourceContext *resource_context= *it;
846
 
      int err;
847
 
 
848
814
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
849
815
 
850
816
      if (resource->participatesInSqlTransaction())
851
817
      {
852
 
        if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, !(0))))
 
818
        if (int err= resource_context->getTransactionalStorageEngine()->rollback(&session, true))
853
819
        {
854
820
          my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
855
821
          error= 1;
898
864
  section "4.33.4 SQL-statements and transaction states",
899
865
  NamedSavepoint is *not* transaction-initiating SQL-statement
900
866
*/
901
 
int TransactionServices::setSavepoint(Session::reference session,
 
867
int TransactionServices::setSavepoint(Session& session,
902
868
                                      NamedSavepoint &sv)
903
869
{
904
870
  int error= 0;
907
873
 
908
874
  if (resource_contexts.empty() == false)
909
875
  {
910
 
    for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
911
 
         it != resource_contexts.end();
912
 
         ++it)
 
876
    BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
913
877
    {
914
 
      ResourceContext *resource_context= *it;
915
 
      int err;
916
 
 
917
878
      plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
918
879
 
919
880
      if (resource->participatesInSqlTransaction())
920
881
      {
921
 
        if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv)))
 
882
        if (int err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv))
922
883
        {
923
884
          my_error(ER_GET_ERRNO, MYF(0), err);
924
885
          error= 1;
950
911
  return error;
951
912
}
952
913
 
953
 
int TransactionServices::releaseSavepoint(Session::reference session,
 
914
int TransactionServices::releaseSavepoint(Session& session,
954
915
                                          NamedSavepoint &sv)
955
916
{
956
917
  int error= 0;
957
918
 
958
919
  TransactionContext::ResourceContexts &resource_contexts= sv.getResourceContexts();
959
920
 
960
 
  for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
961
 
       it != resource_contexts.end();
962
 
       ++it)
 
921
  BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
963
922
  {
964
 
    int err;
965
 
    ResourceContext *resource_context= *it;
966
 
 
967
923
    plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
968
924
 
969
925
    if (resource->participatesInSqlTransaction())
970
926
    {
971
 
      if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(&session, sv)))
 
927
      if (int err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(&session, sv))
972
928
      {
973
929
        my_error(ER_GET_ERRNO, MYF(0), err);
974
930
        error= 1;
981
937
 
982
938
bool TransactionServices::shouldConstructMessages()
983
939
{
984
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
985
 
  return replication_services.isActive();
 
940
  return ReplicationServices::isActive();
986
941
}
987
942
 
988
 
message::Transaction *TransactionServices::getActiveTransactionMessage(Session::reference session,
 
943
message::Transaction *TransactionServices::getActiveTransactionMessage(Session& session,
989
944
                                                                       bool should_inc_trx_id)
990
945
{
991
946
  message::Transaction *transaction= session.getTransactionMessage();
997
952
     * for this Session object.  Session is responsible for
998
953
     * deleting transaction message when done with it.
999
954
     */
1000
 
    transaction= new (nothrow) message::Transaction();
 
955
    transaction= new message::Transaction();
1001
956
    initTransactionMessage(*transaction, session, should_inc_trx_id);
1002
957
    session.setTransactionMessage(transaction);
1003
 
    return transaction;
1004
958
  }
1005
 
  else
1006
 
    return transaction;
 
959
  return transaction;
1007
960
}
1008
961
 
1009
962
void TransactionServices::initTransactionMessage(message::Transaction &transaction,
1010
 
                                                 Session::reference session,
 
963
                                                 Session& session,
1011
964
                                                 bool should_inc_trx_id)
1012
965
{
1013
966
  message::TransactionContext *trx= transaction.mutable_transaction_context();
1024
977
    trx->set_transaction_id(0);
1025
978
  }
1026
979
 
1027
 
  trx->set_start_timestamp(session.getCurrentTimestamp());
 
980
  trx->set_start_timestamp(session.times.getCurrentTimestamp());
1028
981
  
1029
982
  /* segment info may get set elsewhere as needed */
1030
983
  transaction.set_segment_id(1);
1032
985
}
1033
986
 
1034
987
void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
1035
 
                                                     Session::const_reference session)
 
988
                                                     const Session& session)
1036
989
{
1037
990
  message::TransactionContext *trx= transaction.mutable_transaction_context();
1038
 
  trx->set_end_timestamp(session.getCurrentTimestamp());
 
991
  trx->set_end_timestamp(session.times.getCurrentTimestamp());
1039
992
}
1040
993
 
1041
994
void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
1042
 
                                                    Session::reference session)
 
995
                                                    Session& session)
1043
996
{
1044
997
  delete transaction;
1045
998
  session.setStatementMessage(NULL);
1047
1000
  session.setXaId(0);
1048
1001
}
1049
1002
 
1050
 
int TransactionServices::commitTransactionMessage(Session::reference session)
 
1003
int TransactionServices::commitTransactionMessage(Session& session)
1051
1004
{
1052
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1053
 
  if (! replication_services.isActive())
 
1005
  if (! ReplicationServices::isActive())
1054
1006
    return 0;
1055
1007
 
1056
1008
  /*
1084
1036
  
1085
1037
  finalizeTransactionMessage(*transaction, session);
1086
1038
  
1087
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
 
1039
  plugin::ReplicationReturnCode result= ReplicationServices::pushTransactionMessage(session, *transaction);
1088
1040
 
1089
1041
  cleanupTransactionMessage(transaction, session);
1090
1042
 
1093
1045
 
1094
1046
void TransactionServices::initStatementMessage(message::Statement &statement,
1095
1047
                                               message::Statement::Type type,
1096
 
                                               Session::const_reference session)
 
1048
                                               const Session& session)
1097
1049
{
1098
1050
  statement.set_type(type);
1099
 
  statement.set_start_timestamp(session.getCurrentTimestamp());
 
1051
  statement.set_start_timestamp(session.times.getCurrentTimestamp());
1100
1052
 
1101
1053
  if (session.variables.replicate_query)
1102
1054
    statement.set_sql(session.getQueryString()->c_str());
1103
1055
}
1104
1056
 
1105
1057
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1106
 
                                                   Session::reference session)
 
1058
                                                   Session& session)
1107
1059
{
1108
 
  statement.set_end_timestamp(session.getCurrentTimestamp());
 
1060
  statement.set_end_timestamp(session.times.getCurrentTimestamp());
1109
1061
  session.setStatementMessage(NULL);
1110
1062
}
1111
1063
 
1112
 
void TransactionServices::rollbackTransactionMessage(Session::reference session)
 
1064
void TransactionServices::rollbackTransactionMessage(Session& session)
1113
1065
{
1114
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1115
 
  if (! replication_services.isActive())
 
1066
  if (! ReplicationServices::isActive())
1116
1067
    return;
1117
1068
  
1118
1069
  message::Transaction *transaction= getActiveTransactionMessage(session);
1159
1110
 
1160
1111
    finalizeTransactionMessage(*transaction, session);
1161
1112
    
1162
 
    (void) replication_services.pushTransactionMessage(session, *transaction);
 
1113
    (void) ReplicationServices::pushTransactionMessage(session, *transaction);
1163
1114
  }
1164
1115
 
1165
1116
  cleanupTransactionMessage(transaction, session);
1166
1117
}
1167
1118
 
1168
 
void TransactionServices::rollbackStatementMessage(Session::reference session)
 
1119
void TransactionServices::rollbackStatementMessage(Session& session)
1169
1120
{
1170
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1171
 
  if (! replication_services.isActive())
 
1121
  if (! ReplicationServices::isActive())
1172
1122
    return;
1173
1123
 
1174
1124
  message::Statement *current_statement= session.getStatementMessage();
1231
1181
  }
1232
1182
}
1233
1183
 
1234
 
message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
 
1184
message::Transaction *TransactionServices::segmentTransactionMessage(Session& session,
1235
1185
                                                                     message::Transaction *transaction)
1236
1186
{
1237
1187
  uint64_t trx_id= transaction->transaction_context().transaction_id();
1249
1199
  return transaction;
1250
1200
}
1251
1201
 
1252
 
message::Statement &TransactionServices::getInsertStatement(Session::reference session,
 
1202
message::Statement &TransactionServices::getInsertStatement(Session& session,
1253
1203
                                                            Table &table,
1254
1204
                                                            uint32_t *next_segment_id)
1255
1205
{
1340
1290
}
1341
1291
 
1342
1292
void TransactionServices::setInsertHeader(message::Statement &statement,
1343
 
                                          Session::const_reference session,
 
1293
                                          const Session& session,
1344
1294
                                          Table &table)
1345
1295
{
1346
1296
  initStatementMessage(statement, message::Statement::INSERT, session);
1353
1303
  message::InsertHeader *header= statement.mutable_insert_header();
1354
1304
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1355
1305
 
1356
 
  string schema_name;
1357
 
  (void) table.getShare()->getSchemaName(schema_name);
1358
 
  string table_name;
1359
 
  (void) table.getShare()->getTableName(table_name);
1360
 
 
1361
 
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1362
 
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
1363
 
 
1364
 
  Field *current_field;
 
1306
  table_metadata->set_schema_name(table.getShare()->getSchemaName());
 
1307
  table_metadata->set_table_name(table.getShare()->getTableName());
 
1308
 
1365
1309
  Field **table_fields= table.getFields();
1366
1310
 
1367
1311
  message::FieldMetadata *field_metadata;
1369
1313
  /* We will read all the table's fields... */
1370
1314
  table.setReadSet();
1371
1315
 
1372
 
  while ((current_field= *table_fields++) != NULL) 
 
1316
  while (Field* current_field= *table_fields++) 
1373
1317
  {
1374
1318
    field_metadata= header->add_field_metadata();
1375
1319
    field_metadata->set_name(current_field->field_name);
1377
1321
  }
1378
1322
}
1379
1323
 
1380
 
bool TransactionServices::insertRecord(Session::reference session,
 
1324
bool TransactionServices::insertRecord(Session& session,
1381
1325
                                       Table &table)
1382
1326
{
1383
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1384
 
  if (! replication_services.isActive())
 
1327
  if (! ReplicationServices::isActive())
1385
1328
    return false;
1386
1329
 
1387
1330
  if (not table.getShare()->is_replicated())
1436
1379
  return false;
1437
1380
}
1438
1381
 
1439
 
message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
 
1382
message::Statement &TransactionServices::getUpdateStatement(Session& session,
1440
1383
                                                            Table &table,
1441
1384
                                                            const unsigned char *old_record, 
1442
1385
                                                            const unsigned char *new_record,
1529
1472
}
1530
1473
 
1531
1474
void TransactionServices::setUpdateHeader(message::Statement &statement,
1532
 
                                          Session::const_reference session,
 
1475
                                          const Session& session,
1533
1476
                                          Table &table,
1534
1477
                                          const unsigned char *old_record, 
1535
1478
                                          const unsigned char *new_record)
1544
1487
  message::UpdateHeader *header= statement.mutable_update_header();
1545
1488
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1546
1489
 
1547
 
  string schema_name;
1548
 
  (void) table.getShare()->getSchemaName(schema_name);
1549
 
  string table_name;
1550
 
  (void) table.getShare()->getTableName(table_name);
1551
 
 
1552
 
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1553
 
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
 
1490
  table_metadata->set_schema_name(table.getShare()->getSchemaName());
 
1491
  table_metadata->set_table_name(table.getShare()->getTableName());
1554
1492
 
1555
1493
  Field *current_field;
1556
1494
  Field **table_fields= table.getFields();
1583
1521
  }
1584
1522
}
1585
1523
 
1586
 
void TransactionServices::updateRecord(Session::reference session,
 
1524
void TransactionServices::updateRecord(Session& session,
1587
1525
                                       Table &table, 
1588
1526
                                       const unsigned char *old_record, 
1589
1527
                                       const unsigned char *new_record)
1590
1528
{
1591
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1592
 
  if (! replication_services.isActive())
 
1529
  if (! ReplicationServices::isActive())
1593
1530
    return;
1594
1531
 
1595
1532
  if (not table.getShare()->is_replicated())
1714
1651
  return isUpdated;
1715
1652
}  
1716
1653
 
1717
 
message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
 
1654
message::Statement &TransactionServices::getDeleteStatement(Session& session,
1718
1655
                                                            Table &table,
1719
1656
                                                            uint32_t *next_segment_id)
1720
1657
{
1805
1742
}
1806
1743
 
1807
1744
void TransactionServices::setDeleteHeader(message::Statement &statement,
1808
 
                                          Session::const_reference session,
 
1745
                                          const Session& session,
1809
1746
                                          Table &table)
1810
1747
{
1811
1748
  initStatementMessage(statement, message::Statement::DELETE, session);
1817
1754
  message::DeleteHeader *header= statement.mutable_delete_header();
1818
1755
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1819
1756
 
1820
 
  string schema_name;
1821
 
  (void) table.getShare()->getSchemaName(schema_name);
1822
 
  string table_name;
1823
 
  (void) table.getShare()->getTableName(table_name);
1824
 
 
1825
 
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
1826
 
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
 
1757
  table_metadata->set_schema_name(table.getShare()->getSchemaName());
 
1758
  table_metadata->set_table_name(table.getShare()->getTableName());
1827
1759
 
1828
1760
  Field *current_field;
1829
1761
  Field **table_fields= table.getFields();
1846
1778
  }
1847
1779
}
1848
1780
 
1849
 
void TransactionServices::deleteRecord(Session::reference session,
 
1781
void TransactionServices::deleteRecord(Session& session,
1850
1782
                                       Table &table,
1851
1783
                                       bool use_update_record)
1852
1784
{
1853
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1854
 
  if (! replication_services.isActive())
 
1785
  if (! ReplicationServices::isActive())
1855
1786
    return;
1856
1787
 
1857
1788
  if (not table.getShare()->is_replicated())
1906
1837
  }
1907
1838
}
1908
1839
 
1909
 
void TransactionServices::createTable(Session::reference session,
 
1840
void TransactionServices::createTable(Session& session,
1910
1841
                                      const message::Table &table)
1911
1842
{
1912
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1913
 
  if (not replication_services.isActive())
 
1843
  if (not ReplicationServices::isActive())
1914
1844
    return;
1915
1845
 
1916
1846
  if (not message::is_replicated(table))
1933
1863
 
1934
1864
  finalizeTransactionMessage(*transaction, session);
1935
1865
  
1936
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
1866
  (void) ReplicationServices::pushTransactionMessage(session, *transaction);
1937
1867
 
1938
1868
  cleanupTransactionMessage(transaction, session);
1939
1869
 
1940
1870
}
1941
1871
 
1942
 
void TransactionServices::createSchema(Session::reference session,
 
1872
void TransactionServices::createSchema(Session& session,
1943
1873
                                       const message::Schema &schema)
1944
1874
{
1945
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1946
 
  if (! replication_services.isActive())
 
1875
  if (! ReplicationServices::isActive())
1947
1876
    return;
1948
1877
 
1949
1878
  if (not message::is_replicated(schema))
1966
1895
 
1967
1896
  finalizeTransactionMessage(*transaction, session);
1968
1897
  
1969
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
1898
  (void) ReplicationServices::pushTransactionMessage(session, *transaction);
1970
1899
 
1971
1900
  cleanupTransactionMessage(transaction, session);
1972
1901
 
1973
1902
}
1974
1903
 
1975
 
void TransactionServices::dropSchema(Session::reference session,
1976
 
                                     identifier::Schema::const_reference identifier,
 
1904
void TransactionServices::dropSchema(Session& session,
 
1905
                                     const identifier::Schema& identifier,
1977
1906
                                     message::schema::const_reference schema)
1978
1907
{
1979
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
1980
 
  if (not replication_services.isActive())
 
1908
  if (not ReplicationServices::isActive())
1981
1909
    return;
1982
1910
 
1983
1911
  if (not message::is_replicated(schema))
2000
1928
 
2001
1929
  finalizeTransactionMessage(*transaction, session);
2002
1930
  
2003
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
1931
  (void) ReplicationServices::pushTransactionMessage(session, *transaction);
2004
1932
 
2005
1933
  cleanupTransactionMessage(transaction, session);
2006
1934
}
2007
1935
 
2008
 
void TransactionServices::alterSchema(Session::reference session,
 
1936
void TransactionServices::alterSchema(Session& session,
2009
1937
                                      const message::Schema &old_schema,
2010
1938
                                      const message::Schema &new_schema)
2011
1939
{
2012
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2013
 
  if (! replication_services.isActive())
 
1940
  if (! ReplicationServices::isActive())
2014
1941
    return;
2015
1942
 
2016
1943
  if (not message::is_replicated(old_schema))
2037
1964
 
2038
1965
  finalizeTransactionMessage(*transaction, session);
2039
1966
  
2040
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
1967
  (void) ReplicationServices::pushTransactionMessage(session, *transaction);
2041
1968
 
2042
1969
  cleanupTransactionMessage(transaction, session);
2043
1970
}
2044
1971
 
2045
 
void TransactionServices::dropTable(Session::reference session,
2046
 
                                    identifier::Table::const_reference identifier,
 
1972
void TransactionServices::dropTable(Session& session,
 
1973
                                    const identifier::Table& identifier,
2047
1974
                                    message::table::const_reference table,
2048
1975
                                    bool if_exists)
2049
1976
{
2050
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2051
 
  if (! replication_services.isActive())
 
1977
  if (! ReplicationServices::isActive())
2052
1978
    return;
2053
1979
 
2054
1980
  if (not message::is_replicated(table))
2076
2002
 
2077
2003
  finalizeTransactionMessage(*transaction, session);
2078
2004
  
2079
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2005
  (void) ReplicationServices::pushTransactionMessage(session, *transaction);
2080
2006
 
2081
2007
  cleanupTransactionMessage(transaction, session);
2082
2008
}
2083
2009
 
2084
 
void TransactionServices::truncateTable(Session::reference session,
2085
 
                                        Table &table)
 
2010
void TransactionServices::truncateTable(Session& session, Table &table)
2086
2011
{
2087
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2088
 
  if (! replication_services.isActive())
 
2012
  if (! ReplicationServices::isActive())
2089
2013
    return;
2090
2014
 
2091
2015
  if (not table.getShare()->is_replicated())
2103
2027
  message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
2104
2028
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2105
2029
 
2106
 
  string schema_name;
2107
 
  (void) table.getShare()->getSchemaName(schema_name);
2108
 
 
2109
 
  string table_name;
2110
 
  (void) table.getShare()->getTableName(table_name);
2111
 
 
2112
 
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
2113
 
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
 
2030
  table_metadata->set_schema_name(table.getShare()->getSchemaName());
 
2031
  table_metadata->set_table_name(table.getShare()->getTableName());
2114
2032
 
2115
2033
  finalizeStatementMessage(*statement, session);
2116
2034
 
2117
2035
  finalizeTransactionMessage(*transaction, session);
2118
2036
  
2119
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2037
  (void) ReplicationServices::pushTransactionMessage(session, *transaction);
2120
2038
 
2121
2039
  cleanupTransactionMessage(transaction, session);
2122
2040
}
2123
2041
 
2124
 
void TransactionServices::rawStatement(Session::reference session,
 
2042
void TransactionServices::rawStatement(Session& session,
2125
2043
                                       const string &query,
2126
2044
                                       const string &schema)
2127
2045
{
2128
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2129
 
  if (! replication_services.isActive())
 
2046
  if (! ReplicationServices::isActive())
2130
2047
    return;
2131
2048
 
2132
2049
  message::Transaction *transaction= getActiveTransactionMessage(session);
2140
2057
 
2141
2058
  finalizeTransactionMessage(*transaction, session);
2142
2059
  
2143
 
  (void) replication_services.pushTransactionMessage(session, *transaction);
 
2060
  (void) ReplicationServices::pushTransactionMessage(session, *transaction);
2144
2061
 
2145
2062
  cleanupTransactionMessage(transaction, session);
2146
2063
}
2147
2064
 
2148
 
int TransactionServices::sendEvent(Session::reference session,
2149
 
                                   const message::Event &event)
 
2065
int TransactionServices::sendEvent(Session& session, const message::Event &event)
2150
2066
{
2151
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
2152
 
  if (! replication_services.isActive())
 
2067
  if (not ReplicationServices::isActive())
2153
2068
    return 0;
2154
 
 
2155
 
  message::Transaction *transaction= new (nothrow) message::Transaction();
 
2069
  message::Transaction transaction;
2156
2070
 
2157
2071
  // set server id, start timestamp
2158
 
  initTransactionMessage(*transaction, session, true);
 
2072
  initTransactionMessage(transaction, session, true);
2159
2073
 
2160
2074
  // set end timestamp
2161
 
  finalizeTransactionMessage(*transaction, session);
2162
 
 
2163
 
  message::Event *trx_event= transaction->mutable_event();
2164
 
 
 
2075
  finalizeTransactionMessage(transaction, session);
 
2076
 
 
2077
  message::Event *trx_event= transaction.mutable_event();
2165
2078
  trx_event->CopyFrom(event);
2166
 
 
2167
 
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
2168
 
 
2169
 
  delete transaction;
2170
 
 
2171
 
  return static_cast<int>(result);
 
2079
  plugin::ReplicationReturnCode result= ReplicationServices::pushTransactionMessage(session, transaction);
 
2080
  return result;
2172
2081
}
2173
2082
 
2174
 
bool TransactionServices::sendStartupEvent(Session::reference session)
 
2083
bool TransactionServices::sendStartupEvent(Session& session)
2175
2084
{
2176
2085
  message::Event event;
2177
2086
  event.set_type(message::Event::STARTUP);
2178
 
  if (sendEvent(session, event) != 0)
2179
 
    return false;
2180
 
  return true;
 
2087
  return not sendEvent(session, event);
2181
2088
}
2182
2089
 
2183
 
bool TransactionServices::sendShutdownEvent(Session::reference session)
 
2090
bool TransactionServices::sendShutdownEvent(Session& session)
2184
2091
{
2185
2092
  message::Event event;
2186
2093
  event.set_type(message::Event::SHUTDOWN);
2187
 
  if (sendEvent(session, event) != 0)
2188
 
    return false;
2189
 
  return true;
 
2094
  return not sendEvent(session, event);
2190
2095
}
2191
2096
 
2192
2097
} /* namespace drizzled */