1
# This program is copyright 2011 Percona Inc.
2
# This program is copyright 2007-2010 Baron Schwartz.
3
# Feedback and improvements are welcome.
5
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
6
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
7
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
9
# This program is free software; you can redistribute it and/or modify it under
10
# the terms of the GNU General Public License as published by the Free Software
11
# Foundation, version 2; OR the Perl Artistic License. On UNIX and similar
12
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
15
# You should have received a copy of the GNU General Public License along with
16
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
17
# Place, Suite 330, Boston, MA 02111-1307 USA.
18
# ###########################################################################
19
# ChangeHandler package $Revision: 6785 $
20
# ###########################################################################
22
# Package: ChangeHandler
23
# ChangeHandler creates SQL statements for changing rows in a table.
25
package ChangeHandler;
28
use warnings FATAL => 'all';
29
use English qw(-no_match_vars);
30
use constant MKDEBUG => $ENV{MKDEBUG} || 0;
32
my $DUPE_KEY = qr/Duplicate entry/;
33
our @ACTIONS = qw(DELETE REPLACE INSERT UPDATE);
41
# left_db - Left database (src by default)
42
# left_tbl - Left table (src by default)
43
# right_db - Right database (dst by default)
44
# right_tbl - Right table (dst by default)
45
# actions - arrayref of subroutines to call when handling a change.
46
# replace - Do UPDATE/INSERT as REPLACE.
47
# queue - Queue changes until <process_rows()> is called with a greater
49
# Quoter - <Quoter> object
52
# tbl_struct - Used to sort columns and detect binary columns
53
# hex_blob - HEX() BLOB columns (default yes)
56
# ChangeHandler object
58
my ( $class, %args ) = @_;
59
foreach my $arg ( qw(Quoter left_db left_tbl right_db right_tbl
61
die "I need a $arg argument" unless defined $args{$arg};
63
my $q = $args{Quoter};
68
left_db_tbl => $q->quote(@args{qw(left_db left_tbl)}),
69
right_db_tbl => $q->quote(@args{qw(right_db right_tbl)}),
72
# By default left is source and right is dest. With bidirectional
73
# syncing this can change. See set_src().
74
$self->{src_db_tbl} = $self->{left_db_tbl};
75
$self->{dst_db_tbl} = $self->{right_db_tbl};
77
# Init and zero changes for all actions.
78
map { $self->{$_} = [] } @ACTIONS;
79
$self->{changes} = { map { $_ => 0 } @ACTIONS };
81
return bless $self, $class;
85
# Set the fetch-back dbh. If I'm supposed to fetch-back, that means I have
86
# to get the full row from the database. For example, someone might call
87
# me like so: $me->change('UPDATE', { a => 1 }) But 'a' is only the primary
88
# key. I now need to select that row and make an UPDATE statement with all
92
# $dbh - dbh to use for fetching-back values
94
my ( $self, $dbh ) = @_;
95
$self->{fetch_back} = $dbh;
96
MKDEBUG && _d('Set fetch back dbh', $dbh);
101
# Set which side of left-right pair is the source.
102
# For bidirectional syncing both tables are src and dst. Internally,
103
# we refer to the tables generically as the left and right. Either
104
# one can be src or dst, as set by this sub when called by the caller.
105
# Other subs don't know to which table src or dst point. They just
106
# fetchback from src and change dst. If the optional $dbh arg is
107
# given, fetch_back() is set with it, too.
110
# $src - Hashref with source host information
111
# $dbh - Set <fetch_back()> with this dbh if given
113
my ( $self, $src, $dbh ) = @_;
114
die "I need a src argument" unless $src;
115
if ( lc $src eq 'left' ) {
116
$self->{src_db_tbl} = $self->{left_db_tbl};
117
$self->{dst_db_tbl} = $self->{right_db_tbl};
119
elsif ( lc $src eq 'right' ) {
120
$self->{src_db_tbl} = $self->{right_db_tbl};
121
$self->{dst_db_tbl} = $self->{left_db_tbl};
124
die "src argument must be either 'left' or 'right'"
126
MKDEBUG && _d('Set src to', $src);
127
$self->fetch_back($dbh) if $dbh;
132
# Return current source db.tbl (could be left or right table).
135
# Source database-qualified table name
138
return $self->{src_db_tbl};
142
# Return current destination db.tbl (could be left or right table).
145
# Destination database-qualified table name
148
return $self->{dst_db_tbl};
152
# Call the user-provied actions. Actions are passed an action statement
153
# and an optional dbh. This sub is not called directly; it's called
154
# by <change()> or <process_rows()>.
157
# sql - A SQL statement
158
# dbh - optional dbh passed to the action callback
160
my ( $self, $sql, $dbh ) = @_;
161
MKDEBUG && _d('Calling subroutines on', $dbh, $sql);
162
foreach my $action ( @{$self->{actions}} ) {
163
$action->($sql, $dbh);
169
# Make an action SQL statment for the given parameters if not queueing.
170
# This sub calls <_take_action()>, passing the action statement and
171
# optional dbh. If queueing, the parameters are saved and the same work
172
# is done in <process_rows()>. Queueing does not work with bidirectional
176
# action - One of @ACTIONS
177
# row - Hashref of row data
178
# cols - Arrayref of column names
179
# dbh - Optional dbh passed to <_take_action()>
181
my ( $self, $action, $row, $cols, $dbh ) = @_;
182
MKDEBUG && _d($dbh, $action, 'where', $self->make_where_clause($row, $cols));
184
# Undef action means don't do anything. This allows deeply
185
# nested callers to avoid/skip a change without dying.
186
return unless $action;
189
$self->{replace} && $action ne 'DELETE' ? 'REPLACE' : $action
191
if ( $self->{queue} ) {
192
$self->__queue($action, $row, $cols, $dbh);
196
my $func = "make_$action";
197
$self->_take_action($self->$func($row, $cols), $dbh);
199
if ( $EVAL_ERROR =~ m/$DUPE_KEY/ ) {
200
MKDEBUG && _d('Duplicate key violation; will queue and rewrite');
202
$self->{replace} = 1;
203
$self->__queue($action, $row, $cols, $dbh);
205
elsif ( $EVAL_ERROR ) {
213
# Queue an action for later execution. This sub is called by <change()>
214
# <process_rows()> to defer action.
217
# action - One of @ACTIONS
218
# row - Hashref of row data
219
# cols - Arrayref of column names
220
# dbh - Optional dbh passed to <_take_action()>
222
my ( $self, $action, $row, $cols, $dbh ) = @_;
223
MKDEBUG && _d('Queueing change for later');
224
if ( $self->{replace} ) {
225
$action = $action eq 'DELETE' ? $action : 'REPLACE';
227
push @{$self->{$action}}, [ $row, $cols, $dbh ];
231
# Make changes to rows created/queued earlier.
232
# If called with 1, will process rows that have been deferred from instant
233
# processing. If no arg, will process all rows.
236
# $queue_level - Queue level caller is in
237
# $trace_msg - Optional string to append to each SQL statement for
238
# tracing them in binary logs.
240
my ( $self, $queue_level, $trace_msg ) = @_;
243
if ( $queue_level && $queue_level < $self->{queue} ) { # see redo below!
244
MKDEBUG && _d('Not processing now', $queue_level, '<', $self->{queue});
247
MKDEBUG && _d('Processing rows:');
250
foreach my $action ( @ACTIONS ) {
251
my $func = "make_$action";
252
my $rows = $self->{$action};
253
MKDEBUG && _d(scalar(@$rows), 'to', $action);
256
# Each row is an arrayref like:
258
# { col1 => val1, colN => ... },
259
# [ col1, colN, ... ],
263
my $sql = $self->$func(@$row);
264
$sql .= " /*maatkit $trace_msg*/" if $trace_msg;
265
$self->_take_action($sql, $row->[2]);
270
if ( !$error_count++ && $EVAL_ERROR =~ m/$DUPE_KEY/ ) {
271
MKDEBUG && _d('Duplicate key violation; re-queueing and rewriting');
272
$self->{queue}++; # Defer rows to the very end
273
$self->{replace} = 1;
274
$self->__queue($cur_act, @$row);
277
elsif ( $EVAL_ERROR ) {
284
# Make a DELETE statement. DELETE never needs to be fetched back.
287
# $row - Hashref with row values
288
# $cols - Arrayref with column names
291
# A DELETE statement for the given row and columns
293
my ( $self, $row, $cols ) = @_;
294
MKDEBUG && _d('Make DELETE');
295
return "DELETE FROM $self->{dst_db_tbl} WHERE "
296
. $self->make_where_clause($row, $cols)
301
# Make an UPDATE statement.
304
# $row - Hashref with row values
305
# $cols - Arrayref with column names
308
# An UPDATE statement for the given row and columns
310
my ( $self, $row, $cols ) = @_;
311
MKDEBUG && _d('Make UPDATE');
312
if ( $self->{replace} ) {
313
return $self->make_row('REPLACE', $row, $cols);
315
my %in_where = map { $_ => 1 } @$cols;
316
my $where = $self->make_where_clause($row, $cols);
318
if ( my $dbh = $self->{fetch_back} ) {
319
my $sql = $self->make_fetch_back_query($where);
320
MKDEBUG && _d('Fetching data on dbh', $dbh, 'for UPDATE:', $sql);
321
my $res = $dbh->selectrow_hashref($sql);
322
@{$row}{keys %$res} = values %$res;
323
@cols = $self->sort_cols($res);
326
@cols = $self->sort_cols($row);
328
return "UPDATE $self->{dst_db_tbl} SET "
330
$self->{Quoter}->quote($_)
331
. '=' . $self->{Quoter}->quote_val($row->{$_})
332
} grep { !$in_where{$_} } @cols)
333
. " WHERE $where LIMIT 1";
337
# Make an INSERT statement. This sub is stub for <make_row()> which
338
# does the real work.
341
# $row - Hashref with row values
342
# $cols - Arrayref with column names
345
# An INSERT statement for the given row and columns
347
my ( $self, $row, $cols ) = @_;
348
MKDEBUG && _d('Make INSERT');
349
if ( $self->{replace} ) {
350
return $self->make_row('REPLACE', $row, $cols);
352
return $self->make_row('INSERT', $row, $cols);
356
# Make a REPLACE statement. This sub is a stub for <make_row()> which
357
# does the real work.
360
# $row - Hashref with row values
361
# $cols - Arrayref with column names
364
# A REPLACE statement for the given row and columns
366
my ( $self, $row, $cols ) = @_;
367
MKDEBUG && _d('Make REPLACE');
368
return $self->make_row('REPLACE', $row, $cols);
372
# Make an INSERT or REPLACE statement. Values from $row are quoted
373
# with <Quoter::quote_val()>.
376
# $verb - "INSERT" or "REPLACE"
377
# $row - Hashref with row values
378
# $cols - Arrayref with column names
383
my ( $self, $verb, $row, $cols ) = @_;
385
if ( my $dbh = $self->{fetch_back} ) {
386
my $where = $self->make_where_clause($row, $cols);
387
my $sql = $self->make_fetch_back_query($where);
388
MKDEBUG && _d('Fetching data on dbh', $dbh, 'for', $verb, ':', $sql);
389
my $res = $dbh->selectrow_hashref($sql);
390
@{$row}{keys %$res} = values %$res;
391
@cols = $self->sort_cols($res);
394
@cols = $self->sort_cols($row);
396
my $q = $self->{Quoter};
397
return "$verb INTO $self->{dst_db_tbl}("
398
. join(', ', map { $q->quote($_) } @cols)
400
. join(', ', map { $q->quote_val($_) } @{$row}{@cols} )
404
# Sub: make_where_clause
405
# Make a WHERE clause. Values are quoted with <Quoter::quote_val()>.
408
# $row - Hashref with row values
409
# $cols - Arrayref with column names
412
# A WHERE clause without the word "WHERE"
413
sub make_where_clause {
414
my ( $self, $row, $cols ) = @_;
416
my $val = $row->{$_};
417
my $sep = defined $val ? '=' : ' IS ';
418
$self->{Quoter}->quote($_) . $sep . $self->{Quoter}->quote_val($val);
420
return join(' AND ', @clauses);
425
# Get a summary of changes made.
428
# Hash of changes where the keys are actions like "DELETE" and the values
429
# are how many of the action were made
432
return %{$self->{changes}};
437
# Sort a row's columns based on their real order in the table.
438
# This requires that the optional tbl_struct arg was passed to <new()>.
439
# If not, the rows are sorted alphabetically.
442
# $row - Hashref with row values
445
# Array of column names
447
my ( $self, $row ) = @_;
449
if ( $self->{tbl_struct} ) {
450
my $pos = $self->{tbl_struct}->{col_posn};
453
$pos->{$a} <=> $pos->{$b}
456
if ( !defined $pos->{$_} ) {
457
push @not_in_tbl, $_;
465
push @cols, @not_in_tbl if @not_in_tbl;
468
@cols = sort keys %$row;
473
# Sub: make_fetch_back_query
474
# Make a SELECT statement to fetch-back values.
475
# This requires that the optional tbl_struct arg was passed to <new()>.
478
# $where - Optional WHERE clause without the word "WHERE"
482
sub make_fetch_back_query {
483
my ( $self, $where ) = @_;
484
die "I need a where argument" unless $where;
486
my $tbl_struct = $self->{tbl_struct};
491
if ( $self->{hex_blob}
492
&& $tbl_struct->{type_for}->{$col} =~ m/blob|text|binary/ ) {
493
$col = "IF(`$col`='', '', CONCAT('0x', HEX(`$col`))) AS `$col`";
499
} @{ $tbl_struct->{cols} }
503
# This shouldn't happen in the real world.
504
MKDEBUG && _d('Failed to make explicit columns list from tbl struct');
508
return "SELECT $cols FROM $self->{src_db_tbl} WHERE $where LIMIT 1";
512
my ($package, undef, $line) = caller 0;
513
@_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
514
map { defined $_ ? $_ : 'undef' }
516
print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
521
# ###########################################################################
522
# End ChangeHandler package
523
# ###########################################################################