~drizzle-replicator/replicator/trunk

« back to all changes in this revision

Viewing changes to replicator/src/java/com/continuent/tungsten/replicator/thl/CommitSeqnoTable.java

  • Committer: Marcus Eriksson
  • Date: 2011-01-20 21:27:11 UTC
  • Revision ID: marcuse@marcuse-laptop-20110120212711-ff0854gb8gzwsz1x
work-in-progress - wont build, work or look good

Show diffs side-by-side

added added

removed removed

Lines of Context:
22
22
 
23
23
package com.continuent.tungsten.replicator.thl;
24
24
 
 
25
import java.sql.Connection;
25
26
import java.sql.PreparedStatement;
26
27
import java.sql.ResultSet;
27
28
import java.sql.SQLException;
123
124
                .prepareStatement("SELECT seqno, fragno, last_frag, source_id, epoch_number, eventid from "
124
125
                        + schema + "." + TABLE_NAME);
125
126
 
126
 
        commitSeqnoUpdate = database.prepareStatement("UPDATE "
127
 
                + commitSeqnoTable.getSchema() + "."
128
 
                + commitSeqnoTable.getName() + " SET "
129
 
                + commitSeqnoTableSeqno.getName() + "=?, "
130
 
                + commitSeqnoTableFragno.getName() + "=?, "
131
 
                + commitSeqnoTableLastFrag.getName() + "=?, "
132
 
                + commitSeqnoTableSourceId.getName() + "=?, "
133
 
                + commitSeqnoTableEpochNumber.getName() + "=?, "
134
 
                + commitSeqnoTableEventId.getName() + "=?, "
135
 
                + commitSeqnoTableAppliedLatency.getName() + "=?");
136
 
 
137
127
        // Create the table if it does not exist.
138
128
        if (logger.isDebugEnabled())
139
129
            logger.debug("Initializing " + TABLE_NAME + " table");
248
238
    /**
249
239
     * Updates the last commit seqno value.
250
240
     */
251
 
    public void updateLastCommitSeqno(ReplDBMSHeader header, long appliedLatency)
 
241
    public void updateLastCommitSeqno(Connection connection, ReplDBMSHeader header, long appliedLatency)
252
242
            throws SQLException
253
243
    {
254
244
        if (logger.isDebugEnabled())
255
245
            logger.debug("Updating last committed event header: "
256
246
                    + header.getSeqno());
257
247
 
 
248
        commitSeqnoUpdate = connection.prepareStatement("UPDATE "
 
249
                + commitSeqnoTable.getSchema() + "."
 
250
                + commitSeqnoTable.getName() + " SET "
 
251
                + commitSeqnoTableSeqno.getName() + "=?, "
 
252
                + commitSeqnoTableFragno.getName() + "=?, "
 
253
                + commitSeqnoTableLastFrag.getName() + "=?, "
 
254
                + commitSeqnoTableSourceId.getName() + "=?, "
 
255
                + commitSeqnoTableEpochNumber.getName() + "=?, "
 
256
                + commitSeqnoTableEventId.getName() + "=?, "
 
257
                + commitSeqnoTableAppliedLatency.getName() + "=?");
 
258
 
258
259
        commitSeqnoUpdate.setLong(1, header.getSeqno());
259
260
        commitSeqnoUpdate.setShort(2, header.getFragno());
260
261
        commitSeqnoUpdate.setBoolean(3, header.getLastFrag());