Skip to content

Instantly share code, notes, and snippets.

@athlan
Created July 11, 2019 14:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save athlan/b357b718a28a9ea83973ed9ff25eb77d to your computer and use it in GitHub Desktop.
Save athlan/b357b718a28a9ea83973ed9ff25eb77d to your computer and use it in GitHub Desktop.
FlyweightConcurrentSupplier Java
package pl.athlan.common.concurrent;
import static java.util.Objects.requireNonNull;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
/**
* Thread-safe Supplier implementation of accessing decorated {@link Supplier}
* in a way that only one thread can access it at the time and value obtained from decorated Supplier
* is shared across waiting threads.
*
* <p>If no threads are waiting after execution, shared value is not stored.
*
* <p>Use case: When computation of value is memory consuming and result between obtaining it
* within t0 and t+1 does not matter (it can be cached/shared). Crucial is to free up a memory after execution.
*
* @param <T> the type of results supplied by this supplier
*/
public final class FlyweightConcurrentSupplier<T> implements Supplier<T> {
private final Supplier<T> delegate;
private final ReadWriteLock rwl = new ReentrantReadWriteLock();
private final AtomicInteger queueLength = new AtomicInteger();
private T sharedValue;
private volatile boolean sharedValueAvailable;
public FlyweightConcurrentSupplier(Supplier<T> delegate) {
this.delegate = requireNonNull(delegate, "delegate cannot be null");
}
@Override
public T get() {
queueLength.getAndIncrement();
rwl.readLock().lock();
if (!sharedValueAvailable) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!sharedValueAvailable) {
sharedValue = delegate.get();
sharedValueAvailable = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
try {
T result = sharedValue;
// No thread is waiting for shared value, so can be cleaned up.
if (queueLength.decrementAndGet() == 0) {
rwl.readLock().unlock();
rwl.writeLock().lock();
// Recheck if no thread joined at the meantime
// and would potentially get null.
if (queueLength.compareAndSet(0, 0)) {
sharedValue = null;
sharedValueAvailable = false;
}
rwl.readLock().lock();
rwl.writeLock().unlock();
}
return result;
} finally {
rwl.readLock().unlock();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment