~vcs-imports/mammoth-replicator/trunk

« back to all changes in this revision

Viewing changes to contrib/dbmirror/DBMirror.pl

  • Committer: alvherre
  • Date: 2005-12-16 21:24:52 UTC
  • Revision ID: svn-v4:db760fc0-0f08-0410-9d63-cc6633f64896:trunk:1
Initial import of the REL8_0_3 sources from the Pgsql CVS repository.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/perl
 
2
#############################################################################
 
3
#
 
4
# DBMirror.pl
 
5
# Contains the Database mirroring script.
 
6
# This script queries the pending table off the database specified
 
7
# (along with the associated schema) for updates that are pending on a 
 
8
# specific host.  The database on that host is then updated with the changes.
 
9
#
 
10
#
 
11
#    Written by Steven Singer (ssinger@navtechinc.com)
 
12
#    (c) 2001-2002 Navtech Systems Support Inc.
 
13
# ALL RIGHTS RESERVED;
 
14
#
 
15
# Permission to use, copy, modify, and distribute this software and its
 
16
# documentation for any purpose, without fee, and without a written agreement
 
17
# is hereby granted, provided that the above copyright notice and this
 
18
# paragraph and the following two paragraphs appear in all copies.
 
19
#
 
20
# IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
 
21
# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
 
22
# LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
 
23
# DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
 
24
# POSSIBILITY OF SUCH DAMAGE.
 
25
#
 
26
# THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
 
27
# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
 
28
# AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
 
29
# ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
 
30
# PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
 
31
#
 
32
#
 
33
 
34
#
 
35
##############################################################################
 
36
# $PostgreSQL: pgsql/contrib/dbmirror/DBMirror.pl,v 1.10 2004-07-02 00:58:09 joe Exp $ 
 
37
#
 
38
##############################################################################
 
39
 
 
40
=head1 NAME
 
41
 
 
42
DBMirror.pl - A Perl module to mirror database changes from a master database
 
43
to a slave.
 
44
 
 
45
=head1 SYNPOSIS
 
46
 
 
47
 
 
48
DBMirror.pl slaveConfigfile.conf
 
49
 
 
50
 
 
51
=head1 DESCRIPTION
 
52
 
 
53
This Perl script will connect to the master database and query its pending 
 
54
table for a list of pending changes.
 
55
 
 
56
The transactions of the original changes to the master will be preserved
 
57
when sending things to the slave.
 
58
 
 
59
=cut
 
60
 
 
61
 
 
62
=head1 METHODS
 
63
 
 
64
=over 4
 
65
 
 
66
=cut
 
67
 
 
68
 
 
69
BEGIN {
 
70
  # add in a global path to files
 
71
  # Pg should be included. 
 
72
}
 
73
 
 
74
 
 
75
use strict;
 
76
use Pg;
 
77
use IO::Handle;
 
78
sub mirrorCommand($$$$$$);
 
79
sub mirrorInsert($$$$$);
 
80
sub mirrorDelete($$$$$);
 
81
sub mirrorUpdate($$$$$);
 
82
sub logErrorMessage($);
 
83
sub setupSlave($);
 
84
sub updateMirrorHostTable($$);
 
85
sub extractData($$);
 
86
local $::masterHost;
 
87
local $::masterDb; 
 
88
local $::masterUser; 
 
89
local $::masterPassword; 
 
90
local $::errorThreshold=5;
 
91
local $::errorEmailAddr=undef;
 
92
local $::sleepInterval=60;
 
93
 
 
94
my %slaveInfoHash;
 
95
local $::slaveInfo = \%slaveInfoHash;
 
96
 
 
97
my $lastErrorMsg;
 
98
my $repeatErrorCount=0;
 
99
 
 
100
my $lastXID;
 
101
my $commandCount=0;
 
102
 
 
103
my $masterConn;
 
104
 
 
105
Main();
 
106
 
 
107
sub Main() {
 
108
  
 
109
#run the configuration file.
 
110
  if ($#ARGV != 0) {
 
111
    die "usage: DBMirror.pl configFile\n";
 
112
  }
 
113
  if( ! defined do $ARGV[0]) {
 
114
    logErrorMessage("Invalid Configuration file $ARGV[0]");
 
115
    die;
 
116
  }
 
117
  
 
118
  if (defined($::syslog))
 
119
  {
 
120
      # log with syslog
 
121
      require Sys::Syslog; 
 
122
      import Sys::Syslog qw(openlog syslog);
 
123
      openlog($0, 'cons,pid', 'user');
 
124
      syslog("info", '%s', "starting $0 script with $ARGV[0]");
 
125
  }
 
126
 
 
127
  my $connectString;
 
128
  if(defined($::masterHost))
 
129
  {
 
130
      $connectString .= "host=$::masterHost ";
 
131
  }
 
132
  if(defined($::masterPort))
 
133
  {
 
134
      $connectString .= "port=$::masterPort ";
 
135
  }
 
136
  $connectString .= "dbname=$::masterDb user=$::masterUser password=$::masterPassword";
 
137
  
 
138
  $masterConn = Pg::connectdb($connectString);
 
139
  
 
140
  unless($masterConn->status == PGRES_CONNECTION_OK) {
 
141
    logErrorMessage("Can't connect to master database\n" .
 
142
                    $masterConn->errorMessage);
 
143
    die;
 
144
  }
 
145
    
 
146
  my $setQuery;
 
147
  $setQuery = "SET search_path = public";
 
148
  my $setResult = $masterConn->exec($setQuery);
 
149
  if($setResult->resultStatus!=PGRES_COMMAND_OK) { 
 
150
    logErrorMessage($masterConn->errorMessage . "\n" . 
 
151
                    $setQuery);
 
152
    die;
 
153
  }
 
154
    
 
155
  my $firstTime = 1;
 
156
  while(1) {
 
157
    if($firstTime == 0) {
 
158
      sleep $::sleepInterval; 
 
159
    } 
 
160
    $firstTime = 0;
 
161
    
 
162
    setupSlave($::slaveInfo);
 
163
   
 
164
   
 
165
    
 
166
    
 
167
    #Obtain a list of pending transactions using ordering by our approximation
 
168
    #to the commit time.  The commit time approximation is taken to be the
 
169
    #SeqId of the last row edit in the transaction.
 
170
    my $pendingTransQuery = "SELECT pd.XID,MAX(SeqId) FROM dbmirror_Pending pd";
 
171
    $pendingTransQuery .= " LEFT JOIN dbmirror_MirroredTransaction mt INNER JOIN";
 
172
    $pendingTransQuery .= " dbmirror_MirrorHost mh ON mt.MirrorHostId = ";
 
173
    $pendingTransQuery .= " mh.MirrorHostId AND mh.SlaveName=";
 
174
    $pendingTransQuery .= " '$::slaveInfo->{\"slaveName\"}' "; 
 
175
    $pendingTransQuery .= " ON pd.XID";
 
176
    $pendingTransQuery .= " = mt.XID WHERE mt.XID is null ";
 
177
   
 
178
 
 
179
    $pendingTransQuery .= " GROUP BY pd.XID";
 
180
    $pendingTransQuery .= " ORDER BY MAX(pd.SeqId)";
 
181
    
 
182
    
 
183
    my $pendingTransResults = $masterConn->exec($pendingTransQuery);
 
184
    unless($pendingTransResults->resultStatus==PGRES_TUPLES_OK) {
 
185
      logErrorMessage("Can't query pending table\n" . $masterConn->errorMessage);
 
186
      die;
 
187
    }
 
188
    
 
189
    my $numPendingTrans = $pendingTransResults->ntuples;
 
190
    my $curTransTuple = 0;
 
191
    
 
192
    
 
193
    #
 
194
    # This loop loops through each pending transaction in the proper order.
 
195
    # The Pending row edits for that transaction will be queried from the 
 
196
    # master and sent + committed to the slaves.
 
197
    while($curTransTuple < $numPendingTrans) {
 
198
      my $XID = $pendingTransResults->getvalue($curTransTuple,0);
 
199
      my $maxSeqId = $pendingTransResults->getvalue($curTransTuple,1);
 
200
      my $seqId;
 
201
 
 
202
     
 
203
      if($::slaveInfo->{'status'} eq 'FileClosed')
 
204
      {
 
205
          openTransactionFile($::slaveInfo,$XID);
 
206
      }
 
207
 
 
208
 
 
209
 
 
210
      my $pendingQuery = "SELECT pnd.SeqId,pnd.TableName,";
 
211
      $pendingQuery .= " pnd.Op,pnddata.IsKey, pnddata.Data AS Data ";
 
212
      $pendingQuery .= " FROM dbmirror_Pending pnd, dbmirror_PendingData pnddata ";
 
213
      $pendingQuery .= " WHERE pnd.SeqId = pnddata.SeqId ";
 
214
     
 
215
      $pendingQuery .= " AND pnd.XID=$XID ORDER BY SeqId, IsKey DESC";
 
216
      
 
217
      
 
218
      my $pendingResults = $masterConn->exec($pendingQuery);
 
219
      unless($pendingResults->resultStatus==PGRES_TUPLES_OK) {
 
220
        logErrorMessage("Can't query pending table\n" . $masterConn->errorMessage);
 
221
        die;
 
222
      }
 
223
      
 
224
      sendQueryToSlaves($XID,"BEGIN");
 
225
            
 
226
      my $numPending = $pendingResults->ntuples;
 
227
      my $curTuple = 0;
 
228
      while ($curTuple < $numPending) {
 
229
        $seqId = $pendingResults->getvalue($curTuple,0);
 
230
        my $tableName = $pendingResults->getvalue($curTuple,1);
 
231
        my $op = $pendingResults->getvalue($curTuple,2);
 
232
        $curTuple = mirrorCommand($seqId,$tableName,$op,$XID,
 
233
                                  $pendingResults,$curTuple) +1;
 
234
        
 
235
      }
 
236
 
 
237
      if($::slaveInfo->{'status'} ne 'DBOpen' &&
 
238
         $::slaveInfo->{'status'} ne 'FileOpen')
 
239
      {
 
240
          last;
 
241
      }
 
242
      sendQueryToSlaves(undef,"COMMIT");
 
243
      #Now commit the transaction.
 
244
      updateMirrorHostTable($XID,$seqId);
 
245
      
 
246
      $pendingResults = undef;
 
247
      $curTransTuple = $curTransTuple +1;
 
248
 
 
249
      if($::slaveInfo->{'status'} eq 'FileOpen')
 
250
      {
 
251
          close ($::slaveInfo->{'TransactionFile'});
 
252
           $::slaveInfo->{"status"} = 'FileClosed';
 
253
 
 
254
      }
 
255
      elsif($::slaveInfo->{'status'} eq 'DBOpen')
 
256
      {
 
257
          if($commandCount > 5000) {
 
258
              $commandCount = 0;
 
259
              $::slaveInfo->{"status"} = 'DBClosed';
 
260
              $::slaveInfo->{"slaveConn"}->reset;
 
261
              #Open the connection right away.
 
262
              openSlaveConnection($::slaveInfo);
 
263
              
 
264
          }
 
265
      }
 
266
 
 
267
    }#while transactions left.
 
268
        
 
269
        $pendingTransResults = undef;
 
270
    
 
271
  }#while(1)
 
272
}#Main
 
273
 
 
274
 
 
275
 
 
276
=item mirrorCommand(SeqId,tableName,op,transId,pendingResults,curTuple)
 
277
 
 
278
Mirrors a single SQL Command(change to a single row) to the slave.
 
279
 
 
280
=over 4
 
281
 
 
282
=item * SeqId
 
283
 
 
284
The id number of the change to mirror.  This is the
 
285
primary key of the pending table.
 
286
 
 
287
 
 
288
=item * tableName
 
289
 
 
290
The name of the table the transaction takes place on.
 
291
 
 
292
=item * op
 
293
 
 
294
The type of operation this transaction is.  'i' for insert, 'u' for update or
 
295
'd' for delete.
 
296
 
 
297
=item * transId
 
298
 
 
299
The Transaction of of the Transaction that this command is part of.
 
300
 
 
301
=item * pendingResults
 
302
 
 
303
A Results set structure returned from Pg::execute that contains the 
 
304
join of the Pending and PendingData tables for all of the pending row
 
305
edits in this transaction. 
 
306
 
 
307
=item * currentTuple 
 
308
 
 
309
 
 
310
The tuple(or row) number of the pendingRow for the command that is about
 
311
to be edited.   If the command is an update then this points to the row
 
312
with IsKey equal to true.  The next row, curTuple+1 is the contains the
 
313
PendingData with IsKey false for the update.
 
314
 
 
315
 
 
316
=item returns
 
317
 
 
318
 
 
319
The tuple number of last tuple for this command.  This might be equal to
 
320
currentTuple or it might be larger (+1 in the case of an Update).
 
321
 
 
322
 
 
323
=back
 
324
 
 
325
=cut
 
326
 
 
327
 
 
328
sub mirrorCommand($$$$$$) {
 
329
    my $seqId = $_[0];
 
330
    my $tableName = $_[1];
 
331
    my $op = $_[2];
 
332
    my $transId = $_[3];
 
333
    my $pendingResults = $_[4];
 
334
    my $currentTuple = $_[5];
 
335
 
 
336
 
 
337
    if($op eq 'i') {
 
338
      $currentTuple = mirrorInsert($seqId,$tableName,$transId,$pendingResults
 
339
                               ,$currentTuple);
 
340
    }
 
341
    if($op eq 'd') {
 
342
      $currentTuple = mirrorDelete($seqId,$tableName,$transId,$pendingResults,
 
343
                               $currentTuple);
 
344
    }
 
345
    if($op eq 'u') {
 
346
      $currentTuple = mirrorUpdate($seqId,$tableName,$transId,$pendingResults,
 
347
                   $currentTuple);
 
348
    }
 
349
    if($op eq 's')  {
 
350
        $currentTuple = mirrorSequence($seqId,$tableName,$transId,$pendingResults,
 
351
                                       $currentTuple);
 
352
    }
 
353
    $commandCount = $commandCount +1;
 
354
    if($commandCount % 100 == 0) {
 
355
    #  print "Sent 100 commmands on SeqId $seqId \n";
 
356
    #  flush STDOUT;
 
357
    }
 
358
    return $currentTuple
 
359
  }
 
360
 
 
361
 
 
362
=item mirrorInsert(transId,tableName,transId,pendingResults,currentTuple)
 
363
 
 
364
Mirrors an INSERT operation to the slave database.  A new row is placed
 
365
in the slave database containing the primary key from pendingKeys along with
 
366
the data fields contained in the row identified by sourceOid.
 
367
 
 
368
=over 4
 
369
 
 
370
=item * transId
 
