~ubuntu-branches/ubuntu/trusty/drizzle/trusty

« back to all changes in this revision

Viewing changes to drizzled/transaction_services.cc

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2010-10-02 14:17:48 UTC
  • mfrom: (1.1.1 upstream)
  • mto: (2.1.17 sid)
  • mto: This revision was merged to the branch mainline in revision 3.
  • Revision ID: james.westby@ubuntu.com-20101002141748-m6vbfbfjhrw1153e
Tags: 2010.09.1802-1
* New upstream release.
* Removed pid-file argument hack.
* Updated GPL-2 address to be new address.
* Directly copy in drizzledump.1 since debian doesn't have sphinx 1.0 yet.
* Link to jquery from libjs-jquery. Add it as a depend.
* Add drizzled.8 symlink to the install files.

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
 
21
21
/**
22
22
 * @file Transaction processing code
 
23
 *
 
24
 * @note
 
25
 *
 
26
 * The TransactionServices component takes internal events (for instance the start of a 
 
27
 * transaction, the changing of a record, or the rollback of a transaction) 
 
28
 * and constructs GPB Messages that are passed to the ReplicationServices
 
29
 * component and used during replication.
 
30
 *
 
31
 * The reason for this functionality is to encapsulate all communication
 
32
 * between the kernel and the replicator/applier plugins into GPB Messages.
 
33
 * Instead of the plugin having to understand the (often fluidly changing)
 
34
 * mechanics of the kernel, all the plugin needs to understand is the message
 
35
 * format, and GPB messages provide a nice, clear, and versioned format for 
 
36
 * these messages.
 
37
 *
 
38
 * @see /drizzled/message/transaction.proto
 
39
 *
 
40
 * @todo
 
41
 *
 
42
 * We really should store the raw bytes in the messages, not the
 
43
 * String value of the Field.  But, to do that, the
 
44
 * statement_transform library needs first to be updated
 
45
 * to include the transformation code to convert raw
 
46
 * Drizzle-internal Field byte representation into something
 
47
 * plugins can understand.
23
48
 */
24
49
 
25
50
#include "config.h"
33
58
#include "drizzled/replication_services.h"
34
59
#include "drizzled/transaction_services.h"
35
60
#include "drizzled/transaction_context.h"
 
61
#include "drizzled/message/transaction.pb.h"
 
62
#include "drizzled/message/statement_transform.h"
36
63
#include "drizzled/resource_context.h"
37
64
#include "drizzled/lock.h"
38
65
#include "drizzled/item/int.h"
53
80
namespace drizzled
54
81
{
55
82
 
 
83
/** @TODO: Make this a system variable */
 
84
static const size_t trx_msg_threshold= 1024 * 1024;
 
85
 
56
86
/**
57
87
 * @defgroup Transactions
58
88
 *
400
430
}
401
431
 
402
432
/**
403
 
  Check if we can skip the two-phase commit.
404
 
 
405
 
  A helper function to evaluate if two-phase commit is mandatory.
406
 
  As a side effect, propagates the read-only/read-write flags
407
 
  of the statement transaction to its enclosing normal transaction.
408
 
 
409
 
  @retval true   we must run a two-phase commit. Returned
410
 
                 if we have at least two engines with read-write changes.
411
 
  @retval false  Don't need two-phase commit. Even if we have two
412
 
                 transactional engines, we can run two independent
413
 
                 commits if changes in one of the engines are read-only.
414
 
*/
415
 
static
416
 
bool
417
 
ha_check_and_coalesce_trx_read_only(Session *session,
418
 
                                    TransactionContext::ResourceContexts &resource_contexts,
419
 
                                    bool normal_transaction)
420
 
{
421
 
  /* The number of storage engines that have actual changes. */
422
 
  unsigned num_resources_modified_data= 0;
423
 
  ResourceContext *resource_context;
424
 
 
425
 
  for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
426
 
       it != resource_contexts.end();
427
 
       ++it)
428
 
  {
429
 
    resource_context= *it;
430
 
    if (resource_context->hasModifiedData())
431
 
      ++num_resources_modified_data;
432
 
 
433
 
    if (! normal_transaction)
434
 
    {
435
 
      ResourceContext *resource_context_normal= session->getResourceContext(resource_context->getMonitored(), true);
436
 
      assert(resource_context != resource_context_normal);
437
 
      /*
438
 
        Merge read-only/read-write information about statement
439
 
        transaction to its enclosing normal transaction. Do this
440
 
        only if in a real transaction -- that is, if we know
441
 
        that resource_context_all is registered in session->transaction.all.
442
 
        Since otherwise we only clutter the normal transaction flags.
443
 
      */
444
 
      if (resource_context_normal->isStarted()) /* false if autocommit. */
445
 
        resource_context_normal->coalesceWith(resource_context);
446
 
    }
447
 
    else if (num_resources_modified_data > 1)
448
 
    {
449
 
      /*
450
 
        It is a normal transaction, so we don't need to merge read/write
451
 
        information up, and the need for two-phase commit has been
452
 
        already established. Break the loop prematurely.
453
 
      */
454
 
      break;
455
 
    }
456
 
  }
457
 
  return num_resources_modified_data > 1;
458
 
}
459
 
 
460
 
 
461
 
/**
462
433
  @retval
463
434
    0   ok
464
435
  @retval
472
443
    stored functions or triggers. So we simply do nothing now.
473
444
    TODO: This should be fixed in later ( >= 5.1) releases.
474
445
*/
475
 
int TransactionServices::ha_commit_trans(Session *session, bool normal_transaction)
 
446
int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
476
447
{
477
448
  int error= 0, cookie= 0;
478
449
  /*
495
466
 
496
467
  if (resource_contexts.empty() == false)
497
468
  {
498
 
    bool must_2pc;
499
 
 
500
469
    if (is_real_trans && wait_if_global_read_lock(session, 0, 0))
501
470
    {
502
 
      ha_rollback_trans(session, normal_transaction);
 
471
      rollbackTransaction(session, normal_transaction);
503
472
      return 1;
504
473
    }
505
474
 
506
 
    must_2pc= ha_check_and_coalesce_trx_read_only(session, resource_contexts, normal_transaction);
507
 
 
508
 
    if (! trans->no_2pc && must_2pc)
 
475
    /*
 
476
     * If replication is on, we do a PREPARE on the resource managers, push the
 
477
     * Transaction message across the replication stream, and then COMMIT if the
 
478
     * replication stream returned successfully.
 
479
     */
 
480
    if (shouldConstructMessages())
509
481
    {
510
482
      for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
511
483
           it != resource_contexts.end() && ! error;
532
504
          }
533
505
          else
534
506
          {
535
 
            status_var_increment(session->status_var.ha_prepare_count);
 
507
            session->status_var.ha_prepare_count++;
536
508
          }
537
509
        }
538
510
      }
 
511
      if (error == 0 && is_real_trans)
 
512
      {
 
513
        /*
 
514
         * Push the constructed Transaction messages across to
 
515
         * replicators and appliers.
 
516
         */
 
517
        error= commitTransactionMessage(session);
 
518
      }
539
519
      if (error)
540
520
      {
541
 
        ha_rollback_trans(session, normal_transaction);
 
521
        rollbackTransaction(session, normal_transaction);
542
522
        error= 1;
543
523
        goto end;
544
524
      }
545
525
    }
546
 
    error= ha_commit_one_phase(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
 
526
    error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
547
527
end:
548
528
    if (is_real_trans)
549
529
      start_waiting_global_read_lock(session);
555
535
  @note
556
536
  This function does not care about global read lock. A caller should.
557
537
*/
558
 
int TransactionServices::ha_commit_one_phase(Session *session, bool normal_transaction)
 
538
int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
559
539
{
560
540
  int error=0;
561
541
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
583
563
        }
584
564
        else if (normal_transaction)
585
565
        {
586
 
          status_var_increment(session->status_var.ha_commit_count);
 
566
          session->status_var.ha_commit_count++;
587
567
        }
588
568
      }
589
569
      else if (resource->participatesInSqlTransaction())
595
575
        }
596
576
        else if (normal_transaction)
597
577
        {
598
 
          status_var_increment(session->status_var.ha_commit_count);
 
578
          session->status_var.ha_commit_count++;
599
579
        }
600
580
      }
601
581
      resource_context->reset(); /* keep it conveniently zero-filled */
611
591
    }
612
592
  }
613
593
  trans->reset();
614
 
  if (error == 0)
615
 
  {
616
 
    if (is_real_trans)
617
 
    {
618
 
      /* 
619
 
       * We commit the normal transaction by finalizing the transaction message
620
 
       * and propogating the message to all registered replicators.
621
 
       */
622
 
      ReplicationServices &replication_services= ReplicationServices::singleton();
623
 
      replication_services.commitTransaction(session);
624
 
    }
625
 
  }
626
594
  return error;
627
595
}
628
596
 
629
 
int TransactionServices::ha_rollback_trans(Session *session, bool normal_transaction)
 
597
int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
630
598
{
631
599
  int error= 0;
632
600
  TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
661
629
        }
662
630
        else if (normal_transaction)
663
631
        {
664
 
          status_var_increment(session->status_var.ha_rollback_count);
 
632
          session->status_var.ha_rollback_count++;
665
633
        }
666
634
      }
667
635
      else if (resource->participatesInSqlTransaction())
673
641
        }
674
642
        else if (normal_transaction)
675
643
        {
676
 
          status_var_increment(session->status_var.ha_rollback_count);
 
644
          session->status_var.ha_rollback_count++;
677
645
        }
678
646
      }
679
647
      resource_context->reset(); /* keep it conveniently zero-filled */
686
654
     * a rollback statement with the corresponding transaction ID
687
655
     * to rollback.
688
656
     */
689
 
    ReplicationServices &replication_services= ReplicationServices::singleton();
690
 
    replication_services.rollbackTransaction(session);
 
657
    if (normal_transaction)
 
658
      rollbackTransactionMessage(session);
691
659
 
692
660
    if (is_real_trans)
693
661
      session->transaction.xid_state.xid.null();
726
694
    the user has used LOCK TABLES then that mechanism does not know to do the
727
695
    commit.
728
696
*/
729
 
int TransactionServices::ha_autocommit_or_rollback(Session *session, int error)
 
697
int TransactionServices::autocommitOrRollback(Session *session, int error)
730
698
{
731
699
  if (session->transaction.stmt.getResourceContexts().empty() == false)
732
700
  {
733
701
    if (! error)
734
702
    {
735
 
      if (ha_commit_trans(session, false))
 
703
      if (commitTransaction(session, false))
736
704
        error= 1;
737
705
    }
738
706
    else
739
707
    {
740
 
      (void) ha_rollback_trans(session, false);
 
708
      (void) rollbackTransaction(session, false);
741
709
      if (session->transaction_rollback_request)
742
 
        (void) ha_rollback_trans(session, true);
 
710
        (void) rollbackTransaction(session, true);
743
711
    }
744
712
 
745
713
    session->variables.tx_isolation= session->session_tx_isolation;
747
715
  return error;
748
716
}
749
717
 
750
 
/**
751
 
  return the list of XID's to a client, the same way SHOW commands do.
752
 
 
753
 
  @note
754
 
    I didn't find in XA specs that an RM cannot return the same XID twice,
755
 
    so mysql_xa_recover does not filter XID's to ensure uniqueness.
756
 
    It can be easily fixed later, if necessary.
757
 
*/
758
 
bool TransactionServices::mysql_xa_recover(Session *session)
759
 
{
760
 
  List<Item> field_list;
761
 
  int i= 0;
762
 
  XID_STATE *xs;
763
 
 
764
 
  field_list.push_back(new Item_int("formatID", 0, MY_INT32_NUM_DECIMAL_DIGITS));
765
 
  field_list.push_back(new Item_int("gtrid_length", 0, MY_INT32_NUM_DECIMAL_DIGITS));
766
 
  field_list.push_back(new Item_int("bqual_length", 0, MY_INT32_NUM_DECIMAL_DIGITS));
767
 
  field_list.push_back(new Item_empty_string("data", DRIZZLE_XIDDATASIZE));
768
 
 
769
 
  if (session->client->sendFields(&field_list))
770
 
    return 1;
771
 
 
772
 
  pthread_mutex_lock(&LOCK_xid_cache);
773
 
  while ((xs= (XID_STATE*)hash_element(&xid_cache, i++)))
774
 
  {
775
 
    if (xs->xa_state==XA_PREPARED)
776
 
    {
777
 
      session->client->store((int64_t)xs->xid.formatID);
778
 
      session->client->store((int64_t)xs->xid.gtrid_length);
779
 
      session->client->store((int64_t)xs->xid.bqual_length);
780
 
      session->client->store(xs->xid.data,
781
 
                             xs->xid.gtrid_length+xs->xid.bqual_length);
782
 
      if (session->client->flush())
783
 
      {
784
 
        pthread_mutex_unlock(&LOCK_xid_cache);
785
 
        return 1;
786
 
      }
787
 
    }
788
 
  }
789
 
 
790
 
  pthread_mutex_unlock(&LOCK_xid_cache);
791
 
  session->my_eof();
792
 
  return 0;
793
 
}
794
 
 
795
718
struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
796
719
{
797
720
  result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
802
725
  }
803
726
};
804
727
 
805
 
int TransactionServices::ha_rollback_to_savepoint(Session *session, NamedSavepoint &sv)
 
728
int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
806
729
{
807
730
  int error= 0;
808
731
  TransactionContext *trans= &session->transaction.all;
832
755
      }
833
756
      else
834
757
      {
835
 
        status_var_increment(session->status_var.ha_savepoint_rollback_count);
 
758
        session->status_var.ha_savepoint_rollback_count++;
836
759
      }
837
760
    }
838
761
    trans->no_2pc|= not resource->participatesInXaTransaction();
846
769
    TransactionContext::ResourceContexts sorted_sv_resource_contexts(sv_resource_contexts);
847
770
    TransactionContext::ResourceContexts set_difference_contexts;
848
771
 
 
772
    /* 
 
773
     * Bug #542299: segfault during set_difference() below.  copy<>() requires pre-allocation
 
774
     * of all elements, including the target, which is why we pre-allocate the set_difference_contexts
 
775
     * here
 
776
     */
 
777
    set_difference_contexts.reserve(max(tran_resource_contexts.size(), sv_resource_contexts.size()));
 
778
 
849
779
    sort(sorted_tran_resource_contexts.begin(),
850
780
         sorted_tran_resource_contexts.end(),
851
781
         ResourceContextCompare());
882
812
        }
883
813
        else
884
814
        {
885
 
          status_var_increment(session->status_var.ha_rollback_count);
 
815
          session->status_var.ha_rollback_count++;
886
816
        }
887
817
      }
888
818
      resource_context->reset(); /* keep it conveniently zero-filled */
889
819
    }
890
820
  }
891
821
  trans->setResourceContexts(sv_resource_contexts);
 
822
 
 
823
  if (shouldConstructMessages())
 
824
  {
 
825
    cleanupTransactionMessage(getActiveTransactionMessage(session), session);
 
826
    message::Transaction *savepoint_transaction= sv.getTransactionMessage();
 
827
    if (savepoint_transaction != NULL)
 
828
    {
 
829
      /* Make a copy of the savepoint transaction, this is necessary to assure proper cleanup. 
 
830
         Upon commit the savepoint_transaction_copy will be cleaned up by a call to 
 
831
         cleanupTransactionMessage(). The Transaction message in NamedSavepoint will be cleaned
 
832
         up when the savepoint is cleaned up. This avoids calling delete twice on the Transaction.
 
833
      */ 
 
834
      message::Transaction *savepoint_transaction_copy= new message::Transaction(*sv.getTransactionMessage());
 
835
      uint32_t num_statements = savepoint_transaction_copy->statement_size();
 
836
      if (num_statements == 0)
 
837
      {    
 
838
        session->setStatementMessage(NULL);
 
839
      }    
 
840
      else 
 
841
      {
 
842
        session->setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
 
843
      }    
 
844
      session->setTransactionMessage(savepoint_transaction_copy);
 
845
    }
 
846
  }
 
847
 
892
848
  return error;
893
849
}
894
850
 
898
854
  section "4.33.4 SQL-statements and transaction states",
899
855
  NamedSavepoint is *not* transaction-initiating SQL-statement
900
856
*/
901
 
int TransactionServices::ha_savepoint(Session *session, NamedSavepoint &sv)
 
857
int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
902
858
{
903
859
  int error= 0;
904
860
  TransactionContext *trans= &session->transaction.all;
924
880
        }
925
881
        else
926
882
        {
927
 
          status_var_increment(session->status_var.ha_savepoint_count);
 
883
          session->status_var.ha_savepoint_count++;
928
884
        }
929
885
      }
930
886
    }
933
889
    Remember the list of registered storage engines.
934
890
  */
935
891
  sv.setResourceContexts(resource_contexts);
 
892
 
 
893
  if (shouldConstructMessages())
 
894
  {
 
895
    message::Transaction *transaction= session->getTransactionMessage();
 
896
                  
 
897
    if (transaction != NULL)
 
898
    {
 
899
      message::Transaction *transaction_savepoint= 
 
900
        new message::Transaction(*transaction);
 
901
      sv.setTransactionMessage(transaction_savepoint);
 
902
    }
 
903
  } 
 
904
 
936
905
  return error;
937
906
}
938
907
 
939
 
int TransactionServices::ha_release_savepoint(Session *session, NamedSavepoint &sv)
 
908
int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
940
909
{
941
910
  int error= 0;
942
911
 
960
929
      }
961
930
    }
962
931
  }
 
932
  
963
933
  return error;
964
934
}
965
935
 
 
936
bool TransactionServices::shouldConstructMessages()
 
937
{
 
938
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
939
  return replication_services.isActive();
 
940
}
 
941
 
 
942
message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session, bool should_inc_trx_id)
 
943
{
 
944
  message::Transaction *transaction= in_session->getTransactionMessage();
 
945
 
 
946
  if (unlikely(transaction == NULL))
 
947
  {
 
948
    /* 
 
949
     * Allocate and initialize a new transaction message 
 
950
     * for this Session object.  Session is responsible for
 
951
     * deleting transaction message when done with it.
 
952
     */
 
953
    transaction= new (nothrow) message::Transaction();
 
954
    initTransactionMessage(*transaction, in_session, should_inc_trx_id);
 
955
    in_session->setTransactionMessage(transaction);
 
956
    return transaction;
 
957
  }
 
958
  else
 
959
    return transaction;
 
960
}
 
961
 
 
962
void TransactionServices::initTransactionMessage(message::Transaction &in_transaction,
 
963
                                                 Session *in_session,
 
964
                                                 bool should_inc_trx_id)
 
965
{
 
966
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
967
  trx->set_server_id(in_session->getServerId());
 
968
 
 
969
  if (should_inc_trx_id)
 
970
    trx->set_transaction_id(getNextTransactionId());
 
971
 
 
972
  trx->set_start_timestamp(in_session->getCurrentTimestamp());
 
973
}
 
974
 
 
975
void TransactionServices::finalizeTransactionMessage(message::Transaction &in_transaction,
 
976
                                              Session *in_session)
 
977
{
 
978
  message::TransactionContext *trx= in_transaction.mutable_transaction_context();
 
979
  trx->set_end_timestamp(in_session->getCurrentTimestamp());
 
980
}
 
981
 
 
982
void TransactionServices::cleanupTransactionMessage(message::Transaction *in_transaction,
 
983
                                             Session *in_session)
 
984
{
 
985
  delete in_transaction;
 
986
  in_session->setStatementMessage(NULL);
 
987
  in_session->setTransactionMessage(NULL);
 
988
}
 
989
 
 
990
int TransactionServices::commitTransactionMessage(Session *in_session)
 
991
{
 
992
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
993
  if (! replication_services.isActive())
 
994
    return 0;
 
995
 
 
996
  /* If there is an active statement message, finalize it */
 
997
  message::Statement *statement= in_session->getStatementMessage();
 
998
 
 
999
  if (statement != NULL)
 
1000
  {
 
1001
    finalizeStatementMessage(*statement, in_session);
 
1002
  }
 
1003
  else
 
1004
    return 0; /* No data modification occurred inside the transaction */
 
1005
  
 
1006
  message::Transaction* transaction= getActiveTransactionMessage(in_session);
 
1007
 
 
1008
  finalizeTransactionMessage(*transaction, in_session);
 
1009
  
 
1010
  plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
 
1011
 
 
1012
  cleanupTransactionMessage(transaction, in_session);
 
1013
 
 
1014
  return static_cast<int>(result);
 
1015
}
 
1016
 
 
1017
void TransactionServices::initStatementMessage(message::Statement &statement,
 
1018
                                        message::Statement::Type in_type,
 
1019
                                        Session *in_session)
 
1020
{
 
1021
  statement.set_type(in_type);
 
1022
  statement.set_start_timestamp(in_session->getCurrentTimestamp());
 
1023
  /** @TODO Set sql string optionally */
 
1024
}
 
1025
 
 
1026
void TransactionServices::finalizeStatementMessage(message::Statement &statement,
 
1027
                                            Session *in_session)
 
1028
{
 
1029
  statement.set_end_timestamp(in_session->getCurrentTimestamp());
 
1030
  in_session->setStatementMessage(NULL);
 
1031
}
 
1032
 
 
1033
void TransactionServices::rollbackTransactionMessage(Session *in_session)
 
1034
{
 
1035
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1036
  if (! replication_services.isActive())
 
1037
    return;
 
1038
  
 
1039
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1040
 
 
1041
  /*
 
1042
   * OK, so there are two situations that we need to deal with here:
 
1043
   *
 
1044
   * 1) We receive an instruction to ROLLBACK the current transaction
 
1045
   *    and the currently-stored Transaction message is *self-contained*, 
 
1046
   *    meaning that no Statement messages in the Transaction message
 
1047
   *    contain a message having its segment_id member greater than 1.  If
 
1048
   *    no non-segment ID 1 members are found, we can simply clear the
 
1049
   *    current Transaction message and remove it from memory.
 
1050
   *
 
1051
   * 2) If the Transaction message does indeed have a non-end segment, that
 
1052
   *    means that a bulk update/delete/insert Transaction message segment
 
1053
   *    has previously been sent over the wire to replicators.  In this case, 
 
1054
   *    we need to package a Transaction with a Statement message of type
 
1055
   *    ROLLBACK to indicate to replicators that previously-transmitted
 
1056
   *    messages must be un-applied.
 
1057
   */
 
1058
  if (unlikely(message::transactionContainsBulkSegment(*transaction)))
 
1059
  {
 
1060
    /* Remember the transaction ID so we can re-use it */
 
1061
    uint64_t trx_id= transaction->transaction_context().transaction_id();
 
1062
 
 
1063
    /*
 
1064
     * Clear the transaction, create a Rollback statement message, 
 
1065
     * attach it to the transaction, and push it to replicators.
 
1066
     */
 
1067
    transaction->Clear();
 
1068
    initTransactionMessage(*transaction, in_session, false);
 
1069
 
 
1070
    /* Set the transaction ID to match the previous messages */
 
1071
    transaction->mutable_transaction_context()->set_transaction_id(trx_id);
 
1072
 
 
1073
    message::Statement *statement= transaction->add_statement();
 
1074
 
 
1075
    initStatementMessage(*statement, message::Statement::ROLLBACK, in_session);
 
1076
    finalizeStatementMessage(*statement, in_session);
 
1077
 
 
1078
    finalizeTransactionMessage(*transaction, in_session);
 
1079
    
 
1080
    (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1081
  }
 
1082
  cleanupTransactionMessage(transaction, in_session);
 
1083
}
 
1084
 
 
1085
message::Statement &TransactionServices::getInsertStatement(Session *in_session,
 
1086
                                                            Table *in_table,
 
1087
                                                            uint32_t *next_segment_id)
 
1088
{
 
1089
  message::Statement *statement= in_session->getStatementMessage();
 
1090
  message::Transaction *transaction= NULL;
 
1091
 
 
1092
  /* 
 
1093
   * Check the type for the current Statement message, if it is anything
 
1094
   * other then INSERT we need to call finalize, this will ensure a 
 
1095
   * new InsertStatement is created. If it is of type INSERT check
 
1096
   * what table the INSERT belongs to, if it is a different table
 
1097
   * call finalize, so a new InsertStatement can be created. 
 
1098
   */
 
1099
  if (statement != NULL && statement->type() != message::Statement::INSERT)
 
1100
  {
 
1101
    finalizeStatementMessage(*statement, in_session);
 
1102
    statement= in_session->getStatementMessage();
 
1103
  } 
 
1104
  else if (statement != NULL)
 
1105
  {
 
1106
    transaction= getActiveTransactionMessage(in_session);
 
1107
 
 
1108
    /*
 
1109
     * If we've passed our threshold for the statement size (possible for
 
1110
     * a bulk insert), we'll finalize the Statement and Transaction (doing
 
1111
     * the Transaction will keep it from getting huge).
 
1112
     */
 
1113
    if (static_cast<size_t>(transaction->ByteSize()) >= trx_msg_threshold)
 
1114
    {
 
1115
      /* Remember the transaction ID so we can re-use it */
 
1116
      uint64_t trx_id= transaction->transaction_context().transaction_id();
 
1117
 
 
1118
      message::InsertData *current_data= statement->mutable_insert_data();
 
1119
 
 
1120
      /* Caller should use this value when adding a new record */
 
1121
      *next_segment_id= current_data->segment_id() + 1;
 
1122
 
 
1123
      current_data->set_end_segment(false);
 
1124
 
 
1125
      /* 
 
1126
       * Send the trx message to replicators after finalizing the 
 
1127
       * statement and transaction. This will also set the Transaction
 
1128
       * and Statement objects in Session to NULL.
 
1129
       */
 
1130
      commitTransactionMessage(in_session);
 
1131
 
 
1132
      /*
 
1133
       * Statement and Transaction should now be NULL, so new ones will get
 
1134
       * created. We reuse the transaction id since we are segmenting
 
1135
       * one transaction.
 
1136
       */
 
1137
      statement= in_session->getStatementMessage();
 
1138
      transaction= getActiveTransactionMessage(in_session, false);
 
1139
      assert(transaction != NULL);
 
1140
 
 
1141
      /* Set the transaction ID to match the previous messages */
 
1142
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
 
1143
    }
 
1144
    else
 
1145
    {
 
1146
      const message::InsertHeader &insert_header= statement->insert_header();
 
1147
      string old_table_name= insert_header.table_metadata().table_name();
 
1148
     
 
1149
      string current_table_name;
 
1150
      (void) in_table->getShare()->getTableName(current_table_name);
 
1151
 
 
1152
      if (current_table_name.compare(old_table_name))
 
1153
      {
 
1154
        finalizeStatementMessage(*statement, in_session);
 
1155
        statement= in_session->getStatementMessage();
 
1156
      }
 
1157
      else
 
1158
      {
 
1159
        /* carry forward the existing segment id */
 
1160
        const message::InsertData &current_data= statement->insert_data();
 
1161
        *next_segment_id= current_data.segment_id();
 
1162
      }
 
1163
    }
 
1164
  } 
 
1165
 
 
1166
  if (statement == NULL)
 
1167
  {
 
1168
    /*
 
1169
     * Transaction will be non-NULL only if we had to segment it due to
 
1170
     * transaction size above.
 
1171
     */
 
1172
    if (transaction == NULL)
 
1173
      transaction= getActiveTransactionMessage(in_session);
 
1174
 
 
1175
    /* 
 
1176
     * Transaction message initialized and set, but no statement created
 
1177
     * yet.  We construct one and initialize it, here, then return the
 
1178
     * message after attaching the new Statement message pointer to the 
 
1179
     * Session for easy retrieval later...
 
1180
     */
 
1181
    statement= transaction->add_statement();
 
1182
    setInsertHeader(*statement, in_session, in_table);
 
1183
    in_session->setStatementMessage(statement);
 
1184
  }
 
1185
  return *statement;
 
1186
}
 
1187
 
 
1188
void TransactionServices::setInsertHeader(message::Statement &statement,
 
1189
                                          Session *in_session,
 
1190
                                          Table *in_table)
 
1191
{
 
1192
  initStatementMessage(statement, message::Statement::INSERT, in_session);
 
1193
 
 
1194
  /* 
 
1195
   * Now we construct the specialized InsertHeader message inside
 
1196
   * the generalized message::Statement container...
 
1197
   */
 
1198
  /* Set up the insert header */
 
1199
  message::InsertHeader *header= statement.mutable_insert_header();
 
1200
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
 
1201
 
 
1202
  string schema_name;
 
1203
  (void) in_table->getShare()->getSchemaName(schema_name);
 
1204
  string table_name;
 
1205
  (void) in_table->getShare()->getTableName(table_name);
 
1206
 
 
1207
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
 
1208
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
 
1209
 
 
1210
  Field *current_field;
 
1211
  Field **table_fields= in_table->getFields();
 
1212
 
 
1213
  message::FieldMetadata *field_metadata;
 
1214
 
 
1215
  /* We will read all the table's fields... */
 
1216
  in_table->setReadSet();
 
1217
 
 
1218
  while ((current_field= *table_fields++) != NULL) 
 
1219
  {
 
1220
    field_metadata= header->add_field_metadata();
 
1221
    field_metadata->set_name(current_field->field_name);
 
1222
    field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
 
1223
  }
 
1224
}
 
1225
 
 
1226
bool TransactionServices::insertRecord(Session *in_session, Table *in_table)
 
1227
{
 
1228
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1229
  if (! replication_services.isActive())
 
1230
    return false;
 
1231
  /**
 
1232
   * We do this check here because we don't want to even create a 
 
1233
   * statement if there isn't a primary key on the table...
 
1234
   *
 
1235
   * @todo
 
1236
   *
 
1237
   * Multi-column primary keys are handled how exactly?
 
1238
   */
 
1239
  if (not in_table->getShare()->hasPrimaryKey())
 
1240
  {
 
1241
    my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
 
1242
    return true;
 
1243
  }
 
1244
 
 
1245
  uint32_t next_segment_id= 1;
 
1246
  message::Statement &statement= getInsertStatement(in_session, in_table, &next_segment_id);
 
1247
 
 
1248
  message::InsertData *data= statement.mutable_insert_data();
 
1249
  data->set_segment_id(next_segment_id);
 
1250
  data->set_end_segment(true);
 
1251
  message::InsertRecord *record= data->add_record();
 
1252
 
 
1253
  Field *current_field;
 
1254
  Field **table_fields= in_table->getFields();
 
1255
 
 
1256
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1257
  string_value->set_charset(system_charset_info);
 
1258
 
 
1259
  /* We will read all the table's fields... */
 
1260
  in_table->setReadSet();
 
1261
 
 
1262
  while ((current_field= *table_fields++) != NULL) 
 
1263
  {
 
1264
    if (current_field->is_null())
 
1265
    {
 
1266
      record->add_is_null(true);
 
1267
      record->add_insert_value("", 0);
 
1268
    } 
 
1269
    else 
 
1270
    {
 
1271
      string_value= current_field->val_str(string_value);
 
1272
      record->add_is_null(false);
 
1273
      record->add_insert_value(string_value->c_ptr(), string_value->length());
 
1274
      string_value->free();
 
1275
    }
 
1276
  }
 
1277
  return false;
 
1278
}
 
1279
 
 
1280
message::Statement &TransactionServices::getUpdateStatement(Session *in_session,
 
1281
                                                            Table *in_table,
 
1282
                                                            const unsigned char *old_record, 
 
1283
                                                            const unsigned char *new_record,
 
1284
                                                            uint32_t *next_segment_id)
 
1285
{
 
1286
  message::Statement *statement= in_session->getStatementMessage();
 
1287
  message::Transaction *transaction= NULL;
 
1288
 
 
1289
  /*
 
1290
   * Check the type for the current Statement message, if it is anything
 
1291
   * other then UPDATE we need to call finalize, this will ensure a
 
1292
   * new UpdateStatement is created. If it is of type UPDATE check
 
1293
   * what table the UPDATE belongs to, if it is a different table
 
1294
   * call finalize, so a new UpdateStatement can be created.
 
1295
   */
 
1296
  if (statement != NULL && statement->type() != message::Statement::UPDATE)
 
1297
  {
 
1298
    finalizeStatementMessage(*statement, in_session);
 
1299
    statement= in_session->getStatementMessage();
 
1300
  }
 
1301
  else if (statement != NULL)
 
1302
  {
 
1303
    transaction= getActiveTransactionMessage(in_session);
 
1304
 
 
1305
    /*
 
1306
     * If we've passed our threshold for the statement size (possible for
 
1307
     * a bulk insert), we'll finalize the Statement and Transaction (doing
 
1308
     * the Transaction will keep it from getting huge).
 
1309
     */
 
1310
    if (static_cast<size_t>(transaction->ByteSize()) >= trx_msg_threshold)
 
1311
    {
 
1312
      /* Remember the transaction ID so we can re-use it */
 
1313
      uint64_t trx_id= transaction->transaction_context().transaction_id();
 
1314
 
 
1315
      message::UpdateData *current_data= statement->mutable_update_data();
 
1316
 
 
1317
      /* Caller should use this value when adding a new record */
 
1318
      *next_segment_id= current_data->segment_id() + 1;
 
1319
 
 
1320
      current_data->set_end_segment(false);
 
1321
 
 
1322
      /*
 
1323
       * Send the trx message to replicators after finalizing the 
 
1324
       * statement and transaction. This will also set the Transaction
 
1325
       * and Statement objects in Session to NULL.
 
1326
       */
 
1327
      commitTransactionMessage(in_session);
 
1328
 
 
1329
      /*
 
1330
       * Statement and Transaction should now be NULL, so new ones will get
 
1331
       * created. We reuse the transaction id since we are segmenting
 
1332
       * one transaction.
 
1333
       */
 
1334
      statement= in_session->getStatementMessage();
 
1335
      transaction= getActiveTransactionMessage(in_session, false);
 
1336
      assert(transaction != NULL);
 
1337
 
 
1338
      /* Set the transaction ID to match the previous messages */
 
1339
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
 
1340
    }
 
1341
    else
 
1342
    {
 
1343
      const message::UpdateHeader &update_header= statement->update_header();
 
1344
      string old_table_name= update_header.table_metadata().table_name();
 
1345
 
 
1346
      string current_table_name;
 
1347
      (void) in_table->getShare()->getTableName(current_table_name);
 
1348
      if (current_table_name.compare(old_table_name))
 
1349
      {
 
1350
        finalizeStatementMessage(*statement, in_session);
 
1351
        statement= in_session->getStatementMessage();
 
1352
      }
 
1353
      else
 
1354
      {
 
1355
        /* carry forward the existing segment id */
 
1356
        const message::UpdateData &current_data= statement->update_data();
 
1357
        *next_segment_id= current_data.segment_id();
 
1358
      }
 
1359
    }
 
1360
  }
 
1361
 
 
1362
  if (statement == NULL)
 
1363
  {
 
1364
    /*
 
1365
     * Transaction will be non-NULL only if we had to segment it due to
 
1366
     * transaction size above.
 
1367
     */
 
1368
    if (transaction == NULL)
 
1369
      transaction= getActiveTransactionMessage(in_session);
 
1370
 
 
1371
    /* 
 
1372
     * Transaction message initialized and set, but no statement created
 
1373
     * yet.  We construct one and initialize it, here, then return the
 
1374
     * message after attaching the new Statement message pointer to the 
 
1375
     * Session for easy retrieval later...
 
1376
     */
 
1377
    statement= transaction->add_statement();
 
1378
    setUpdateHeader(*statement, in_session, in_table, old_record, new_record);
 
1379
    in_session->setStatementMessage(statement);
 
1380
  }
 
1381
  return *statement;
 
1382
}
 
1383
 
 
1384
void TransactionServices::setUpdateHeader(message::Statement &statement,
 
1385
                                          Session *in_session,
 
1386
                                          Table *in_table,
 
1387
                                          const unsigned char *old_record, 
 
1388
                                          const unsigned char *new_record)
 
1389
{
 
1390
  initStatementMessage(statement, message::Statement::UPDATE, in_session);
 
1391
 
 
1392
  /* 
 
1393
   * Now we construct the specialized UpdateHeader message inside
 
1394
   * the generalized message::Statement container...
 
1395
   */
 
1396
  /* Set up the update header */
 
1397
  message::UpdateHeader *header= statement.mutable_update_header();
 
1398
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
 
1399
 
 
1400
  string schema_name;
 
1401
  (void) in_table->getShare()->getSchemaName(schema_name);
 
1402
  string table_name;
 
1403
  (void) in_table->getShare()->getTableName(table_name);
 
1404
 
 
1405
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
 
1406
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
 
1407
 
 
1408
  Field *current_field;
 
1409
  Field **table_fields= in_table->getFields();
 
1410
 
 
1411
  message::FieldMetadata *field_metadata;
 
1412
 
 
1413
  /* We will read all the table's fields... */
 
1414
  in_table->setReadSet();
 
1415
 
 
1416
  while ((current_field= *table_fields++) != NULL) 
 
1417
  {
 
1418
    /*
 
1419
     * We add the "key field metadata" -- i.e. the fields which is
 
1420
     * the primary key for the table.
 
1421
     */
 
1422
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
 
1423
    {
 
1424
      field_metadata= header->add_key_field_metadata();
 
1425
      field_metadata->set_name(current_field->field_name);
 
1426
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
 
1427
    }
 
1428
 
 
1429
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
 
1430
    {
 
1431
      /* Field is changed from old to new */
 
1432
      field_metadata= header->add_set_field_metadata();
 
1433
      field_metadata->set_name(current_field->field_name);
 
1434
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
 
1435
    }
 
1436
  }
 
1437
}
 
1438
void TransactionServices::updateRecord(Session *in_session,
 
1439
                                       Table *in_table, 
 
1440
                                       const unsigned char *old_record, 
 
1441
                                       const unsigned char *new_record)
 
1442
{
 
1443
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1444
  if (! replication_services.isActive())
 
1445
    return;
 
1446
 
 
1447
  uint32_t next_segment_id= 1;
 
1448
  message::Statement &statement= getUpdateStatement(in_session, in_table, old_record, new_record, &next_segment_id);
 
1449
 
 
1450
  message::UpdateData *data= statement.mutable_update_data();
 
1451
  data->set_segment_id(next_segment_id);
 
1452
  data->set_end_segment(true);
 
1453
  message::UpdateRecord *record= data->add_record();
 
1454
 
 
1455
  Field *current_field;
 
1456
  Field **table_fields= in_table->getFields();
 
1457
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1458
  string_value->set_charset(system_charset_info);
 
1459
 
 
1460
  while ((current_field= *table_fields++) != NULL) 
 
1461
  {
 
1462
    /*
 
1463
     * Here, we add the SET field values.  We used to do this in the setUpdateHeader() method, 
 
1464
     * but then realized that an UPDATE statement could potentially have different values for
 
1465
     * the SET field.  For instance, imagine this SQL scenario:
 
1466
     *
 
1467
     * CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
 
1468
     * INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
 
1469
     * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
 
1470
     *
 
1471
     * We will generate two UpdateRecord messages with different set_value byte arrays.
 
1472
     */
 
1473
    if (isFieldUpdated(current_field, in_table, old_record, new_record))
 
1474
    {
 
1475
      /* Store the original "read bit" for this field */
 
1476
      bool is_read_set= current_field->isReadSet();
 
1477
 
 
1478
      /* We need to mark that we will "read" this field... */
 
1479
      in_table->setReadSet(current_field->field_index);
 
1480
 
 
1481
      /* Read the string value of this field's contents */
 
1482
      string_value= current_field->val_str(string_value);
 
1483
 
 
1484
      /* 
 
1485
       * Reset the read bit after reading field to its original state.  This 
 
1486
       * prevents the field from being included in the WHERE clause
 
1487
       */
 
1488
      current_field->setReadSet(is_read_set);
 
1489
 
 
1490
      if (current_field->is_null())
 
1491
      {
 
1492
        record->add_is_null(true);
 
1493
        record->add_after_value("", 0);
 
1494
      }
 
1495
      else
 
1496
      {
 
1497
        record->add_is_null(false);
 
1498
        record->add_after_value(string_value->c_ptr(), string_value->length());
 
1499
      }
 
1500
      string_value->free();
 
1501
    }
 
1502
 
 
1503
    /* 
 
1504
     * Add the WHERE clause values now...for now, this means the
 
1505
     * primary key field value.  Replication only supports tables
 
1506
     * with a primary key.
 
1507
     */
 
1508
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
 
1509
    {
 
1510
      /**
 
1511
       * To say the below is ugly is an understatement. But it works.
 
1512
       * 
 
1513
       * @todo Move this crap into a real Record API.
 
1514
       */
 
1515
      string_value= current_field->val_str(string_value,
 
1516
                                           old_record + 
 
1517
                                           current_field->offset(const_cast<unsigned char *>(new_record)));
 
1518
      record->add_key_value(string_value->c_ptr(), string_value->length());
 
1519
      string_value->free();
 
1520
    }
 
1521
 
 
1522
  }
 
1523
}
 
1524
 
 
1525
bool TransactionServices::isFieldUpdated(Field *current_field,
 
1526
                                         Table *in_table,
 
1527
                                         const unsigned char *old_record,
 
1528
                                         const unsigned char *new_record)
 
1529
{
 
1530
  /*
 
1531
   * The below really should be moved into the Field API and Record API.  But for now
 
1532
   * we do this crazy pointer fiddling to figure out if the current field
 
1533
   * has been updated in the supplied record raw byte pointers.
 
1534
   */
 
1535
  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
 
1536
  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - in_table->getInsertRecord());
 
1537
 
 
1538
  uint32_t field_length= current_field->pack_length(); /** @TODO This isn't always correct...check varchar diffs. */
 
1539
 
 
1540
  bool old_value_is_null= current_field->is_null_in_record(old_record);
 
1541
  bool new_value_is_null= current_field->is_null_in_record(new_record);
 
1542
 
 
1543
  bool isUpdated= false;
 
1544
  if (old_value_is_null != new_value_is_null)
 
1545
  {
 
1546
    if ((old_value_is_null) && (! new_value_is_null)) /* old value is NULL, new value is non NULL */
 
1547
    {
 
1548
      isUpdated= true;
 
1549
    }
 
1550
    else if ((! old_value_is_null) && (new_value_is_null)) /* old value is non NULL, new value is NULL */
 
1551
    {
 
1552
      isUpdated= true;
 
1553
    }
 
1554
  }
 
1555
 
 
1556
  if (! isUpdated)
 
1557
  {
 
1558
    if (memcmp(old_ptr, new_ptr, field_length) != 0)
 
1559
    {
 
1560
      isUpdated= true;
 
1561
    }
 
1562
  }
 
1563
  return isUpdated;
 
1564
}  
 
