Skip to content

Instantly share code, notes, and snippets.

@fabiog1901
Created April 5, 2023 13:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fabiog1901/73facfcb3d38f43fed2ea378976ba99d to your computer and use it in GitHub Desktop.
Save fabiog1901/73facfcb3d38f43fed2ea378976ba99d to your computer and use it in GitHub Desktop.
import org.postgresql.ds.PGSimpleDataSource;
import java.sql.*;
import java.lang.Thread;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
public class BasicExample {
public static <V> int runConcurrentlyAndWait(List<Callable<V>> tasks, long timeout, TimeUnit timeUnit) {
return runConcurrentlyAndWait(tasks, timeout, timeUnit, null);
}
public static <V> int runConcurrentlyAndWait(List<Callable<V>> tasks, long timeout, TimeUnit timeUnit,
Consumer<V> consumer) {
ScheduledExecutorService cancellationService = Executors.newSingleThreadScheduledExecutor();
ExecutorService executorService = new ThreadPoolExecutor(ForkJoinPool.getCommonPoolParallelism(),
Integer.MAX_VALUE, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(ForkJoinPool.getCommonPoolParallelism()));
List<CompletableFuture<Boolean>> allFutures = new ArrayList<>();
long expirationTime = System.currentTimeMillis() + timeUnit.toMillis(timeout);
AtomicInteger completions = new AtomicInteger();
tasks.forEach(callable -> {
CompletableFuture<Boolean> f = CompletableFuture.supplyAsync(() -> {
if (System.currentTimeMillis() > expirationTime) {
System.out.println("Task scheduled after expiration time: " + callable);
return false;
}
Future<V> future = executorService.submit(callable);
long delay = Math.abs(expirationTime - System.currentTimeMillis());
cancellationService.schedule(() -> future.cancel(true), delay, TimeUnit.MILLISECONDS);
try {
V result = future.get();
if (consumer != null) {
consumer.accept(result);
}
completions.incrementAndGet();
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Task interrupt: " + e);
} catch (CancellationException e) {
System.out.println("Task cancellation: " + e);
} catch (ExecutionException e) {
System.out.println("Task fail" + e);
}
return false;
});
allFutures.add(f);
});
try {
CompletableFuture.allOf(allFutures.toArray(new CompletableFuture[] {})).join();
} finally {
executorService.shutdown();
cancellationService.shutdown();
}
return completions.get();
}
public static Callable<Boolean> fab(int locality) {
PGSimpleDataSource ds = new PGSimpleDataSource();
ds.setApplicationName("pg-connect-test");
// ds.setUrl(args[0]);
ds.setUrl("jdbc:postgresql://localhost:26257/defaultdb?sslmode=disable");
ds.setUser("root");
// ds.setPassword("fabio");
String stmt = "select * from t where locality = (?);";
try (Connection connection = ds.getConnection()) {
connection.setAutoCommit(true);
PreparedStatement pstmt = connection.prepareStatement(stmt);
pstmt.setInt(0, locality);
pstmt.execute();
ResultSet rs = pstmt.getResultSet();
ResultSetMetaData rsmeta = rs.getMetaData();
int colCount = rsmeta.getColumnCount();
} catch (SQLException e) {
System.out.printf("ERROR: { state => %s, cause => %s, message => %s }\n",
e.getSQLState(), e.getCause(), e.getMessage());
}
return null;
}
public static void app() {
List<Callable<Boolean>> tasks = Collections.synchronizedList(new ArrayList<>());
tasks.add(fab(0));
tasks.add(fab(1));
tasks.add(fab(2));
int completions = runConcurrentlyAndWait(tasks, 10, TimeUnit.SECONDS);
}
public static void main(String[] args) {
app();
}
}
// public static void main(String[] args) {
//
// PGSimpleDataSource ds = new PGSimpleDataSource();
// ds.setApplicationName("pg-connect-test");
//// ds.setUrl(args[0]);
// ds.setUrl("jdbc:postgresql://localhost:26257/defaultdb?sslmode=disable");
// ds.setUser("root");
//// ds.setPassword("fabio");
// String stmt = "insert into t values (1), (2);";
//
// try (Connection connection = ds.getConnection()) {
// connection.setAutoCommit(true);
// PreparedStatement pstmt = connection.prepareStatement(stmt);
//// long start_time = System.currentTimeMillis();
//
//// Thread.sleep(5000);
// pstmt.execute();
//// connection.commit();
//// long duration = System.currentTimeMillis() - start_time;
//// System.out.println(duration);
//// ResultSet rs = pstmt.getResultSet();
//// rs.next();
//// System.out.println(rs.getString(0));
//
// } catch (SQLException e) {
// System.out.printf("ERROR: { state => %s, cause => %s, message => %s }\n",
// e.getSQLState(), e.getCause(), e.getMessage());
// }
// }
//}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment