1
/* Copyright (C) 2007 Google Inc.
2
Copyright (c) 2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
3
Use is subject to license terms.
5
This program is free software; you can redistribute it and/or modify
6
it under the terms of the GNU General Public License as published by
7
the Free Software Foundation; version 2 of the License.
9
This program is distributed in the hope that it will be useful,
10
but WITHOUT ANY WARRANTY; without even the implied warranty of
11
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
GNU General Public License for more details.
14
You should have received a copy of the GNU General Public License
15
along with this program; if not, write to the Free Software
16
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
19
#include "semisync_master.h"
21
#define TIME_THOUSAND 1000
22
#define TIME_MILLION 1000000
23
#define TIME_BILLION 1000000000
25
/* This indicates whether semi-synchronous replication is enabled. */
26
char rpl_semi_sync_master_enabled;
27
unsigned long rpl_semi_sync_master_timeout;
28
unsigned long rpl_semi_sync_master_trace_level;
29
char rpl_semi_sync_master_status = 0;
30
unsigned long rpl_semi_sync_master_yes_transactions = 0;
31
unsigned long rpl_semi_sync_master_no_transactions = 0;
32
unsigned long rpl_semi_sync_master_off_times = 0;
33
unsigned long rpl_semi_sync_master_timefunc_fails = 0;
34
unsigned long rpl_semi_sync_master_wait_timeouts = 0;
35
unsigned long rpl_semi_sync_master_wait_sessions = 0;
36
unsigned long rpl_semi_sync_master_wait_pos_backtraverse = 0;
37
unsigned long rpl_semi_sync_master_avg_trx_wait_time = 0;
38
unsigned long long rpl_semi_sync_master_trx_wait_num = 0;
39
unsigned long rpl_semi_sync_master_avg_net_wait_time = 0;
40
unsigned long long rpl_semi_sync_master_net_wait_num = 0;
41
unsigned long rpl_semi_sync_master_clients = 0;
42
unsigned long long rpl_semi_sync_master_net_wait_time = 0;
43
unsigned long long rpl_semi_sync_master_trx_wait_time = 0;
44
char rpl_semi_sync_master_wait_no_slave = 1;
47
static int getWaitTime(const struct timespec& start_ts);
49
static unsigned long long timespec_to_usec(const struct timespec *ts)
52
return (unsigned long long) ts->tv_sec * TIME_MILLION + ts->tv_nsec / TIME_THOUSAND;
54
return ts->tv.i64 / 10;
58
/*******************************************************************************
60
* <ActiveTranx> class : manage all active transaction nodes
62
******************************************************************************/
64
ActiveTranx::ActiveTranx(mysql_mutex_t *lock,
65
unsigned long trace_level)
66
: Trace(trace_level), allocator_(max_connections),
67
num_entries_(max_connections << 1), /* Transaction hash table size
68
* is set to double the size
69
* of max_connections */
72
/* No transactions are in the list initially. */
76
/* Create the hash table to find a transaction's ending event. */
77
trx_htb_ = new TranxNode *[num_entries_];
78
for (int idx = 0; idx < num_entries_; ++idx)
81
sql_print_information("Semi-sync replication initialized for transactions.");
84
ActiveTranx::~ActiveTranx()
91
unsigned int ActiveTranx::calc_hash(const unsigned char *key,
94
unsigned int nr = 1, nr2 = 4;
96
/* The hash implementation comes from calc_hashnr() in mysys/hash.c. */
99
nr ^= (((nr & 63)+nr2)*((unsigned int) (unsigned char) *key++))+ (nr << 8);
102
return((unsigned int) nr);
105
unsigned int ActiveTranx::get_hash_value(const char *log_file_name,
106
my_off_t log_file_pos)
108
unsigned int hash1 = calc_hash((const unsigned char *)log_file_name,
109
strlen(log_file_name));
110
unsigned int hash2 = calc_hash((const unsigned char *)(&log_file_pos),
111
sizeof(log_file_pos));
113
return (hash1 + hash2) % num_entries_;
116
int ActiveTranx::compare(const char *log_file_name1, my_off_t log_file_pos1,
117
const char *log_file_name2, my_off_t log_file_pos2)
119
int cmp = strcmp(log_file_name1, log_file_name2);
124
if (log_file_pos1 > log_file_pos2)
126
else if (log_file_pos1 < log_file_pos2)
131
int ActiveTranx::insert_tranx_node(const char *log_file_name,
132
my_off_t log_file_pos)
134
const char *kWho = "ActiveTranx:insert_tranx_node";
137
unsigned int hash_val;
139
function_enter(kWho);
141
ins_node = allocator_.allocate_node();
144
sql_print_error("%s: transaction node allocation failed for: (%s, %lu)",
145
kWho, log_file_name, (unsigned long)log_file_pos);
150
/* insert the binlog position in the active transaction list. */
151
strncpy(ins_node->log_name_, log_file_name, FN_REFLEN-1);
152
ins_node->log_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
153
ins_node->log_pos_ = log_file_pos;
157
/* The list is empty. */
158
trx_front_ = trx_rear_ = ins_node;
162
int cmp = compare(ins_node, trx_rear_);
165
/* Compare with the tail first. If the transaction happens later in
166
* binlog, then make it the new tail.
168
trx_rear_->next_ = ins_node;
169
trx_rear_ = ins_node;
173
/* Otherwise, it is an error because the transaction should hold the
174
* mysql_bin_log.LOCK_log when appending events.
176
sql_print_error("%s: binlog write out-of-order, tail (%s, %lu), "
177
"new node (%s, %lu)", kWho,
178
trx_rear_->log_name_, (unsigned long)trx_rear_->log_pos_,
179
ins_node->log_name_, (unsigned long)ins_node->log_pos_);
185
hash_val = get_hash_value(ins_node->log_name_, ins_node->log_pos_);
186
ins_node->hash_next_ = trx_htb_[hash_val];
187
trx_htb_[hash_val] = ins_node;
189
if (trace_level_ & kTraceDetail)
190
sql_print_information("%s: insert (%s, %lu) in entry(%u)", kWho,
191
ins_node->log_name_, (unsigned long)ins_node->log_pos_,
195
return function_exit(kWho, result);
198
bool ActiveTranx::is_tranx_end_pos(const char *log_file_name,
199
my_off_t log_file_pos)
201
const char *kWho = "ActiveTranx::is_tranx_end_pos";
202
function_enter(kWho);
204
unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
205
TranxNode *entry = trx_htb_[hash_val];
207
while (entry != NULL)
209
if (compare(entry, log_file_name, log_file_pos) == 0)
212
entry = entry->hash_next_;
215
if (trace_level_ & kTraceDetail)
216
sql_print_information("%s: probe (%s, %lu) in entry(%u)", kWho,
217
log_file_name, (unsigned long)log_file_pos, hash_val);
219
function_exit(kWho, (entry != NULL));
220
return (entry != NULL);
223
int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
224
my_off_t log_file_pos)
226
const char *kWho = "ActiveTranx::::clear_active_tranx_nodes";
227
TranxNode *new_front;
229
function_enter(kWho);
231
if (log_file_name != NULL)
233
new_front = trx_front_;
237
if (compare(new_front, log_file_name, log_file_pos) > 0)
239
new_front = new_front->next_;
244
/* If log_file_name is NULL, clear everything. */
248
if (new_front == NULL)
250
/* No active transaction nodes after the call. */
252
/* Clear the hash table. */
253
memset(trx_htb_, 0, num_entries_ * sizeof(TranxNode *));
254
allocator_.free_all_nodes();
256
/* Clear the active transaction list. */
257
if (trx_front_ != NULL)
263
if (trace_level_ & kTraceDetail)
264
sql_print_information("%s: cleared all nodes", kWho);
266
else if (new_front != trx_front_)
268
TranxNode *curr_node, *next_node;
270
/* Delete all transaction nodes before the confirmation point. */
272
curr_node = trx_front_;
273
while (curr_node != new_front)
275
next_node = curr_node->next_;
278
/* Remove the node from the hash table. */
279
unsigned int hash_val = get_hash_value(curr_node->log_name_, curr_node->log_pos_);
280
TranxNode **hash_ptr = &(trx_htb_[hash_val]);
281
while ((*hash_ptr) != NULL)
283
if ((*hash_ptr) == curr_node)
285
(*hash_ptr) = curr_node->hash_next_;
288
hash_ptr = &((*hash_ptr)->hash_next_);
291
curr_node = next_node;
294
trx_front_ = new_front;
295
allocator_.free_nodes_before(trx_front_);
297
if (trace_level_ & kTraceDetail)
298
sql_print_information("%s: cleared %d nodes back until pos (%s, %lu)",
300
trx_front_->log_name_, (unsigned long)trx_front_->log_pos_);
303
return function_exit(kWho, 0);
307
/*******************************************************************************
309
* <ReplSemiSyncMaster> class: the basic code layer for sync-replication master.
310
* <ReplSemiSyncSlave> class: the basic code layer for sync-replication slave.
312
* The most important functions during semi-syn replication listed:
315
* . reportReplyBinlog(): called by the binlog dump thread when it receives
316
* the slave's status information.
317
* . updateSyncHeader(): based on transaction waiting information, decide
318
* whether to request the slave to reply.
319
* . writeTranxInBinlog(): called by the transaction thread when it finishes
320
* writing all transaction events in binlog.
321
* . commitTrx(): transaction thread wait for the slave reply.
324
* . slaveReadSyncHeader(): read the semi-sync header from the master, get the
325
* sync status and get the payload for events.
326
* . slaveReply(): reply to the master about the replication progress.
328
******************************************************************************/
330
ReplSemiSyncMaster::ReplSemiSyncMaster()
331
: active_tranxs_(NULL),
333
reply_file_name_inited_(false),
335
wait_file_name_inited_(false),
337
master_enabled_(false),
341
strcpy(reply_file_name_, "");
342
strcpy(wait_file_name_, "");
345
int ReplSemiSyncMaster::initObject()
348
const char *kWho = "ReplSemiSyncMaster::initObject";
352
fprintf(stderr, "%s called twice\n", kWho);
357
/* References to the parameter works after set_options(). */
358
setWaitTimeout(rpl_semi_sync_master_timeout);
359
setTraceLevel(rpl_semi_sync_master_trace_level);
361
/* Mutex initialization can only be done after MY_INIT(). */
362
mysql_mutex_init(key_ss_mutex_LOCK_binlog_,
363
&LOCK_binlog_, MY_MUTEX_INIT_FAST);
364
mysql_cond_init(key_ss_cond_COND_binlog_send_,
365
&COND_binlog_send_, NULL);
367
if (rpl_semi_sync_master_enabled)
368
result = enableMaster();
370
result = disableMaster();
375
int ReplSemiSyncMaster::enableMaster()
379
/* Must have the lock when we do enable of disable. */
382
if (!getMasterEnabled())
384
active_tranxs_ = new ActiveTranx(&LOCK_binlog_, trace_level_);
385
if (active_tranxs_ != NULL)
387
commit_file_name_inited_ = false;
388
reply_file_name_inited_ = false;
389
wait_file_name_inited_ = false;
391
set_master_enabled(true);
393
sql_print_information("Semi-sync replication enabled on the master.");
397
sql_print_error("Cannot allocate memory to enable semi-sync on the master.");
407
int ReplSemiSyncMaster::disableMaster()
409
/* Must have the lock when we do enable of disable. */
412
if (getMasterEnabled())
414
/* Switch off the semi-sync first so that waiting transaction will be
419
assert(active_tranxs_ != NULL);
420
delete active_tranxs_;
421
active_tranxs_ = NULL;
423
reply_file_name_inited_ = false;
424
wait_file_name_inited_ = false;
425
commit_file_name_inited_ = false;
427
set_master_enabled(false);
428
sql_print_information("Semi-sync replication disabled on the master.");
436
ReplSemiSyncMaster::~ReplSemiSyncMaster()
440
mysql_mutex_destroy(&LOCK_binlog_);
441
mysql_cond_destroy(&COND_binlog_send_);
444
delete active_tranxs_;
447
void ReplSemiSyncMaster::lock()
449
mysql_mutex_lock(&LOCK_binlog_);
452
void ReplSemiSyncMaster::unlock()
454
mysql_mutex_unlock(&LOCK_binlog_);
457
void ReplSemiSyncMaster::cond_broadcast()
459
mysql_cond_broadcast(&COND_binlog_send_);
462
int ReplSemiSyncMaster::cond_timewait(struct timespec *wait_time)
464
const char *kWho = "ReplSemiSyncMaster::cond_timewait()";
467
function_enter(kWho);
468
wait_res= mysql_cond_timedwait(&COND_binlog_send_,
469
&LOCK_binlog_, wait_time);
470
return function_exit(kWho, wait_res);
473
void ReplSemiSyncMaster::add_slave()
476
rpl_semi_sync_master_clients++;
480
void ReplSemiSyncMaster::remove_slave()
483
rpl_semi_sync_master_clients--;
485
/* If user has chosen not to wait if no semi-sync slave available
486
and the last semi-sync slave exits, turn off semi-sync on master
489
if (!rpl_semi_sync_master_wait_no_slave &&
490
rpl_semi_sync_master_clients == 0)
495
bool ReplSemiSyncMaster::is_semi_sync_slave()
499
get_user_var_int("rpl_semi_sync_slave", &val, &null_value);
503
int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
504
const char *log_file_name,
505
my_off_t log_file_pos)
507
const char *kWho = "ReplSemiSyncMaster::reportReplyBinlog";
509
bool can_release_threads = false;
510
bool need_copy_send_pos = true;
512
if (!(getMasterEnabled()))
515
function_enter(kWho);
519
/* This is the real check inside the mutex. */
520
if (!getMasterEnabled())
524
/* We check to see whether we can switch semi-sync ON. */
525
try_switch_on(server_id, log_file_name, log_file_pos);
527
/* The position should increase monotonically, if there is only one
528
* thread sending the binlog to the slave.
529
* In reality, to improve the transaction availability, we allow multiple
530
* sync replication slaves. So, if any one of them get the transaction,
531
* the transaction session in the primary can move forward.
533
if (reply_file_name_inited_)
535
cmp = ActiveTranx::compare(log_file_name, log_file_pos,
536
reply_file_name_, reply_file_pos_);
538
/* If the requested position is behind the sending binlog position,
539
* would not adjust sending binlog position.
540
* We based on the assumption that there are multiple semi-sync slave,
541
* and at least one of them shou/ld be up to date.
542
* If all semi-sync slaves are behind, at least initially, the primary
543
* can find the situation after the waiting timeout. After that, some
544
* slaves should catch up quickly.
548
/* If the position is behind, do not copy it. */
549
need_copy_send_pos = false;
553
if (need_copy_send_pos)
555
strcpy(reply_file_name_, log_file_name);
556
reply_file_pos_ = log_file_pos;
557
reply_file_name_inited_ = true;
559
/* Remove all active transaction nodes before this point. */
560
assert(active_tranxs_ != NULL);
561
active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos);
563
if (trace_level_ & kTraceDetail)
564
sql_print_information("%s: Got reply at (%s, %lu)", kWho,
565
log_file_name, (unsigned long)log_file_pos);
568
if (rpl_semi_sync_master_wait_sessions > 0)
570
/* Let us check if some of the waiting threads doing a trx
571
* commit can now proceed.
573
cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
574
wait_file_name_, wait_file_pos_);
577
/* Yes, at least one waiting thread can now proceed:
578
* let us release all waiting threads with a broadcast
580
can_release_threads = true;
581
wait_file_name_inited_ = false;
588
if (can_release_threads)
590
if (trace_level_ & kTraceDetail)
591
sql_print_information("%s: signal all waiting threads.", kWho);
596
return function_exit(kWho, 0);
599
int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
600
my_off_t trx_wait_binlog_pos)
602
const char *kWho = "ReplSemiSyncMaster::commitTrx";
604
function_enter(kWho);
606
if (getMasterEnabled() && trx_wait_binlog_name)
608
struct timespec start_ts;
609
struct timespec abstime;
611
const char *old_msg= 0;
613
set_timespec(start_ts, 0);
615
/* Acquire the mutex. */
618
/* This must be called after acquired the lock */
619
old_msg= thd_enter_cond(NULL, &COND_binlog_send_, &LOCK_binlog_,
620
"Waiting for semi-sync ACK from slave");
622
/* This is the real check inside the mutex. */
623
if (!getMasterEnabled() || !is_on())
626
if (trace_level_ & kTraceDetail)
628
sql_print_information("%s: wait pos (%s, %lu), repl(%d)\n", kWho,
629
trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos,
635
if (reply_file_name_inited_)
637
int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
638
trx_wait_binlog_name, trx_wait_binlog_pos);
641
/* We have already sent the relevant binlog to the slave: no need to
644
if (trace_level_ & kTraceDetail)
645
sql_print_information("%s: Binlog reply is ahead (%s, %lu),",
646
kWho, reply_file_name_, (unsigned long)reply_file_pos_);
651
/* Let us update the info about the minimum binlog position of waiting
654
if (wait_file_name_inited_)
656
int cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos,
657
wait_file_name_, wait_file_pos_);
660
/* This thd has a lower position, let's update the minimum info. */
661
strcpy(wait_file_name_, trx_wait_binlog_name);
662
wait_file_pos_ = trx_wait_binlog_pos;
664
rpl_semi_sync_master_wait_pos_backtraverse++;
665
if (trace_level_ & kTraceDetail)
666
sql_print_information("%s: move back wait position (%s, %lu),",
667
kWho, wait_file_name_, (unsigned long)wait_file_pos_);
672
strcpy(wait_file_name_, trx_wait_binlog_name);
673
wait_file_pos_ = trx_wait_binlog_pos;
674
wait_file_name_inited_ = true;
676
if (trace_level_ & kTraceDetail)
677
sql_print_information("%s: init wait position (%s, %lu),",
678
kWho, wait_file_name_, (unsigned long)wait_file_pos_);
681
/* Calcuate the waiting period. */
683
abstime.tv.i64 = start_ts.tv.i64 + (__int64)wait_timeout_ * TIME_THOUSAND * 10;
684
abstime.max_timeout_msec= (long)wait_timeout_;
686
unsigned long long diff_nsecs =
687
start_ts.tv_nsec + (unsigned long long)wait_timeout_ * TIME_MILLION;
688
abstime.tv_sec = start_ts.tv_sec;
689
while (diff_nsecs >= TIME_BILLION)
692
diff_nsecs -= TIME_BILLION;
694
abstime.tv_nsec = diff_nsecs;
697
/* In semi-synchronous replication, we wait until the binlog-dump
698
* thread has received the reply on the relevant binlog segment from the
701
* Let us suspend this thread to wait on the condition;
702
* when replication has progressed far enough, we will release
703
* these waiting threads.
705
rpl_semi_sync_master_wait_sessions++;
707
if (trace_level_ & kTraceDetail)
708
sql_print_information("%s: wait %lu ms for binlog sent (%s, %lu)",
710
wait_file_name_, (unsigned long)wait_file_pos_);
712
wait_result = cond_timewait(&abstime);
713
rpl_semi_sync_master_wait_sessions--;
715
if (wait_result != 0)
717
/* This is a real wait timeout. */
718
sql_print_warning("Timeout waiting for reply of binlog (file: %s, pos: %lu), "
719
"semi-sync up to file %s, position %lu.",
720
trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos,
721
reply_file_name_, (unsigned long)reply_file_pos_);
722
rpl_semi_sync_master_wait_timeouts++;
724
/* switch semi-sync off */
731
wait_time = getWaitTime(start_ts);
734
if (trace_level_ & kTraceGeneral)
736
sql_print_error("Replication semi-sync getWaitTime fail at "
737
"wait position (%s, %lu)",
738
trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos);
740
rpl_semi_sync_master_timefunc_fails++;
744
rpl_semi_sync_master_trx_wait_num++;
745
rpl_semi_sync_master_trx_wait_time += wait_time;
752
At this point, the binlog file and position of this transaction
753
must have been removed from ActiveTranx.
755
assert(!active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name,
756
trx_wait_binlog_pos));
758
/* Update the status counter. */
760
rpl_semi_sync_master_yes_transactions++;
762
rpl_semi_sync_master_no_transactions++;
764
/* The lock held will be released by thd_exit_cond, so no need to
765
call unlock() here */
766
thd_exit_cond(NULL, old_msg);
769
return function_exit(kWho, 0);
772
/* Indicate that semi-sync replication is OFF now.
774
* What should we do when it is disabled? The problem is that we want
775
* the semi-sync replication enabled again when the slave catches up
776
* later. But, it is not that easy to detect that the slave has caught
777
* up. This is caused by the fact that MySQL's replication protocol is
778
* asynchronous, meaning that if the master does not use the semi-sync
779
* protocol, the slave would not send anything to the master.
780
* Still, if the master is sending (N+1)-th event, we assume that it is
781
* an indicator that the slave has received N-th event and earlier ones.
783
* If semi-sync is disabled, all transactions still update the wait
784
* position with the last position in binlog. But no transactions will
785
* wait for confirmations and the active transaction list would not be
786
* maintained. In binlog dump thread, updateSyncHeader() checks whether
787
* the current sending event catches up with last wait position. If it
788
* does match, semi-sync will be switched on again.
790
int ReplSemiSyncMaster::switch_off()
792
const char *kWho = "ReplSemiSyncMaster::switch_off";
795
function_enter(kWho);
798
/* Clear the active transaction list. */
799
assert(active_tranxs_ != NULL);
800
result = active_tranxs_->clear_active_tranx_nodes(NULL, 0);
802
rpl_semi_sync_master_off_times++;
803
wait_file_name_inited_ = false;
804
reply_file_name_inited_ = false;
805
sql_print_information("Semi-sync replication switched OFF.");
806
cond_broadcast(); /* wake up all waiting threads */
808
return function_exit(kWho, result);
811
int ReplSemiSyncMaster::try_switch_on(int server_id,
812
const char *log_file_name,
813
my_off_t log_file_pos)
815
const char *kWho = "ReplSemiSyncMaster::try_switch_on";
816
bool semi_sync_on = false;
818
function_enter(kWho);
820
/* If the current sending event's position is larger than or equal to the
821
* 'largest' commit transaction binlog position, the slave is already
822
* catching up now and we can switch semi-sync on here.
823
* If commit_file_name_inited_ indicates there are no recent transactions,
824
* we can enable semi-sync immediately.
826
if (commit_file_name_inited_)
828
int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
829
commit_file_name_, commit_file_pos_);
830
semi_sync_on = (cmp >= 0);
839
/* Switch semi-sync replication on. */
842
sql_print_information("Semi-sync replication switched ON with slave (server_id: %d) "
844
server_id, log_file_name,
845
(unsigned long)log_file_pos);
848
return function_exit(kWho, 0);
851
int ReplSemiSyncMaster::reserveSyncHeader(unsigned char *header,
854
const char *kWho = "ReplSemiSyncMaster::reserveSyncHeader";
855
function_enter(kWho);
858
if (!is_semi_sync_slave())
864
/* No enough space for the extra header, disable semi-sync master */
865
if (sizeof(kSyncHeader) > size)
867
sql_print_warning("No enough space in the packet "
868
"for semi-sync extra header, "
869
"semi-sync replication disabled");
874
/* Set the magic number and the sync status. By default, no sync
877
memcpy(header, kSyncHeader, sizeof(kSyncHeader));
878
hlen= sizeof(kSyncHeader);
880
return function_exit(kWho, hlen);
883
int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
884
const char *log_file_name,
885
my_off_t log_file_pos,
888
const char *kWho = "ReplSemiSyncMaster::updateSyncHeader";
892
/* If the semi-sync master is not enabled, or the slave is not a semi-sync
893
* target, do not request replies from the slave.
895
if (!getMasterEnabled() || !is_semi_sync_slave())
901
function_enter(kWho);
905
/* This is the real check inside the mutex. */
906
if (!getMasterEnabled())
914
/* semi-sync is ON */
915
sync = false; /* No sync unless a transaction is involved. */
917
if (reply_file_name_inited_)
919
cmp = ActiveTranx::compare(log_file_name, log_file_pos,
920
reply_file_name_, reply_file_pos_);
923
/* If we have already got the reply for the event, then we do
924
* not need to sync the transaction again.
930
if (wait_file_name_inited_)
932
cmp = ActiveTranx::compare(log_file_name, log_file_pos,
933
wait_file_name_, wait_file_pos_);
940
/* If we are already waiting for some transaction replies which
941
* are later in binlog, do not wait for this one event.
946
* We only wait if the event is a transaction's ending event.
948
assert(active_tranxs_ != NULL);
949
sync = active_tranxs_->is_tranx_end_pos(log_file_name,
955
if (commit_file_name_inited_)
957
int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
958
commit_file_name_, commit_file_pos_);
967
if (trace_level_ & kTraceDetail)
968
sql_print_information("%s: server(%d), (%s, %lu) sync(%d), repl(%d)",
969
kWho, server_id, log_file_name,
970
(unsigned long)log_file_pos, sync, (int)is_on());
975
/* We do not need to clear sync flag because we set it to 0 when we
976
* reserve the packet header.
980
(packet)[2] = kPacketFlagSync;
983
return function_exit(kWho, 0);
986
int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
987
my_off_t log_file_pos)
989
const char *kWho = "ReplSemiSyncMaster::writeTranxInBinlog";
992
function_enter(kWho);
996
/* This is the real check inside the mutex. */
997
if (!getMasterEnabled())
1000
/* Update the 'largest' transaction commit position seen so far even
1001
* though semi-sync is switched off.
1002
* It is much better that we update commit_file_* here, instead of
1003
* inside commitTrx(). This is mostly because updateSyncHeader()
1004
* will watch for commit_file_* to decide whether to switch semi-sync
1005
* on. The detailed reason is explained in function updateSyncHeader().
1007
if (commit_file_name_inited_)
1009
int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
1010
commit_file_name_, commit_file_pos_);
1013
/* This is a larger position, let's update the maximum info. */
1014
strncpy(commit_file_name_, log_file_name, FN_REFLEN-1);
1015
commit_file_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
1016
commit_file_pos_ = log_file_pos;
1021
strncpy(commit_file_name_, log_file_name, FN_REFLEN-1);
1022
commit_file_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
1023
commit_file_pos_ = log_file_pos;
1024
commit_file_name_inited_ = true;
1029
assert(active_tranxs_ != NULL);
1030
if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
1033
if insert tranx_node failed, print a warning message
1034
and turn off semi-sync
1036
sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %lu",
1037
log_file_name, (ulong)log_file_pos);
1045
return function_exit(kWho, result);
1048
int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
1049
const char *event_buf)
1051
const char *kWho = "ReplSemiSyncMaster::readSlaveReply";
1052
const unsigned char *packet;
1053
char log_file_name[FN_REFLEN];
1054
my_off_t log_file_pos;
1055
ulong log_file_len = 0;
1059
struct timespec start_ts;
1060
ulong trc_level = trace_level_;
1062
function_enter(kWho);
1064
assert((unsigned char)event_buf[1] == kPacketMagicNum);
1065
if ((unsigned char)event_buf[2] != kPacketFlagSync)
1067
/* current event does not require reply */
1072
if (trc_level & kTraceNetWait)
1073
set_timespec(start_ts, 0);
1075
/* We flush to make sure that the current event is sent to the network,
1076
* instead of being buffered in the TCP/IP stack.
1080
sql_print_error("Semi-sync master failed on net_flush() "
1081
"before waiting for slave reply");
1086
if (trc_level & kTraceDetail)
1087
sql_print_information("%s: Wait for replica's reply", kWho);
1089
/* Wait for the network here. Though binlog dump thread can indefinitely wait
1090
* here, transactions would not wait indefintely.
1091
* Transactions wait on binlog replies detected by binlog dump threads. If
1092
* binlog dump threads wait too long, transactions will timeout and continue.
1094
packet_len = my_net_read(net);
1096
if (trc_level & kTraceNetWait)
1098
int wait_time = getWaitTime(start_ts);
1101
sql_print_error("Semi-sync master wait for reply "
1102
"fail to get wait time.");
1103
rpl_semi_sync_master_timefunc_fails++;
1107
rpl_semi_sync_master_net_wait_num++;
1108
rpl_semi_sync_master_net_wait_time += wait_time;
1112
if (packet_len == packet_error || packet_len < REPLY_BINLOG_NAME_OFFSET)
1114
if (packet_len == packet_error)
1115
sql_print_error("Read semi-sync reply network error: %s (errno: %d)",
1116
net->last_error, net->last_errno);
1118
sql_print_error("Read semi-sync reply length error: %s (errno: %d)",
1119
net->last_error, net->last_errno);
1123
packet = net->read_pos;
1124
if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum)
1126
sql_print_error("Read semi-sync reply magic number error");
1130
log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
1131
log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
1132
if (log_file_len >= FN_REFLEN)
1134
sql_print_error("Read semi-sync reply binlog file length too large");
1137
strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
1138
log_file_name[log_file_len] = 0;
1140
if (trc_level & kTraceDetail)
1141
sql_print_information("%s: Got reply (%s, %lu)",
1142
kWho, log_file_name, (ulong)log_file_pos);
1144
result = reportReplyBinlog(server_id, log_file_name, log_file_pos);
1147
return function_exit(kWho, result);
1151
int ReplSemiSyncMaster::resetMaster()
1153
const char *kWho = "ReplSemiSyncMaster::resetMaster";
1156
function_enter(kWho);
1161
state_ = getMasterEnabled()? 1 : 0;
1163
wait_file_name_inited_ = false;
1164
reply_file_name_inited_ = false;
1165
commit_file_name_inited_ = false;
1167
rpl_semi_sync_master_yes_transactions = 0;
1168
rpl_semi_sync_master_no_transactions = 0;
1169
rpl_semi_sync_master_off_times = 0;
1170
rpl_semi_sync_master_timefunc_fails = 0;
1171
rpl_semi_sync_master_wait_sessions = 0;
1172
rpl_semi_sync_master_wait_pos_backtraverse = 0;
1173
rpl_semi_sync_master_trx_wait_num = 0;
1174
rpl_semi_sync_master_trx_wait_time = 0;
1175
rpl_semi_sync_master_net_wait_num = 0;
1176
rpl_semi_sync_master_net_wait_time = 0;
1180
return function_exit(kWho, result);
1183
void ReplSemiSyncMaster::setExportStats()
1187
rpl_semi_sync_master_status = state_;
1188
rpl_semi_sync_master_avg_trx_wait_time=
1189
((rpl_semi_sync_master_trx_wait_num) ?
1190
(unsigned long)((double)rpl_semi_sync_master_trx_wait_time /
1191
((double)rpl_semi_sync_master_trx_wait_num)) : 0);
1192
rpl_semi_sync_master_avg_net_wait_time=
1193
((rpl_semi_sync_master_net_wait_num) ?
1194
(unsigned long)((double)rpl_semi_sync_master_net_wait_time /
1195
((double)rpl_semi_sync_master_net_wait_num)) : 0);
1200
/* Get the waiting time given the wait's staring time.
1203
* >= 0: the waiting time in microsecons(us)
1204
* < 0: error in get time or time back traverse
1206
static int getWaitTime(const struct timespec& start_ts)
1208
unsigned long long start_usecs, end_usecs;
1209
struct timespec end_ts;
1211
/* Starting time in microseconds(us). */
1212
start_usecs = timespec_to_usec(&start_ts);
1214
/* Get the wait time interval. */
1215
set_timespec(end_ts, 0);
1217
/* Ending time in microseconds(us). */
1218
end_usecs = timespec_to_usec(&end_ts);
1220
if (end_usecs < start_usecs)
1223
return (int)(end_usecs - start_usecs);