Skip to content

Instantly share code, notes, and snippets.

@innerverse
Created July 28, 2017 16:20
Show Gist options
  • Save innerverse/fe8a8c7c49ae74bcf94a4e03d802e768 to your computer and use it in GitHub Desktop.
Save innerverse/fe8a8c7c49ae74bcf94a4e03d802e768 to your computer and use it in GitHub Desktop.
package com.innerverse.completionq;
import com.google.common.util.concurrent.Striped;
import java.util.concurrent.*;
import java.util.function.Supplier;
public class StripedExecutor {
protected final Striped<Semaphore> stripes;
private final ExecutorService executor;
public StripedExecutor(int stripes, int threadsPerStripe, int maxThreads) {
this.stripes = Striped.lazyWeakSemaphore(stripes, threadsPerStripe);
this.executor = Executors.newWorkStealingPool(maxThreads);
}
public <T> CompletableFuture<T> supplyAsync(final Object stripe, final Supplier<T> supplier) {
final Supplier<T> stripedSupplier = new StripedSupplier<>(stripe, supplier);
return CompletableFuture.supplyAsync(stripedSupplier, this.executor);
}
private class StripedSupplier<T> implements Supplier<T> {
private final Object stripe;
private final Supplier<T> supplier;
StripedSupplier(final Object stripe, Supplier<T> supplier) {
this.stripe = stripe;
this.supplier = supplier;
}
volatile T result;
@Override
public T get() {
try {
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
@Override
public boolean block() {
final Semaphore stripeLock = stripes.get(stripe);
try {
stripeLock.acquireUninterruptibly();
result = supplier.get();
return true;
} finally {
stripeLock.release();
}
}
@Override
public boolean isReleasable() {
return result != null;
}
});
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
return result;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment