package com.philmcrew.utility.db; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.util.HashMap; import java.util.LinkedHashMap; import gnu.getopt.LongOpt; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.philmcrew.utility.GetOptExtended; /** * Generic JDBC Database Transfer. * * Given a source and target be able to duplicate the source into the target - given that the source * and target may be completely different DBMS's. * * Some capabilities: * find all tables in source and duplicate schema and contents to destination * duplicate source table in source connection to destination * duplicate layout of incoming result set into destination table on destination * check all tables in source for strange columns and other issues * * To support differences between DBMS requires an implementation of DbSpecific for each one. * * DbColumnInfo is used to hold column related meta data. * * In addition we should consider putting all values as strings and passing in a create table command: * dup(Connection sourceConnection, String sourceTableName, Connection destConnection, String destTableName, String destTableCreate); * copy(ResultSet resultSet, Connection destConnection, String destTableName, String destTableCreate); * * This work would make use of * java.sql http://java.sun.com/j2se/1.4.2/docs/api/java/sql/package-summary.html * DatabaseMetaData http://java.sun.com/j2se/1.4.2/docs/api/java/sql/DatabaseMetaData.html * ResultSetMetaData http://java.sun.com/j2se/1.4.2/docs/api/java/sql/ResultSetMetaData.html * Statement.addBatch http://java.sun.com/j2se/1.4.2/docs/api/java/sql/Statement.html#addBatch(java.lang.String) * Statement.executeBatch http://java.sun.com/j2se/1.4.2/docs/api/java/sql/Statement.html#executeBatch() * * DatabaseMetaData.getColumns http://java.sun.com/j2se/1.4.2/docs/api/java/sql/DatabaseMetaData.html#getColumns(java.lang.String,%20java.lang.String,%20java.lang.String,%20java.lang.String) * Provides quite a bit of information - which should prove helpful when working with a modern db such as Oracle, Sybase. * If a data is missing this as in RedBrick we would have to be more creative. * * There is a heavy reliance on ResultSetMetaData.getColumns - structure of which is copied here * for clarity. * * Retrieves a description of table columns available in the specified catalog. * Only column descriptions matching the catalog, schema, table and column name criteria are returned. They are ordered by TABLE_SCHEM, TABLE_NAME, and ORDINAL_POSITION. * Each column description has the following columns: * 1. TABLE_CAT String => table catalog (may be null) * 2. TABLE_SCHEM String => table schema (may be null) * 3. TABLE_NAME String => table name * 4. COLUMN_NAME String => column name * 5. DATA_TYPE int => SQL type from java.sql.Types * 6. TYPE_NAME String => Data source dependent type name, for a UDT the type name is fully qualified * 7. COLUMN_SIZE int => column size. For char or date types this is the maximum number of characters, for numeric or decimal types this is precision. * 8. BUFFER_LENGTH is not used. * 9. DECIMAL_DIGITS int => the number of fractional digits * 10. NUM_PREC_RADIX int => Radix (typically either 10 or 2) * 11. NULLABLE int => is NULL allowed. * o columnNoNulls - might not allow NULL values * o columnNullable - definitely allows NULL values * o columnNullableUnknown - nullability unknown * 12. REMARKS String => comment describing column (may be null) * 13. COLUMN_DEF String => default value (may be null) * 14. SQL_DATA_TYPE int => unused * 15. SQL_DATETIME_SUB int => unused * 16. CHAR_OCTET_LENGTH int => for char types the maximum number of bytes in the column * 17. ORDINAL_POSITION int => index of column in table (starting at 1) * 18. IS_NULLABLE String => "NO" means column definitely does not allow NULL values; "YES" means the column might allow NULL values. An empty string means nobody knows. * 19. SCOPE_CATLOG String => catalog of table that is the scope of a reference attribute (null if DATA_TYPE isn't REF) * 20. SCOPE_SCHEMA String => schema of table that is the scope of a reference attribute (null if the DATA_TYPE isn't REF) * 21. SCOPE_TABLE String => table name that this the scope of a reference attribure (null if the DATA_TYPE isn't REF) * 22. SOURCE_DATA_TYPE short => source type of a distinct type or user-generated Ref type, SQL type from java.sql.Types (null if DATA_TYPE isn't DISTINCT or user-generated REF) * To handle anything complicated we will rely most on the 5th column DATA_TYPE which is the value from java.sql.Types * * We can always read through the whole data set to determine types and lengths and then read again to build batch. * * @author Mitchell J. Friedman */ public class DbTransfer { private final static Log LOG = LogFactory.getLog( DbTransfer.class ); private final static String FQCN = DbTransfer.class.getName(); long numCommands = 0l; long numRowsInserted = 0l; long numColumnsInserted = 0l; long numTablesCreated = 0l; long numTablesDropped = 0l; private int batchSize = 1000; private int maxRowCount = 10000; private boolean recreateAlways = false; public DbTransfer() { } public String getStatistics() { StringBuffer buf = new StringBuffer(); buf.append( "TablesCreated=" ); buf.append( numTablesCreated ); buf.append( ";TablesDropped=" ); buf.append( numTablesDropped ); buf.append( ";Rows=" ); buf.append( numRowsInserted ); buf.append( ";Cols=" ); buf.append( numColumnsInserted ); return buf.toString(); } public void resetStatistics() { numCommands = 0l; numRowsInserted = 0l; numColumnsInserted = 01; numTablesCreated = 0l; numTablesDropped = 0l; } public Connection connect( String url, String userid, String password ) { Connection connection = null; try { connection = DriverManager.getConnection( url, userid, password ); } catch( SQLException e ) { LOG.error( "Could not connect to db url=" + url + ";userid=" + userid, e ); } return connection; } // need getopt here as well - source (url, user, password), dest (url, user, password), source table mask // -d load arbitrary drivers public DbSpecific getDbSpecific( Connection connection ) throws SQLException { // use DatabaseMetaData to get database type - use it to pick DbSpecificImpl DbSpecific dbSpecific = null; DatabaseMetaData metaData = connection.getMetaData(); String productName = metaData.getDatabaseProductName(); LOG.info( "Connected to " + metaData.getDatabaseProductName() ); if( productName.equals( "PostgreSQL" ) ) { dbSpecific = new DbSpecificPostgres( connection ); } else if( productName.equals( "Microsoft SQL Server" ) ) { dbSpecific = new DbSpecificMicrosoftSqlServer( connection ); } else if( productName.equals( "Oracle" ) ) { dbSpecific = new DbSpecificOracle( connection ); } else { dbSpecific = new DbSpecificBase( connection ); } LOG.info( "\t" + dbSpecific.connectMessage() ); return dbSpecific; } public void close( Connection connection ) { if( null != connection ) { try { connection.close(); } catch( SQLException e ) { LOG.error( "Error closing connection", e ); } } } public void close( Statement statement ) { if( null != statement ) { try { statement.close(); } catch( SQLException e ) { LOG.error( "Error closing statement", e ); } } } public void close( ResultSet resultSet ) { if( null != resultSet ) { try { resultSet.close(); } catch( SQLException e ) { LOG.error( "Error closing result set", e ); } } } public void dup( DbSpecific source, DbSpecific dest, String catalog, String schemaPattern, String tableNamePattern, String types[] ) throws SQLException { ResultSet resultSet = null; try { StringBuffer selectQuery = new StringBuffer(); // get list of tables in source, call dup with table name DatabaseMetaData metaData = source.getMetaData(); resultSet = metaData.getTables( catalog, schemaPattern, tableNamePattern, types ); if( null != resultSet ) { String tableName, schemaTableName; while( resultSet.next() ) { tableName = resultSet.getString( 3 ); schemaTableName = resultSet.getString( 2 ); HashMap destHashMap = dupCreate( source, resultSet.getString( 1 ), resultSet.getString( 2 ), resultSet.getString( 3 ), dest ); selectQuery.delete( 0, selectQuery.length() ); selectQuery.append( "select * from " ); if( null != schemaTableName ) { selectQuery.append( schemaTableName ); selectQuery.append( "." ); } selectQuery.append( tableName ); copy( selectQuery.toString(), source, dest, tableName, destHashMap ); } } } finally { close( resultSet ); } } public HashMap dupCreate( DbSpecific source, String catTableName, String schemaTableName, String sourceTableName, DbSpecific dest ) throws SQLException { HashMap destHashMap = new LinkedHashMap(); ResultSet sourceColumns = null; ResultSet destColumns = null; StringBuffer insertQueryBuf = new StringBuffer(); insertQueryBuf.append( "insert into " ); insertQueryBuf.append( sourceTableName ); insertQueryBuf.append( "\n\t(" ); try { DatabaseMetaData sourceMetaData = source.getMetaData(); sourceColumns = sourceMetaData.getColumns( catTableName, schemaTableName, sourceTableName, null ); DatabaseMetaData destMetaData = dest.getMetaData(); destColumns = destMetaData.getColumns( null, null, sourceTableName.toLowerCase(), null ); StringBuffer sourceQueryBuf = new StringBuffer(); dest.createTableQuery( sourceColumns, sourceQueryBuf, null ); String sourceQuery = sourceQueryBuf.toString(); StringBuffer destQueryBuf = new StringBuffer(); dest.createTableQuery( destColumns, destQueryBuf, destHashMap ); String destQuery = destQueryBuf.toString(); if( isRecreateAlways() || !sourceQuery.contentEquals( destQuery ) ) { if( null != destQuery && destQuery.length() != 0 ) { LOG.info( "Dropping " + sourceTableName + ";sql=" + sourceQuery.length() + ";dql=" + destQuery.length() ); if( !isRecreateAlways() && sourceQuery.length() == destQuery.length() ) { for( int ii = 0; ii < destQuery.length(); ii++ ) { if( destQuery.charAt( ii ) != sourceQuery.charAt( ii ) ) { LOG.info( "table=" + sourceTableName + ";ii=" + ii + ";dest=" + destQuery.charAt( ii ) + ";source=" + sourceQuery.charAt( ii ) ); } } } dest.drop( catTableName, schemaTableName, sourceTableName ); numTablesDropped++; } else { LOG.info( "Don't need to drop " + sourceTableName + " not present" ); } LOG.info( "Creating " + sourceTableName ); LOG.info( sourceQuery ); Statement statement = null; ResultSet resultSet = null; try { statement = dest.getConnection().createStatement(); resultSet = dest.query( statement, sourceQuery ); numTablesCreated++; } catch( SQLException e ) { LOG.error( "Error on query " + sourceQuery, e ); } finally { close( resultSet ); close( statement ); } close( destColumns ); destColumns = destMetaData.getColumns( null, null, sourceTableName.toLowerCase(), null ); dest.createTableQuery( destColumns, destQueryBuf, destHashMap ); } else { LOG.info( "Deleting " + sourceTableName ); dest.delete( catTableName, schemaTableName, sourceTableName ); } } finally { close( sourceColumns ); close( destColumns ); } return destHashMap; // DatabaseMetaData.getColumns to verify source // DatabaseMetaData.getTables to verify not in dest // getDbSpecific // create dest table if necessary using dbSpecific // build source select using dbSpecific - select * from source // call copy } public void copy( String sourceQuery, DbSpecific source, DbSpecific dest, String destTableName, HashMap destHashMap ) throws SQLException { // ResultSetMetaData vs DatabaseMetaData.getColumns to verify destination // Create PreparedStatement (from dbSpecific) - insert foo values (a, b, c); // Start BatchStatement (from dbSpecific) // Add to batch, when batch is big enough insert // End batch, flush ending inserts if necessary // return number of rows inserted LOG.info( "copy " + destTableName + " from " + sourceQuery ); Statement selectStatement = null; ResultSet selectResultSet = null; PreparedStatement insertBatch = null; StringBuffer insertQuery = new StringBuffer(); int columnCount = -1; try { selectStatement = source.getConnection().createStatement(); selectStatement.execute( sourceQuery ); selectResultSet = selectStatement.getResultSet(); if( null != selectResultSet ) { ResultSetMetaData resultSetMetaData = selectResultSet.getMetaData(); columnCount = resultSetMetaData.getColumnCount(); insertQuery.delete( 0, insertQuery.length() ); insertQuery.append( "insert into " ); insertQuery.append( destTableName ); insertQuery.append( "\n\t(" ); insertQuery.append( resultSetMetaData.getColumnName( 1 ) ); for( int ii = 2; ii <= columnCount; ii++ ) { insertQuery.append( ", " ); insertQuery.append( dest.buildColumnName( dest.buildColumnName( resultSetMetaData.getColumnName( ii ) ) ) ); } insertQuery.append( ")\nvalues\n\t(?" ); for( int ii = 2; ii <= columnCount; ii++ ) { insertQuery.append( ", ?" ); } insertQuery.append( ")\n" ); int batchCount = 0; int rowCount = 0; insertBatch = dest.getConnection().prepareStatement( insertQuery.toString() ); DbColumnInfo dbColumnInfo; String columnName; LOG.info( "copy " + destTableName + " select completed, batch insert prepared " + insertQuery ); while( selectResultSet.next() ) { insertBatch.clearParameters(); for( int ii = 1; ii <= columnCount; ii++ ) { columnName = dest.buildColumnName( resultSetMetaData.getColumnName( ii ) ); dbColumnInfo = destHashMap.get( columnName ); if( null == dbColumnInfo ) { LOG.warn( "Couldn't find dest info for " + columnName ); } dest.setObject( insertBatch, ii, selectResultSet.getObject( ii ), resultSetMetaData.getColumnType( ii ), dbColumnInfo ); } if( getBatchSize() > 1 ) { insertBatch.addBatch(); batchCount++; } else { LOG.info( "Inserting row " + rowCount + ", for " + destTableName ); int insertCount = insertBatch.executeUpdate(); numRowsInserted += insertCount; numColumnsInserted += insertCount * columnCount; } rowCount++; if( batchCount >= this.getBatchSize() ) { LOG.info( "Inserting Batch " + batchCount + ", row " + rowCount + ", for " + destTableName ); if( getBatchSize() > 1 ) { int[] batchResult = insertBatch.executeBatch(); numRowsInserted += batchResult.length; numColumnsInserted += batchResult.length * columnCount; } batchCount = 0; } if( rowCount > getMaxRowCount() ) break; } if( batchCount > 0 ) { int[] batchResult = insertBatch.executeBatch(); } } } catch( SQLException e ) { LOG.error( "Error copying " + destTableName + ";errCode=" + e.getErrorCode() + ";sqlState=" + e.getSQLState() + ";insertQuery=" + insertQuery, e ); Exception e2 = e.getNextException(); if( null != e2 ) { LOG.error( "Batch Error: cause=" + e2.getCause() + ";msg=" + e2.getMessage() + ";e=" + e2, e2 ); } if( getBatchSize() < 2 ) { LOG.error( "table=" + destTableName + ";columnCount=" + columnCount + ";sql=" + insertBatch.toString() + ";insertQuery=" + insertQuery ); } throw e; } finally { close( insertBatch ); close( selectResultSet ); close( selectStatement ); } } public void addIndexes( Connection connection ) { } public void addIndexes( Connection connection, String tableName ) { } public void removeIndexes( Connection connection ) { } public void removeIndexes( Connection connection, String tableName ) { } public void loadDriver( String driver ) { try { Class.forName( driver ); LOG.info( "Loaded " + driver ); } catch( ClassNotFoundException e ) { LOG.error( "Could not load driver " + driver, e ); } } /** * */ public static void instructions() { System.out.println( "Usage:\n java " + FQCN + " -d driver -u url -i uid -p password \n" ); System.out.println( "Connect to Source database:" ); System.out.println( "-u eg jdbc:postgresql://localhost/nm -i userid -p password" ); System.out.println( "-i " ); System.out.println( "-p " ); System.out.println( "-d " ); System.out.println( "-t " ); System.out.println( "-s " ); System.out.println( "Connect to Destination database:" ); System.out.println( "-U eg jdbc:postgresql://localhost/nm -I userid -P password" ); System.out.println( "-I " ); System.out.println( "-P " ); System.out.println( "\nOther:" ); System.out.println( "-r Register additional jdbc driver" ); System.out.println( "-b Default is 100 rows" ); System.out.println( "-x Default is 1000 rows" ); System.out.println( "-R Always drop and recreate tables" ); System.out.println( "\nSome Drivers:" ); System.out.println( "db2, 1433, as400, , com.ibm.as400.access.AS400JDBCDriver" ); System.out.println( "microsoft sql server 1433, microsoft:sqlserver, master, com.microsoft.jdbc.sqlserver.SQLServerDriver" ); System.out.println( "mysql: 3306, mysql, mysql, org.gjt.mm.mysql.Driver" ); System.out.println( "oracle 1521, oracle:thin, ???, oracle.jdbc.driver.OracleDriver" ); System.out.println( "postgres 5432, postgresql, template1, org.postgresql.Driver" ); System.out.println( "red brick 5050, rbw, ???, redbrick.jdbc.RBWDriver" ); System.out.println( "sas 5010, sharenet, ???, com.sas.net.sharenet.ShareNetDriver" ); System.out.println( "sybase 523, freetds, master, com.internetcds.jdbc.tds.Driver" ); System.out.println( "\nExamples:" ); System.out.println( "java \" + FQCN + \" -u jdbc:oracle:thin:@edwdbp1:1521:edwp1 -r oracle.jdbc.driver.OracleDriver -i millerj -p millerj -U jdbc:postgresql://maod2.hiw.com/mjf2 -I postgres -r org.postgresql.Driver -s COMPET" ); System.out.println( "java \" + FQCN + \" -u jdbc:postgresql://maod2.hiw.com/mfriedma -i postgres -U jdbc:postgresql://maod2.hiw.com/mjfp -I postgres -r org.postgresql.Driver" ); System.out.println( "java \" + FQCN + \" -u jdbc:postgresql://capsela.hiw.com/capsela_prod -i postgres -U jdbc:postgresql://maod2.hiw.com/mjfp -I postgres -r org.postgresql.Driver" ); System.out.println( "java \" + FQCN + \" -u jdbc:postgresql://maod2.hiw.com/mfriedma -i postgres -U jdbc:microsoft:sqlserver://beowulf0;databaseName=mjf_capsela -I sa -r com.microsoft.jdbc.sqlserver.SQLServerDriver -r org.postgresql.Driver" ); System.exit( 0 ); } public static void main( String[] args ) { LongOpt[] longopts = new LongOpt[ 3 ]; longopts[ 0 ] = new LongOpt( "help", LongOpt.NO_ARGUMENT, null, 'h' ); GetOptExtended g = new GetOptExtended( FQCN, args, ":b:d:hi:I:p:P:r:Rs:t:u:U:x:", longopts ); g.setOpterr( false ); // We'll do our own error handling DbTransfer dbTransfer = new DbTransfer(); String sourceUrl = null, sourceUserid = null, sourcePassword = null; String destUrl = null, destUserid = null, destPassword = null; String databaseMask = null, schemaMask = null, tableMask = null; int c; while( (c = g.getopt()) != -1 ) { try { switch( c ) { case 'b': { dbTransfer.setBatchSize( g.getInt( "Batch Size" ) ); break; } case 'd': { databaseMask = g.getString( "Source Database Mask" ); break; } case 'i': { sourceUserid = g.getString( "Source Userid" ); break; } case 'I': { destUserid = g.getString( "Destination Userid" ); break; } case 's': { schemaMask = g.getString( "Schema Database Mask" ); break; } case 't': { tableMask = g.getString( "Table Database Mask" ); break; } case 'u': { sourceUrl = g.getString( "Source URL" ); break; } case 'U': { destUrl = g.getString( "Destination URL" ); break; } case 'p': { sourcePassword = g.getString( "Source Password" ); break; } case 'P': { destPassword = g.getString( "Destination Password" ); break; } case 'r': { dbTransfer.loadDriver( g.getString( "Source Password" ) ); break; } case 'R': { dbTransfer.setRecreateAlways( !dbTransfer.isRecreateAlways() ); break; } case 'x': { dbTransfer.setMaxRowCount( g.getInt( "Max Row Count" ) ); break; } case 'h': // help case '?': default: { // instructions(); break; } case ':': { System.err.println( "You need an argument for option " + (char) g.getOptopt() ); break; } } // End Switch } catch( Exception e ) { LOG.error( "DbTransfer.main", e ); } } Connection sourceConn = null; Connection destConn = null; try { sourceConn = dbTransfer.connect( sourceUrl, sourceUserid, sourcePassword ); destConn = dbTransfer.connect( destUrl, destUserid, destPassword ); if( null == sourceConn || null == destConn ) { instructions(); } DbSpecific source = dbTransfer.getDbSpecific( sourceConn ); source.getConnection().setReadOnly( true ); DbSpecific dest = dbTransfer.getDbSpecific( destConn ); dbTransfer.dup( source, dest, databaseMask, schemaMask, tableMask, new String[]{"TABLE"} ); } catch( SQLException e ) { LOG.error( "DbTransfer.main", e ); } catch( Throwable t ) { LOG.error( "DbTransfer.main", t ); } finally { dbTransfer.close( sourceConn ); dbTransfer.close( destConn ); } } public int getBatchSize() { return batchSize; } public void setBatchSize( int batchSize ) { this.batchSize = batchSize; } public int getMaxRowCount() { return maxRowCount; } public void setMaxRowCount( int maxRowCount ) { this.maxRowCount = maxRowCount; } public boolean isRecreateAlways() { return recreateAlways; } public void setRecreateAlways( boolean recreateAlways ) { this.recreateAlways = recreateAlways; } public String getSchema( String sourceUrl, String sourceUserid, String sourcePassword, String databaseMask, String schemaMask, String tableMask ) { StringBuffer buf = new StringBuffer(); Connection sourceConn = null; ResultSet resultSet = null; ResultSet sourceColumns = null; try { sourceConn = connect( sourceUrl, sourceUserid, sourcePassword ); DbSpecific source = getDbSpecific( sourceConn ); StringBuffer tmp = new StringBuffer(); // get list of tables in source, call dup with table name DatabaseMetaData sourceMetaData = source.getMetaData(); resultSet = sourceMetaData.getTables( databaseMask, schemaMask, tableMask, new String[]{"TABLE"} ); if( null != resultSet ) { String catTableName, tableName, schemaTableName; while( resultSet.next() ) { catTableName = resultSet.getString( 1 ); schemaTableName = resultSet.getString( 2 ); tableName = resultSet.getString( 3 ); sourceColumns = sourceMetaData.getColumns( catTableName, schemaTableName, tableName, null ); source.createTableQuery( sourceColumns, buf, null ); close( sourceColumns ); sourceColumns = null; } } } catch( SQLException e ) { LOG.error( "DbTransfer.main", e ); } catch( Throwable t ) { LOG.error( "DbTransfer.main", t ); } finally { close( sourceColumns ); close( resultSet ); close( sourceConn ); } return buf.toString(); } public void dup( String sourceUrl, String sourceUserid, String sourcePassword, String destUrl, String destUserid, String destPassword, String databaseMask, String schemaMask, String tableMask ) { Connection sourceConn = null; Connection destConn = null; try { sourceConn = connect( sourceUrl, sourceUserid, sourcePassword ); destConn = connect( destUrl, destUserid, destPassword ); DbSpecific source = getDbSpecific( sourceConn ); DbSpecific dest = getDbSpecific( destConn ); dup( source, dest, databaseMask, schemaMask, tableMask, new String[]{"TABLE"} ); } catch( SQLException e ) { LOG.error( "DbTransfer.main", e ); } catch( Throwable t ) { LOG.error( "DbTransfer.main", t ); } finally { close( sourceConn ); close( destConn ); } } }