1565
 
 
1566
message::Statement &TransactionServices::getDeleteStatement(Session *in_session,
 
1567
                                                            Table *in_table,
 
1568
                                                            uint32_t *next_segment_id)
 
1569
{
 
1570
  message::Statement *statement= in_session->getStatementMessage();
 
1571
  message::Transaction *transaction= NULL;
 
1572
 
 
1573
  /*
 
1574
   * Check the type for the current Statement message, if it is anything
 
1575
   * other then DELETE we need to call finalize, this will ensure a
 
1576
   * new DeleteStatement is created. If it is of type DELETE check
 
1577
   * what table the DELETE belongs to, if it is a different table
 
1578
   * call finalize, so a new DeleteStatement can be created.
 
1579
   */
 
1580
  if (statement != NULL && statement->type() != message::Statement::DELETE)
 
1581
  {
 
1582
    finalizeStatementMessage(*statement, in_session);
 
1583
    statement= in_session->getStatementMessage();
 
1584
  }
 
1585
  else if (statement != NULL)
 
1586
  {
 
1587
    transaction= getActiveTransactionMessage(in_session);
 
1588
 
 
1589
    /*
 
1590
     * If we've passed our threshold for the statement size (possible for
 
1591
     * a bulk insert), we'll finalize the Statement and Transaction (doing
 
1592
     * the Transaction will keep it from getting huge).
 
1593
     */
 
1594
    if (static_cast<size_t>(transaction->ByteSize()) >= trx_msg_threshold)
 
1595
    {
 
1596
      /* Remember the transaction ID so we can re-use it */
 
1597
      uint64_t trx_id= transaction->transaction_context().transaction_id();
 
1598
 
 
1599
      message::DeleteData *current_data= statement->mutable_delete_data();
 
1600
 
 
1601
      /* Caller should use this value when adding a new record */
 
1602
      *next_segment_id= current_data->segment_id() + 1;
 
1603
 
 
1604
      current_data->set_end_segment(false);
 
1605
 
 
1606
      /* 
 
1607
       * Send the trx message to replicators after finalizing the 
 
1608
       * statement and transaction. This will also set the Transaction
 
1609
       * and Statement objects in Session to NULL.
 
1610
       */
 
1611
      commitTransactionMessage(in_session);
 
1612
 
 
1613
      /*
 
1614
       * Statement and Transaction should now be NULL, so new ones will get
 
1615
       * created. We reuse the transaction id since we are segmenting
 
1616
       * one transaction.
 
1617
       */
 
1618
      statement= in_session->getStatementMessage();
 
1619
      transaction= getActiveTransactionMessage(in_session, false);
 
1620
      assert(transaction != NULL);
 
1621
 
 
1622
      /* Set the transaction ID to match the previous messages */
 
1623
      transaction->mutable_transaction_context()->set_transaction_id(trx_id);
 
1624
    }
 
1625
    else
 
1626
    {
 
1627
      const message::DeleteHeader &delete_header= statement->delete_header();
 
1628
      string old_table_name= delete_header.table_metadata().table_name();
 
1629
 
 
1630
      string current_table_name;
 
1631
      (void) in_table->getShare()->getTableName(current_table_name);
 
1632
      if (current_table_name.compare(old_table_name))
 
1633
      {
 
1634
        finalizeStatementMessage(*statement, in_session);
 
1635
        statement= in_session->getStatementMessage();
 
1636
      }
 
1637
      else
 
1638
      {
 
1639
        /* carry forward the existing segment id */
 
1640
        const message::DeleteData &current_data= statement->delete_data();
 
1641
        *next_segment_id= current_data.segment_id();
 
1642
      }
 
1643
    }
 
1644
  }
 
1645
 
 
1646
  if (statement == NULL)
 
1647
  {
 
1648
    /*
 
1649
     * Transaction will be non-NULL only if we had to segment it due to
 
1650
     * transaction size above.
 
1651
     */
 
1652
    if (transaction == NULL)
 
1653
      transaction= getActiveTransactionMessage(in_session);
 
1654
 
 
1655
    /* 
 
1656
     * Transaction message initialized and set, but no statement created
 
1657
     * yet.  We construct one and initialize it, here, then return the
 
1658
     * message after attaching the new Statement message pointer to the 
 
1659
     * Session for easy retrieval later...
 
1660
     */
 
1661
    statement= transaction->add_statement();
 
1662
    setDeleteHeader(*statement, in_session, in_table);
 
1663
    in_session->setStatementMessage(statement);
 
1664
  }
 
1665
  return *statement;
 
1666
}
 
1667
 
 
1668
void TransactionServices::setDeleteHeader(message::Statement &statement,
 
1669
                                          Session *in_session,
 
1670
                                          Table *in_table)
 
1671
{
 
1672
  initStatementMessage(statement, message::Statement::DELETE, in_session);
 
1673
 
 
1674
  /* 
 
1675
   * Now we construct the specialized DeleteHeader message inside
 
1676
   * the generalized message::Statement container...
 
1677
   */
 
1678
  message::DeleteHeader *header= statement.mutable_delete_header();
 
1679
  message::TableMetadata *table_metadata= header->mutable_table_metadata();
 
1680
 
 
1681
  string schema_name;
 
1682
  (void) in_table->getShare()->getSchemaName(schema_name);
 
1683
  string table_name;
 
1684
  (void) in_table->getShare()->getTableName(table_name);
 
1685
 
 
1686
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
 
1687
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
 
1688
 
 
1689
  Field *current_field;
 
1690
  Field **table_fields= in_table->getFields();
 
1691
 
 
1692
  message::FieldMetadata *field_metadata;
 
1693
 
 
1694
  while ((current_field= *table_fields++) != NULL) 
 
1695
  {
 
1696
    /* 
 
1697
     * Add the WHERE clause values now...for now, this means the
 
1698
     * primary key field value.  Replication only supports tables
 
1699
     * with a primary key.
 
1700
     */
 
1701
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
 
1702
    {
 
1703
      field_metadata= header->add_key_field_metadata();
 
1704
      field_metadata->set_name(current_field->field_name);
 
1705
      field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
 
1706
    }
 
1707
  }
 
1708
}
 
1709
 
 
1710
void TransactionServices::deleteRecord(Session *in_session, Table *in_table, bool use_update_record)
 
1711
{
 
1712
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1713
  if (! replication_services.isActive())
 
1714
    return;
 
1715
 
 
1716
  uint32_t next_segment_id= 1;
 
1717
  message::Statement &statement= getDeleteStatement(in_session, in_table, &next_segment_id);
 
1718
 
 
1719
  message::DeleteData *data= statement.mutable_delete_data();
 
1720
  data->set_segment_id(next_segment_id);
 
1721
  data->set_end_segment(true);
 
1722
  message::DeleteRecord *record= data->add_record();
 
1723
 
 
1724
  Field *current_field;
 
1725
  Field **table_fields= in_table->getFields();
 
1726
  String *string_value= new (in_session->mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
 
1727
  string_value->set_charset(system_charset_info);
 
1728
 
 
1729
  while ((current_field= *table_fields++) != NULL) 
 
1730
  {
 
1731
    /* 
 
1732
     * Add the WHERE clause values now...for now, this means the
 
1733
     * primary key field value.  Replication only supports tables
 
1734
     * with a primary key.
 
1735
     */
 
1736
    if (in_table->getShare()->fieldInPrimaryKey(current_field))
 
1737
    {
 
1738
      if (use_update_record)
 
1739
      {
 
1740
        /*
 
1741
         * Temporarily point to the update record to get its value.
 
1742
         * This is pretty much a hack in order to get the PK value from
 
1743
         * the update record rather than the insert record. Field::val_str()
 
1744
         * should not change anything in Field::ptr, so this should be safe.
 
1745
         * We are careful not to change anything in old_ptr.
 
1746
         */
 
1747
        const unsigned char *old_ptr= current_field->ptr;
 
1748
        current_field->ptr= in_table->getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - in_table->getInsertRecord());
 
1749
        string_value= current_field->val_str(string_value);
 
1750
        current_field->ptr= const_cast<unsigned char *>(old_ptr);
 
1751
      }
 
1752
      else
 
1753
      {
 
1754
        string_value= current_field->val_str(string_value);
 
1755
        /**
 
1756
         * @TODO Store optional old record value in the before data member
 
1757
         */
 
1758
      }
 
1759
      record->add_key_value(string_value->c_ptr(), string_value->length());
 
1760
      string_value->free();
 
1761
    }
 
1762
  }
 
1763
}
 
1764
 
 
1765
void TransactionServices::createTable(Session *in_session,
 
1766
                                      const message::Table &table)
 
1767
{
 
1768
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1769
  if (! replication_services.isActive())
 
1770
    return;
 
1771
  
 
1772
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1773
  message::Statement *statement= transaction->add_statement();
 
1774
 
 
1775
  initStatementMessage(*statement, message::Statement::CREATE_TABLE, in_session);
 
1776
 
 
1777
  /* 
 
1778
   * Construct the specialized CreateTableStatement message and attach
 
1779
   * it to the generic Statement message
 
1780
   */
 
1781
  message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
 
1782
  message::Table *new_table_message= create_table_statement->mutable_table();
 
1783
  *new_table_message= table;
 
1784
 
 
1785
  finalizeStatementMessage(*statement, in_session);
 
1786
 
 
1787
  finalizeTransactionMessage(*transaction, in_session);
 
1788
  
 
1789
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1790
 
 
1791
  cleanupTransactionMessage(transaction, in_session);
 
1792
 
 
1793
}
 
1794
 
 
1795
void TransactionServices::createSchema(Session *in_session,
 
1796
                                       const message::Schema &schema)
 
1797
{
 
1798
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1799
  if (! replication_services.isActive())
 
1800
    return;
 
1801
  
 
1802
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1803
  message::Statement *statement= transaction->add_statement();
 
1804
 
 
1805
  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, in_session);
 
1806
 
 
1807
  /* 
 
1808
   * Construct the specialized CreateSchemaStatement message and attach
 
1809
   * it to the generic Statement message
 
1810
   */
 
1811
  message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
 
1812
  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
 
1813
  *new_schema_message= schema;
 
1814
 
 
1815
  finalizeStatementMessage(*statement, in_session);
 
1816
 
 
1817
  finalizeTransactionMessage(*transaction, in_session);
 
1818
  
 
1819
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1820
 
 
1821
  cleanupTransactionMessage(transaction, in_session);
 
1822
 
 
1823
}
 
1824
 
 
1825
void TransactionServices::dropSchema(Session *in_session, const string &schema_name)
 
1826
{
 
1827
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1828
  if (! replication_services.isActive())
 
1829
    return;
 
1830
  
 
1831
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1832
  message::Statement *statement= transaction->add_statement();
 
1833
 
 
1834
  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, in_session);
 
1835
 
 
1836
  /* 
 
1837
   * Construct the specialized DropSchemaStatement message and attach
 
1838
   * it to the generic Statement message
 
1839
   */
 
1840
  message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
 
1841
 
 
1842
  drop_schema_statement->set_schema_name(schema_name);
 
1843
 
 
1844
  finalizeStatementMessage(*statement, in_session);
 
1845
 
 
1846
  finalizeTransactionMessage(*transaction, in_session);
 
1847
  
 
1848
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1849
 
 
1850
  cleanupTransactionMessage(transaction, in_session);
 
1851
}
 
1852
 
 
1853
void TransactionServices::dropTable(Session *in_session,
 
1854
                                    const string &schema_name,
 
1855
                                    const string &table_name,
 
1856
                                    bool if_exists)
 
1857
{
 
1858
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1859
  if (! replication_services.isActive())
 
1860
    return;
 
1861
  
 
1862
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1863
  message::Statement *statement= transaction->add_statement();
 
1864
 
 
1865
  initStatementMessage(*statement, message::Statement::DROP_TABLE, in_session);
 
1866
 
 
1867
  /* 
 
1868
   * Construct the specialized DropTableStatement message and attach
 
1869
   * it to the generic Statement message
 
1870
   */
 
1871
  message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
 
1872
 
 
1873
  drop_table_statement->set_if_exists_clause(if_exists);
 
1874
 
 
1875
  message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
 
1876
 
 
1877
  table_metadata->set_schema_name(schema_name);
 
1878
  table_metadata->set_table_name(table_name);
 
1879
 
 
1880
  finalizeStatementMessage(*statement, in_session);
 
1881
 
 
1882
  finalizeTransactionMessage(*transaction, in_session);
 
1883
  
 
1884
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1885
 
 
1886
  cleanupTransactionMessage(transaction, in_session);
 
1887
}
 
1888
 
 
1889
void TransactionServices::truncateTable(Session *in_session, Table *in_table)
 
1890
{
 
1891
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1892
  if (! replication_services.isActive())
 
1893
    return;
 
1894
  
 
1895
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1896
  message::Statement *statement= transaction->add_statement();
 
1897
 
 
1898
  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, in_session);
 
1899
 
 
1900
  /* 
 
1901
   * Construct the specialized TruncateTableStatement message and attach
 
1902
   * it to the generic Statement message
 
1903
   */
 
1904
  message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
 
1905
  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
 
1906
 
 
1907
  string schema_name;
 
1908
  (void) in_table->getShare()->getSchemaName(schema_name);
 
1909
  string table_name;
 
1910
  (void) in_table->getShare()->getTableName(table_name);
 
1911
 
 
1912
  table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
 
1913
  table_metadata->set_table_name(table_name.c_str(), table_name.length());
 
1914
 
 
1915
  finalizeStatementMessage(*statement, in_session);
 
1916
 
 
1917
  finalizeTransactionMessage(*transaction, in_session);
 
1918
  
 
1919
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1920
 
 
1921
  cleanupTransactionMessage(transaction, in_session);
 
1922
}
 
1923
 
 
1924
void TransactionServices::rawStatement(Session *in_session, const string &query)
 
1925
{
 
1926
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
1927
  if (! replication_services.isActive())
 
1928
    return;
 
1929
  
 
1930
  message::Transaction *transaction= getActiveTransactionMessage(in_session);
 
1931
  message::Statement *statement= transaction->add_statement();
 
1932
 
 
1933
  initStatementMessage(*statement, message::Statement::RAW_SQL, in_session);
 
1934
  statement->set_sql(query);
 
1935
  finalizeStatementMessage(*statement, in_session);
 
1936
 
 
1937
  finalizeTransactionMessage(*transaction, in_session);
 
1938
  
 
1939
  (void) replication_services.pushTransactionMessage(*in_session, *transaction);
 
1940
 
 
1941
  cleanupTransactionMessage(transaction, in_session);
 
1942
}
 
1943
 
966
1944
} /* namespace drizzled */