Created
April 5, 2023 13:30
-
-
Save fabiog1901/73facfcb3d38f43fed2ea378976ba99d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.postgresql.ds.PGSimpleDataSource; | |
import java.sql.*; | |
import java.lang.Thread; | |
import java.util.Collections; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.*; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.function.Consumer; | |
public class BasicExample { | |
public static <V> int runConcurrentlyAndWait(List<Callable<V>> tasks, long timeout, TimeUnit timeUnit) { | |
return runConcurrentlyAndWait(tasks, timeout, timeUnit, null); | |
} | |
public static <V> int runConcurrentlyAndWait(List<Callable<V>> tasks, long timeout, TimeUnit timeUnit, | |
Consumer<V> consumer) { | |
ScheduledExecutorService cancellationService = Executors.newSingleThreadScheduledExecutor(); | |
ExecutorService executorService = new ThreadPoolExecutor(ForkJoinPool.getCommonPoolParallelism(), | |
Integer.MAX_VALUE, 0, TimeUnit.MILLISECONDS, | |
new LinkedBlockingDeque<>(ForkJoinPool.getCommonPoolParallelism())); | |
List<CompletableFuture<Boolean>> allFutures = new ArrayList<>(); | |
long expirationTime = System.currentTimeMillis() + timeUnit.toMillis(timeout); | |
AtomicInteger completions = new AtomicInteger(); | |
tasks.forEach(callable -> { | |
CompletableFuture<Boolean> f = CompletableFuture.supplyAsync(() -> { | |
if (System.currentTimeMillis() > expirationTime) { | |
System.out.println("Task scheduled after expiration time: " + callable); | |
return false; | |
} | |
Future<V> future = executorService.submit(callable); | |
long delay = Math.abs(expirationTime - System.currentTimeMillis()); | |
cancellationService.schedule(() -> future.cancel(true), delay, TimeUnit.MILLISECONDS); | |
try { | |
V result = future.get(); | |
if (consumer != null) { | |
consumer.accept(result); | |
} | |
completions.incrementAndGet(); | |
return true; | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
System.out.println("Task interrupt: " + e); | |
} catch (CancellationException e) { | |
System.out.println("Task cancellation: " + e); | |
} catch (ExecutionException e) { | |
System.out.println("Task fail" + e); | |
} | |
return false; | |
}); | |
allFutures.add(f); | |
}); | |
try { | |
CompletableFuture.allOf(allFutures.toArray(new CompletableFuture[] {})).join(); | |
} finally { | |
executorService.shutdown(); | |
cancellationService.shutdown(); | |
} | |
return completions.get(); | |
} | |
public static Callable<Boolean> fab(int locality) { | |
PGSimpleDataSource ds = new PGSimpleDataSource(); | |
ds.setApplicationName("pg-connect-test"); | |
// ds.setUrl(args[0]); | |
ds.setUrl("jdbc:postgresql://localhost:26257/defaultdb?sslmode=disable"); | |
ds.setUser("root"); | |
// ds.setPassword("fabio"); | |
String stmt = "select * from t where locality = (?);"; | |
try (Connection connection = ds.getConnection()) { | |
connection.setAutoCommit(true); | |
PreparedStatement pstmt = connection.prepareStatement(stmt); | |
pstmt.setInt(0, locality); | |
pstmt.execute(); | |
ResultSet rs = pstmt.getResultSet(); | |
ResultSetMetaData rsmeta = rs.getMetaData(); | |
int colCount = rsmeta.getColumnCount(); | |
} catch (SQLException e) { | |
System.out.printf("ERROR: { state => %s, cause => %s, message => %s }\n", | |
e.getSQLState(), e.getCause(), e.getMessage()); | |
} | |
return null; | |
} | |
public static void app() { | |
List<Callable<Boolean>> tasks = Collections.synchronizedList(new ArrayList<>()); | |
tasks.add(fab(0)); | |
tasks.add(fab(1)); | |
tasks.add(fab(2)); | |
int completions = runConcurrentlyAndWait(tasks, 10, TimeUnit.SECONDS); | |
} | |
public static void main(String[] args) { | |
app(); | |
} | |
} | |
// public static void main(String[] args) { | |
// | |
// PGSimpleDataSource ds = new PGSimpleDataSource(); | |
// ds.setApplicationName("pg-connect-test"); | |
//// ds.setUrl(args[0]); | |
// ds.setUrl("jdbc:postgresql://localhost:26257/defaultdb?sslmode=disable"); | |
// ds.setUser("root"); | |
//// ds.setPassword("fabio"); | |
// String stmt = "insert into t values (1), (2);"; | |
// | |
// try (Connection connection = ds.getConnection()) { | |
// connection.setAutoCommit(true); | |
// PreparedStatement pstmt = connection.prepareStatement(stmt); | |
//// long start_time = System.currentTimeMillis(); | |
// | |
//// Thread.sleep(5000); | |
// pstmt.execute(); | |
//// connection.commit(); | |
//// long duration = System.currentTimeMillis() - start_time; | |
//// System.out.println(duration); | |
//// ResultSet rs = pstmt.getResultSet(); | |
//// rs.next(); | |
//// System.out.println(rs.getString(0)); | |
// | |
// } catch (SQLException e) { | |
// System.out.printf("ERROR: { state => %s, cause => %s, message => %s }\n", | |
// e.getSQLState(), e.getCause(), e.getMessage()); | |
// } | |
// } | |
//} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment