~michaeleguo/ubuntu/trusty/percona-xtradb-cluster-5.5/arm64fix

« back to all changes in this revision

Viewing changes to plugin/semisync/semisync_master.cc

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2014-02-10 14:44:23 UTC
  • Revision ID: package-import@ubuntu.com-20140210144423-f2134l2gxuvq2m6l
Tags: upstream-5.5.34-25.9+dfsg
ImportĀ upstreamĀ versionĀ 5.5.34-25.9+dfsg

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2007 Google Inc.
 
2
   Copyright (c) 2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
 
3
   Use is subject to license terms.
 
4
 
 
5
   This program is free software; you can redistribute it and/or modify
 
6
   it under the terms of the GNU General Public License as published by
 
7
   the Free Software Foundation; version 2 of the License.
 
8
 
 
9
   This program is distributed in the hope that it will be useful,
 
10
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
   GNU General Public License for more details.
 
13
 
 
14
   You should have received a copy of the GNU General Public License
 
15
   along with this program; if not, write to the Free Software
 
16
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
17
 
 
18
 
 
19
#include "semisync_master.h"
 
20
 
 
21
#define TIME_THOUSAND 1000
 
22
#define TIME_MILLION  1000000
 
23
#define TIME_BILLION  1000000000
 
24
 
 
25
/* This indicates whether semi-synchronous replication is enabled. */
 
26
char rpl_semi_sync_master_enabled;
 
27
unsigned long rpl_semi_sync_master_timeout;
 
28
unsigned long rpl_semi_sync_master_trace_level;
 
29
char rpl_semi_sync_master_status                    = 0;
 
30
unsigned long rpl_semi_sync_master_yes_transactions = 0;
 
31
unsigned long rpl_semi_sync_master_no_transactions  = 0;
 
32
unsigned long rpl_semi_sync_master_off_times        = 0;
 
33
unsigned long rpl_semi_sync_master_timefunc_fails   = 0;
 
34
unsigned long rpl_semi_sync_master_wait_timeouts     = 0;
 
35
unsigned long rpl_semi_sync_master_wait_sessions    = 0;
 
36
unsigned long rpl_semi_sync_master_wait_pos_backtraverse = 0;
 
37
unsigned long rpl_semi_sync_master_avg_trx_wait_time = 0;
 
38
unsigned long long rpl_semi_sync_master_trx_wait_num = 0;
 
39
unsigned long rpl_semi_sync_master_avg_net_wait_time    = 0;
 
40
unsigned long long rpl_semi_sync_master_net_wait_num = 0;
 
41
unsigned long rpl_semi_sync_master_clients          = 0;
 
42
unsigned long long rpl_semi_sync_master_net_wait_time = 0;
 
43
unsigned long long rpl_semi_sync_master_trx_wait_time = 0;
 
44
char rpl_semi_sync_master_wait_no_slave = 1;
 
45
 
 
46
 
 
47
static int getWaitTime(const struct timespec& start_ts);
 
48
 
 
49
static unsigned long long timespec_to_usec(const struct timespec *ts)
 
50
{
 
51
#ifndef __WIN__
 
52
  return (unsigned long long) ts->tv_sec * TIME_MILLION + ts->tv_nsec / TIME_THOUSAND;
 
53
#else
 
54
  return ts->tv.i64 / 10;
 
55
#endif /* __WIN__ */
 
56
}
 
57
 
 
58
/*******************************************************************************
 
59
 *
 
60
 * <ActiveTranx> class : manage all active transaction nodes
 
61
 *
 
62
 ******************************************************************************/
 
63
 
 
64
ActiveTranx::ActiveTranx(mysql_mutex_t *lock,
 
65
                         unsigned long trace_level)
 
66
  : Trace(trace_level), allocator_(max_connections),
 
67
    num_entries_(max_connections << 1), /* Transaction hash table size
 
68
                                         * is set to double the size
 
69
                                         * of max_connections */
 
70
    lock_(lock)
 
71
{
 
72
  /* No transactions are in the list initially. */
 
73
  trx_front_ = NULL;
 
74
  trx_rear_  = NULL;
 
75
 
 
76
  /* Create the hash table to find a transaction's ending event. */
 
77
  trx_htb_ = new TranxNode *[num_entries_];
 
78
  for (int idx = 0; idx < num_entries_; ++idx)
 
79
    trx_htb_[idx] = NULL;
 
80
 
 
81
  sql_print_information("Semi-sync replication initialized for transactions.");
 
82
}
 
83
 
 
84
ActiveTranx::~ActiveTranx()
 
85
{
 
86
  delete [] trx_htb_;
 
87
  trx_htb_          = NULL;
 
88
  num_entries_      = 0;
 
89
}
 
90
 
 
91
unsigned int ActiveTranx::calc_hash(const unsigned char *key,
 
92
                                    unsigned int length)
 
93
{
 
94
  unsigned int nr = 1, nr2 = 4;
 
95
 
 
96
  /* The hash implementation comes from calc_hashnr() in mysys/hash.c. */
 
97
  while (length--)
 
98
  {
 
99
    nr  ^= (((nr & 63)+nr2)*((unsigned int) (unsigned char) *key++))+ (nr << 8);
 
100
    nr2 += 3;
 
101
  }
 
102
  return((unsigned int) nr);
 
103
}
 
104
 
 
105
unsigned int ActiveTranx::get_hash_value(const char *log_file_name,
 
106
                                 my_off_t    log_file_pos)
 
107
{
 
108
  unsigned int hash1 = calc_hash((const unsigned char *)log_file_name,
 
109
                                 strlen(log_file_name));
 
110
  unsigned int hash2 = calc_hash((const unsigned char *)(&log_file_pos),
 
111
                                 sizeof(log_file_pos));
 
112
 
 
113
  return (hash1 + hash2) % num_entries_;
 
114
}
 
115
 
 
116
int ActiveTranx::compare(const char *log_file_name1, my_off_t log_file_pos1,
 
117
                         const char *log_file_name2, my_off_t log_file_pos2)
 
118
{
 
119
  int cmp = strcmp(log_file_name1, log_file_name2);
 
120
 
 
121
  if (cmp != 0)
 
122
    return cmp;
 
123
 
 
124
  if (log_file_pos1 > log_file_pos2)
 
125
    return 1;
 
126
  else if (log_file_pos1 < log_file_pos2)
 
127
    return -1;
 
128
  return 0;
 
129
}
 
130
 
 
131
int ActiveTranx::insert_tranx_node(const char *log_file_name,
 
132
                                   my_off_t log_file_pos)
 
133
{
 
134
  const char *kWho = "ActiveTranx:insert_tranx_node";
 
135
  TranxNode  *ins_node;
 
136
  int         result = 0;
 
137
  unsigned int        hash_val;
 
138
 
 
139
  function_enter(kWho);
 
140
 
 
141
  ins_node = allocator_.allocate_node();
 
142
  if (!ins_node)
 
143
  {
 
144
    sql_print_error("%s: transaction node allocation failed for: (%s, %lu)",
 
145
                    kWho, log_file_name, (unsigned long)log_file_pos);
 
146
    result = -1;
 
147
    goto l_end;
 
148
  }
 
149
 
 
150
  /* insert the binlog position in the active transaction list. */
 
151
  strncpy(ins_node->log_name_, log_file_name, FN_REFLEN-1);
 
152
  ins_node->log_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
 
153
  ins_node->log_pos_ = log_file_pos;
 
154
 
 
155
  if (!trx_front_)
 
156
  {
 
157
    /* The list is empty. */
 
158
    trx_front_ = trx_rear_ = ins_node;
 
159
  }
 
160
  else
 
161
  {
 
162
    int cmp = compare(ins_node, trx_rear_);
 
163
    if (cmp > 0)
 
164
    {
 
165
      /* Compare with the tail first.  If the transaction happens later in
 
166
       * binlog, then make it the new tail.
 
167
       */
 
168
      trx_rear_->next_ = ins_node;
 
169
      trx_rear_        = ins_node;
 
170
    }
 
171
    else
 
172
    {
 
173
      /* Otherwise, it is an error because the transaction should hold the
 
174
       * mysql_bin_log.LOCK_log when appending events.
 
175
       */
 
176
      sql_print_error("%s: binlog write out-of-order, tail (%s, %lu), "
 
177
                      "new node (%s, %lu)", kWho,
 
178
                      trx_rear_->log_name_, (unsigned long)trx_rear_->log_pos_,
 
179
                      ins_node->log_name_, (unsigned long)ins_node->log_pos_);
 
180
      result = -1;
 
181
      goto l_end;
 
182
    }
 
183
  }
 
184
 
 
185
  hash_val = get_hash_value(ins_node->log_name_, ins_node->log_pos_);
 
186
  ins_node->hash_next_ = trx_htb_[hash_val];
 
187
  trx_htb_[hash_val]   = ins_node;
 
188
 
 
189
  if (trace_level_ & kTraceDetail)
 
190
    sql_print_information("%s: insert (%s, %lu) in entry(%u)", kWho,
 
191
                          ins_node->log_name_, (unsigned long)ins_node->log_pos_,
 
192
                          hash_val);
 
193
 
 
194
 l_end:
 
195
  return function_exit(kWho, result);
 
196
}
 
197
 
 
198
bool ActiveTranx::is_tranx_end_pos(const char *log_file_name,
 
199
                                   my_off_t    log_file_pos)
 
200
{
 
201
  const char *kWho = "ActiveTranx::is_tranx_end_pos";
 
202
  function_enter(kWho);
 
203
 
 
204
  unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
 
205
  TranxNode *entry = trx_htb_[hash_val];
 
206
 
 
207
  while (entry != NULL)
 
208
  {
 
209
    if (compare(entry, log_file_name, log_file_pos) == 0)
 
210
      break;
 
211
 
 
212
    entry = entry->hash_next_;
 
213
  }
 
214
 
 
215
  if (trace_level_ & kTraceDetail)
 
216
    sql_print_information("%s: probe (%s, %lu) in entry(%u)", kWho,
 
217
                          log_file_name, (unsigned long)log_file_pos, hash_val);
 
218
 
 
219
  function_exit(kWho, (entry != NULL));
 
220
  return (entry != NULL);
 
221
}
 
222
 
 
223
int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
 
224
                                          my_off_t log_file_pos)
 
225
{
 
226
  const char *kWho = "ActiveTranx::::clear_active_tranx_nodes";
 
227
  TranxNode *new_front;
 
228
 
 
229
  function_enter(kWho);
 
230
 
 
231
  if (log_file_name != NULL)
 
232
  {
 
233
    new_front = trx_front_;
 
234
 
 
235
    while (new_front)
 
236
    {
 
237
      if (compare(new_front, log_file_name, log_file_pos) > 0)
 
238
        break;
 
239
      new_front = new_front->next_;
 
240
    }
 
241
  }
 
242
  else
 
243
  {
 
244
    /* If log_file_name is NULL, clear everything. */
 
245
    new_front = NULL;
 
246
  }
 
247
 
 
248
  if (new_front == NULL)
 
249
  {
 
250
    /* No active transaction nodes after the call. */
 
251
 
 
252
    /* Clear the hash table. */
 
253
    memset(trx_htb_, 0, num_entries_ * sizeof(TranxNode *));
 
254
    allocator_.free_all_nodes();
 
255
 
 
256
    /* Clear the active transaction list. */
 
257
    if (trx_front_ != NULL)
 
258
    {
 
259
      trx_front_ = NULL;
 
260
      trx_rear_  = NULL;
 
261
    }
 
262
 
 
263
    if (trace_level_ & kTraceDetail)
 
264
      sql_print_information("%s: cleared all nodes", kWho);
 
265
  }
 
266
  else if (new_front != trx_front_)
 
267
  {
 
268
    TranxNode *curr_node, *next_node;
 
269
 
 
270
    /* Delete all transaction nodes before the confirmation point. */
 
271
    int n_frees = 0;
 
272
    curr_node = trx_front_;
 
273
    while (curr_node != new_front)
 
274
    {
 
275
      next_node = curr_node->next_;
 
276
      n_frees++;
 
277
 
 
278
      /* Remove the node from the hash table. */
 
279
      unsigned int hash_val = get_hash_value(curr_node->log_name_, curr_node->log_pos_);
 
280
      TranxNode **hash_ptr = &(trx_htb_[hash_val]);
 
281
      while ((*hash_ptr) != NULL)
 
282
      {
 
283
        if ((*hash_ptr) == curr_node)
 
284
        {
 
285
          (*hash_ptr) = curr_node->hash_next_;
 
286
          break;
 
287
        }
 
288
        hash_ptr = &((*hash_ptr)->hash_next_);
 
289
      }
 
290
 
 
291
      curr_node = next_node;
 
292
    }
 
293
 
 
294
    trx_front_ = new_front;
 
295
    allocator_.free_nodes_before(trx_front_);
 
296
 
 
297
    if (trace_level_ & kTraceDetail)
 
298
      sql_print_information("%s: cleared %d nodes back until pos (%s, %lu)",
 
299
                            kWho, n_frees,
 
300
                            trx_front_->log_name_, (unsigned long)trx_front_->log_pos_);
 
301
  }
 
302
 
 
303
  return function_exit(kWho, 0);
 
304
}
 
305
 
 
306
 
 
307
/*******************************************************************************
 
308
 *
 
309
 * <ReplSemiSyncMaster> class: the basic code layer for sync-replication master.
 
310
 * <ReplSemiSyncSlave>  class: the basic code layer for sync-replication slave.
 
311
 *
 
312
 * The most important functions during semi-syn replication listed:
 
313
 *
 
314
 * Master:
 
315
 *  . reportReplyBinlog():  called by the binlog dump thread when it receives
 
316
 *                          the slave's status information.
 
317
 *  . updateSyncHeader():   based on transaction waiting information, decide
 
318
 *                          whether to request the slave to reply.
 
319
 *  . writeTranxInBinlog(): called by the transaction thread when it finishes
 
320
 *                          writing all transaction events in binlog.
 
321
 *  . commitTrx():          transaction thread wait for the slave reply.
 
322
 *
 
323
 * Slave:
 
324
 *  . slaveReadSyncHeader(): read the semi-sync header from the master, get the
 
325
 *                           sync status and get the payload for events.
 
326
 *  . slaveReply():          reply to the master about the replication progress.
 
327
 *
 
328
 ******************************************************************************/
 
329
 
 
330
ReplSemiSyncMaster::ReplSemiSyncMaster()
 
331
  : active_tranxs_(NULL),
 
332
    init_done_(false),
 
333
    reply_file_name_inited_(false),
 
334
    reply_file_pos_(0L),
 
335
    wait_file_name_inited_(false),
 
336
    wait_file_pos_(0),
 
337
    master_enabled_(false),
 
338
    wait_timeout_(0L),
 
339
    state_(0)
 
340
{
 
341
  strcpy(reply_file_name_, "");
 
342
  strcpy(wait_file_name_, "");
 
343
}
 
344
 
 
345
int ReplSemiSyncMaster::initObject()
 
346
{
 
347
  int result;
 
348
  const char *kWho = "ReplSemiSyncMaster::initObject";
 
349
 
 
350
  if (init_done_)
 
351
  {
 
352
    fprintf(stderr, "%s called twice\n", kWho);
 
353
    return 1;
 
354
  }
 
355
  init_done_ = true;
 
356
 
 
357
  /* References to the parameter works after set_options(). */
 
358
  setWaitTimeout(rpl_semi_sync_master_timeout);
 
359
  setTraceLevel(rpl_semi_sync_master_trace_level);
 
360
 
 
361
  /* Mutex initialization can only be done after MY_INIT(). */
 
362
  mysql_mutex_init(key_ss_mutex_LOCK_binlog_,
 
363
                   &LOCK_binlog_, MY_MUTEX_INIT_FAST);
 
364
  mysql_cond_init(key_ss_cond_COND_binlog_send_,
 
365
                  &COND_binlog_send_, NULL);
 
366
 
 
367
  if (rpl_semi_sync_master_enabled)
 
368
    result = enableMaster();
 
369
  else
 
370
    result = disableMaster();
 
371
 
 
372
  return result;
 
373
}
 
374
 
 
375
int ReplSemiSyncMaster::enableMaster()
 
376
{
 
377
  int result = 0;
 
378
 
 
379
  /* Must have the lock when we do enable of disable. */
 
380
  lock();
 
381
 
 
382
  if (!getMasterEnabled())
 
383
  {
 
384
    active_tranxs_ = new ActiveTranx(&LOCK_binlog_, trace_level_);
 
385
    if (active_tranxs_ != NULL)
 
386
    {
 
387
      commit_file_name_inited_ = false;
 
388
      reply_file_name_inited_  = false;
 
389
      wait_file_name_inited_   = false;
 
390
 
 
391
      set_master_enabled(true);
 
392
      state_ = true;
 
393
      sql_print_information("Semi-sync replication enabled on the master.");
 
394
    }
 
395
    else
 
396
    {
 
397
      sql_print_error("Cannot allocate memory to enable semi-sync on the master.");
 
398
      result = -1;
 
399
    }
 
400
  }
 
401
 
 
402
  unlock();
 
403
 
 
404
  return result;
 
405
}
 
406
 
 
407
int ReplSemiSyncMaster::disableMaster()
 
408
{
 
409
  /* Must have the lock when we do enable of disable. */
 
410
  lock();
 
411
 
 
412
  if (getMasterEnabled())
 
413
  {
 
414
    /* Switch off the semi-sync first so that waiting transaction will be
 
415
     * waken up.
 
416
     */
 
417
    switch_off();
 
418
 
 
419
    assert(active_tranxs_ != NULL);
 
420
    delete active_tranxs_;
 
421
    active_tranxs_ = NULL;
 
422
 
 
423
    reply_file_name_inited_ = false;
 
424
    wait_file_name_inited_  = false;
 
425
    commit_file_name_inited_ = false;
 
426
 
 
427
    set_master_enabled(false);
 
428
    sql_print_information("Semi-sync replication disabled on the master.");
 
429
  }
 
430
 
 
431
  unlock();
 
432
 
 
433
  return 0;
 
434
}
 
435
 
 
436
ReplSemiSyncMaster::~ReplSemiSyncMaster()
 
437
{
 
438
  if (init_done_)
 
439
  {
 
440
    mysql_mutex_destroy(&LOCK_binlog_);
 
441
    mysql_cond_destroy(&COND_binlog_send_);
 
442
  }
 
443
 
 
444
  delete active_tranxs_;
 
445
}
 
446
 
 
447
void ReplSemiSyncMaster::lock()
 
448
{
 
449
  mysql_mutex_lock(&LOCK_binlog_);
 
450
}
 
451
 
 
452
void ReplSemiSyncMaster::unlock()
 
453
{
 
454
  mysql_mutex_unlock(&LOCK_binlog_);
 
455
}
 
456
 
 
457
void ReplSemiSyncMaster::cond_broadcast()
 
458
{
 
459
  mysql_cond_broadcast(&COND_binlog_send_);
 
460
}
 
461
 
 
462
int ReplSemiSyncMaster::cond_timewait(struct timespec *wait_time)
 
463
{
 
464
  const char *kWho = "ReplSemiSyncMaster::cond_timewait()";
 
465
  int wait_res;
 
466
 
 
467
  function_enter(kWho);
 
468
  wait_res= mysql_cond_timedwait(&COND_binlog_send_,
 
469
                                 &LOCK_binlog_, wait_time);
 
470
  return function_exit(kWho, wait_res);
 
471
}
 
472
 
 
473
void ReplSemiSyncMaster::add_slave()
 
474
{
 
475
  lock();
 
476
  rpl_semi_sync_master_clients++;
 
477
  unlock();
 
478
}
 
479
 
 
480
void ReplSemiSyncMaster::remove_slave()
 
481
{
 
482
  lock();
 
483
  rpl_semi_sync_master_clients--;
 
484
 
 
485
  /* If user has chosen not to wait if no semi-sync slave available
 
486
     and the last semi-sync slave exits, turn off semi-sync on master
 
487
     immediately.
 
488
   */
 
489
  if (!rpl_semi_sync_master_wait_no_slave &&
 
490
      rpl_semi_sync_master_clients == 0)
 
491
    switch_off();
 
492
  unlock();
 
493
}
 
494
 
 
495
bool ReplSemiSyncMaster::is_semi_sync_slave()
 
496
{
 
497
  int null_value;
 
498
  long long val= 0;
 
499
  get_user_var_int("rpl_semi_sync_slave", &val, &null_value);
 
500
  return val;
 
501
}
 
502
 
 
503
int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
 
504
                                          const char *log_file_name,
 
505
                                          my_off_t log_file_pos)
 
506
{
 
507
  const char *kWho = "ReplSemiSyncMaster::reportReplyBinlog";
 
508
  int   cmp;
 
509
  bool  can_release_threads = false;
 
510
  bool  need_copy_send_pos = true;
 
511
 
 
512
  if (!(getMasterEnabled()))
 
513
    return 0;
 
514
 
 
515
  function_enter(kWho);
 
516
 
 
517
  lock();
 
518
 
 
519
  /* This is the real check inside the mutex. */
 
520
  if (!getMasterEnabled())
 
521
    goto l_end;
 
522
 
 
523
  if (!is_on())
 
524
    /* We check to see whether we can switch semi-sync ON. */
 
525
    try_switch_on(server_id, log_file_name, log_file_pos);
 
526
 
 
527
  /* The position should increase monotonically, if there is only one
 
528
   * thread sending the binlog to the slave.
 
529
   * In reality, to improve the transaction availability, we allow multiple
 
530
   * sync replication slaves.  So, if any one of them get the transaction,
 
531
   * the transaction session in the primary can move forward.
 
532
   */
 
533
  if (reply_file_name_inited_)
 
534
  {
 
535
    cmp = ActiveTranx::compare(log_file_name, log_file_pos,
 
536
                               reply_file_name_, reply_file_pos_);
 
537
 
 
538
    /* If the requested position is behind the sending binlog position,
 
539
     * would not adjust sending binlog position.
 
540
     * We based on the assumption that there are multiple semi-sync slave,
 
541
     * and at least one of them shou/ld be up to date.
 
542
     * If all semi-sync slaves are behind, at least initially, the primary
 
543
     * can find the situation after the waiting timeout.  After that, some
 
544
     * slaves should catch up quickly.
 
545
     */
 
546
    if (cmp < 0)
 
547
    {
 
548
      /* If the position is behind, do not copy it. */
 
549
      need_copy_send_pos = false;
 
550
    }
 
551
  }
 
552
 
 
553
  if (need_copy_send_pos)
 
554
  {
 
555
    strcpy(reply_file_name_, log_file_name);
 
556
    reply_file_pos_ = log_file_pos;
 
557
    reply_file_name_inited_ = true;
 
558
 
 
559
    /* Remove all active transaction nodes before this point. */
 
560
    assert(active_tranxs_ != NULL);
 
561
    active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos);
 
562
 
 
563
    if (trace_level_ & kTraceDetail)
 
564
      sql_print_information("%s: Got reply at (%s, %lu)", kWho,
 
565
                            log_file_name, (unsigned long)log_file_pos);
 
566
  }
 
567
 
 
568
  if (rpl_semi_sync_master_wait_sessions > 0)
 
569
  {
 
570
    /* Let us check if some of the waiting threads doing a trx
 
571
     * commit can now proceed.
 
572
     */
 
573
    cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
 
574
                               wait_file_name_, wait_file_pos_);
 
575
    if (cmp >= 0)
 
576
    {
 
577
      /* Yes, at least one waiting thread can now proceed:
 
578
       * let us release all waiting threads with a broadcast
 
579
       */
 
580
      can_release_threads = true;
 
581
      wait_file_name_inited_ = false;
 
582
    }
 
583
  }
 
584
 
 
585
 l_end:
 
586
  unlock();
 
587
 
 
588
  if (can_release_threads)
 
589
  {
 
590
    if (trace_level_ & kTraceDetail)
 
591
      sql_print_information("%s: signal all waiting threads.", kWho);
 
592
 
 
593
    cond_broadcast();
 
594
  }
 
595
 
 
596
  return function_exit(kWho, 0);
 
597
}
 
598
 
 
599
int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
 
600
                                  my_off_t trx_wait_binlog_pos)
 
601
{
 
602
  const char *kWho = "ReplSemiSyncMaster::commitTrx";
 
603
 
 
604
  function_enter(kWho);
 
605
 
 
606
  if (getMasterEnabled() && trx_wait_binlog_name)
 
607
  {
 
608
    struct timespec start_ts;
 
609
    struct timespec abstime;
 
610
    int wait_result;
 
611
    const char *old_msg= 0;
 
612
 
 
613
    set_timespec(start_ts, 0);
 
614
 
 
615
    /* Acquire the mutex. */
 
616
    lock();
 
617
 
 
618
    /* This must be called after acquired the lock */
 
619
    old_msg= thd_enter_cond(NULL, &COND_binlog_send_, &LOCK_binlog_,
 
620
                            "Waiting for semi-sync ACK from slave");
 
621
 
 
622
    /* This is the real check inside the mutex. */
 
623
    if (!getMasterEnabled() || !is_on())
 
624
      goto l_end;
 
625
 
 
626
    if (trace_level_ & kTraceDetail)
 
627
    {
 
628
      sql_print_information("%s: wait pos (%s, %lu), repl(%d)\n", kWho,
 
629
                            trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos,
 
630
                            (int)is_on());
 
631
    }
 
632
 
 
633
    while (is_on())
 
634
    {
 
635
      if (reply_file_name_inited_)
 
636
      {
 
637
        int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
 
638
                                       trx_wait_binlog_name, trx_wait_binlog_pos);
 
639
        if (cmp >= 0)
 
640
        {
 
641
          /* We have already sent the relevant binlog to the slave: no need to
 
642
           * wait here.
 
643
           */
 
644
          if (trace_level_ & kTraceDetail)
 
645
            sql_print_information("%s: Binlog reply is ahead (%s, %lu),",
 
646
                                  kWho, reply_file_name_, (unsigned long)reply_file_pos_);
 
647
          break;
 
648
        }
 
649
      }
 
650
 
 
651
      /* Let us update the info about the minimum binlog position of waiting
 
652
       * threads.
 
653
       */
 
654
      if (wait_file_name_inited_)
 
655
      {
 
656
        int cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos,
 
657
                                       wait_file_name_, wait_file_pos_);
 
658
        if (cmp <= 0)
 
659
        {
 
660
          /* This thd has a lower position, let's update the minimum info. */
 
661
          strcpy(wait_file_name_, trx_wait_binlog_name);
 
662
          wait_file_pos_ = trx_wait_binlog_pos;
 
663
 
 
664
          rpl_semi_sync_master_wait_pos_backtraverse++;
 
665
          if (trace_level_ & kTraceDetail)
 
666
            sql_print_information("%s: move back wait position (%s, %lu),",
 
667
                                  kWho, wait_file_name_, (unsigned long)wait_file_pos_);
 
668
        }
 
669
      }
 
670
      else
 
671
      {
 
672
        strcpy(wait_file_name_, trx_wait_binlog_name);
 
673
        wait_file_pos_ = trx_wait_binlog_pos;
 
674
        wait_file_name_inited_ = true;
 
675
 
 
676
        if (trace_level_ & kTraceDetail)
 
677
          sql_print_information("%s: init wait position (%s, %lu),",
 
678
                                kWho, wait_file_name_, (unsigned long)wait_file_pos_);
 
679
      }
 
680
 
 
681
      /* Calcuate the waiting period. */
 
682
#ifdef __WIN__
 
683
      abstime.tv.i64 = start_ts.tv.i64 + (__int64)wait_timeout_ * TIME_THOUSAND * 10;
 
684
      abstime.max_timeout_msec= (long)wait_timeout_;
 
685
#else
 
686
      unsigned long long diff_nsecs =
 
687
        start_ts.tv_nsec + (unsigned long long)wait_timeout_ * TIME_MILLION;
 
688
      abstime.tv_sec = start_ts.tv_sec;
 
689
      while (diff_nsecs >= TIME_BILLION)
 
690
      {
 
691
        abstime.tv_sec++;
 
692
        diff_nsecs -= TIME_BILLION;
 
693
      }
 
694
      abstime.tv_nsec = diff_nsecs;
 
695
#endif /* __WIN__ */
 
696
      
 
697
      /* In semi-synchronous replication, we wait until the binlog-dump
 
698
       * thread has received the reply on the relevant binlog segment from the
 
699
       * replication slave.
 
700
       *
 
701
       * Let us suspend this thread to wait on the condition;
 
702
       * when replication has progressed far enough, we will release
 
703
       * these waiting threads.
 
704
       */
 
705
      rpl_semi_sync_master_wait_sessions++;
 
706
      
 
707
      if (trace_level_ & kTraceDetail)
 
708
        sql_print_information("%s: wait %lu ms for binlog sent (%s, %lu)",
 
709
                              kWho, wait_timeout_,
 
710
                              wait_file_name_, (unsigned long)wait_file_pos_);
 
711
      
 
712
      wait_result = cond_timewait(&abstime);
 
713
      rpl_semi_sync_master_wait_sessions--;
 
714
      
 
715
      if (wait_result != 0)
 
716
      {
 
717
        /* This is a real wait timeout. */
 
718
        sql_print_warning("Timeout waiting for reply of binlog (file: %s, pos: %lu), "
 
719
                          "semi-sync up to file %s, position %lu.",
 
720
                          trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos,
 
721
                          reply_file_name_, (unsigned long)reply_file_pos_);
 
722
        rpl_semi_sync_master_wait_timeouts++;
 
723
        
 
724
        /* switch semi-sync off */
 
725
        switch_off();
 
726
      }
 
727
      else
 
728
      {
 
729
        int wait_time;
 
730
        
 
731
        wait_time = getWaitTime(start_ts);
 
732
        if (wait_time < 0)
 
733
        {
 
734
          if (trace_level_ & kTraceGeneral)
 
735
          {
 
736
            sql_print_error("Replication semi-sync getWaitTime fail at "
 
737
                            "wait position (%s, %lu)",
 
738
                            trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos);
 
739
          }
 
740
          rpl_semi_sync_master_timefunc_fails++;
 
741
        }
 
742
        else
 
743
        {
 
744
          rpl_semi_sync_master_trx_wait_num++;
 
745
          rpl_semi_sync_master_trx_wait_time += wait_time;
 
746
        }
 
747
      }
 
748
    }
 
749
 
 
750
  l_end:
 
751
    /*
 
752
      At this point, the binlog file and position of this transaction
 
753
      must have been removed from ActiveTranx.
 
754
    */
 
755
    assert(!active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name,
 
756
                                             trx_wait_binlog_pos));
 
757
    
 
758
    /* Update the status counter. */
 
759
    if (is_on())
 
760
      rpl_semi_sync_master_yes_transactions++;
 
761
    else
 
762
      rpl_semi_sync_master_no_transactions++;
 
763
 
 
764
    /* The lock held will be released by thd_exit_cond, so no need to
 
765
       call unlock() here */
 
766
    thd_exit_cond(NULL, old_msg);
 
767
  }
 
768
 
 
769
  return function_exit(kWho, 0);
 
770
}
 
771
 
 
772
/* Indicate that semi-sync replication is OFF now.
 
773
 * 
 
774
 * What should we do when it is disabled?  The problem is that we want
 
775
 * the semi-sync replication enabled again when the slave catches up
 
776
 * later.  But, it is not that easy to detect that the slave has caught
 
777
 * up.  This is caused by the fact that MySQL's replication protocol is
 
778
 * asynchronous, meaning that if the master does not use the semi-sync
 
779
 * protocol, the slave would not send anything to the master.
 
780
 * Still, if the master is sending (N+1)-th event, we assume that it is
 
781
 * an indicator that the slave has received N-th event and earlier ones.
 
782
 *
 
783
 * If semi-sync is disabled, all transactions still update the wait
 
784
 * position with the last position in binlog.  But no transactions will
 
785
 * wait for confirmations and the active transaction list would not be
 
786
 * maintained.  In binlog dump thread, updateSyncHeader() checks whether
 
787
 * the current sending event catches up with last wait position.  If it
 
788
 * does match, semi-sync will be switched on again.
 
789
 */
 
790
int ReplSemiSyncMaster::switch_off()
 
791
{
 
792
  const char *kWho = "ReplSemiSyncMaster::switch_off";
 
793
  int result;
 
794
 
 
795
  function_enter(kWho);
 
796
  state_ = false;
 
797
 
 
798
  /* Clear the active transaction list. */
 
799
  assert(active_tranxs_ != NULL);
 
800
  result = active_tranxs_->clear_active_tranx_nodes(NULL, 0);
 
801
 
 
802
  rpl_semi_sync_master_off_times++;
 
803
  wait_file_name_inited_   = false;
 
804
  reply_file_name_inited_  = false;
 
805
  sql_print_information("Semi-sync replication switched OFF.");
 
806
  cond_broadcast();                            /* wake up all waiting threads */
 
807
 
 
808
  return function_exit(kWho, result);
 
809
}
 
810
 
 
811
int ReplSemiSyncMaster::try_switch_on(int server_id,
 
812
                                      const char *log_file_name,
 
813
                                      my_off_t log_file_pos)
 
814
{
 
815
  const char *kWho = "ReplSemiSyncMaster::try_switch_on";
 
816
  bool semi_sync_on = false;
 
817
 
 
818
  function_enter(kWho);
 
819
 
 
820
  /* If the current sending event's position is larger than or equal to the
 
821
   * 'largest' commit transaction binlog position, the slave is already
 
822
   * catching up now and we can switch semi-sync on here.
 
823
   * If commit_file_name_inited_ indicates there are no recent transactions,
 
824
   * we can enable semi-sync immediately.
 
825
   */
 
826
  if (commit_file_name_inited_)
 
827
  {
 
828
    int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
 
829
                                   commit_file_name_, commit_file_pos_);
 
830
    semi_sync_on = (cmp >= 0);
 
831
  }
 
832
  else
 
833
  {
 
834
    semi_sync_on = true;
 
835
  }
 
836
 
 
837
  if (semi_sync_on)
 
838
  {
 
839
    /* Switch semi-sync replication on. */
 
840
    state_ = true;
 
841
 
 
842
    sql_print_information("Semi-sync replication switched ON with slave (server_id: %d) "
 
843
                          "at (%s, %lu)",
 
844
                          server_id, log_file_name,
 
845
                          (unsigned long)log_file_pos);
 
846
  }
 
847
 
 
848
  return function_exit(kWho, 0);
 
849
}
 
850
 
 
851
int ReplSemiSyncMaster::reserveSyncHeader(unsigned char *header,
 
852
                                          unsigned long size)
 
853
{
 
854
  const char *kWho = "ReplSemiSyncMaster::reserveSyncHeader";
 
855
  function_enter(kWho);
 
856
 
 
857
  int hlen=0;
 
858
  if (!is_semi_sync_slave())
 
859
  {
 
860
    hlen= 0;
 
861
  }
 
862
  else
 
863
  {
 
864
    /* No enough space for the extra header, disable semi-sync master */
 
865
    if (sizeof(kSyncHeader) > size)
 
866
    {
 
867
      sql_print_warning("No enough space in the packet "
 
868
                        "for semi-sync extra header, "
 
869
                        "semi-sync replication disabled");
 
870
      disableMaster();
 
871
      return 0;
 
872
    }
 
873
    
 
874
    /* Set the magic number and the sync status.  By default, no sync
 
875
     * is required.
 
876
     */
 
877
    memcpy(header, kSyncHeader, sizeof(kSyncHeader));
 
878
    hlen= sizeof(kSyncHeader);
 
879
  }
 
880
  return function_exit(kWho, hlen);
 
881
}
 
882
 
 
883
int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
 
884
                                         const char *log_file_name,
 
885
                                         my_off_t log_file_pos,
 
886
                                         uint32 server_id)
 
887
{
 
888
  const char *kWho = "ReplSemiSyncMaster::updateSyncHeader";
 
889
  int  cmp = 0;
 
890
  bool sync = false;
 
891
 
 
892
  /* If the semi-sync master is not enabled, or the slave is not a semi-sync
 
893
   * target, do not request replies from the slave.
 
894
   */
 
895
  if (!getMasterEnabled() || !is_semi_sync_slave())
 
896
  {
 
897
    sync = false;
 
898
    return 0;
 
899
  }
 
900
 
 
901
  function_enter(kWho);
 
902
 
 
903
  lock();
 
904
 
 
905
  /* This is the real check inside the mutex. */
 
906
  if (!getMasterEnabled())
 
907
  {
 
908
    sync = false;
 
909
    goto l_end;
 
910
  }
 
911
 
 
912
  if (is_on())
 
913
  {
 
914
    /* semi-sync is ON */
 
915
    sync = false;     /* No sync unless a transaction is involved. */
 
916
 
 
917
    if (reply_file_name_inited_)
 
918
    {
 
919
      cmp = ActiveTranx::compare(log_file_name, log_file_pos,
 
920
                                 reply_file_name_, reply_file_pos_);
 
921
      if (cmp <= 0)
 
922
      {
 
923
        /* If we have already got the reply for the event, then we do
 
924
         * not need to sync the transaction again.
 
925
         */
 
926
        goto l_end;
 
927
      }
 
928
    }
 
929
 
 
930
    if (wait_file_name_inited_)
 
931
    {
 
932
      cmp = ActiveTranx::compare(log_file_name, log_file_pos,
 
933
                                 wait_file_name_, wait_file_pos_);
 
934
    }
 
935
    else
 
936
    {
 
937
      cmp = 1;
 
938
    }
 
939
    
 
940
    /* If we are already waiting for some transaction replies which
 
941
     * are later in binlog, do not wait for this one event.
 
942
     */
 
943
    if (cmp >= 0)
 
944
    {
 
945
      /* 
 
946
       * We only wait if the event is a transaction's ending event.
 
947
       */
 
948
      assert(active_tranxs_ != NULL);
 
949
      sync = active_tranxs_->is_tranx_end_pos(log_file_name,
 
950
                                               log_file_pos);
 
951
    }
 
952
  }
 
953
  else
 
954
  {
 
955
    if (commit_file_name_inited_)
 
956
    {
 
957
      int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
 
958
                                     commit_file_name_, commit_file_pos_);
 
959
      sync = (cmp >= 0);
 
960
    }
 
961
    else
 
962
    {
 
963
      sync = true;
 
964
    }
 
965
  }
 
966
 
 
967
  if (trace_level_ & kTraceDetail)
 
968
    sql_print_information("%s: server(%d), (%s, %lu) sync(%d), repl(%d)",
 
969
                          kWho, server_id, log_file_name,
 
970
                          (unsigned long)log_file_pos, sync, (int)is_on());
 
971
 
 
972
 l_end:
 
973
  unlock();
 
974
 
 
975
  /* We do not need to clear sync flag because we set it to 0 when we
 
976
   * reserve the packet header.
 
977
   */
 
978
  if (sync)
 
979
  {
 
980
    (packet)[2] = kPacketFlagSync;
 
981
  }
 
982
 
 
983
  return function_exit(kWho, 0);
 
984
}
 
985
 
 
986
int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
 
987
                                           my_off_t log_file_pos)
 
988
{
 
989
  const char *kWho = "ReplSemiSyncMaster::writeTranxInBinlog";
 
990
  int result = 0;
 
991
 
 
992
  function_enter(kWho);
 
993
 
 
994
  lock();
 
995
 
 
996
  /* This is the real check inside the mutex. */
 
997
  if (!getMasterEnabled())
 
998
    goto l_end;
 
999
 
 
1000
  /* Update the 'largest' transaction commit position seen so far even
 
1001
   * though semi-sync is switched off.
 
1002
   * It is much better that we update commit_file_* here, instead of
 
1003
   * inside commitTrx().  This is mostly because updateSyncHeader()
 
1004
   * will watch for commit_file_* to decide whether to switch semi-sync
 
1005
   * on. The detailed reason is explained in function updateSyncHeader().
 
1006
   */
 
1007
  if (commit_file_name_inited_)
 
1008
  {
 
1009
    int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
 
1010
                                   commit_file_name_, commit_file_pos_);
 
1011
    if (cmp > 0)
 
1012
    {
 
1013
      /* This is a larger position, let's update the maximum info. */
 
1014
      strncpy(commit_file_name_, log_file_name, FN_REFLEN-1);
 
1015
      commit_file_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
 
1016
      commit_file_pos_ = log_file_pos;
 
1017
    }
 
1018
  }
 
1019
  else
 
1020
  {
 
1021
    strncpy(commit_file_name_, log_file_name, FN_REFLEN-1);
 
1022
    commit_file_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
 
1023
    commit_file_pos_ = log_file_pos;
 
1024
    commit_file_name_inited_ = true;
 
1025
  }
 
1026
 
 
1027
  if (is_on())
 
1028
  {
 
1029
    assert(active_tranxs_ != NULL);
 
1030
    if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
 
1031
    {
 
1032
      /*
 
1033
        if insert tranx_node failed, print a warning message
 
1034
        and turn off semi-sync
 
1035
      */
 
1036
      sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %lu",
 
1037
                        log_file_name, (ulong)log_file_pos);
 
1038
      switch_off();
 
1039
    }
 
1040
  }
 
1041
 
 
1042
 l_end:
 
1043
  unlock();
 
1044
 
 
1045
  return function_exit(kWho, result);
 
1046
}
 
1047
 
 
1048
int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
 
1049
                                       const char *event_buf)
 
