Skip to content

Instantly share code, notes, and snippets.

@rac021
Created July 15, 2021 15:09
Show Gist options
  • Save rac021/3b0d3a54537bf5174ff7e8bdfd5ef1d6 to your computer and use it in GitHub Desktop.
Save rac021/3b0d3a54537bf5174ff7e8bdfd5ef1d6 to your computer and use it in GitHub Desktop.
package main ;
import io.vertx.core.AsyncResult;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.impl.cpu.CpuCoreSensor;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgPool;
import io.vertx.sqlclient.Cursor;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.PreparedStatement;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.Transaction;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class VertxPgClient {
static CountDownLatch countDownLatch ;
public static void main( String[] args ) throws InterruptedException {
String sqlQuery = "SELECT ...." ;
PgConnectOptions connectOption = getOptions( "5432" ,
"localhost" ,
"MY_DB" ,
"admin" ,
"admin" ) ;
PoolOptions poolOptions = new PoolOptions().setMaxSize( 12 ) ;
VertxOptions options = new VertxOptions().setWorkerPoolSize( 50 )
.setEventLoopPoolSize( 3 * CpuCoreSensor.availableProcessors() )
.setWarningExceptionTime( 5 )
.setWarningExceptionTimeUnit( TimeUnit.SECONDS )
.setMaxEventLoopExecuteTime(10)
.setMaxEventLoopExecuteTimeUnit( TimeUnit.SECONDS )
.setMaxWorkerExecuteTime(10)
.setMaxEventLoopExecuteTimeUnit(TimeUnit.SECONDS ) ;
Vertx vertx = Vertx.vertx(options) ;
PgPool pgPoolclient = PgPool.pool( vertx, connectOption , poolOptions );
countDownLatch = new CountDownLatch(1) ;
runSqlQuery( pgPoolclient, sqlQuery );
countDownLatch.await() ;
pgPoolclient.close() ;
vertx.close() ;
}
private static void runSqlQuery( PgPool pgPoolclient ,
String sqlQuery ) {
pgPoolclient.getConnection( (AsyncResult<SqlConnection> ar0) -> {
if (ar0.succeeded()) {
SqlConnection connection = ar0.result();
connection.prepare(sqlQuery , (AsyncResult<PreparedStatement> ar1) -> {
if (ar1.succeeded()) {
PreparedStatement pq = ar1.result();
Cursor cursor = pq.cursor();
connection.begin( (AsyncResult<Transaction> ar2) -> {
if (ar2.succeeded()) {
Transaction tx = ar2.result() ;
Vertx.currentContext().executeBlocking( (Promise<Object> promise ) -> {
cursor.read( 5 , ar3 -> {
if (ar3.succeeded()) {
RowSet<Row> rows = ar3.result();
rows.iterator().forEachRemaining( row -> {
System.out.println( "row = " + row ) ;
});
while ( cursor.hasMore() ) {
cursor.read( 5 , ar4 -> {
if (ar4.succeeded()) {
RowSet<Row> rows1 = ar4.result();
rows1.iterator().forEachRemaining( row -> {
System.out.println( "row1 = " + row ) ;
});
} else if (ar4.failed()) {
ar4.cause().printStackTrace() ;
}
});
}
tx.commit() ;
if( ! cursor.isClosed() ) cursor.close() ;
connection.close() ;
countDownLatch.countDown() ;
}
});
}, false , hdlr -> {
tx.commit() ;
if( ! cursor.isClosed() ) cursor.close() ;
connection.close() ;
countDownLatch.countDown() ;
});
} else {
if ( ! cursor.isClosed() ) cursor.close() ;
connection.close() ;
countDownLatch.countDown() ;
}
});
}
}).exceptionHandler( hdlr -> {
hdlr.getCause().printStackTrace();
countDownLatch.countDown() ;
connection.close();
});
} else {
ar0.cause().printStackTrace();
countDownLatch.countDown() ;
}
});
}
private static PgConnectOptions getOptions( String db_port ,
String db_host ,
String db_name ,
String db_user ,
String db_password ) throws NumberFormatException {
return new PgConnectOptions().setPort ( Integer.parseInt(db_port) )
.setHost ( db_host )
.setDatabase( db_name )
.setUser ( db_user )
.setPassword( db_password )
.setConnectTimeout(10_000 ) ;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment