~brian-thomason/+junk/ha-jdbc

« back to all changes in this revision

Viewing changes to src/net/sf/hajdbc/sync/SynchronizationSupport.java

  • Committer: Brian Thomason
  • Date: 2011-12-20 17:34:21 UTC
  • Revision ID: brian.thomason@canonical.com-20111220173421-p9jg95iq91jgdihh
Initial import

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * HA-JDBC: High-Availability JDBC
 
3
 * Copyright (c) 2004-2007 Paul Ferraro
 
4
 * 
 
5
 * This library is free software; you can redistribute it and/or modify it 
 
6
 * under the terms of the GNU Lesser General Public License as published by the 
 
7
 * Free Software Foundation; either version 2.1 of the License, or (at your 
 
8
 * option) any later version.
 
9
 * 
 
10
 * This library is distributed in the hope that it will be useful, but WITHOUT
 
11
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 
 
12
 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License 
 
13
 * for more details.
 
14
 * 
 
15
 * You should have received a copy of the GNU Lesser General Public License
 
16
 * along with this library; if not, write to the Free Software Foundation, 
 
17
 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 * 
 
19
 * Contact: ferraro@users.sourceforge.net
 
20
 */
 
21
package net.sf.hajdbc.sync;
 
22
 
 
23
import java.sql.Connection;
 
24
import java.sql.ResultSet;
 
25
import java.sql.SQLException;
 
26
import java.sql.Statement;
 
27
import java.sql.Types;
 
28
import java.text.MessageFormat;
 
29
import java.util.Collection;
 
30
import java.util.HashMap;
 
31
import java.util.Map;
 
32
import java.util.Set;
 
33
import java.util.concurrent.Callable;
 
34
import java.util.concurrent.ExecutionException;
 
35
import java.util.concurrent.ExecutorService;
 
36
import java.util.concurrent.Future;
 
37
 
 
38
import net.sf.hajdbc.Database;
 
39
import net.sf.hajdbc.Dialect;
 
40
import net.sf.hajdbc.ForeignKeyConstraint;
 
41
import net.sf.hajdbc.Messages;
 
42
import net.sf.hajdbc.SequenceProperties;
 
43
import net.sf.hajdbc.SynchronizationContext;
 
44
import net.sf.hajdbc.TableProperties;
 
45
import net.sf.hajdbc.UniqueConstraint;
 
46
import net.sf.hajdbc.util.SQLExceptionFactory;
 
47
import net.sf.hajdbc.util.Strings;
 
48
 
 
49
import org.slf4j.Logger;
 
50
import org.slf4j.LoggerFactory;
 
51
 
 
52
/**
 
53
 * @author Paul Ferraro
 
54
 *
 
55
 */
 
56
public final class SynchronizationSupport
 
