~percona-toolkit-dev/percona-toolkit/pt-agent

« back to all changes in this revision

Viewing changes to lib/TableSyncNibble.pm

  • Committer: Daniel Nichter
  • Date: 2011-06-24 17:22:06 UTC
  • Revision ID: daniel@percona.com-20110624172206-c7q4s4ad6r260zz6
Add lib/, t/lib/, and sandbox/.  All modules are updated and passing on MySQL 5.1.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# This program is copyright 2011 Percona Inc.
 
2
# This program is copyright 2007-2010 Baron Schwartz.
 
3
# Feedback and improvements are welcome.
 
4
#
 
5
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
 
6
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
 
7
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
 
8
#
 
9
# This program is free software; you can redistribute it and/or modify it under
 
10
# the terms of the GNU General Public License as published by the Free Software
 
11
# Foundation, version 2; OR the Perl Artistic License.  On UNIX and similar
 
12
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
 
13
# licenses.
 
14
#
 
15
# You should have received a copy of the GNU General Public License along with
 
16
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
 
17
# Place, Suite 330, Boston, MA  02111-1307  USA.
 
18
# ###########################################################################
 
19
# TableSyncNibble package $Revision: 6511 $
 
20
# ###########################################################################
 
21
 
 
22
# Package: TableSyncNibble
 
23
# TableSyncNibble is a table sync algo that uses <TableNibbler>.
 
24
# This package implements a moderately complex sync algorithm:
 
25
# * Prepare to nibble the table (see TableNibbler.pm)
 
26
# * Fetch the nibble'th next row (say the 500th) from the current row
 
27
# * Checksum from the current row to the nibble'th as a chunk
 
28
# * If nibble differs, make a note to checksum the rows in the nibble (state 1)
 
29
# * Checksum them (state 2)
 
30
# * If a row differs, it must be synced
 
31
# See TableSyncStream for the TableSync interface this conforms to.
 
32
{
 
33
package TableSyncNibble;
 
34
 
 
35
use strict;
 
36
use warnings FATAL => 'all';
 
37
use English qw(-no_match_vars);
 
38
use constant MKDEBUG => $ENV{MKDEBUG} || 0;
 
39
 
 
40
use Data::Dumper;
 
41
$Data::Dumper::Indent    = 1;
 
42
$Data::Dumper::Sortkeys  = 1;
 
43
$Data::Dumper::Quotekeys = 0;
 
44
 
 
45
sub new {
 
46
   my ( $class, %args ) = @_;
 
47
   foreach my $arg ( qw(TableNibbler TableChunker TableParser Quoter) ) {
 
48
      die "I need a $arg argument" unless defined $args{$arg};
 
49
   }
 
50
   my $self = { %args };
 
51
   return bless $self, $class;
 
52
}
 
53
 
 
54
sub name {
 
55
   return 'Nibble';
 
56
}
 
57
 
 
58
# Returns a hash (true) with a chunk_index that can be used to sync
 
59
# the given tbl_struct.  Else, returns nothing (false) if the table
 
60
# cannot be synced.  Arguments:
 
61
#   * tbl_struct    Return value of TableParser::parse()
 
62
#   * chunk_index   (optional) Index to use for chunking
 
63
# If chunk_index is given, then it is required so the return value will
 
64
# only be true if it's the best possible index.  If it's not given, then
 
65
# the best possible index is returned.  The return value should be passed
 
66
# back to prepare_to_sync().  -- nibble_index is the same as chunk_index:
 
67
# both are used to select multiple rows at once in state 0.
 
68
sub can_sync {
 
69
   my ( $self, %args ) = @_;
 
70
   foreach my $arg ( qw(tbl_struct) ) {
 
71
      die "I need a $arg argument" unless defined $args{$arg};
 
72
   }
 
73
 
 
74
   # If there's an index, TableNibbler::generate_asc_stmt() will use it,
 
75
   # so it is an indication that the nibble algorithm will work.
 
76
   my $nibble_index = $self->{TableParser}->find_best_index($args{tbl_struct});
 
77
   if ( $nibble_index ) {
 
78
      MKDEBUG && _d('Best nibble index:', Dumper($nibble_index));
 
79
      if ( !$args{tbl_struct}->{keys}->{$nibble_index}->{is_unique} ) {
 
80
         MKDEBUG && _d('Best nibble index is not unique');
 
81
         return;
 
82
      }
 
83
      if ( $args{chunk_index} && $args{chunk_index} ne $nibble_index ) {
 
84
         MKDEBUG && _d('Best nibble index is not requested index',
 
85
            $args{chunk_index});
 
86
         return;
 
87
      }
 
88
   }
 
89
   else {
 
90
      MKDEBUG && _d('No best nibble index returned');
 
91
      return;
 
92
   }
 
93
 
 
94
   # MySQL may choose to use no index for small tables because it's faster.
 
95
   # However, this will cause __get_boundaries() to die with a "Cannot nibble
 
96
   # table" error.  So we check if the table is small and if it is then we
 
97
   # let MySQL do whatever it wants and let ORDER BY keep us safe.
 
98
   my $small_table = 0;
 
99
   if ( $args{src} && $args{src}->{dbh} ) {
 
100
      my $dbh = $args{src}->{dbh};
 
101
      my $db  = $args{src}->{db};
 
102
      my $tbl = $args{src}->{tbl};
 
103
      my $table_status;
 
104
      eval {
 
105
         my $sql = "SHOW TABLE STATUS FROM `$db` LIKE "
 
106
                 . $self->{Quoter}->literal_like($tbl);
 
107
         MKDEBUG && _d($sql);
 
108
         $table_status = $dbh->selectrow_hashref($sql);
 
109
      };
 
110
      MKDEBUG && $EVAL_ERROR && _d($EVAL_ERROR);
 
111
      if ( $table_status ) {
 
112
         my $n_rows   = defined $table_status->{Rows} ? $table_status->{Rows}
 
113
                      : defined $table_status->{rows} ? $table_status->{rows}
 
114
                      : undef;
 
115
         $small_table = 1 if defined $n_rows && $n_rows <= 100;
 
116
      }
 
117
   }
 
118
   MKDEBUG && _d('Small table:', $small_table);
 
119
 
 
120
   MKDEBUG && _d('Can nibble using index', $nibble_index);
 
121
   return (
 
122
      1,
 
123
      chunk_index => $nibble_index,
 
124
      key_cols    => $args{tbl_struct}->{keys}->{$nibble_index}->{cols},
 
125
      small_table => $small_table,
 
126
   );
 
127
}
 
128
 
 
129
sub prepare_to_sync {
 
130
   my ( $self, %args ) = @_;
 
131
   my @required_args = qw(dbh db tbl tbl_struct chunk_index key_cols chunk_size
 
132
                          crc_col ChangeHandler);
 
133
   foreach my $arg ( @required_args ) {
 
134
      die "I need a $arg argument" unless defined $args{$arg};
 
135
   }
 
136
 
 
137
   $self->{dbh}             = $args{dbh};
 
138
   $self->{tbl_struct}      = $args{tbl_struct};
 
139
   $self->{crc_col}         = $args{crc_col};
 
140
   $self->{index_hint}      = $args{index_hint};
 
141
   $self->{key_cols}        = $args{key_cols};
 
142
   ($self->{chunk_size})    = $self->{TableChunker}->size_to_rows(%args);
 
143
   $self->{buffer_in_mysql} = $args{buffer_in_mysql};
 
144
   $self->{small_table}     = $args{small_table};
 
145
   $self->{ChangeHandler}   = $args{ChangeHandler};
 
146
 
 
147
   $self->{ChangeHandler}->fetch_back($args{dbh});
 
148
 
 
149
   # Make sure our chunk col is in the list of comparison columns
 
150
   # used by TableChecksum::make_row_checksum() to create $row_sql.
 
151
   # Normally that sub removes dupes, but the code to make boundary
 
152
   # sql does not, so we do it here.
 
153
   my %seen;
 
154
   my @ucols = grep { !$seen{$_}++ } @{$args{cols}}, @{$args{key_cols}};
 
155
   $args{cols} = \@ucols;
 
156
 
 
157
   $self->{sel_stmt} = $self->{TableNibbler}->generate_asc_stmt(
 
158
      %args,
 
159
      index    => $args{chunk_index}, # expects an index arg, not chunk_index
 
160
      asc_only => 1,
 
161
   );
 
162
 
 
163
   $self->{nibble}            = 0;
 
164
   $self->{cached_row}        = undef;
 
165
   $self->{cached_nibble}     = undef;
 
166
   $self->{cached_boundaries} = undef;
 
167
   $self->{state}             = 0;
 
168
 
 
169
   return;
 
170
}
 
171
 
 
172
sub uses_checksum {
 
173
   return 1;
 
174
}
 
175
 
 
176
sub set_checksum_queries {
 
177
   my ( $self, $nibble_sql, $row_sql ) = @_;
 
178
   die "I need a nibble_sql argument" unless $nibble_sql;
 
179
   die "I need a row_sql argument" unless $row_sql;
 
180
   $self->{nibble_sql} = $nibble_sql;
 
181
   $self->{row_sql} = $row_sql;
 
182
   return;
 
183
}
 
184
 
 
185
sub prepare_sync_cycle {
 
186
   my ( $self, $host ) = @_;
 
187
   my $sql = 'SET @crc := "", @cnt := 0';
 
188
   MKDEBUG && _d($sql);
 
189
   $host->{dbh}->do($sql);
 
190
   return;
 
191
}
 
192
 
 
193
# Returns a SELECT statement that either gets a nibble of rows (state=0) or,
 
194
# if the last nibble was bad (state=1 or 2), gets the rows in the nibble.
 
195
# This way we can sync part of the table before moving on to the next part.
 
196
# Required args: database, table
 
197
# Optional args: where
 
198
sub get_sql {
 
199
   my ( $self, %args ) = @_;
 
200
   if ( $self->{state} ) {
 
201
      # Selects the individual rows so that they can be compared.
 
202
      my $q = $self->{Quoter};
 
203
      return 'SELECT /*rows in nibble*/ '
 
204
         . ($self->{buffer_in_mysql} ? 'SQL_BUFFER_RESULT ' : '')
 
205
         . $self->{row_sql} . " AS $self->{crc_col}"
 
206
         . ' FROM ' . $q->quote(@args{qw(database table)})
 
207
         . ' ' . ($self->{index_hint} ? $self->{index_hint} : '')
 
208
         . ' WHERE (' . $self->__get_boundaries(%args) . ')'
 
209
         . ($args{where} ? " AND ($args{where})" : '')
 
210
         . ' ORDER BY ' . join(', ', map {$q->quote($_) } @{$self->key_cols()});
 
211
   }
 
212
   else {
 
213
      # Selects the rows as a nibble (aka a chunk).
 
214
      my $where = $self->__get_boundaries(%args);
 
215
      return $self->{TableChunker}->inject_chunks(
 
216
         database   => $args{database},
 
217
         table      => $args{table},
 
218
         chunks     => [ $where ],
 
219
         chunk_num  => 0,
 
220
         query      => $self->{nibble_sql},
 
221
         index_hint => $self->{index_hint},
 
222
         where      => [ $args{where} ],
 
223
      );
 
224
   }
 
225
}
 
226
 
 
227
# Returns a WHERE clause for selecting rows in a nibble relative to lower
 
228
# and upper boundary rows.  Initially neither boundary is defined, so we
 
229
# get the first upper boundary row and return a clause like:
 
230
#   WHERE rows < upper_boundary_row1
 
231
# This selects all "lowest" rows: those before/below the first nibble
 
232
# boundary.  The upper boundary row is saved (as cached_row) so that on the
 
233
# next call it becomes the lower boundary and we get the next upper boundary,
 
234
# resulting in a clause like:
 
235
#   WHERE rows > cached_row && col < upper_boundary_row2
 
236
# This process repeats for subsequent calls. Assuming that the source and
 
237
# destination tables have different data, executing the same query against
 
238
# them might give back a different boundary row, which is not what we want,
 
239
# so each boundary needs to be cached until the nibble increases.
 
240
sub __get_boundaries {
 
241
   my ( $self, %args ) = @_;
 
242
   my $q = $self->{Quoter};
 
243
   my $s = $self->{sel_stmt};
 
244
 
 
245
   my $lb;   # Lower boundary part of WHERE
 
246
   my $ub;   # Upper boundary part of WHERE
 
247
   my $row;  # Next upper boundary row or cached_row
 
248
 
 
249
   if ( $self->{cached_boundaries} ) {
 
250
      MKDEBUG && _d('Using cached boundaries');
 
251
      return $self->{cached_boundaries};
 
252
   }
 
253
 
 
254
   if ( $self->{cached_row} && $self->{cached_nibble} == $self->{nibble} ) {
 
255
      # If there's a cached (last) row and the nibble number hasn't increased
 
256
      # then a differing row was found in this nibble.  We re-use its
 
257
      # boundaries so that instead of advancing to the next nibble we'll look
 
258
      # at the row in this nibble (get_sql() will return its SELECT
 
259
      # /*rows in nibble*/ query).
 
260
      MKDEBUG && _d('Using cached row for boundaries');
 
261
      $row = $self->{cached_row};
 
262
   }
 
263
   else {
 
264
      MKDEBUG && _d('Getting next upper boundary row');
 
265
      my $sql;
 
266
      ($sql, $lb) = $self->__make_boundary_sql(%args);  # $lb from outer scope!
 
267
 
 
268
      # Check that $sql will use the index chosen earlier in new().
 
269
      # Only do this for the first nibble.  I assume this will be safe
 
270
      # enough since the WHERE should use the same columns.
 
271
      if ( $self->{nibble} == 0 && !$self->{small_table} ) {
 
272
         my $explain_index = $self->__get_explain_index($sql);
 
273
         if ( lc($explain_index || '') ne lc($s->{index}) ) {
 
274
            die 'Cannot nibble table '.$q->quote($args{database}, $args{table})
 
275
               . " because MySQL chose "
 
276
               . ($explain_index ? "the `$explain_index`" : 'no') . ' index'
 
277
               . " instead of the `$s->{index}` index";
 
278
         }
 
279
      }
 
280
 
 
281
      $row = $self->{dbh}->selectrow_hashref($sql);
 
282
      MKDEBUG && _d($row ? 'Got a row' : "Didn't get a row");
 
283
   }
 
284
 
 
285
   if ( $row ) {
 
286
      # Add the row to the WHERE clause as the upper boundary.  As such,
 
287
      # the table rows should be <= to this boundary.  (Conversely, for
 
288
      # any lower boundary the table rows should be > the lower boundary.)
 
289
      my $i = 0;
 
290
      $ub   = $s->{boundaries}->{'<='};
 
291
      $ub   =~ s/\?/$q->quote_val($row->{$s->{scols}->[$i]}, $self->{tbl_struct}->{is_numeric}->{$s->{scols}->[$i++]} || 0)/eg;
 
292
   }
 
293
   else {
 
294
      # This usually happens at the end of the table, after we've nibbled
 
295
      # all the rows.
 
296
      MKDEBUG && _d('No upper boundary');
 
297
      $ub = '1=1';
 
298
   }
 
299
 
 
300
   # If $lb is defined, then this is the 2nd or subsequent nibble and
 
301
   # $ub should be the previous boundary.  Else, this is the first nibble.
 
302
   # Do not append option where arg here; it is added by the caller.
 
303
   my $where = $lb ? "($lb AND $ub)" : $ub;
 
304
 
 
305
   $self->{cached_row}        = $row;
 
306
   $self->{cached_nibble}     = $self->{nibble};
 
307
   $self->{cached_boundaries} = $where;
 
308
 
 
309
   MKDEBUG && _d('WHERE clause:', $where);
 
310
   return $where;
 
311
}
 
312
 
 
313
# Returns a SELECT statement for the next upper boundary row and the
 
314
# lower boundary part of WHERE if this is the 2nd or subsequent nibble.
 
315
# (The first nibble doesn't have a lower boundary.)  The returned SELECT
 
316
# is largely responsible for nibbling the table because if the boundaries
 
317
# are off then the nibble may not advance properly and we'll get stuck
 
318
# in an infinite loop (issue 96).
 
319
sub __make_boundary_sql {
 
320
   my ( $self, %args ) = @_;
 
321
   my $lb;
 
322
   my $q   = $self->{Quoter};
 
323
   my $s   = $self->{sel_stmt};
 
324
   my $sql = "SELECT /*nibble boundary $self->{nibble}*/ "
 
325
      . join(',', map { $q->quote($_) } @{$s->{cols}})
 
326
      . " FROM " . $q->quote($args{database}, $args{table})
 
327
      . ' ' . ($self->{index_hint} || '')
 
328
      . ($args{where} ? " WHERE ($args{where})" : "");
 
329
 
 
330
   if ( $self->{nibble} ) {
 
331
      # The lower boundaries of the nibble must be defined, based on the last
 
332
      # remembered row.
 
333
      my $tmp = $self->{cached_row};
 
334
      my $i   = 0;
 
335
      $lb     = $s->{boundaries}->{'>'};
 
336
      $lb     =~ s/\?/$q->quote_val($tmp->{$s->{scols}->[$i]}, $self->{tbl_struct}->{is_numeric}->{$s->{scols}->[$i++]} || 0)/eg;
 
337
      $sql   .= $args{where} ? " AND $lb" : " WHERE $lb";
 
338
   }
 
339
   $sql .= " ORDER BY " . join(',', map { $q->quote($_) } @{$self->{key_cols}})
 
340
         . ' LIMIT ' . ($self->{chunk_size} - 1) . ', 1';
 
341
   MKDEBUG && _d('Lower boundary:', $lb);
 
342
   MKDEBUG && _d('Next boundary sql:', $sql);
 
343
   return $sql, $lb;
 
344
}
 
345
 
 
346
# Returns just the index value from EXPLAIN for the given query (sql).
 
347
sub __get_explain_index {
 
348
   my ( $self, $sql ) = @_;
 
349
   return unless $sql;
 
350
   my $explain;
 
351
   eval {
 
352
      $explain = $self->{dbh}->selectall_arrayref("EXPLAIN $sql",{Slice => {}});
 
353
   };
 
354
   if ( $EVAL_ERROR ) {
 
355
      MKDEBUG && _d($EVAL_ERROR);
 
356
      return;
 
357
   }
 
358
   MKDEBUG && _d('EXPLAIN key:', $explain->[0]->{key}); 
 
359
   return $explain->[0]->{key};
 
360
}
 
361
 
 
362
sub same_row {
 
363
   my ( $self, %args ) = @_;
 
364
   my ($lr, $rr) = @args{qw(lr rr)};
 
365
   if ( $self->{state} ) {
 
366
      if ( $lr->{$self->{crc_col}} ne $rr->{$self->{crc_col}} ) {
 
367
         $self->{ChangeHandler}->change('UPDATE', $lr, $self->key_cols());
 
368
      }
 
369
   }
 
370
   elsif ( $lr->{cnt} != $rr->{cnt} || $lr->{crc} ne $rr->{crc} ) {
 
371
      MKDEBUG && _d('Rows:', Dumper($lr, $rr));
 
372
      MKDEBUG && _d('Will examine this nibble before moving to next');
 
373
      $self->{state} = 1; # Must examine this nibble row-by-row
 
374
   }
 
375
}
 
376
 
 
377
# This (and not_in_left) should NEVER be called in state 0.  If there are
 
378
# missing rows in state 0 in one of the tables, the CRC will be all 0's and the
 
379
# cnt will be 0, but the result set should still come back.
 
380
sub not_in_right {
 
381
   my ( $self, %args ) = @_;
 
382
   die "Called not_in_right in state 0" unless $self->{state};
 
383
   $self->{ChangeHandler}->change('INSERT', $args{lr}, $self->key_cols());
 
384
}
 
385
 
 
386
sub not_in_left {
 
387
   my ( $self, %args ) = @_;
 
388
   die "Called not_in_left in state 0" unless $self->{state};
 
389
   $self->{ChangeHandler}->change('DELETE', $args{rr}, $self->key_cols());
 
390
}
 
391
 
 
392
sub done_with_rows {
 
393
   my ( $self ) = @_;
 
394
   if ( $self->{state} == 1 ) {
 
395
      $self->{state} = 2;
 
396
      MKDEBUG && _d('Setting state =', $self->{state});
 
397
   }
 
398
   else {
 
399
      $self->{state} = 0;
 
400
      $self->{nibble}++;
 
401
      delete $self->{cached_boundaries};
 
402
      MKDEBUG && _d('Setting state =', $self->{state},
 
403
         ', nibble =', $self->{nibble});
 
404
   }
 
405
}
 
406
 
 
407
sub done {
 
408
   my ( $self ) = @_;
 
409
   MKDEBUG && _d('Done with nibble', $self->{nibble});
 
410
   MKDEBUG && $self->{state} && _d('Nibble differs; must examine rows');
 
411
   return $self->{state} == 0 && $self->{nibble} && !$self->{cached_row};
 
412
}
 
413
 
 
414
sub pending_changes {
 
415
   my ( $self ) = @_;
 
416
   if ( $self->{state} ) {
 
417
      MKDEBUG && _d('There are pending changes');
 
418
      return 1;
 
419
   }
 
420
   else {
 
421
      MKDEBUG && _d('No pending changes');
 
422
      return 0;
 
423
   }
 
424
}
 
425
 
 
426
sub key_cols {
 
427
   my ( $self ) = @_;
 
428
   my @cols;
 
429
   if ( $self->{state} == 0 ) {
 
430
      @cols = qw(chunk_num);
 
431
   }
 
432
   else {
 
433
      @cols = @{$self->{key_cols}};
 
434
   }
 
435
   MKDEBUG && _d('State', $self->{state},',', 'key cols', join(', ', @cols));
 
436
   return \@cols;
 
437
}
 
438
 
 
439
sub _d {
 
440
   my ($package, undef, $line) = caller 0;
 
441
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
 
442
        map { defined $_ ? $_ : 'undef' }
 
443
        @_;
 
444
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
 
445
}
 
446
 
 
447
1;
 
448
}
 
449
# ###########################################################################
 
450
# End TableSyncNibble package
 
451
# ###########################################################################