1050
{
 
1051
  const char *kWho = "ReplSemiSyncMaster::readSlaveReply";
 
1052
  const unsigned char *packet;
 
1053
  char     log_file_name[FN_REFLEN];
 
1054
  my_off_t log_file_pos;
 
1055
  ulong    log_file_len = 0;
 
1056
  ulong    packet_len;
 
1057
  int      result = -1;
 
1058
 
 
1059
  struct timespec start_ts;
 
1060
  ulong trc_level = trace_level_;
 
1061
 
 
1062
  function_enter(kWho);
 
1063
 
 
1064
  assert((unsigned char)event_buf[1] == kPacketMagicNum);
 
1065
  if ((unsigned char)event_buf[2] != kPacketFlagSync)
 
1066
  {
 
1067
    /* current event does not require reply */
 
1068
    result = 0;
 
1069
    goto l_end;
 
1070
  }
 
1071
 
 
1072
  if (trc_level & kTraceNetWait)
 
1073
    set_timespec(start_ts, 0);
 
1074
 
 
1075
  /* We flush to make sure that the current event is sent to the network,
 
1076
   * instead of being buffered in the TCP/IP stack.
 
1077
   */
 
1078
  if (net_flush(net))
 
1079
  {
 
1080
    sql_print_error("Semi-sync master failed on net_flush() "
 
1081
                    "before waiting for slave reply");
 
1082
    goto l_end;
 
1083
  }
 
1084
 
 
1085
  net_clear(net, 0);
 
1086
  if (trc_level & kTraceDetail)
 
1087
    sql_print_information("%s: Wait for replica's reply", kWho);
 
1088
 
 
1089
  /* Wait for the network here.  Though binlog dump thread can indefinitely wait
 
1090
   * here, transactions would not wait indefintely.
 
1091
   * Transactions wait on binlog replies detected by binlog dump threads.  If
 
1092
   * binlog dump threads wait too long, transactions will timeout and continue.
 
1093
   */
 
1094
  packet_len = my_net_read(net);
 
1095
 
 
1096
  if (trc_level & kTraceNetWait)
 
1097
  {
 
1098
    int wait_time = getWaitTime(start_ts);
 
1099
    if (wait_time < 0)
 
1100
    {
 
1101
      sql_print_error("Semi-sync master wait for reply "
 
1102
                      "fail to get wait time.");
 
1103
      rpl_semi_sync_master_timefunc_fails++;
 
1104
    }
 
1105
    else
 
1106
    {
 
1107
      rpl_semi_sync_master_net_wait_num++;
 
1108
      rpl_semi_sync_master_net_wait_time += wait_time;
 
1109
    }
 
1110
  }
 
1111
 
 
1112
  if (packet_len == packet_error || packet_len < REPLY_BINLOG_NAME_OFFSET)
 
1113
  {
 
1114
    if (packet_len == packet_error)
 
1115
      sql_print_error("Read semi-sync reply network error: %s (errno: %d)",
 
1116
                      net->last_error, net->last_errno);
 
1117
    else
 
1118
      sql_print_error("Read semi-sync reply length error: %s (errno: %d)",
 
1119
                      net->last_error, net->last_errno);
 
1120
    goto l_end;
 
1121
  }
 
1122
 
 
1123
  packet = net->read_pos;
 
1124
  if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum)
 
1125
  {
 
1126
    sql_print_error("Read semi-sync reply magic number error");
 
1127
    goto l_end;
 
1128
  }
 
1129
 
 
1130
  log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
 
1131
  log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
 
1132
  if (log_file_len >= FN_REFLEN)
 
1133
  {
 
1134
    sql_print_error("Read semi-sync reply binlog file length too large");
 
1135
    goto l_end;
 
1136
  }
 
1137
  strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
 
1138
  log_file_name[log_file_len] = 0;
 
1139
 
 
1140
  if (trc_level & kTraceDetail)
 
1141
    sql_print_information("%s: Got reply (%s, %lu)",
 
1142
                          kWho, log_file_name, (ulong)log_file_pos);
 
1143
 
 
1144
  result = reportReplyBinlog(server_id, log_file_name, log_file_pos);
 
1145
 
 
1146
 l_end:
 
1147
  return function_exit(kWho, result);
 
1148
}
 
1149
 
 
1150
 
 
1151
int ReplSemiSyncMaster::resetMaster()
 
1152
{
 
1153
  const char *kWho = "ReplSemiSyncMaster::resetMaster";
 
1154
  int result = 0;
 
1155
 
 
1156
  function_enter(kWho);
 
1157
 
 
1158
 
 
1159
  lock();
 
1160
 
 
1161
  state_ = getMasterEnabled()? 1 : 0;
 
1162
 
 
1163
  wait_file_name_inited_   = false;
 
1164
  reply_file_name_inited_  = false;
 
1165
  commit_file_name_inited_ = false;
 
1166
 
 
1167
  rpl_semi_sync_master_yes_transactions = 0;
 
1168
  rpl_semi_sync_master_no_transactions = 0;
 
1169
  rpl_semi_sync_master_off_times = 0;
 
1170
  rpl_semi_sync_master_timefunc_fails = 0;
 
1171
  rpl_semi_sync_master_wait_sessions = 0;
 
1172
  rpl_semi_sync_master_wait_pos_backtraverse = 0;
 
1173
  rpl_semi_sync_master_trx_wait_num = 0;
 
1174
  rpl_semi_sync_master_trx_wait_time = 0;
 
1175
  rpl_semi_sync_master_net_wait_num = 0;
 
1176
  rpl_semi_sync_master_net_wait_time = 0;
 
1177
 
 
1178
  unlock();
 
1179
 
 
1180
  return function_exit(kWho, result);
 
1181
}
 
1182
 
 
1183
void ReplSemiSyncMaster::setExportStats()
 
1184
{
 
1185
  lock();
 
1186
 
 
1187
  rpl_semi_sync_master_status           = state_;
 
1188
  rpl_semi_sync_master_avg_trx_wait_time=
 
1189
    ((rpl_semi_sync_master_trx_wait_num) ?
 
1190
     (unsigned long)((double)rpl_semi_sync_master_trx_wait_time /
 
1191
                     ((double)rpl_semi_sync_master_trx_wait_num)) : 0);
 
1192
  rpl_semi_sync_master_avg_net_wait_time=
 
1193
    ((rpl_semi_sync_master_net_wait_num) ?
 
1194
     (unsigned long)((double)rpl_semi_sync_master_net_wait_time /
 
1195
                     ((double)rpl_semi_sync_master_net_wait_num)) : 0);
 
1196
 
 
1197
  unlock();
 
1198
}
 
1199
 
 
1200
/* Get the waiting time given the wait's staring time.
 
1201
 * 
 
1202
 * Return:
 
1203
 *  >= 0: the waiting time in microsecons(us)
 
1204
 *   < 0: error in get time or time back traverse
 
1205
 */
 
1206
static int getWaitTime(const struct timespec& start_ts)
 
1207
{
 
1208
  unsigned long long start_usecs, end_usecs;
 
1209
  struct timespec end_ts;
 
1210
  
 
1211
  /* Starting time in microseconds(us). */
 
1212
  start_usecs = timespec_to_usec(&start_ts);
 
1213
 
 
1214
  /* Get the wait time interval. */
 
1215
  set_timespec(end_ts, 0);
 
1216
 
 
1217
  /* Ending time in microseconds(us). */
 
1218
  end_usecs = timespec_to_usec(&end_ts);
 
1219
 
 
1220
  if (end_usecs < start_usecs)
 
1221
    return -1;
 
1222
 
 
1223
  return (int)(end_usecs - start_usecs);
 
1224
}