371
 
 
372
The sequence id of the INSERT operation being mirrored. This is the primary
 
373
key of the pending table.
 
374
 
 
375
=item * tableName
 
376
 
 
377
 
 
378
The name of the table the transaction takes place on.
 
379
 
 
380
=item * sourceOid
 
381
 
 
382
The OID of the row in the master database for which this transaction effects.
 
383
If the transaction is a delete then the operation is not valid.
 
384
 
 
385
=item * transId 
 
386
 
 
387
The Transaction Id of transaction that this insert is part of.
 
388
 
 
389
 
 
390
 
 
391
=item * pendingResults
 
392
 
 
393
A Results set structure returned from Pg::execute that contains the 
 
394
join of the Pending and PendingData tables for all of the pending row
 
395
edits in this transaction. 
 
396
 
 
397
=item * currentTuple 
 
398
 
 
399
 
 
400
The tuple(or row) number of the pendingRow for the command that is about
 
401
to be edited.   In the case of an insert this should point to the one 
 
402
row for the row edit.
 
403
 
 
404
=item returns
 
405
 
 
406
The tuple number of the last tuple for the row edit.  This should be 
 
407
currentTuple.
 
408
 
 
409
 
 
410
=back
 
411
 
 
412
=cut
 
413
 
 
414
 
 
415
sub mirrorInsert($$$$$) {
 
416
    my $seqId = $_[0];
 
417
    my $tableName = $_[1];
 
418
    my $transId = $_[2];
 
419
    my $pendingResults = $_[3];
 
420
    my $currentTuple = $_[4];
 
421
    my $counter;
 
422
    my $column;
 
423
 
 
424
    my $firstIteration=1;
 
425
    my %recordValues = extractData($pendingResults,$currentTuple);
 
426
 
 
427
        
 
428
    #Now build the insert query.
 
429
    my $insertQuery = "INSERT INTO $tableName (";
 
430
    my $valuesQuery = ") VALUES (";
 
431
    foreach $column (keys (%recordValues)) {
 
432
        if($firstIteration==0) {
 
433
            $insertQuery .= " ,";
 
434
            $valuesQuery .= " ,";
 
435
        }
 
436
      $insertQuery .= "\"$column\"";
 
437
      if(defined $recordValues{$column}) {
 
438
        my $quotedValue = $recordValues{$column};
 
439
        $quotedValue =~ s/\\/\\\\/g;
 
440
        $quotedValue =~ s/'/\\'/g;
 
441
        $valuesQuery .= "'$quotedValue'";
 
442
      }
 
443
      else {
 
444
        $valuesQuery .= "null";
 
445
      }
 
446
        $firstIteration=0;
 
447
    }
 
448
    $valuesQuery .= ")";
 
449
    sendQueryToSlaves($transId,$insertQuery . $valuesQuery);
 
450
    return $currentTuple;
 
451
}
 
452
 
 
453
=item mirrorDelete(SeqId,tableName,transId,pendingResult,currentTuple)
 
454
 
 
455
Deletes a single row from the slave database.  The row is identified by the
 
456
primary key for the transaction in the pendingKeys table.
 
457
 
 
458
=over 4
 
459
 
 
460
=item * SeqId
 
461
 
 
462
The Sequence id for this delete request.
 
463
 
 
464
=item * tableName
 
465
 
 
466
The name of the table to delete the row from.
 
467
 
 
468
=item * transId 
 
469
 
 
470
The Transaction Id of the transaction that this command is part of.
 
471
 
 
472
 
 
473
 
 
474
=item * pendingResults
 
475
 
 
476
A Results set structure returned from Pg::execute that contains the 
 
477
join of the Pending and PendingData tables for all of the pending row
 
478
edits in this transaction. 
 
479
 
 
480
=item * currentTuple 
 
481
 
 
482
 
 
483
The tuple(or row) number of the pendingRow for the command that is about
 
484
to be edited.   In the case of a  delete this should point to the one 
 
485
row for the row edit.
 
486
 
 
487
=item returns
 
488
 
 
489
The tuple number of the last tuple for the row edit.  This should be 
 
490
currentTuple.
 
491
 
 
492
 
 
493
=back
 
494
 
 
495
=cut
 
496
 
 
497
 
 
498
sub mirrorDelete($$$$$) {
 
499
    my $seqId = $_[0];
 
500
    my $tableName = $_[1];
 
501
    my $transId = $_[2];
 
502
    my $pendingResult = $_[3];
 
503
    my $currentTuple = $_[4];
 
504
    my %dataHash;
 
505
    my $currentField;
 
506
    my $firstField=1;
 
507
    %dataHash = extractData($pendingResult,$currentTuple);
 
508
 
 
509
    my $counter=0;
 
510
    my $deleteQuery = "DELETE FROM $tableName WHERE ";
 
511
    foreach $currentField (keys %dataHash) {
 
512
      if($firstField==0) {
 
513
        $deleteQuery .= " AND ";
 
514
      }
 
515
      my $currentValue = $dataHash{$currentField};
 
516
      $deleteQuery .= "\"";
 
517
      $deleteQuery .= $currentField;
 
518
      if(defined $currentValue) {
 
519
        $deleteQuery .= "\"='";
 
520
        $deleteQuery .= $currentValue;
 
521
        $deleteQuery .= "'";
 
522
      }
 
523
      else {
 
524
        $deleteQuery .= " is null ";
 
525
      }
 
526
      $counter++;
 
527
      $firstField=0;
 
528
    }
 
529
    sendQueryToSlaves($transId,$deleteQuery);
 
530
    return $currentTuple;
 
531
}
 
532
 
 
533
 
 
534
=item mirrorUpdate(seqId,tableName,transId,pendingResult,currentTuple)
 
535
 
 
536
Mirrors over an edit request to a single row of the database.
 
537
The primary key from before the edit is used to determine which row in the
 
538
slave should be changed.  
 
539
 
 
540
After the edit takes place on the slave its primary key will match the primary 
 
541
key the master had immediatly following the edit.  All other fields will be set
 
542
to the current values.   
 
543
 
 
544
Data integrity is maintained because the mirroring is performed in an 
 
545
SQL transcation so either all pending changes are made or none are.
 
546
 
 
547
=over 4
 
548
 
 
549
=item * seqId 
 
550
 
 
551
The Sequence id of the update.
 
552
 
 
553
=item * tableName
 
554
 
 
555
The name of the table to perform the update on.
 
556
 
 
557
=item * transId
 
558
 
 
559
The transaction Id for the transaction that this command is part of.
 
560
 
 
561
 
 
562
=item * pendingResults
 
563
 
 
564
A Results set structure returned from Pg::execute that contains the 
 
565
join of the Pending and PendingData tables for all of the pending row
 
566
edits in this transaction. 
 
567
 
 
568
=item * currentTuple 
 
569
 
 
570
 
 
571
The tuple(or row) number of the pendingRow for the command that is about
 
572
to be edited.   In the case of a  delete this should point to the one 
 
573
row for the row edit.
 
574
 
 
575
=item returns
 
576
 
 
577
The tuple number of the last tuple for the row edit.  This should be 
 
578
currentTuple +1.  Which points to the non key row of the update.
 
579
 
 
580
 
 
581
=back
 
582
 
 
583
=cut
 
584
 
 
585
sub mirrorUpdate($$$$$) {
 
586
    my $seqId = $_[0];
 
587
    my $tableName = $_[1];
 
588
    my $transId = $_[2];
 
589
    my $pendingResult = $_[3];
 
590
    my $currentTuple = $_[4];
 
591
  
 
592
    my $counter;
 
593
    my $quotedValue;
 
594
    my $updateQuery = "UPDATE $tableName SET ";
 
595
    my $currentField;
 
596
 
 
597
    my %keyValueHash;
 
598
    my %dataValueHash;
 
599
    my $firstIteration=1;
 
600
 
 
601
    #Extract the Key values. This row contains the values of the
 
602
    # key fields before the update occours(the WHERE clause)
 
603
    %keyValueHash = extractData($pendingResult,$currentTuple);
 
604
 
 
605
 
 
606
    #Extract the data values.  This is a SET clause that contains 
 
607
    #values for the entire row AFTER the update.    
 
608
    %dataValueHash = extractData($pendingResult,$currentTuple+1);
 
609
 
 
610
    $firstIteration=1;
 
611
    foreach $currentField (keys (%dataValueHash)) {
 
612
      if($firstIteration==0) {
 
613
        $updateQuery .= ", ";
 
614
      }
 
615
      $updateQuery .= " \"$currentField\"=";
 
616
      my $currentValue = $dataValueHash{$currentField};
 
617
      if(defined $currentValue ) {
 
618
        $quotedValue = $currentValue;
 
619
        $quotedValue =~ s/\\/\\\\/g;
 
620
        $quotedValue =~ s/'/\\'/g;
 
621
        $updateQuery .= "'$quotedValue'";
 
622
        }
 
623
      else {
 
624
        $updateQuery .= "null ";
 
625
      }
 
626
      $firstIteration=0;
 
627
    }
 
628
 
 
629
   
 
630
    $updateQuery .= " WHERE ";
 
631
    $firstIteration=1;
 
632
    foreach $currentField (keys (%keyValueHash)) {   
 
633
      my $currentValue;
 
634
      if($firstIteration==0) {
 
635
        $updateQuery .= " AND ";
 
636
      }
 
637
      $updateQuery .= "\"$currentField\"=";
 
638
      $currentValue = $keyValueHash{$currentField};
 
639
      if(defined $currentValue) {
 
640
        $quotedValue = $currentValue;
 
641
        $quotedValue =~ s/\\/\\\\/g;
 
642
        $quotedValue =~ s/'/\\'/g;
 
643
        $updateQuery .= "'$quotedValue'";
 
644
      }
 
645
      else {
 
646
        $updateQuery .= " null ";
 
647
      }
 
648
      $firstIteration=0;
 
649
    }
 
650
    sendQueryToSlaves($transId,$updateQuery);
 
651
    return $currentTuple+1;
 
652
}
 
