~percona-toolkit-dev/percona-toolkit/docu-ptc-rbr-limitation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
# This program is copyright 2007-2011 Baron Schwartz, 2011 Percona Inc.
# Feedback and improvements are welcome.
#
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, version 2; OR the Perl Artistic License.  On UNIX and similar
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
# licenses.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
# Place, Suite 330, Boston, MA  02111-1307  USA.
# ###########################################################################
# TableSyncer package
# ###########################################################################
{
# Package: TableSyncer
# TableSyncer helps sync tables with various table sync algo modules.
package TableSyncer;

use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use constant PTDEBUG => $ENV{PTDEBUG} || 0;

use Data::Dumper;
$Data::Dumper::Indent    = 1;
$Data::Dumper::Sortkeys  = 1;
$Data::Dumper::Quotekeys = 0;

# Arguments:
#   * MasterSlave    A MasterSlave module
#   * Quoter         A Quoter module
#   * TableChecksum  A TableChecksum module
#   * Retry          A Retry module
#   * DSNParser      (optional)
sub new {
   my ( $class, %args ) = @_;
   my @required_args = qw(MasterSlave Quoter TableChecksum Retry);
   foreach my $arg ( @required_args ) {
      die "I need a $arg argument" unless defined $args{$arg};
   }
   my $self = { %args };
   return bless $self, $class;
}

# Return the first plugin from the arrayref of TableSync* plugins
# that can sync the given table struct.  plugin->can_sync() usually
# returns a hashref that it wants back when plugin->prepare_to_sync()
# is called.  Or, it may return nothing (false) to say that it can't
# sync the table.
sub get_best_plugin {
   my ( $self, %args ) = @_;
   foreach my $arg ( qw(plugins tbl_struct) ) {
      die "I need a $arg argument" unless $args{$arg};
   }
   PTDEBUG && _d('Getting best plugin');
   foreach my $plugin ( @{$args{plugins}} ) {
      PTDEBUG && _d('Trying plugin', $plugin->name);
      my ($can_sync, %plugin_args) = $plugin->can_sync(%args);
      if ( $can_sync ) {
        PTDEBUG && _d('Can sync with', $plugin->name, Dumper(\%plugin_args));
        return $plugin, %plugin_args;
      }
   }
   PTDEBUG && _d('No plugin can sync the table');
   return;
}

# Required arguments:
#   * plugins         Arrayref of TableSync* modules, in order of preference
#   * src             Hashref with source (aka left) dbh, db, tbl
#   * dst             Hashref with destination (aka right) dbh, db, tbl
#   * tbl_struct      Return val from TableParser::parser() for src and dst tbl
#   * cols            Arrayref of column names to checksum/compare
#   * chunk_size      Size/number of rows to select in each chunk
#   * RowDiff         A RowDiff module
#   * ChangeHandler   A ChangeHandler module
# Optional arguments:
#   * where           WHERE clause to restrict synced rows (default none)
#   * bidirectional   If doing bidirectional sync (default no)
#   * changing_src    If making changes on src (default no)
#   * replicate       Checksum table if syncing via replication (default no)
#   * function        Crypto hash func for checksumming chunks (default CRC32)
#   * dry_run         Prepare to sync but don't actually sync (default no)
#   * chunk_col       Column name to chunk table on (default auto-choose)
#   * chunk_index     Index name to use for chunking table (default auto-choose)
#   * index_hint      Use FORCE/USE INDEX (chunk_index) (default yes)
#   * buffer_in_mysql  Use SQL_BUFFER_RESULT (default no)
#   * buffer_to_client Use mysql_use_result (default no)
#   * callback        Sub called before executing the sql (default none)
#   * trace           Append trace message to change statements (default yes)
#   * transaction     locking
#   * change_dbh      locking
#   * lock            locking
#   * wait            locking
sub sync_table {
   my ( $self, %args ) = @_;
   my @required_args = qw(plugins src dst tbl_struct cols chunk_size
                          RowDiff ChangeHandler);
   foreach my $arg ( @required_args ) {
      die "I need a $arg argument" unless $args{$arg};
   }
   PTDEBUG && _d('Syncing table with args:',
      map { "$_: " . Dumper($args{$_}) }
      qw(plugins src dst tbl_struct cols chunk_size));

   my ($plugins, $src, $dst, $tbl_struct, $cols, $chunk_size, $rd, $ch)
      = @args{@required_args};
   my $dp = $self->{DSNParser};
   $args{trace} = 1 unless defined $args{trace};

   if ( $args{bidirectional} && $args{ChangeHandler}->{queue} ) {
      # This should be checked by the caller but just in case...
      die "Queueing does not work with bidirectional syncing";
   }

   $args{index_hint}    = 1 unless defined $args{index_hint};
   $args{lock}        ||= 0;
   $args{wait}        ||= 0;
   $args{transaction} ||= 0;
   $args{timeout_ok}  ||= 0;

   my $q  = $self->{Quoter};

   # ########################################################################
   # Get and prepare the first plugin that can sync this table.
   # ########################################################################
   my ($plugin, %plugin_args) = $self->get_best_plugin(%args);
   die "No plugin can sync $src->{db}.$src->{tbl}" unless $plugin;

   # The row-level (state 2) checksums use __crc, so the table can't use that.
   my $crc_col = '__crc';
   while ( $tbl_struct->{is_col}->{$crc_col} ) {
      $crc_col = "_$crc_col"; # Prepend more _ until not a column.
   }
   PTDEBUG && _d('CRC column:', $crc_col);

   # Make an index hint for either the explicitly given chunk_index
   # or the chunk_index chosen by the plugin if index_hint is true.
   my $index_hint;
   if ( $args{chunk_index} ) {
      PTDEBUG && _d('Using given chunk index for index hint');
      $index_hint = "FORCE INDEX (" . $q->quote($args{chunk_index}) . ")";
   }
   elsif ( $plugin_args{chunk_index} && $args{index_hint} ) {
      PTDEBUG && _d('Using chunk index chosen by plugin for index hint');
      $index_hint = "FORCE INDEX (" . $q->quote($plugin_args{chunk_index}) . ")";
   }
   PTDEBUG && _d('Index hint:', $index_hint);

   eval {
      $plugin->prepare_to_sync(
         %args,
         %plugin_args,
         dbh        => $src->{dbh},
         db         => $src->{db},
         tbl        => $src->{tbl},
         crc_col    => $crc_col,
         index_hint => $index_hint,
      );
   };
   if ( $EVAL_ERROR ) {
      # At present, no plugin should fail to prepare, but just in case...
      die 'Failed to prepare TableSync', $plugin->name, ' plugin: ',
         $EVAL_ERROR;
   }

   # Some plugins like TableSyncChunk use checksum queries, others like
   # TableSyncGroupBy do not.  For those that do, make chunk (state 0)
   # and row (state 2) checksum queries.
   if ( $plugin->uses_checksum() ) {
      eval {
         my ($chunk_sql, $row_sql) = $self->make_checksum_queries(%args);
         $plugin->set_checksum_queries($chunk_sql, $row_sql);
      };
      if ( $EVAL_ERROR ) {
         # This happens if src and dst are really different and the same
         # checksum algo and hash func can't be used on both.
         die "Failed to make checksum queries: $EVAL_ERROR";
      }
   } 

   # ########################################################################
   # Plugin is ready, return now if this is a dry run.
   # ########################################################################
   if ( $args{dry_run} ) {
      return $ch->get_changes(), ALGORITHM => $plugin->name;
   }

   # ########################################################################
   # Start syncing the table.
   # ########################################################################

   # USE db on src and dst for cases like when replicate-do-db is being used.
   eval {
      $src->{dbh}->do("USE `$src->{db}`");
      $dst->{dbh}->do("USE `$dst->{db}`");
   };
   if ( $EVAL_ERROR ) {
      # This shouldn't happen, but just in case.  (The db and tbl on src
      # and dst should be checked before calling this sub.)
      die "Failed to USE database on source or destination: $EVAL_ERROR";
   }

   # For bidirectional syncing it's important to know on which dbh
   # changes are made or rows are fetched.  This identifies the dbhs,
   # then you can search for each one by its address like
   # "dbh DBI::db=HASH(0x1028b38)".
   PTDEBUG && _d('left dbh', $src->{dbh});
   PTDEBUG && _d('right dbh', $dst->{dbh});

   chomp(my $hostname = `hostname`);
   my $trace_msg
      = $args{trace} ? "src_db:$src->{db} src_tbl:$src->{tbl} "
         . ($dp && $src->{dsn} ? "src_dsn:".$dp->as_string($src->{dsn}) : "")
         . " dst_db:$dst->{db} dst_tbl:$dst->{tbl} "
         . ($dp && $dst->{dsn} ? "dst_dsn:".$dp->as_string($dst->{dsn}) : "")
         . " " . join(" ", map { "$_:" . ($args{$_} || 0) }
                     qw(lock transaction changing_src replicate bidirectional))
         . " pid:$PID "
         . ($ENV{USER} ? "user:$ENV{USER} " : "")
         . ($hostname  ? "host:$hostname"   : "")
      :                "";
   PTDEBUG && _d("Binlog trace message:", $trace_msg);

   $self->lock_and_wait(%args, lock_level => 2);  # per-table lock

   my $callback = $args{callback};
   my $cycle    = 0;
   while ( !$plugin->done() ) {

      # Do as much of the work as possible before opening a transaction or
      # locking the tables.
      PTDEBUG && _d('Beginning sync cycle', $cycle);
      my $src_sql = $plugin->get_sql(
         database => $src->{db},
         table    => $src->{tbl},
         where    => $args{where},
      );
      my $dst_sql = $plugin->get_sql(
         database => $dst->{db},
         table    => $dst->{tbl},
         where    => $args{where},
      );

      if ( $args{transaction} ) {
         if ( $args{bidirectional} ) {
            # Making changes on src and dst.
            $src_sql .= ' FOR UPDATE';
            $dst_sql .= ' FOR UPDATE';
         }
         elsif ( $args{changing_src} ) {
            # Making changes on master (src) which replicate to slave (dst).
            $src_sql .= ' FOR UPDATE';
            $dst_sql .= ' LOCK IN SHARE MODE';
         }
         else {
            # Making changes on slave (dst).
            $src_sql .= ' LOCK IN SHARE MODE';
            $dst_sql .= ' FOR UPDATE';
         }
      }
      PTDEBUG && _d('src:', $src_sql);
      PTDEBUG && _d('dst:', $dst_sql);

      # Give callback a chance to do something with the SQL statements.
      $callback->($src_sql, $dst_sql) if $callback;

      # Prepare each host for next sync cycle. This does stuff
      # like reset/init MySQL accumulator vars, etc.
      $plugin->prepare_sync_cycle($src);
      $plugin->prepare_sync_cycle($dst);

      # Prepare SQL statements on host.  These aren't real prepared
      # statements (i.e. no ? placeholders); we just need sths to
      # pass to compare_sets().  Also, we can control buffering
      # (mysql_use_result) on the sths.
      my $src_sth = $src->{dbh}->prepare($src_sql);
      my $dst_sth = $dst->{dbh}->prepare($dst_sql);
      if ( $args{buffer_to_client} ) {
         $src_sth->{mysql_use_result} = 1;
         $dst_sth->{mysql_use_result} = 1;
      }

      # The first cycle should lock to begin work; after that, unlock only if
      # the plugin says it's OK (it may want to dig deeper on the rows it
      # currently has locked).
      my $executed_src = 0;
      if ( !$cycle || !$plugin->pending_changes() ) {
         # per-sync cycle lock
         $executed_src
            = $self->lock_and_wait(%args, src_sth => $src_sth, lock_level => 1);
      }

      # The source sth might have already been executed by lock_and_wait().
      $src_sth->execute() unless $executed_src;
      $dst_sth->execute();

      # Compare rows in the two sths.  If any differences are found
      # (same_row, not_in_left, not_in_right), the appropriate $syncer
      # methods are called to handle them.  Changes may be immediate, or...
      $rd->compare_sets(
         left_sth   => $src_sth,
         right_sth  => $dst_sth,
         left_dbh   => $src->{dbh},
         right_dbh  => $dst->{dbh},
         syncer     => $plugin,
         tbl_struct => $tbl_struct,
      );
      # ...changes may be queued and executed now.
      $ch->process_rows(1, $trace_msg);

      PTDEBUG && _d('Finished sync cycle', $cycle);
      $cycle++;
   }

   $ch->process_rows(0, $trace_msg);

   $self->unlock(%args, lock_level => 2);

   return $ch->get_changes(), ALGORITHM => $plugin->name;
}

sub make_checksum_queries {
   my ( $self, %args ) = @_;
   my @required_args = qw(src dst tbl_struct);
   foreach my $arg ( @required_args ) {
      die "I need a $arg argument" unless $args{$arg};
   }
   my ($src, $dst, $tbl_struct) = @args{@required_args};
   my $checksum = $self->{TableChecksum};

   # Decide on checksumming strategy and store checksum query prototypes for
   # later.
   my $src_algo = $checksum->best_algorithm(
      algorithm => 'BIT_XOR',
      dbh       => $src->{dbh},
      where     => 1,
      chunk     => 1,
      count     => 1,
   );
   my $dst_algo = $checksum->best_algorithm(
      algorithm => 'BIT_XOR',
      dbh       => $dst->{dbh},
      where     => 1,
      chunk     => 1,
      count     => 1,
   );
   if ( $src_algo ne $dst_algo ) {
      die "Source and destination checksum algorithms are different: ",
         "$src_algo on source, $dst_algo on destination"
   }
   PTDEBUG && _d('Chosen algo:', $src_algo);

   my $src_func = $checksum->choose_hash_func(dbh => $src->{dbh}, %args);
   my $dst_func = $checksum->choose_hash_func(dbh => $dst->{dbh}, %args);
   if ( $src_func ne $dst_func ) {
      die "Source and destination hash functions are different: ",
      "$src_func on source, $dst_func on destination";
   }
   PTDEBUG && _d('Chosen hash func:', $src_func);

   # Since the checksum algo and hash func are the same on src and dst
   # it doesn't matter if we use src_algo/func or dst_algo/func.

   my $crc_wid    = $checksum->get_crc_wid($src->{dbh}, $src_func);
   my ($crc_type) = $checksum->get_crc_type($src->{dbh}, $src_func);
   my $opt_slice;
   if ( $src_algo eq 'BIT_XOR' && $crc_type !~ m/int$/ ) {
      $opt_slice = $checksum->optimize_xor(
         dbh      => $src->{dbh},
         function => $src_func
      );
   }

   my $chunk_sql = $checksum->make_checksum_query(
      %args,
      db        => $src->{db},
      tbl       => $src->{tbl},
      algorithm => $src_algo,
      function  => $src_func,
      crc_wid   => $crc_wid,
      crc_type  => $crc_type,
      opt_slice => $opt_slice,
      replicate => undef, # replicate means something different to this sub
   );                     # than what we use it for; do not pass it!
   PTDEBUG && _d('Chunk sql:', $chunk_sql);
   my $row_sql = $checksum->make_row_checksum(
      %args,
      function => $src_func,
   );
   PTDEBUG && _d('Row sql:', $row_sql);
   return $chunk_sql, $row_sql;
}

sub lock_table {
   my ( $self, $dbh, $where, $db_tbl, $mode ) = @_;
   my $query = "LOCK TABLES $db_tbl $mode";
   PTDEBUG && _d($query);
   $dbh->do($query);
   PTDEBUG && _d('Acquired table lock on', $where, 'in', $mode, 'mode');
}

# Doesn't work quite the same way as lock_and_wait. It will unlock any LOWER
# priority lock level, not just the exact same one.
sub unlock {
   my ( $self, %args ) = @_;

   foreach my $arg ( qw(src dst lock transaction lock_level) ) {
      die "I need a $arg argument" unless defined $args{$arg};
   }
   my $src = $args{src};
   my $dst = $args{dst};

   return unless $args{lock} && $args{lock} <= $args{lock_level};

   # First, unlock/commit.
   foreach my $dbh ( $src->{dbh}, $dst->{dbh} ) {
      if ( $args{transaction} ) {
         PTDEBUG && _d('Committing', $dbh);
         $dbh->commit();
      }
      else {
         my $sql = 'UNLOCK TABLES';
         PTDEBUG && _d($dbh, $sql);
         $dbh->do($sql);
      }
   }

   return;
}

# Arguments:
#    lock         scalar: lock level requested by user
#    local_level  scalar: lock level code is calling from
#    src          hashref
#    dst          hashref
# Optional arguments:
#   * wait_retry_args  hashref: retry args for retrying wait/MASTER_POS_WAIT
# Lock levels:
#   0 => none
#   1 => per sync cycle
#   2 => per table
#   3 => global
# This function might actually execute the $src_sth.  If we're using
# transactions instead of table locks, the $src_sth has to be executed before
# the MASTER_POS_WAIT() on the slave.  The return value is whether the
# $src_sth was executed.
sub lock_and_wait {
   my ( $self, %args ) = @_;
   my $result = 0;

   foreach my $arg ( qw(src dst lock lock_level) ) {
      die "I need a $arg argument" unless defined $args{$arg};
   }
   my $src = $args{src};
   my $dst = $args{dst};

   return unless $args{lock} && $args{lock} == $args{lock_level};
   PTDEBUG && _d('lock and wait, lock level', $args{lock});

   # First, commit/unlock the previous transaction/lock.
   foreach my $dbh ( $src->{dbh}, $dst->{dbh} ) {
      if ( $args{transaction} ) {
         PTDEBUG && _d('Committing', $dbh);
         $dbh->commit();
      }
      else {
         my $sql = 'UNLOCK TABLES';
         PTDEBUG && _d($dbh, $sql);
         $dbh->do($sql);
      }
   }

   # User wants us to lock for consistency.  But lock only on source initially;
   # might have to wait for the slave to catch up before locking on the dest.
   if ( $args{lock} == 3 ) {
      my $sql = 'FLUSH TABLES WITH READ LOCK';
      PTDEBUG && _d($src->{dbh}, $sql);
      $src->{dbh}->do($sql);
   }
   else {
      # Lock level 2 (per-table) or 1 (per-sync cycle)
      if ( $args{transaction} ) {
         if ( $args{src_sth} ) {
            # Execute the $src_sth on the source, so LOCK IN SHARE MODE/FOR
            # UPDATE will lock the rows examined.
            PTDEBUG && _d('Executing statement on source to lock rows');

            my $sql = "START TRANSACTION /*!40108 WITH CONSISTENT SNAPSHOT */";
            PTDEBUG && _d($src->{dbh}, $sql);
            $src->{dbh}->do($sql);

            $args{src_sth}->execute();
            $result = 1;
         }
      }
      else {
         $self->lock_table($src->{dbh}, 'source',
            $self->{Quoter}->quote($src->{db}, $src->{tbl}),
            $args{changing_src} ? 'WRITE' : 'READ');
      }
   }

   # If there is any error beyond this point, we need to unlock/commit.
   eval {
      if ( my $timeout = $args{wait} ) {
         my $ms    = $self->{MasterSlave};
         my $tries = $args{wait_retry_args}->{tries} || 3;
         my $wait;
         $self->{Retry}->retry(
            tries => $tries,
            wait  => sub { sleep $args{wait_retry_args}->{wait}  || 10 },
            try   => sub {
               my ( %args ) = @_;
               # Be careful using $args{...} in this callback!  %args in
               # here are the passed-in args, not the args to lock_and_wait().

               if ( $args{tryno} > 1 ) {
                  warn "Retrying MASTER_POS_WAIT() for --wait $timeout...";
               }

               # Always use the misc_dbh dbh to check the master's position
               # because the main dbh might be in use due to executing
               # $src_sth.
               $wait = $ms->wait_for_master(
                  master_status => $ms->get_master_status($src->{misc_dbh}),
                  slave_dbh     => $dst->{dbh},
                  timeout       => $timeout,
               );
               if ( defined $wait->{result} && $wait->{result} != -1 ) {
                  return;  # slave caught up
               }
               die; # call fail
            },
            fail => sub {
               my (%args) = @_;
               if ( !defined $wait->{result} ) {
                  # Slave was stopped either before or during the wait.
                  # Wait a few seconds and try again in hopes that the
                  # slave is restarted.  This is the only case for which
                  # we wait and retry because the slave might have been
                  # stopped temporarily and/or unbeknownst to the user,
                  # so they'll be happy if we wait for slave to be restarted
                  # and then continue syncing.
                  my $msg;
                  if ( $wait->{waited}  ) {
                     $msg = "The slave was stopped while waiting with "
                          . "MASTER_POS_WAIT().";
                  }
                  else {
                     $msg = "MASTER_POS_WAIT() returned NULL.  Verify that "
                          . "the slave is running.";
                  }
                  if ( $tries - $args{tryno} ) {
                     $msg .= "  Sleeping $wait seconds then retrying "
                           . ($tries - $args{tryno}) . " more times.";
                  }
                  warn "$msg\n";
                  return 1; # call wait, call try
               }
               elsif ( $wait->{result} == -1 ) {
                  # MASTER_POS_WAIT timed out, don't retry since we've
                  # already waited as long as the user specified with --wait.
                  return 0;  # call final_fail
               }
            },
            final_fail => sub {
               die "Slave did not catch up to its master after $tries attempts "
                  . "of waiting $timeout seconds with MASTER_POS_WAIT.  "
                  . "Check that the slave is running, increase the --wait "
                  . "time, or disable this feature by specifying --wait 0.";
            },
         );  # retry MasterSlave::wait_for_master()
      }

      # Don't lock the destination if we're making changes on the source
      # (for sync-to-master and sync via replicate) else the destination
      # won't be apply to make the changes.
      if ( $args{changing_src} ) {
         PTDEBUG && _d('Not locking destination because changing source ',
            '(syncing via replication or sync-to-master)');
      }
      else {
         if ( $args{lock} == 3 ) {
            my $sql = 'FLUSH TABLES WITH READ LOCK';
            PTDEBUG && _d($dst->{dbh}, ',', $sql);
            $dst->{dbh}->do($sql);
         }
         elsif ( !$args{transaction} ) {
            $self->lock_table($dst->{dbh}, 'dest',
               $self->{Quoter}->quote($dst->{db}, $dst->{tbl}),
               $args{execute} ? 'WRITE' : 'READ');
         }
      }
   };
   if ( $EVAL_ERROR ) {
      # Must abort/unlock/commit so that we don't interfere with any further
      # tables we try to do.
      if ( $args{src_sth}->{Active} ) {
         $args{src_sth}->finish();
      }
      foreach my $dbh ( $src->{dbh}, $dst->{dbh}, $src->{misc_dbh} ) {
         next unless $dbh;
         PTDEBUG && _d('Caught error, unlocking/committing on', $dbh);
         $dbh->do('UNLOCK TABLES');
         $dbh->commit() unless $dbh->{AutoCommit};
      }
      # ... and then re-throw the error.
      die $EVAL_ERROR;
   }

   return $result;
}

# This query will check all needed privileges on the table without actually
# changing anything in it.  We can't use REPLACE..SELECT because that doesn't
# work inside of LOCK TABLES.  Returns 1 if user has all needed privs to
# sync table, else returns 0.
sub have_all_privs {
   my ( $self, $dbh, $db, $tbl ) = @_;
   my $db_tbl = $self->{Quoter}->quote($db, $tbl);
   my $sql    = "SHOW FULL COLUMNS FROM $db_tbl";
   PTDEBUG && _d('Permissions check:', $sql);
   my $cols       = $dbh->selectall_arrayref($sql, {Slice => {}});
   my ($hdr_name) = grep { m/privileges/i } keys %{$cols->[0]};
   my $privs      = $cols->[0]->{$hdr_name};
   $sql = "DELETE FROM $db_tbl LIMIT 0"; # FULL COLUMNS doesn't show all privs
   PTDEBUG && _d('Permissions check:', $sql);
   eval { $dbh->do($sql); };
   my $can_delete = $EVAL_ERROR ? 0 : 1;

   PTDEBUG && _d('User privs on', $db_tbl, ':', $privs,
      ($can_delete ? 'delete' : ''));
   if ( $privs =~ m/select/ && $privs =~ m/insert/ && $privs =~ m/update/ 
        && $can_delete ) {
      PTDEBUG && _d('User has all privs');
      return 1;
   }
   PTDEBUG && _d('User does not have all privs');
   return 0;
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;
}
# ###########################################################################
# End TableSyncer package
# ###########################################################################