2
* HA-JDBC: High-Availability JDBC
3
* Copyright (c) 2004-2007 Paul Ferraro
5
* This library is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU Lesser General Public License as published by the
7
* Free Software Foundation; either version 2.1 of the License, or (at your
8
* option) any later version.
10
* This library is distributed in the hope that it will be useful, but WITHOUT
11
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15
* You should have received a copy of the GNU Lesser General Public License
16
* along with this library; if not, write to the Free Software Foundation,
17
* Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
* Contact: ferraro@users.sourceforge.net
21
package net.sf.hajdbc.sync;
23
import java.sql.Connection;
24
import java.sql.ResultSet;
25
import java.sql.SQLException;
26
import java.sql.Statement;
27
import java.sql.Types;
28
import java.text.MessageFormat;
29
import java.util.Collection;
30
import java.util.HashMap;
33
import java.util.concurrent.Callable;
34
import java.util.concurrent.ExecutionException;
35
import java.util.concurrent.ExecutorService;
36
import java.util.concurrent.Future;
38
import net.sf.hajdbc.Database;
39
import net.sf.hajdbc.Dialect;
40
import net.sf.hajdbc.ForeignKeyConstraint;
41
import net.sf.hajdbc.Messages;
42
import net.sf.hajdbc.SequenceProperties;
43
import net.sf.hajdbc.SynchronizationContext;
44
import net.sf.hajdbc.TableProperties;
45
import net.sf.hajdbc.UniqueConstraint;
46
import net.sf.hajdbc.util.SQLExceptionFactory;
47
import net.sf.hajdbc.util.Strings;
49
import org.slf4j.Logger;
50
import org.slf4j.LoggerFactory;
53
* @author Paul Ferraro
56
public final class SynchronizationSupport
58
private static Logger logger = LoggerFactory.getLogger(SynchronizationSupport.class);
60
private SynchronizationSupport()
66
* Drop all foreign key constraints on the target database
68
* @param context a synchronization context
69
* @throws SQLException if database error occurs
71
public static <D> void dropForeignKeys(SynchronizationContext<D> context) throws SQLException
73
Dialect dialect = context.getDialect();
75
Connection connection = context.getConnection(context.getTargetDatabase());
77
Statement statement = connection.createStatement();
79
for (TableProperties table: context.getTargetDatabaseProperties().getTables())
81
for (ForeignKeyConstraint constraint: table.getForeignKeyConstraints())
83
String sql = dialect.getDropForeignKeyConstraintSQL(constraint);
87
statement.addBatch(sql);
91
statement.executeBatch();
96
* Restores all foreign key constraints on the target database
98
* @param context a synchronization context
99
* @throws SQLException if database error occurs
101
public static <D> void restoreForeignKeys(SynchronizationContext<D> context) throws SQLException
103
Dialect dialect = context.getDialect();
105
Connection connection = context.getConnection(context.getTargetDatabase());
107
Statement statement = connection.createStatement();
109
for (TableProperties table: context.getSourceDatabaseProperties().getTables())
111
for (ForeignKeyConstraint constraint: table.getForeignKeyConstraints())
113
String sql = dialect.getCreateForeignKeyConstraintSQL(constraint);
117
statement.addBatch(sql);
121
statement.executeBatch();
126
* Synchronizes the sequences on the target database with the source database.
128
* @param context a synchronization context
129
* @throws SQLException if database error occurs
131
public static <D> void synchronizeSequences(final SynchronizationContext<D> context) throws SQLException
133
Collection<SequenceProperties> sequences = context.getSourceDatabaseProperties().getSequences();
135
if (!sequences.isEmpty())
137
Database<D> sourceDatabase = context.getSourceDatabase();
139
Set<Database<D>> databases = context.getActiveDatabaseSet();
141
ExecutorService executor = context.getExecutor();
143
Dialect dialect = context.getDialect();
145
Map<SequenceProperties, Long> sequenceMap = new HashMap<SequenceProperties, Long>();
146
Map<Database<D>, Future<Long>> futureMap = new HashMap<Database<D>, Future<Long>>();
148
for (SequenceProperties sequence: sequences)
150
final String sql = dialect.getNextSequenceValueSQL(sequence);
154
for (final Database<D> database: databases)
156
Callable<Long> task = new Callable<Long>()
158
public Long call() throws SQLException
160
Statement statement = context.getConnection(database).createStatement();
161
ResultSet resultSet = statement.executeQuery(sql);
165
long value = resultSet.getLong(1);
173
futureMap.put(database, executor.submit(task));
178
Long sourceValue = futureMap.get(sourceDatabase).get();
180
sequenceMap.put(sequence, sourceValue);
182
for (Database<D> database: databases)
184
if (!database.equals(sourceDatabase))
186
Long value = futureMap.get(database).get();
188
if (!value.equals(sourceValue))
190
throw new SQLException(Messages.getMessage(Messages.SEQUENCE_OUT_OF_SYNC, sequence, database, value, sourceDatabase, sourceValue));
195
catch (InterruptedException e)
197
throw SQLExceptionFactory.createSQLException(e);
199
catch (ExecutionException e)
201
throw SQLExceptionFactory.createSQLException(e.getCause());
205
Connection targetConnection = context.getConnection(context.getTargetDatabase());
206
Statement targetStatement = targetConnection.createStatement();
208
for (SequenceProperties sequence: sequences)
210
String sql = dialect.getAlterSequenceSQL(sequence, sequenceMap.get(sequence) + 1);
214
targetStatement.addBatch(sql);
217
targetStatement.executeBatch();
218
targetStatement.close();
225
* @throws SQLException
227
public static <D> void synchronizeIdentityColumns(SynchronizationContext<D> context) throws SQLException
229
Statement sourceStatement = context.getConnection(context.getSourceDatabase()).createStatement();
230
Statement targetStatement = context.getConnection(context.getTargetDatabase()).createStatement();
232
Dialect dialect = context.getDialect();
234
for (TableProperties table: context.getSourceDatabaseProperties().getTables())
236
Collection<String> columns = table.getIdentityColumns();
238
if (!columns.isEmpty())
240
String selectSQL = MessageFormat.format("SELECT max({0}) FROM {1}", Strings.join(columns, "), max("), table.getName()); //$NON-NLS-1$ //$NON-NLS-2$
242
logger.debug(selectSQL);
244
Map<String, Long> map = new HashMap<String, Long>();
246
ResultSet resultSet = sourceStatement.executeQuery(selectSQL);
248
if (resultSet.next())
252
for (String column: columns)
254
map.put(column, resultSet.getLong(++i));
262
for (Map.Entry<String, Long> mapEntry: map.entrySet())
264
String alterSQL = dialect.getAlterIdentityColumnSQL(table, table.getColumnProperties(mapEntry.getKey()), mapEntry.getValue() + 1);
266
if (alterSQL != null)
268
logger.debug(alterSQL);
270
targetStatement.addBatch(alterSQL);
274
targetStatement.executeBatch();
279
sourceStatement.close();
280
targetStatement.close();
286
* @throws SQLException
288
public static <D> void dropUniqueConstraints(SynchronizationContext<D> context) throws SQLException
290
Dialect dialect = context.getDialect();
292
Connection connection = context.getConnection(context.getTargetDatabase());
294
Statement statement = connection.createStatement();
296
for (TableProperties table: context.getTargetDatabaseProperties().getTables())
298
for (UniqueConstraint constraint: table.getUniqueConstraints())
300
String sql = dialect.getDropUniqueConstraintSQL(constraint);
304
statement.addBatch(sql);
308
statement.executeBatch();
315
* @throws SQLException
317
public static <D> void restoreUniqueConstraints(SynchronizationContext<D> context) throws SQLException
319
Dialect dialect = context.getDialect();
321
Connection connection = context.getConnection(context.getTargetDatabase());
323
Statement statement = connection.createStatement();
325
for (TableProperties table: context.getSourceDatabaseProperties().getTables())
327
// Drop unique constraints on the current table
328
for (UniqueConstraint constraint: table.getUniqueConstraints())
330
String sql = dialect.getCreateUniqueConstraintSQL(constraint);
334
statement.addBatch(sql);
338
statement.executeBatch();
345
public static void rollback(Connection connection)
349
connection.rollback();
350
connection.setAutoCommit(true);
352
catch (SQLException e)
354
logger.warn(e.toString(), e);
359
* Helper method for {@link java.sql.ResultSet#getObject(int)} with special handling for large objects.
363
* @return the object of the specified type at the specified index from the specified result set
364
* @throws SQLException
366
public static Object getObject(ResultSet resultSet, int index, int type) throws SQLException
372
return resultSet.getBlob(index);
376
return resultSet.getClob(index);
380
return resultSet.getObject(index);