~percona-toolkit-dev/percona-toolkit/fix-log-parser-writer-bug-963225

« back to all changes in this revision

Viewing changes to lib/ChangeHandler.pm

  • Committer: Daniel Nichter
  • Date: 2011-06-24 17:22:06 UTC
  • Revision ID: daniel@percona.com-20110624172206-c7q4s4ad6r260zz6
Add lib/, t/lib/, and sandbox/.  All modules are updated and passing on MySQL 5.1.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# This program is copyright 2011 Percona Inc.
 
2
# This program is copyright 2007-2010 Baron Schwartz.
 
3
# Feedback and improvements are welcome.
 
4
#
 
5
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
 
6
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
 
7
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
 
8
#
 
9
# This program is free software; you can redistribute it and/or modify it under
 
10
# the terms of the GNU General Public License as published by the Free Software
 
11
# Foundation, version 2; OR the Perl Artistic License.  On UNIX and similar
 
12
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
 
13
# licenses.
 
14
#
 
15
# You should have received a copy of the GNU General Public License along with
 
16
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
 
17
# Place, Suite 330, Boston, MA  02111-1307  USA.
 
18
# ###########################################################################
 
19
# ChangeHandler package $Revision: 6785 $
 
20
# ###########################################################################
 
21
 
 
22
# Package: ChangeHandler
 
23
# ChangeHandler creates SQL statements for changing rows in a table.
 
24
{
 
25
package ChangeHandler;
 
26
 
 
27
use strict;
 
28
use warnings FATAL => 'all';
 
29
use English qw(-no_match_vars);
 
30
use constant MKDEBUG => $ENV{MKDEBUG} || 0;
 
31
 
 
32
my $DUPE_KEY  = qr/Duplicate entry/;
 
33
our @ACTIONS  = qw(DELETE REPLACE INSERT UPDATE);
 
34
 
 
35
# Sub: new
 
36
#
 
37
# Parameters:
 
38
#   %args - Arguments
 
39
#
 
40
# Required Arguments:
 
41
#   left_db   - Left database (src by default)
 
42
#   left_tbl  - Left table (src by default)
 
43
#   right_db  - Right database (dst by default)
 
44
#   right_tbl - Right table (dst by default)
 
45
#   actions   - arrayref of subroutines to call when handling a change.
 
46
#   replace   - Do UPDATE/INSERT as REPLACE.
 
47
#   queue     - Queue changes until <process_rows()> is called with a greater
 
48
#               queue level.
 
49
#   Quoter    - <Quoter> object
 
50
#
 
51
# Optional Arguments:
 
52
#   tbl_struct - Used to sort columns and detect binary columns
 
53
#   hex_blob   - HEX() BLOB columns (default yes)
 
54
#
 
55
# Returns:
 
56
#   ChangeHandler object
 
57
sub new {
 
58
   my ( $class, %args ) = @_;
 
59
   foreach my $arg ( qw(Quoter left_db left_tbl right_db right_tbl
 
60
                        replace queue) ) {
 
61
      die "I need a $arg argument" unless defined $args{$arg};
 
62
   }
 
63
   my $q = $args{Quoter};
 
64
 
 
65
   my $self = {
 
66
      hex_blob     => 1,
 
67
      %args,
 
68
      left_db_tbl  => $q->quote(@args{qw(left_db left_tbl)}),
 
69
      right_db_tbl => $q->quote(@args{qw(right_db right_tbl)}),
 
70
   };
 
71
 
 
72
   # By default left is source and right is dest.  With bidirectional
 
73
   # syncing this can change.  See set_src().
 
74
   $self->{src_db_tbl} = $self->{left_db_tbl};
 
75
   $self->{dst_db_tbl} = $self->{right_db_tbl};
 
76
 
 
77
   # Init and zero changes for all actions.
 
78
   map { $self->{$_} = [] } @ACTIONS;
 
79
   $self->{changes} = { map { $_ => 0 } @ACTIONS };
 
80
 
 
81
   return bless $self, $class;
 
82
}
 
83
 
 
84
# Sub: fetch_back
 
85
#   Set the fetch-back dbh.  If I'm supposed to fetch-back, that means I have
 
86
#   to get the full row from the database.  For example, someone might call
 
87
#   me like so: $me->change('UPDATE', { a => 1 })  But 'a' is only the primary
 
88
#   key. I now need to select that row and make an UPDATE statement with all
 
89
#   of its columns.
 
90
#
 
91
# Parameters:
 
92
#   $dbh - dbh to use for fetching-back values
 
93
sub fetch_back {
 
94
   my ( $self, $dbh ) = @_;
 
95
   $self->{fetch_back} = $dbh;
 
96
   MKDEBUG && _d('Set fetch back dbh', $dbh);
 
97
   return;
 
98
}
 
99
 
 
100
# Sub: set_src
 
101
#   Set which side of left-right pair is the source.
 
102
#   For bidirectional syncing both tables are src and dst.  Internally,
 
103
#   we refer to the tables generically as the left and right.  Either
 
104
#   one can be src or dst, as set by this sub when called by the caller.
 
105
#   Other subs don't know to which table src or dst point.  They just
 
106
#   fetchback from src and change dst.  If the optional $dbh arg is
 
107
#   given, fetch_back() is set with it, too.
 
108
#
 
109
# Parameters:
 
110
#   $src - Hashref with source host information
 
111
#   $dbh - Set <fetch_back()> with this dbh if given
 
112
sub set_src {
 
113
   my ( $self, $src, $dbh ) = @_;
 
114
   die "I need a src argument" unless $src;
 
115
   if ( lc $src eq 'left' ) {
 
116
      $self->{src_db_tbl} = $self->{left_db_tbl};
 
117
      $self->{dst_db_tbl} = $self->{right_db_tbl};
 
118
   }
 
119
   elsif ( lc $src eq 'right' ) {
 
120
      $self->{src_db_tbl} = $self->{right_db_tbl};
 
121
      $self->{dst_db_tbl} = $self->{left_db_tbl}; 
 
122
   }
 
123
   else {
 
124
      die "src argument must be either 'left' or 'right'"
 
125
   }
 
126
   MKDEBUG && _d('Set src to', $src);
 
127
   $self->fetch_back($dbh) if $dbh;
 
128
   return;
 
129
}
 
130
 
 
131
# Sub: src
 
132
#   Return current source db.tbl (could be left or right table).
 
133
#
 
134
# Returns:
 
135
#   Source database-qualified table name
 
136
sub src {
 
137
   my ( $self ) = @_;
 
138
   return $self->{src_db_tbl};
 
139
}
 
140
 
 
141
# Sub: dst
 
142
#   Return current destination db.tbl (could be left or right table).
 
143
#
 
144
# Returns:
 
145
#   Destination database-qualified table name
 
146
sub dst {
 
147
   my ( $self ) = @_;
 
148
   return $self->{dst_db_tbl};
 
149
}
 
150
 
 
151
# Sub: _take_action
 
152
#   Call the user-provied actions.  Actions are passed an action statement
 
153
#   and an optional dbh.  This sub is not called directly; it's called
 
154
#   by <change()> or <process_rows()>.
 
155
#
 
156
# Parameters:
 
157
#   sql - A SQL statement
 
158
#   dbh - optional dbh passed to the action callback
 
159
sub _take_action {
 
160
   my ( $self, $sql, $dbh ) = @_;
 
161
   MKDEBUG && _d('Calling subroutines on', $dbh, $sql);
 
162
   foreach my $action ( @{$self->{actions}} ) {
 
163
      $action->($sql, $dbh);
 
164
   }
 
165
   return;
 
166
}
 
167
 
 
168
# Sub: change
 
169
#   Make an action SQL statment for the given parameters if not queueing.
 
170
#   This sub calls <_take_action()>, passing the action statement and
 
171
#   optional dbh.  If queueing, the parameters are saved and the same work
 
172
#   is done in <process_rows()>.  Queueing does not work with bidirectional
 
173
#   syncs.
 
174
#
 
175
# Parameters:
 
176
#   action - One of @ACTIONS
 
177
#   row    - Hashref of row data
 
178
#   cols   - Arrayref of column names
 
179
#   dbh    - Optional dbh passed to <_take_action()>
 
180
sub change {
 
181
   my ( $self, $action, $row, $cols, $dbh ) = @_;
 
182
   MKDEBUG && _d($dbh, $action, 'where', $self->make_where_clause($row, $cols));
 
183
 
 
184
   # Undef action means don't do anything.  This allows deeply
 
185
   # nested callers to avoid/skip a change without dying.
 
186
   return unless $action;
 
187
 
 
188
   $self->{changes}->{
 
189
      $self->{replace} && $action ne 'DELETE' ? 'REPLACE' : $action
 
190
   }++;
 
191
   if ( $self->{queue} ) {
 
192
      $self->__queue($action, $row, $cols, $dbh);
 
193
   }
 
194
   else {
 
195
      eval {
 
196
         my $func = "make_$action";
 
197
         $self->_take_action($self->$func($row, $cols), $dbh);
 
198
      };
 
199
      if ( $EVAL_ERROR =~ m/$DUPE_KEY/ ) {
 
200
         MKDEBUG && _d('Duplicate key violation; will queue and rewrite');
 
201
         $self->{queue}++;
 
202
         $self->{replace} = 1;
 
203
         $self->__queue($action, $row, $cols, $dbh);
 
204
      }
 
205
      elsif ( $EVAL_ERROR ) {
 
206
         die $EVAL_ERROR;
 
207
      }
 
208
   }
 
209
   return;
 
210
}
 
211
 
 
212
# Sub: __queue
 
213
#   Queue an action for later execution.  This sub is called by <change()>
 
214
#   <process_rows()> to defer action.
 
215
#
 
216
# Parameters:
 
217
#   action - One of @ACTIONS
 
218
#   row    - Hashref of row data
 
219
#   cols   - Arrayref of column names
 
220
#   dbh    - Optional dbh passed to <_take_action()>
 
221
sub __queue {
 
222
   my ( $self, $action, $row, $cols, $dbh ) = @_;
 
223
   MKDEBUG && _d('Queueing change for later');
 
224
   if ( $self->{replace} ) {
 
225
      $action = $action eq 'DELETE' ? $action : 'REPLACE';
 
226
   }
 
227
   push @{$self->{$action}}, [ $row, $cols, $dbh ];
 
228
}
 
229
 
 
230
# Sub: process_rows
 
231
#   Make changes to rows created/queued earlier.
 
232
#   If called with 1, will process rows that have been deferred from instant
 
233
#   processing.  If no arg, will process all rows.
 
234
#
 
235
# Parameters:
 
236
#   $queue_level - Queue level caller is in
 
237
#   $trace_msg   - Optional string to append to each SQL statement for
 
238
#                  tracing them in binary logs.
 
239
sub process_rows {
 
240
   my ( $self, $queue_level, $trace_msg ) = @_;
 
241
   my $error_count = 0;
 
242
   TRY: {
 
243
      if ( $queue_level && $queue_level < $self->{queue} ) { # see redo below!
 
244
         MKDEBUG && _d('Not processing now', $queue_level, '<', $self->{queue});
 
245
         return;
 
246
      }
 
247
      MKDEBUG && _d('Processing rows:');
 
248
      my ($row, $cur_act);
 
249
      eval {
 
250
         foreach my $action ( @ACTIONS ) {
 
251
            my $func = "make_$action";
 
252
            my $rows = $self->{$action};
 
253
            MKDEBUG && _d(scalar(@$rows), 'to', $action);
 
254
            $cur_act = $action;
 
255
            while ( @$rows ) {
 
256
               # Each row is an arrayref like:
 
257
               # [
 
258
               #   { col1 => val1, colN => ... },
 
259
               #   [ col1, colN, ... ],
 
260
               #   dbh,  # optional
 
261
               # ]
 
262
               $row    = shift @$rows;
 
263
               my $sql = $self->$func(@$row);
 
264
               $sql   .= " /*maatkit $trace_msg*/" if $trace_msg;
 
265
               $self->_take_action($sql, $row->[2]);
 
266
            }
 
267
         }
 
268
         $error_count = 0;
 
269
      };
 
270
      if ( !$error_count++ && $EVAL_ERROR =~ m/$DUPE_KEY/ ) {
 
271
         MKDEBUG && _d('Duplicate key violation; re-queueing and rewriting');
 
272
         $self->{queue}++; # Defer rows to the very end
 
273
         $self->{replace} = 1;
 
274
         $self->__queue($cur_act, @$row);
 
275
         redo TRY;
 
276
      }
 
277
      elsif ( $EVAL_ERROR ) {
 
278
         die $EVAL_ERROR;
 
279
      }
 
280
   }
 
281
}
 
282
 
 
283
# Sub: make_DELETE
 
284
#   Make a DELETE statement.  DELETE never needs to be fetched back.
 
285
#
 
286
# Parameters:
 
287
#   $row  - Hashref with row values
 
288
#   $cols - Arrayref with column names
 
289
#
 
290
# Returns:
 
291
#   A DELETE statement for the given row and columns
 
292
sub make_DELETE {
 
293
   my ( $self, $row, $cols ) = @_;
 
294
   MKDEBUG && _d('Make DELETE');
 
295
   return "DELETE FROM $self->{dst_db_tbl} WHERE "
 
296
      . $self->make_where_clause($row, $cols)
 
297
      . ' LIMIT 1';
 
298
}
 
299
 
 
300
# Sub: make_UPDATE
 
301
#   Make an UPDATE statement.
 
302
#
 
303
# Parameters:
 
304
#   $row  - Hashref with row values
 
305
#   $cols - Arrayref with column names
 
306
#
 
307
# Returns:
 
308
#   An UPDATE statement for the given row and columns
 
309
sub make_UPDATE {
 
310
   my ( $self, $row, $cols ) = @_;
 
311
   MKDEBUG && _d('Make UPDATE');
 
312
   if ( $self->{replace} ) {
 
313
      return $self->make_row('REPLACE', $row, $cols);
 
314
   }
 
315
   my %in_where = map { $_ => 1 } @$cols;
 
316
   my $where = $self->make_where_clause($row, $cols);
 
317
   my @cols;
 
318
   if ( my $dbh = $self->{fetch_back} ) {
 
319
      my $sql = $self->make_fetch_back_query($where);
 
320
      MKDEBUG && _d('Fetching data on dbh', $dbh, 'for UPDATE:', $sql);
 
321
      my $res = $dbh->selectrow_hashref($sql);
 
322
      @{$row}{keys %$res} = values %$res;
 
323
      @cols = $self->sort_cols($res);
 
324
   }
 
325
   else {
 
326
      @cols = $self->sort_cols($row);
 
327
   }
 
328
   return "UPDATE $self->{dst_db_tbl} SET "
 
329
      . join(', ', map {
 
330
            $self->{Quoter}->quote($_)
 
331
            . '=' .  $self->{Quoter}->quote_val($row->{$_})
 
332
         } grep { !$in_where{$_} } @cols)
 
333
      . " WHERE $where LIMIT 1";
 
334
}
 
335
 
 
336
# Sub: make_INSERT
 
337
#   Make an INSERT statement.  This sub is stub for <make_row()> which
 
338
#   does the real work.
 
339
#
 
340
# Parameters:
 
341
#   $row  - Hashref with row values
 
342
#   $cols - Arrayref with column names
 
343
#
 
344
# Returns:
 
345
#   An INSERT statement for the given row and columns
 
346
sub make_INSERT {
 
347
   my ( $self, $row, $cols ) = @_;
 
348
   MKDEBUG && _d('Make INSERT');
 
349
   if ( $self->{replace} ) {
 
350
      return $self->make_row('REPLACE', $row, $cols);
 
351
   }
 
352
   return $self->make_row('INSERT', $row, $cols);
 
353
}
 
354
 
 
355
# Sub: make_REPLACE
 
356
#   Make a REPLACE statement.  This sub is a stub for <make_row()> which
 
357
#   does the real work.
 
358
#
 
359
# Parameters:
 
360
#   $row  - Hashref with row values
 
361
#   $cols - Arrayref with column names
 
362
#
 
363
# Returns:
 
364
#   A REPLACE statement for the given row and columns
 
365
sub make_REPLACE {
 
366
   my ( $self, $row, $cols ) = @_;
 
367
   MKDEBUG && _d('Make REPLACE');
 
368
   return $self->make_row('REPLACE', $row, $cols);
 
369
}
 
370
 
 
371
# Sub: make_row
 
372
#   Make an INSERT or REPLACE statement.  Values from $row are quoted
 
373
#   with <Quoter::quote_val()>.
 
374
#
 
375
# Parameters:
 
376
#   $verb - "INSERT" or "REPLACE"
 
377
#   $row  - Hashref with row values
 
378
#   $cols - Arrayref with column names
 
379
#
 
380
# Returns:
 
381
#   A SQL statement
 
382
sub make_row {
 
383
   my ( $self, $verb, $row, $cols ) = @_;
 
384
   my @cols; 
 
385
   if ( my $dbh = $self->{fetch_back} ) {
 
386
      my $where = $self->make_where_clause($row, $cols);
 
387
      my $sql   = $self->make_fetch_back_query($where);
 
388
      MKDEBUG && _d('Fetching data on dbh', $dbh, 'for', $verb, ':', $sql);
 
389
      my $res = $dbh->selectrow_hashref($sql);
 
390
      @{$row}{keys %$res} = values %$res;
 
391
      @cols = $self->sort_cols($res);
 
392
   }
 
393
   else {
 
394
      @cols = $self->sort_cols($row);
 
395
   }
 
396
   my $q = $self->{Quoter};
 
397
   return "$verb INTO $self->{dst_db_tbl}("
 
398
      . join(', ', map { $q->quote($_) } @cols)
 
399
      . ') VALUES ('
 
400
      . join(', ', map { $q->quote_val($_) } @{$row}{@cols} )
 
401
      . ')';
 
402
}
 
403
 
 
404
# Sub: make_where_clause
 
405
#   Make a WHERE clause.  Values are quoted with <Quoter::quote_val()>.
 
406
#
 
407
# Parameters:
 
408
#   $row  - Hashref with row values
 
409
#   $cols - Arrayref with column names
 
410
#
 
411
# Returns:
 
412
#   A WHERE clause without the word "WHERE"
 
413
sub make_where_clause {
 
414
   my ( $self, $row, $cols ) = @_;
 
415
   my @clauses = map {
 
416
      my $val = $row->{$_};
 
417
      my $sep = defined $val ? '=' : ' IS ';
 
418
      $self->{Quoter}->quote($_) . $sep . $self->{Quoter}->quote_val($val);
 
419
   } @$cols;
 
420
   return join(' AND ', @clauses);
 
421
}
 
422
 
 
423
 
 
424
# Sub: get_changes
 
425
#   Get a summary of changes made.
 
426
#
 
427
# Returns:
 
428
#   Hash of changes where the keys are actions like "DELETE" and the values
 
429
#   are how many of the action were made
 
430
sub get_changes {
 
431
   my ( $self ) = @_;
 
432
   return %{$self->{changes}};
 
433
}
 
434
 
 
435
 
 
436
# Sub: sort_cols
 
437
#   Sort a row's columns based on their real order in the table.
 
438
#   This requires that the optional tbl_struct arg was passed to <new()>.
 
439
#   If not, the rows are sorted alphabetically.
 
440
#
 
441
# Parameters:
 
442
#   $row - Hashref with row values
 
443
#
 
444
# Returns:
 
445
#   Array of column names
 
446
sub sort_cols {
 
447
   my ( $self, $row ) = @_;
 
448
   my @cols;
 
449
   if ( $self->{tbl_struct} ) { 
 
450
      my $pos = $self->{tbl_struct}->{col_posn};
 
451
      my @not_in_tbl;
 
452
      @cols = sort {
 
453
            $pos->{$a} <=> $pos->{$b}
 
454
         }
 
455
         grep {
 
456
            if ( !defined $pos->{$_} ) {
 
457
               push @not_in_tbl, $_;
 
458
               0;
 
459
            }
 
460
            else {
 
461
               1;
 
462
            }
 
463
         }
 
464
         keys %$row;
 
465
      push @cols, @not_in_tbl if @not_in_tbl;
 
466
   }
 
467
   else {
 
468
      @cols = sort keys %$row;
 
469
   }
 
470
   return @cols;
 
471
}
 
472
 
 
473
# Sub: make_fetch_back_query
 
474
#   Make a SELECT statement to fetch-back values.
 
475
#   This requires that the optional tbl_struct arg was passed to <new()>.
 
476
#
 
477
# Parameters:
 
478
#   $where - Optional WHERE clause without the word "WHERE"
 
479
#
 
480
# Returns:
 
481
#   A SELECT statement
 
482
sub make_fetch_back_query {
 
483
   my ( $self, $where ) = @_;
 
484
   die "I need a where argument" unless $where;
 
485
   my $cols       = '*';
 
486
   my $tbl_struct = $self->{tbl_struct};
 
487
   if ( $tbl_struct ) {
 
488
      $cols = join(', ',
 
489
         map {
 
490
            my $col = $_;
 
491
            if (    $self->{hex_blob}
 
492
                 && $tbl_struct->{type_for}->{$col} =~ m/blob|text|binary/ ) {
 
493
               $col = "IF(`$col`='', '', CONCAT('0x', HEX(`$col`))) AS `$col`";
 
494
            }
 
495
            else {
 
496
               $col = "`$col`";
 
497
            }
 
498
            $col;
 
499
         } @{ $tbl_struct->{cols} }
 
500
      );
 
501
 
 
502
      if ( !$cols ) {
 
503
         # This shouldn't happen in the real world.
 
504
         MKDEBUG && _d('Failed to make explicit columns list from tbl struct');
 
505
         $cols = '*';
 
506
      }
 
507
   }
 
508
   return "SELECT $cols FROM $self->{src_db_tbl} WHERE $where LIMIT 1";
 
509
}
 
510
 
 
511
sub _d {
 
512
   my ($package, undef, $line) = caller 0;
 
513
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
 
514
        map { defined $_ ? $_ : 'undef' }
 
515
        @_;
 
516
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
 
517
}
 
518
 
 
519
1;
 
520
}
 
521
# ###########################################################################
 
522
# End ChangeHandler package
 
523
# ###########################################################################