653
 
 
654
 
 
655
sub mirrorSequence($$$$$) {
 
656
    my $seqId = $_[0];
 
657
    my $sequenceName = $_[1];
 
658
    my $transId = $_[2];
 
659
    my $pendingResult = $_[3];
 
660
    my $currentTuple = $_[4];
 
661
 
 
662
 
 
663
    my $query;
 
664
    my $sequenceValue = $pendingResult->getvalue($currentTuple,4);
 
665
    $query = sprintf("select setval('%s',%s)",$sequenceName,$sequenceValue);
 
666
 
 
667
    sendQueryToSlaves($transId,$query);
 
668
    return $currentTuple;
 
669
 
 
670
}
 
671
 
 
672
=item sendQueryToSlaves(seqId,sqlQuery)
 
673
 
 
674
Sends an SQL query to the slave.
 
675
 
 
676
 
 
677
=over 4
 
678
 
 
679
=item * seqId
 
680
 
 
681
The sequence Id of the command being sent. Undef if no command is associated 
 
682
with the query being sent.
 
683
 
 
684
=item * sqlQuery
 
685
 
 
686
 
 
687
SQL operation to perform on the slave.
 
688
 
 
689
=back
 
690
 
 
691
=cut
 
692
 
 
693
sub sendQueryToSlaves($$) {
 
694
    my $seqId = $_[0];
 
695
    my  $sqlQuery = $_[1];
 
696
       
 
697
   if($::slaveInfo->{"status"} eq 'DBOpen') {
 
698
       my $queryResult = $::slaveInfo->{"slaveConn"}->exec($sqlQuery);
 
699
       unless($queryResult->resultStatus == PGRES_COMMAND_OK) {
 
700
           my $errorMessage;
 
701
           $errorMessage = "Error sending query  $seqId to " ;
 
702
           $errorMessage .= $::slaveInfo->{"slaveHost"};
 
703
           $errorMessage .=$::slaveInfo->{"slaveConn"}->errorMessage;
 
704
           $errorMessage .= "\n" . $sqlQuery;
 
705
           logErrorMessage($errorMessage);
 
706
           $::slaveInfo->{"slaveConn"}->exec("ROLLBACK");
 
707
           $::slaveInfo->{"status"} = -1;
 
708
       }
 
709
   }
 
710
    elsif($::slaveInfo->{"status"} eq 'FileOpen' ) {
 
711
        my $xfile = $::slaveInfo->{'TransactionFile'};
 
712
        print $xfile  $sqlQuery . ";\n";
 
713
    }
 
714
    
 
715
    
 
716
 
 
717
}
 
718
 
 
719
 
 
720
 
 
721
 
 
722
=item logErrorMessage(error)
 
723
 
 
724
Mails an error message to the users specified $errorEmailAddr
 
725
The error message is also printed to STDERR.
 
726
 
 
727
=over 4
 
728
 
 
729
=item * error
 
730
 
 
731
The error message to log.
 
732
 
 
733
=back
 
734
 
 
735
=cut
 
736
 
 
737
sub logErrorMessage($) {
 
738
    my $error = $_[0];
 
739
 
 
740
    if(defined $lastErrorMsg and $error eq $lastErrorMsg) {
 
741
        if($repeatErrorCount<$::errorThreshold) {
 
742
            $repeatErrorCount++;
 
743
            warn($error);
 
744
            return;
 
745
        }
 
746
 
 
747
    }
 
748
    $repeatErrorCount=0;
 
749
    if(defined $::errorEmailAddr) {
 
750
      my $mailPipe;
 
751
      open (mailPipe, "|/bin/mail -s DBMirror.pl $::errorEmailAddr");
 
752
      print mailPipe "=====================================================\n";
 
753
      print mailPipe "         DBMirror.pl                                 \n";
 
754
      print mailPipe "\n";
 
755
      print mailPipe " The DBMirror.pl script has encountred an error.     \n";
 
756
      print mailPipe " It might indicate that either the master database has\n";
 
757
      print mailPipe " gone down or that the connection to a slave database can\n";
 
758
      print mailPipe " not be made.                                         \n";
 
759
      print mailPipe " Process-Id: $$ on $::masterHost database $::masterDb\n";
 
760
      print mailPipe  "\n";
 
761
      print mailPipe $error;
 
762
      print mailPipe "\n\n\n=================================================\n";
 
763
      close mailPipe;
 
764
    }
 
765
 
 
766
    if (defined($::syslog))
 
767
    {
 
768
        syslog('err', '%s (%m)', $error);
 
769
    }
 
770
 
 
771
    warn($error);    
 
772
    
 
773
    $lastErrorMsg = $error;
 
774
 
 
775
}
 
