1
1
/* Copyright (C) 2007 Google Inc.
2
Copyright (c) 2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
3
Use is subject to license terms.
2
Copyright (c) 2008, 2015, Oracle and/or its affiliates. All rights reserved.
5
4
This program is free software; you can redistribute it and/or modify
6
5
it under the terms of the GNU General Public License as published by
224
223
return (entry != NULL);
226
int ActiveTranx::signal_waiting_sessions_all()
228
const char *kWho = "ActiveTranx::signal_waiting_sessions_all";
229
function_enter(kWho);
230
for (TranxNode* entry= trx_front_; entry; entry=entry->next_)
231
mysql_cond_broadcast(&entry->cond);
233
return function_exit(kWho, 0);
236
int ActiveTranx::signal_waiting_sessions_up_to(const char *log_file_name,
237
my_off_t log_file_pos)
239
const char *kWho = "ActiveTranx::signal_waiting_sessions_up_to";
240
function_enter(kWho);
242
TranxNode* entry= trx_front_;
243
int cmp= ActiveTranx::compare(entry->log_name_, entry->log_pos_, log_file_name, log_file_pos) ;
244
while (entry && cmp <= 0)
246
mysql_cond_broadcast(&entry->cond);
249
cmp= ActiveTranx::compare(entry->log_name_, entry->log_pos_, log_file_name, log_file_pos) ;
252
return function_exit(kWho, (entry != NULL));
255
TranxNode * ActiveTranx::find_active_tranx_node(const char *log_file_name,
256
my_off_t log_file_pos)
258
const char *kWho = "ActiveTranx::find_active_tranx_node";
259
function_enter(kWho);
261
TranxNode* entry= trx_front_;
265
if (ActiveTranx::compare(log_file_name, log_file_pos, entry->log_name_,
266
entry->log_pos_) <= 0)
270
function_exit(kWho, 0);
227
274
int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
228
275
my_off_t log_file_pos)
239
286
while (new_front)
241
if (compare(new_front, log_file_name, log_file_pos) > 0)
288
if (compare(new_front, log_file_name, log_file_pos) > 0 ||
289
new_front->n_waiters > 0)
243
291
new_front = new_front->next_;
365
413
/* Mutex initialization can only be done after MY_INIT(). */
366
414
mysql_mutex_init(key_ss_mutex_LOCK_binlog_,
367
415
&LOCK_binlog_, MY_MUTEX_INIT_FAST);
368
mysql_cond_init(key_ss_cond_COND_binlog_send_,
369
&COND_binlog_send_, NULL);
371
417
if (rpl_semi_sync_master_enabled)
372
418
result = enableMaster();
386
432
if (!getMasterEnabled())
388
active_tranxs_ = new ActiveTranx(&LOCK_binlog_, trace_level_);
434
if (active_tranxs_ == NULL)
435
active_tranxs_ = new ActiveTranx(&LOCK_binlog_, trace_level_);
389
437
if (active_tranxs_ != NULL)
391
439
commit_file_name_inited_ = false;
458
507
mysql_mutex_unlock(&LOCK_binlog_);
461
void ReplSemiSyncMaster::cond_broadcast()
463
mysql_cond_broadcast(&COND_binlog_send_);
466
int ReplSemiSyncMaster::cond_timewait(struct timespec *wait_time)
468
const char *kWho = "ReplSemiSyncMaster::cond_timewait()";
471
function_enter(kWho);
472
wait_res= mysql_cond_timedwait(&COND_binlog_send_,
473
&LOCK_binlog_, wait_time);
474
return function_exit(kWho, wait_res);
477
510
void ReplSemiSyncMaster::add_slave()
579
612
reply_file_pos_ = log_file_pos;
580
613
reply_file_name_inited_ = true;
582
/* Remove all active transaction nodes before this point. */
583
assert(active_tranxs_ != NULL);
584
active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos);
586
615
if (trace_level_ & kTraceDetail)
588
617
if(!skipped_event)
617
645
if (can_release_threads)
619
647
if (trace_level_ & kTraceDetail)
620
648
sql_print_information("%s: signal all waiting threads.", kWho);
649
active_tranxs_->signal_waiting_sessions_up_to(reply_file_name_, reply_file_pos_);
625
652
return function_exit(kWho, 0);
631
658
const char *kWho = "ReplSemiSyncMaster::commitTrx";
633
660
function_enter(kWho);
661
PSI_stage_info old_stage;
663
#if defined(ENABLED_DEBUG_SYNC)
664
/* debug sync may not be initialized for a master */
665
if (current_thd->debug_sync_control)
666
DEBUG_SYNC(current_thd, "rpl_semisync_master_commit_trx_before_lock");
668
/* Acquire the mutex. */
671
TranxNode* entry= NULL;
672
mysql_cond_t* thd_cond= NULL;
673
if (active_tranxs_ != NULL && trx_wait_binlog_name)
676
active_tranxs_->find_active_tranx_node(trx_wait_binlog_name,
677
trx_wait_binlog_pos);
679
thd_cond= &entry->cond;
681
/* This must be called after acquired the lock */
682
THD_ENTER_COND(NULL, thd_cond, &LOCK_binlog_,
683
& stage_waiting_for_semi_sync_ack_from_slave,
635
686
if (getMasterEnabled() && trx_wait_binlog_name)
637
688
struct timespec start_ts;
638
689
struct timespec abstime;
640
PSI_stage_info old_stage;
642
692
set_timespec(start_ts, 0);
643
#if defined(ENABLED_DEBUG_SYNC)
644
/* debug sync may not be initialized for a master */
645
if (current_thd->debug_sync_control)
646
DEBUG_SYNC(current_thd, "rpl_semisync_master_commit_trx_before_lock");
648
/* Acquire the mutex. */
651
/* This must be called after acquired the lock */
652
THD_ENTER_COND(NULL, &COND_binlog_send_, &LOCK_binlog_,
653
& stage_waiting_for_semi_sync_ack_from_slave,
656
693
/* This is the real check inside the mutex. */
657
694
if (!getMasterEnabled() || !is_on())
751
788
kWho, wait_timeout_,
752
789
wait_file_name_, (unsigned long)wait_file_pos_);
754
wait_result = cond_timewait(&abstime);
791
/* wait for the position to be ACK'ed back */
794
wait_result= mysql_cond_timedwait(&entry->cond, &LOCK_binlog_, &abstime);
755
796
rpl_semi_sync_master_wait_sessions--;
757
798
if (wait_result != 0)
794
At this point, the binlog file and position of this transaction
795
must have been removed from ActiveTranx.
797
assert(!getMasterEnabled() ||
798
!active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name,
799
trx_wait_binlog_pos));
801
835
/* Update the status counter. */
803
837
rpl_semi_sync_master_yes_transactions++;
805
839
rpl_semi_sync_master_no_transactions++;
807
/* The lock held will be released by thd_exit_cond, so no need to
808
call unlock() here */
809
THD_EXIT_COND(NULL, & old_stage);
843
/* Last waiter removes the TranxNode */
844
if (trx_wait_binlog_name && active_tranxs_
845
&& entry && entry->n_waiters == 0)
846
active_tranxs_->clear_active_tranx_nodes(trx_wait_binlog_name,
847
trx_wait_binlog_pos);
849
/* The lock held will be released by thd_exit_cond, so no need to
850
call unlock() here */
851
THD_EXIT_COND(NULL, & old_stage);
812
852
return function_exit(kWho, 0);
826
866
* If semi-sync is disabled, all transactions still update the wait
827
867
* position with the last position in binlog. But no transactions will
828
* wait for confirmations and the active transaction list would not be
829
* maintained. In binlog dump thread, updateSyncHeader() checks whether
830
* the current sending event catches up with last wait position. If it
831
* does match, semi-sync will be switched on again.
868
* wait for confirmations maintained. In binlog dump thread,
869
* updateSyncHeader() checks whether the current sending event catches
870
* up with last wait position. If it does match, semi-sync will be
833
873
int ReplSemiSyncMaster::switch_off()
835
875
const char *kWho = "ReplSemiSyncMaster::switch_off";
838
877
function_enter(kWho);
841
/* Clear the active transaction list. */
842
assert(active_tranxs_ != NULL);
843
result = active_tranxs_->clear_active_tranx_nodes(NULL, 0);
845
880
rpl_semi_sync_master_off_times++;
846
881
wait_file_name_inited_ = false;
847
882
reply_file_name_inited_ = false;
848
883
sql_print_information("Semi-sync replication switched OFF.");
849
cond_broadcast(); /* wake up all waiting threads */
851
return function_exit(kWho, result);
885
/* signal waiting sessions */
886
active_tranxs_->signal_waiting_sessions_all();
888
return function_exit(kWho, 0);
854
891
int ReplSemiSyncMaster::try_switch_on(int server_id,