57
{
 
58
        private static Logger logger = LoggerFactory.getLogger(SynchronizationSupport.class);
 
59
        
 
60
        private SynchronizationSupport()
 
61
        {
 
62
                // Hide
 
63
        }
 
64
        
 
65
        /**
 
66
         * Drop all foreign key constraints on the target database
 
67
         * @param <D> 
 
68
         * @param context a synchronization context
 
69
         * @throws SQLException if database error occurs
 
70
         */
 
71
        public static <D> void dropForeignKeys(SynchronizationContext<D> context) throws SQLException
 
72
        {
 
73
                Dialect dialect = context.getDialect();
 
74
                
 
75
                Connection connection = context.getConnection(context.getTargetDatabase());
 
76
                
 
77
                Statement statement = connection.createStatement();
 
78
                
 
79
                for (TableProperties table: context.getTargetDatabaseProperties().getTables())
 
80
                {
 
81
                        for (ForeignKeyConstraint constraint: table.getForeignKeyConstraints())
 
82
                        {
 
83
                                String sql = dialect.getDropForeignKeyConstraintSQL(constraint);
 
84
                                
 
85
                                logger.debug(sql);
 
86
                                
 
87
                                statement.addBatch(sql);
 
88
                        }
 
89
                }
 
90
                
 
91
                statement.executeBatch();
 
92
                statement.close();
 
93
        }
 
94
        
 
95
        /**
 
96
         * Restores all foreign key constraints on the target database
 
97
         * @param <D> 
 
98
         * @param context a synchronization context
 
99
         * @throws SQLException if database error occurs
 
100
         */
 
101
        public static <D> void restoreForeignKeys(SynchronizationContext<D> context) throws SQLException
 
102
        {
 
103
                Dialect dialect = context.getDialect();
 
104
                
 
105
                Connection connection = context.getConnection(context.getTargetDatabase());
 
106
                
 
107
                Statement statement = connection.createStatement();
 
108
                
 
109
                for (TableProperties table: context.getSourceDatabaseProperties().getTables())
 
110
                {
 
111
                        for (ForeignKeyConstraint constraint: table.getForeignKeyConstraints())
 
112
                        {
 
113
                                String sql = dialect.getCreateForeignKeyConstraintSQL(constraint);
 
114
                                
 
115
                                logger.debug(sql);
 
116
                                
 
117
                                statement.addBatch(sql);
 
118
                        }
 
119
                }
 
120
                
 
121
                statement.executeBatch();
 
122
                statement.close();
 
123
        }
 
124
        
 
125
        /**
 
126
         * Synchronizes the sequences on the target database with the source database.
 
127
         * @param <D> 
 
128
         * @param context a synchronization context
 
129
         * @throws SQLException if database error occurs
 
130
         */
 
131
        public static <D> void synchronizeSequences(final SynchronizationContext<D> context) throws SQLException
 
132
        {
 
133
                Collection<SequenceProperties> sequences = context.getSourceDatabaseProperties().getSequences();
 
134
 
 
135
                if (!sequences.isEmpty())
 
136
                {
 
137
                        Database<D> sourceDatabase = context.getSourceDatabase();
 
138
                        
 
139
                        Set<Database<D>> databases = context.getActiveDatabaseSet();
 
140
 
 
141
                        ExecutorService executor = context.getExecutor();
 
142
                        
 
143
                        Dialect dialect = context.getDialect();
 
144
                        
 
145
                        Map<SequenceProperties, Long> sequenceMap = new HashMap<SequenceProperties, Long>();
 
146
                        Map<Database<D>, Future<Long>> futureMap = new HashMap<Database<D>, Future<Long>>();
 
147
 
 
148
                        for (SequenceProperties sequence: sequences)
 
149
                        {
 
150
                                final String sql = dialect.getNextSequenceValueSQL(sequence);
 
151
                                
 
152
                                logger.debug(sql);
 
153
 
 
154
                                for (final Database<D> database: databases)
 
155
                                {
 
156
                                        Callable<Long> task = new Callable<Long>()
 
157
                                        {
 
158
                                                public Long call() throws SQLException
 
159
                                                {
 
160
                                                        Statement statement = context.getConnection(database).createStatement();
 
161
                                                        ResultSet resultSet = statement.executeQuery(sql);
 
162
                                                        
 
163
                                                        resultSet.next();
 
164
                                                        
 
165
                                                        long value = resultSet.getLong(1);
 
166
                                                        
 
167
                                                        statement.close();
 
168
                                                        
 
169
                                                        return value;
 
170
                                                }
 
171
                                        };
 
172
                                        
 
173
                                        futureMap.put(database, executor.submit(task));                         
 
174
                                }
 
175
 
 
176
                                try
 
177
                                {
 
178
                                        Long sourceValue = futureMap.get(sourceDatabase).get();
 
179
                                        
 
180
                                        sequenceMap.put(sequence, sourceValue);
 
181
                                        
 
182
                                        for (Database<D> database: databases)
 
183
                                        {
 
184
                                                if (!database.equals(sourceDatabase))
 
185
                                                {
 
186
                                                        Long value = futureMap.get(database).get();
 
187
                                                        
 
188
                                                        if (!value.equals(sourceValue))
 
189
                                                        {
 
190
                                                                throw new SQLException(Messages.getMessage(Messages.SEQUENCE_OUT_OF_SYNC, sequence, database, value, sourceDatabase, sourceValue));
 
191
                                                        }
 
192
                                                }
 
193
                                        }
 
194
                                }
 
195
                                catch (InterruptedException e)
 
196
                                {
 
197
                                        throw SQLExceptionFactory.createSQLException(e);
 
198
                                }
 
199
                                catch (ExecutionException e)
 
200
                                {
 
201
                                        throw SQLExceptionFactory.createSQLException(e.getCause());
 
202
                                }
 
203
                        }
 
204
                        
 
205
                        Connection targetConnection = context.getConnection(context.getTargetDatabase());
 
206
                        Statement targetStatement = targetConnection.createStatement();
 
207
 
 
208
                        for (SequenceProperties sequence: sequences)
 
209
                        {
 
210
                                String sql = dialect.getAlterSequenceSQL(sequence, sequenceMap.get(sequence) + 1);
 
211
                                
 
212
                                logger.debug(sql);
 
213
                                
 
214
                                targetStatement.addBatch(sql);
 
215
                        }
 
216
                        
 
217
                        targetStatement.executeBatch();         
 
218
                        targetStatement.close();
 
219
                }
 
220
        }
 
221
        
 
222
        /**
 
223
         * @param <D>
 
224
         * @param context
 
225
         * @throws SQLException
 
226
         */
 
227
        public static <D> void synchronizeIdentityColumns(SynchronizationContext<D> context) throws SQLException
 
228
        {
 
229
                Statement sourceStatement = context.getConnection(context.getSourceDatabase()).createStatement();
 
230
                Statement targetStatement = context.getConnection(context.getTargetDatabase()).createStatement();
 
231
                
 
232
                Dialect dialect = context.getDialect();
 
233
                
 
234
                for (TableProperties table: context.getSourceDatabaseProperties().getTables())
 
235
                {
 
236
                        Collection<String> columns = table.getIdentityColumns();
 
237
                        
 
238
                        if (!columns.isEmpty())
 
239
                        {
 
240
                                String selectSQL = MessageFormat.format("SELECT max({0}) FROM {1}", Strings.join(columns, "), max("), table.getName()); //$NON-NLS-1$ //$NON-NLS-2$
 
241
                                
 
242
                                logger.debug(selectSQL);
 
243
                                
 
244
                                Map<String, Long> map = new HashMap<String, Long>();
 
245
                                
 
246
                                ResultSet resultSet = sourceStatement.executeQuery(selectSQL);
 
247
                                
 
248
                                if (resultSet.next())
 
249
                                {
 
250
                                        int i = 0;
 
251
                                        
 
252
                                        for (String column: columns)
 
253
                                        {
 
254
                                                map.put(column, resultSet.getLong(++i));
 
255
                                        }
 
256
                                }
 
257
                                
 
258
                                resultSet.close();
 
259
                                
 
260
                                if (!map.isEmpty())
 
261
                                {
 
262
                                        for (Map.Entry<String, Long> mapEntry: map.entrySet())
 
263
                                        {
 
264
                                                String alterSQL = dialect.getAlterIdentityColumnSQL(table, table.getColumnProperties(mapEntry.getKey()), mapEntry.getValue() + 1);
 
265
                                                
 
266
                                                if (alterSQL != null)
 
267
                                                {
 
268
                                                        logger.debug(alterSQL);
 
269
                                                        
 
270
                                                        targetStatement.addBatch(alterSQL);
 
271
                                                }
 
272
                                        }
 
273
                                        
 
274
                                        targetStatement.executeBatch();
 
275
                                }
 
276
                        }
 
277
                }
 
278
                
 
279
                sourceStatement.close();
 
280
                targetStatement.close();
 
281
        }
 
282
 
 
283
        /**
 
284
         * @param <D>
 
285
         * @param context
 
286
         * @throws SQLException
 
287
         */
 
288
        public static <D> void dropUniqueConstraints(SynchronizationContext<D> context) throws SQLException
 
289
        {
 
290
                Dialect dialect = context.getDialect();
 
291
 
 
292
                Connection connection = context.getConnection(context.getTargetDatabase());
 
293
                
 
294
                Statement statement = connection.createStatement();
 
295
                
 
296
                for (TableProperties table: context.getTargetDatabaseProperties().getTables())
 
297
                {
 
298
                        for (UniqueConstraint constraint: table.getUniqueConstraints())
 
299
                        {
 
300
                                String sql = dialect.getDropUniqueConstraintSQL(constraint);
 
301
                                
 
302
                                logger.debug(sql);
 
303
                                
 
304
                                statement.addBatch(sql);
 
305
                        }
 
306
                }
 
307
                
 
308
                statement.executeBatch();
 
309
                statement.close();
 
310
        }
 
311
        
 
312
        /**
 
313
         * @param <D>
 
314
         * @param context
 
315
         * @throws SQLException
 
316
         */
 
317
        public static <D> void restoreUniqueConstraints(SynchronizationContext<D> context) throws SQLException
 
318
        {
 
319
                Dialect dialect = context.getDialect();
 
320
 
 
321
                Connection connection = context.getConnection(context.getTargetDatabase());
 
322
                
 
323
                Statement statement = connection.createStatement();
 
324
                
 
325
                for (TableProperties table: context.getSourceDatabaseProperties().getTables())
 
326
                {
 
327
                        // Drop unique constraints on the current table
 
328
                        for (UniqueConstraint constraint: table.getUniqueConstraints())
 
329
                        {
 
330
                                String sql = dialect.getCreateUniqueConstraintSQL(constraint);
 
331
                                
 
332
                                logger.debug(sql);
 
333
                                
 
334
                                statement.addBatch(sql);
 
335
                        }
 
336
                }
 
337
                
 
338
                statement.executeBatch();
 
339
                statement.close();
 
340
        }
 
341
        
 
342
        /**
 
343
         * @param connection
 
344
         */
 
345
        public static void rollback(Connection connection)
 
346
        {
 
347
                try
 
348
                {
 
349
                        connection.rollback();
 
350
                        connection.setAutoCommit(true);
 
351
                }
 
352
                catch (SQLException e)
 
353
                {
 
354
                        logger.warn(e.toString(), e);
 
355
                }
 
356
        }
 
357
        
 
358
        /**
 
359
         * Helper method for {@link java.sql.ResultSet#getObject(int)} with special handling for large objects.
 
360
         * @param resultSet
 
361
         * @param index
 
362
         * @param type
 
363
         * @return the object of the specified type at the specified index from the specified result set
 
364
         * @throws SQLException
 
365
         */
 
366
        public static Object getObject(ResultSet resultSet, int index, int type) throws SQLException
 
367
        {
 
368
                switch (type)
 
369
                {
 
370
                        case Types.BLOB:
 
371
                        {
 
372
                                return resultSet.getBlob(index);
 
373
                        }
 
374
                        case Types.CLOB:
 
375
                        {
 
376
                                return resultSet.getClob(index);
 
377
                        }
 
378
                        default:
 
379
                        {
 
380
                                return resultSet.getObject(index);
 
381
                        }
 
382
                }
 
383
        }
 
384
}