~ubuntu-branches/ubuntu/wily/mysql-5.6/wily

« back to all changes in this revision

Viewing changes to plugin/semisync/semisync_master.cc

  • Committer: Package Import Robot
  • Author(s): Marc Deslauriers
  • Date: 2015-07-21 07:09:29 UTC
  • mto: This revision was merged to the branch mainline in revision 14.
  • Revision ID: package-import@ubuntu.com-20150721070929-mg4dpqkgg3it1ajf
Tags: upstream-5.6.25
ImportĀ upstreamĀ versionĀ 5.6.25

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* Copyright (C) 2007 Google Inc.
2
 
   Copyright (c) 2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
3
 
   Use is subject to license terms.
 
2
   Copyright (c) 2008, 2015, Oracle and/or its affiliates. All rights reserved.
4
3
 
5
4
   This program is free software; you can redistribute it and/or modify
6
5
   it under the terms of the GNU General Public License as published by
224
223
  return (entry != NULL);
225
224
}
226
225
 
 
226
int ActiveTranx::signal_waiting_sessions_all()
 
227
{
 
228
  const char *kWho = "ActiveTranx::signal_waiting_sessions_all";
 
229
  function_enter(kWho);
 
230
  for (TranxNode* entry= trx_front_; entry; entry=entry->next_)
 
231
    mysql_cond_broadcast(&entry->cond);
 
232
 
 
233
  return function_exit(kWho, 0);
 
234
}
 
235
 
 
236
int ActiveTranx::signal_waiting_sessions_up_to(const char *log_file_name,
 
237
                                               my_off_t log_file_pos)
 
238
{
 
239
  const char *kWho = "ActiveTranx::signal_waiting_sessions_up_to";
 
240
  function_enter(kWho);
 
241
 
 
242
  TranxNode* entry= trx_front_;
 
243
  int cmp= ActiveTranx::compare(entry->log_name_, entry->log_pos_, log_file_name, log_file_pos) ;
 
244
  while (entry && cmp <= 0)
 
245
  {
 
246
    mysql_cond_broadcast(&entry->cond);
 
247
    entry= entry->next_;
 
248
    if (entry)
 
249
      cmp= ActiveTranx::compare(entry->log_name_, entry->log_pos_, log_file_name, log_file_pos) ;
 
250
  }
 
251
 
 
252
  return function_exit(kWho, (entry != NULL));
 
253
}
 
254
 
 
255
TranxNode * ActiveTranx::find_active_tranx_node(const char *log_file_name,
 
256
                                                my_off_t log_file_pos)
 
257
{
 
258
  const char *kWho = "ActiveTranx::find_active_tranx_node";
 
259
  function_enter(kWho);
 
260
 
 
261
  TranxNode* entry= trx_front_;
 
262
 
 
263
  while (entry)
 
264
  {
 
265
    if (ActiveTranx::compare(log_file_name, log_file_pos, entry->log_name_,
 
266
                             entry->log_pos_) <= 0)
 
267
      break;
 
268
    entry= entry->next_;
 
269
  }
 
270
  function_exit(kWho, 0);
 
271
  return entry;
 
272
}
 
273
 
227
274
int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
228
275
                                          my_off_t log_file_pos)
229
276
{
238
285
 
239
286
    while (new_front)
240
287
    {
241
 
      if (compare(new_front, log_file_name, log_file_pos) > 0)
 
288
      if (compare(new_front, log_file_name, log_file_pos) > 0 ||
 
289
          new_front->n_waiters > 0)
242
290
        break;
243
291
      new_front = new_front->next_;
244
292
    }
365
413
  /* Mutex initialization can only be done after MY_INIT(). */
366
414
  mysql_mutex_init(key_ss_mutex_LOCK_binlog_,
367
415
                   &LOCK_binlog_, MY_MUTEX_INIT_FAST);
368
 
  mysql_cond_init(key_ss_cond_COND_binlog_send_,
369
 
                  &COND_binlog_send_, NULL);
370
416
 
371
417
  if (rpl_semi_sync_master_enabled)
372
418
    result = enableMaster();
385
431
 
386
432
  if (!getMasterEnabled())
387
433
  {
388
 
    active_tranxs_ = new ActiveTranx(&LOCK_binlog_, trace_level_);
 
434
    if (active_tranxs_ == NULL)
 
435
      active_tranxs_ = new ActiveTranx(&LOCK_binlog_, trace_level_);
 
436
 
389
437
    if (active_tranxs_ != NULL)
390
438
    {
391
439
      commit_file_name_inited_ = false;
420
468
     */
421
469
    switch_off();
422
470
 
423
 
    assert(active_tranxs_ != NULL);
424
 
    delete active_tranxs_;
425
 
    active_tranxs_ = NULL;
 
471
    if ( active_tranxs_ && active_tranxs_->is_empty())
 
472
    {
 
473
      delete active_tranxs_;
 
474
      active_tranxs_ = NULL;
 
475
    }
426
476
 
427
477
    reply_file_name_inited_ = false;
428
478
    wait_file_name_inited_  = false;
442
492
  if (init_done_)
443
493
  {
444
494
    mysql_mutex_destroy(&LOCK_binlog_);
445
 
    mysql_cond_destroy(&COND_binlog_send_);
446
495
  }
447
496
 
448
497
  delete active_tranxs_;
458
507
  mysql_mutex_unlock(&LOCK_binlog_);
459
508
}
460
509
 
461
 
void ReplSemiSyncMaster::cond_broadcast()
462
 
{
463
 
  mysql_cond_broadcast(&COND_binlog_send_);
464
 
}
465
 
 
466
 
int ReplSemiSyncMaster::cond_timewait(struct timespec *wait_time)
467
 
{
468
 
  const char *kWho = "ReplSemiSyncMaster::cond_timewait()";
469
 
  int wait_res;
470
 
 
471
 
  function_enter(kWho);
472
 
  wait_res= mysql_cond_timedwait(&COND_binlog_send_,
473
 
                                 &LOCK_binlog_, wait_time);
474
 
  return function_exit(kWho, wait_res);
475
 
}
476
 
 
477
510
void ReplSemiSyncMaster::add_slave()
478
511
{
479
512
  lock();
579
612
    reply_file_pos_ = log_file_pos;
580
613
    reply_file_name_inited_ = true;
581
614
 
582
 
    /* Remove all active transaction nodes before this point. */
583
 
    assert(active_tranxs_ != NULL);
584
 
    active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos);
585
 
 
586
615
    if (trace_level_ & kTraceDetail)
587
616
    {
588
617
      if(!skipped_event)
612
641
  }
613
642
 
614
643
 l_end:
615
 
  unlock();
616
644
 
617
645
  if (can_release_threads)
618
646
  {
619
647
    if (trace_level_ & kTraceDetail)
620
648
      sql_print_information("%s: signal all waiting threads.", kWho);
621
 
 
622
 
    cond_broadcast();
 
649
    active_tranxs_->signal_waiting_sessions_up_to(reply_file_name_, reply_file_pos_);
623
650
  }
624
 
 
 
651
  unlock();
625
652
  return function_exit(kWho, 0);
626
653
}
627
654
 
631
658
  const char *kWho = "ReplSemiSyncMaster::commitTrx";
632
659
 
633
660
  function_enter(kWho);
 
661
  PSI_stage_info old_stage;
 
662
 
 
663
#if defined(ENABLED_DEBUG_SYNC)
 
664
  /* debug sync may not be initialized for a master */
 
665
  if (current_thd->debug_sync_control)
 
666
    DEBUG_SYNC(current_thd, "rpl_semisync_master_commit_trx_before_lock");
 
667
#endif
 
668
  /* Acquire the mutex. */
 
669
  lock();
 
670
 
 
671
  TranxNode* entry= NULL;
 
672
  mysql_cond_t* thd_cond= NULL;
 
673
  if (active_tranxs_ != NULL && trx_wait_binlog_name)
 
674
  {
 
675
    entry=
 
676
      active_tranxs_->find_active_tranx_node(trx_wait_binlog_name,
 
677
                                             trx_wait_binlog_pos);
 
678
    if (entry)
 
679
      thd_cond= &entry->cond;
 
680
  }
 
681
  /* This must be called after acquired the lock */
 
682
  THD_ENTER_COND(NULL, thd_cond, &LOCK_binlog_,
 
683
                 & stage_waiting_for_semi_sync_ack_from_slave,
 
684
                 & old_stage);
634
685
 
635
686
  if (getMasterEnabled() && trx_wait_binlog_name)
636
687
  {
637
688
    struct timespec start_ts;
638
689
    struct timespec abstime;
639
690
    int wait_result;
640
 
    PSI_stage_info old_stage;
641
691
 
642
692
    set_timespec(start_ts, 0);
643
 
#if defined(ENABLED_DEBUG_SYNC)
644
 
    /* debug sync may not be initialized for a master */
645
 
    if (current_thd->debug_sync_control)
646
 
      DEBUG_SYNC(current_thd, "rpl_semisync_master_commit_trx_before_lock");
647
 
#endif
648
 
    /* Acquire the mutex. */
649
 
    lock();
650
 
 
651
 
    /* This must be called after acquired the lock */
652
 
    THD_ENTER_COND(NULL, &COND_binlog_send_, &LOCK_binlog_,
653
 
                   & stage_waiting_for_semi_sync_ack_from_slave,
654
 
                   & old_stage);
655
 
 
656
693
    /* This is the real check inside the mutex. */
657
694
    if (!getMasterEnabled() || !is_on())
658
695
      goto l_end;
751
788
                              kWho, wait_timeout_,
752
789
                              wait_file_name_, (unsigned long)wait_file_pos_);
753
790
      
754
 
      wait_result = cond_timewait(&abstime);
 
791
      /* wait for the position to be ACK'ed back */
 
792
      assert(entry);
 
793
      entry->n_waiters++;
 
794
      wait_result= mysql_cond_timedwait(&entry->cond, &LOCK_binlog_, &abstime);
 
795
      entry->n_waiters--;
755
796
      rpl_semi_sync_master_wait_sessions--;
756
797
      
757
798
      if (wait_result != 0)
790
831
      }
791
832
    }
792
833
 
793
 
    /*
794
 
      At this point, the binlog file and position of this transaction
795
 
      must have been removed from ActiveTranx.
796
 
    */
797
 
    assert(!getMasterEnabled() ||
798
 
           !active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name,
799
 
                                             trx_wait_binlog_pos));
800
 
  l_end:
 
834
l_end:
801
835
    /* Update the status counter. */
802
836
    if (is_on())
803
837
      rpl_semi_sync_master_yes_transactions++;
804
838
    else
805
839
      rpl_semi_sync_master_no_transactions++;
806
840
 
807
 
    /* The lock held will be released by thd_exit_cond, so no need to
808
 
       call unlock() here */
809
 
    THD_EXIT_COND(NULL, & old_stage);
810
841
  }
811
842
 
 
843
  /* Last waiter removes the TranxNode */
 
844
  if (trx_wait_binlog_name && active_tranxs_
 
845
      && entry && entry->n_waiters == 0)
 
846
    active_tranxs_->clear_active_tranx_nodes(trx_wait_binlog_name,
 
847
                                             trx_wait_binlog_pos);
 
848
 
 
849
  /* The lock held will be released by thd_exit_cond, so no need to
 
850
    call unlock() here */
 
851
  THD_EXIT_COND(NULL, & old_stage);
812
852
  return function_exit(kWho, 0);
813
853
}
814
854
 
825
865
 *
826
866
 * If semi-sync is disabled, all transactions still update the wait
827
867
 * position with the last position in binlog.  But no transactions will
828
 
 * wait for confirmations and the active transaction list would not be
829
 
 * maintained.  In binlog dump thread, updateSyncHeader() checks whether
830
 
 * the current sending event catches up with last wait position.  If it
831
 
 * does match, semi-sync will be switched on again.
 
868
 * wait for confirmations maintained.  In binlog dump thread,
 
869
 * updateSyncHeader() checks whether the current sending event catches
 
870
 * up with last wait position.  If it does match, semi-sync will be
 
871
 * switched on again.
832
872
 */
833
873
int ReplSemiSyncMaster::switch_off()
834
874
{
835
875
  const char *kWho = "ReplSemiSyncMaster::switch_off";
836
 
  int result;
837
876
 
838
877
  function_enter(kWho);
839
878
  state_ = false;
840
879
 
841
 
  /* Clear the active transaction list. */
842
 
  assert(active_tranxs_ != NULL);
843
 
  result = active_tranxs_->clear_active_tranx_nodes(NULL, 0);
844
 
 
845
880
  rpl_semi_sync_master_off_times++;
846
881
  wait_file_name_inited_   = false;
847
882
  reply_file_name_inited_  = false;
848
883
  sql_print_information("Semi-sync replication switched OFF.");
849
 
  cond_broadcast();                            /* wake up all waiting threads */
850
 
 
851
 
  return function_exit(kWho, result);
 
884
 
 
885
  /* signal waiting sessions */
 
886
  active_tranxs_->signal_waiting_sessions_all();
 
887
 
 
888
  return function_exit(kWho, 0);
852
889
}
853
890
 
854
891
int ReplSemiSyncMaster::try_switch_on(int server_id,