776
 
 
777
sub setupSlave($) {
 
778
    my $slavePtr = $_[0];
 
779
    
 
780
    
 
781
        $slavePtr->{"status"} = 0;
 
782
        #Determine the MirrorHostId for the slave from the master's database
 
783
        my $resultSet = $masterConn->exec('SELECT MirrorHostId FROM '
 
784
                                          . ' dbmirror_MirrorHost WHERE SlaveName'
 
785
                                          . '=\'' . $slavePtr->{"slaveName"}
 
786
                                          . '\'');
 
787
        if($resultSet->ntuples !=1) {
 
788
            my $errorMessage .= $slavePtr->{"slaveName"} ."\n";
 
789
            $errorMessage .= "Has no MirrorHost entry on master\n";
 
790
            logErrorMessage($errorMessage);
 
791
            $slavePtr->{"status"}=-1;
 
792
            return;
 
793
            
 
794
        }
 
795
        $slavePtr->{"MirrorHostId"} = $resultSet->getvalue(0,0);
 
796
 
 
797
    if(defined($::slaveInfo->{'slaveDb'})) {
 
798
        # We talk directly to a slave database.
 
799
        #
 
800
        if($::slaveInfo->{"status"} ne 'DBOpen')
 
801
        {
 
802
            openSlaveConnection($::slaveInfo);
 
803
        }
 
804
        sendQueryToSlaves(undef,"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
 
805
        sendQueryToSlaves(undef,"SET CONSTRAINTS ALL DEFERRED");
 
806
    }
 
807
    else {
 
808
        $::slaveInfo->{"status"} = 'FileClosed';
 
809
    }
 
810
        
 
811
 
 
812
}
 
813
 
 
814
=item updateMirrorHostTable(lastTransId,lastSeqId)
 
815
 
 
816
Updates the MirroredTransaction table to reflect the fact that
 
817
this transaction has been sent to the current slave.
 
818
 
 
819
=over 4 
 
820
 
 
821
=item * lastTransId
 
822
 
 
823
The Transaction id for the last transaction that has been succesfully mirrored to
 
824
the currently open slaves.
 
825
 
 
826
=item * lastSeqId 
 
827
 
 
828
The Sequence Id of the last command that has been succefully mirrored
 
829
 
 
830
 
 
831
=back
 
832
 
 
833
 
 
834
=cut
 
835
 
 
836
sub updateMirrorHostTable($$) {
 
837
    my $lastTransId = shift;
 
838
    my $lastSeqId = shift;
 
839
 
 
840
 
 
841
    
 
842
    my $deleteTransactionQuery;
 
843
    my $deleteResult;
 
844
    my $updateMasterQuery = "INSERT INTO dbmirror_MirroredTransaction ";
 
845
    $updateMasterQuery .= " (XID,LastSeqId,MirrorHostId)";
 
846
    $updateMasterQuery .= " VALUES ($lastTransId,$lastSeqId,$::slaveInfo->{\"MirrorHostId\"}) ";
 
847
    
 
848
    my $updateResult = $masterConn->exec($updateMasterQuery);
 
849
    unless($updateResult->resultStatus == PGRES_COMMAND_OK) {
 
850
        my $errorMessage = $masterConn->errorMessage . "\n";
 
851
        $errorMessage .= $updateMasterQuery;
 
852
        logErrorMessage($errorMessage);
 
853
        die;
 
854
    }
 
855
#       print "Updated slaves to transaction $lastTransId\n" ;   
 
856
#        flush STDOUT;  
 
857
 
 
858
    #If this transaction has now been mirrored to all mirror hosts
 
859
    #then it can be deleted.
 
860
    $deleteTransactionQuery = 'DELETE FROM dbmirror_Pending WHERE XID='
 
861
        . $lastTransId . ' AND (SELECT COUNT(*) FROM dbmirror_MirroredTransaction'
 
862
        . ' WHERE XID=' . $lastTransId . ')=(SELECT COUNT(*) FROM'
 
863
        . ' dbmirror_MirrorHost)';
 
864
    
 
865
    $deleteResult = $masterConn->exec($deleteTransactionQuery);
 
866
    if($deleteResult->resultStatus!=PGRES_COMMAND_OK) { 
 
867
        logErrorMessage($masterConn->errorMessage . "\n" . 
 
868
                        $deleteTransactionQuery);
 
869
        die;
 
870
    }
 
871
    
 
872
  
 
873
 
 
874
}
 
875
 
 
876
 
 
877
sub extractData($$) {
 
878
  my $pendingResult = $_[0];
 
879
  my $currentTuple = $_[1];
 
880
  my $fnumber;
 
881
  my %valuesHash;
 
882
  $fnumber = 4;
 
883
  my $dataField = $pendingResult->getvalue($currentTuple,$fnumber);
 
884
 
 
885
  while(length($dataField)>0) {
 
886
    # Extract the field name that is surronded by double quotes
 
887
    $dataField =~ m/(\".*?\")/s;
 
888
    my $fieldName = $1;
 
889
    $dataField = substr $dataField ,length($fieldName);
 
890
    $fieldName =~ s/\"//g; #Remove the surronding " signs.
 
891
 
 
892
    if($dataField =~ m/(^= )/s) {
 
893
      #Matched null
 
894
        $dataField = substr $dataField , length($1);
 
895
      $valuesHash{$fieldName}=undef;
 
896
    }
 
897
    elsif ($dataField =~ m/(^=\')/s) {
 
898
      #Has data.
 
899
      my $value;
 
900
      $dataField = substr $dataField ,2; #Skip the ='
 
901
    LOOP: {  #This is to allow us to use last from a do loop.
 
902
             #Recommended in perlsyn manpage.
 
903
      do {
 
904
        my $matchString;
 
905
        #Find the substring ending with the first ' or first \
 
906
        $dataField =~ m/(.*?[\'\\])?/s; 
 
907
        $matchString = $1;
 
908
        $value .= substr $matchString,0,length($matchString)-1;
 
909
 
 
910
        if($matchString =~ m/(\'$)/s) {
 
911
          # $1 runs to the end of the field value.
 
912
            $dataField = substr $dataField,length($matchString)+1;
 
913
            last;
 
914
          
 
915
        }
 
916
        else {
 
917
          #deal with the escape character.
 
918
          #It The character following the escape gets appended.
 
919
            $dataField = substr $dataField,length($matchString);            
 
920
            $dataField =~ s/(^.)//s;        
 
921
            $value .=  $1;
 
922
 
 
923
 
 
924
          
 
925
        }
 
926
        
 
927
           
 
928
      } until(length($dataField)==0);
 
929
  }
 
930
      $valuesHash{$fieldName} = $value;
 
931
      
 
932
      
 
933
      }#else if 
 
934
          else {
 
935
            
 
936
            logErrorMessage "Error in PendingData Sequence Id " .
 
937
                $pendingResult->getvalue($currentTuple,0);
 
938
            die;
 
939
          }
 
940
    
 
941
    
 
942
    
 
943
  } #while
 
944
  return %valuesHash;
 
945
    
 
946
}
 
947
 
 
948
 
 
949
sub openTransactionFile($$)
 
950
{
 
951
    my $slaveInfo = shift;
 
952
    my $XID =shift;
 
953
#      my $now_str = localtime;
 
954
    my $nowsec;
 
955
    my $nowmin;
 
956
    my $nowhour;
 
957
    my $nowmday;
 
958
    my $nowmon;
 
959
    my $nowyear;
 
960
    my $nowwday;
 
961
    my $nowyday;
 
962
    my $nowisdst;
 
963
    ($nowsec,$nowmin,$nowhour,$nowmday,$nowmon,$nowyear,$nowwday,$nowyday,$nowisdst) =
 
964
        localtime;
 
965
    my $fileName=sprintf(">%s/%s_%02d-%02d-%02d_%02d:%02d:%dXID%d.sql", $::slaveInfo->{'TransactionFileDirectory'},
 
966
                         $::slaveInfo->{"MirrorHostId"},($nowyear+1900),($nowmon+1),$nowmday,$nowhour,$nowmin,
 
967
                         $nowsec,$XID);
 
968
    
 
969
    my $xfile;
 
970
    open($xfile,$fileName) or die "Can't open $fileName : $!";
 
971
    
 
972
    $slaveInfo->{'TransactionFile'} = $xfile;
 
973
    $slaveInfo->{'status'} = 'FileOpen';
 
974
}
 
975
 
 
976
 
 
977
 
 
978
sub openSlaveConnection($) {
 
979
    my $slavePtr = $_[0];
 
980
    my $slaveConn;
 
981
    
 
982
    
 
983
    my $slaveConnString;
 
984
    if(defined($slavePtr->{"slaveHost"}))
 
985
    {
 
986
        $slaveConnString .= "host=" . $slavePtr->{"slaveHost"} . " ";    
 
987
    }
 
988
    if(defined($slavePtr->{"slavePort"}))
 
989
    {
 
990
        $slaveConnString .= "port=" . $slavePtr->{"slavePort"} . " ";
 
991
    }
 
992
 
 
993
    $slaveConnString .= " dbname=" . $slavePtr->{"slaveDb"};
 
994
    $slaveConnString .= " user=" . $slavePtr->{"slaveUser"};
 
995
    $slaveConnString .= " password=" . $slavePtr->{"slavePassword"};
 
996
    
 
997
    $slaveConn = Pg::connectdb($slaveConnString);
 
998
    
 
999
    if($slaveConn->status != PGRES_CONNECTION_OK) {
 
1000
        my $errorMessage = "Can't connect to slave database " ;
 
1001
        $errorMessage .= $slavePtr->{"slaveHost"} . "\n";
 
1002
        $errorMessage .= $slaveConn->errorMessage;
 
1003
        logErrorMessage($errorMessage);    
 
1004
        $slavePtr->{"status"} = 'DBFailed';
 
1005
    }
 
1006
    else {
 
1007
        $slavePtr->{"slaveConn"} = $slaveConn;
 
1008
        $slavePtr->{"status"} = 'DBOpen';       
 
1009
    }
 
1010
               
 
1011
 
 
1012
}