Skip to content

Instantly share code, notes, and snippets.

@kudrevatykh
Created September 20, 2014 09:22
Show Gist options
  • Save kudrevatykh/b958abe729489dd5a7f4 to your computer and use it in GitHub Desktop.
Save kudrevatykh/b958abe729489dd5a7f4 to your computer and use it in GitHub Desktop.
simple program for copying tables
package tst;
import java.sql.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* Created by kudrale on 20.09.14.
*/
public class Main {
private final int batchSize;
private final String sqlInput;
private final String sqlOutput;
private final String connectionInput;
private final String connectionOutput;
private volatile boolean finished = false;
private final int colCount;
private final BlockingQueue<Object[]> q;
private Thread reader;
private Thread writer;
public Main(String...args) {
batchSize = Integer.parseInt(args[0]);
sqlInput = args[1];
sqlOutput = args[2];
connectionInput = args[3];
connectionOutput = args[4];
colCount = Integer.parseInt(args[5]);
q = new ArrayBlockingQueue<Object[]>(batchSize *10);
reader = new Thread(new Runnable() {
@Override
public void run() {
try (Connection conn = DriverManager.getConnection(connectionInput);
PreparedStatement ps = conn.prepareStatement(sqlInput);
ResultSet rs = ps.executeQuery()){
rs.setFetchSize(batchSize);
int colCount = ps.getMetaData().getColumnCount();
while(rs.next()&&!finished) {
Object[] o = new Object[colCount];
for(int i=0;i<colCount;++i) {
o[i] = rs.getObject(i+1);
}
while(!finished && !q.offer(o, 1, TimeUnit.MILLISECONDS));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
finished = true;
}
}
});
writer = new Thread(new Runnable() {
@Override
public void run() {
try(Connection conn = DriverManager.getConnection(connectionOutput);
PreparedStatement ps = conn.prepareStatement(sqlOutput)) {
int cnt = 0;
while(!finished || q.size()!=0) {
Object[] o = q.poll(1, TimeUnit.MILLISECONDS);
if(o!=null) {
for(int i = 0; i<colCount;++i) {
ps.setObject(i+1, o[i]);
}
ps.addBatch();
cnt++;
if(cnt% batchSize ==0) {
System.out.println(cnt);
ps.executeBatch();
}
}
}
ps.executeBatch();
conn.commit();
System.out.println("Inserted "+cnt+" rows");
} catch (Exception e) {
e.printStackTrace();
} finally {
finished = true;
}
}
});
}
public static void main(String...args) throws InterruptedException, SQLException {
if(args.length!=6) {
usage();
}
Main m = new Main(args);
m.run();
}
private void run() throws InterruptedException {
long milis = System.currentTimeMillis();
reader.start();
writer.start();
reader.join();
writer.join();
System.out.println("finished in "+(System.currentTimeMillis()-milis)/1000+" seconds");
}
private static void usage() {
throw new RuntimeException("invalid arguments");
}
}
@kudrevatykh
Copy link
Author

Inserted 5355218 rows
finished in 78 seconds

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment