Skip to content

Instantly share code, notes, and snippets.

@stanio
Last active August 21, 2023 08:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stanio/a8c2114f5e39fd77173d7267e9a85ff0 to your computer and use it in GitHub Desktop.
Save stanio/a8c2114f5e39fd77173d7267e9a85ff0 to your computer and use it in GitHub Desktop.
Utility for working with Locks (replacing synchronized blocks)
/*
* This module, both source code and documentation,
* is in the Public Domain, and comes with NO WARRANTY.
*/
package net.example.concurrent;
import static java.util.Objects.requireNonNull;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Encapsulates a {@code Lock} and provides convenience methods for using it.
* <pre>
* <code> Lock lock;
* ...
* lock.lock();
* try {
* // ...
* } finally {
* lock.unlock();
* }</code></pre>
* <p>
* The above:</p>
* <pre>
* <code> Sync sync = Sync.of(lock);
* ...
* sync.withLock(() -> {
* // ...
* });</code></pre>
*
* @see Lock
*/
public class Sync<L extends Lock> {
private final L lock;
Sync(L lock) {
this.lock = lock;
}
/**
* {@return a new <code>Sync</code> instance backed up by the given lock}
*
* @param <L> the type of lock
* @param lock the lock backing the {@code Sync} instance
*/
public static <L extends Lock> Sync<L> of(L lock) {
return new Sync<>(requireNonNull(lock, "null lock"));
}
/**
* {@return a new <code>Sync</code> instance backed up by a
* <code>ReentrantLock</code>}
* <p>
* <i>Example:</i></p>
* <pre>
* <code> private final Sync&lt;ReentranLock&gt; sync = Sync.reentrant();
*
* sync.withLock(() -> {
* // ...
* });
*
* assert !sync.getLock().isHeldByCurrentThread();</code></pre>
*/
public static Sync<ReentrantLock> reentrant() {
return new Sync<>(new ReentrantLock());
}
/**
* {@return the underlying <code>Lock</code>}
*/
public L getLock() {
return lock;
}
/**
* {@return a new <code>Condition</code> instance for the underlying
* <code>Lock</code> instance}
*
* @throws UnsupportedOperationException if the underlying {@code Lock}
* implementation does not support conditions
* @see Lock#newCondition()
*/
public Condition newCondition() {
return lock.newCondition();
}
/**
* Executes the given task in a lock-obtained block. The lock is finally
* released regardless of the runnable outcome.
*
* @param <E> the base type of exceptions thrown by he given runnable
* (normally resolved by the Java compiler)
* @param runnable a task to execute while the lock is held
* @throws E if the given runnable throws exception
* @see Lock#lock()
* @see #resultWithLock(ThrowingSupplier)
*/
public <E extends Throwable>
void withLock(ThrowingRunnable<E> runnable) throws E {
Lock l = getLock();
l.lock();
try {
runnable.run();
} finally {
l.unlock();
}
}
/**
* <i>withLockInterruptibly</i>
*
* @param <E> the base type of exceptions thrown by he given runnable
* (normally resolved by the Java compiler)
* @param runnable a task to execute while the lock is held
* @throws InterruptedException if the current thread is interrupted
* while acquiring the lock (and interruption of lock acquisition
* is supported)
* @throws E if the given runnable throws exception
* @see Lock#lockInterruptibly()
* @see #resultWithLockInterruptibly(ThrowingSupplier)
*/
public <E extends Throwable>
void withLockInterruptibly(ThrowingRunnable<E> runnable)
throws InterruptedException, E {
Lock l = getLock();
l.lockInterruptibly();
try {
runnable.run();
} finally {
l.unlock();
}
}
/**
* <i>tryLock</i>
*
* @param <E> the base type of exceptions thrown by he given runnable
* (normally resolved by the Java compiler)
* @param runnable a task to execute if the lock is obtained successfully
* @return {@code true} if the lock was acquired and {@code false} otherwise
* @throws E if the given runnable throws exception
* @see Lock#tryLock()
*/
public <E extends Throwable>
boolean tryLock(ThrowingRunnable<E> runnable)
throws E {
Lock l = getLock();
if (!l.tryLock())
return false;
try {
runnable.run();
} finally {
l.unlock();
}
return true;
}
/**
* <i>tryLock</i>
*
* @param <E> the base type of exceptions thrown by he given runnable
* (normally resolved by the Java compiler)
* @param runnable a task to execute if the lock is obtained successfully
* @param time the maximum time to wait for the lock
* @param unit the time unit of the time argument
* @return {@code true} if the lock was acquired and {@code false} if the
* waiting time elapsed before the lock was acquired
* @throws InterruptedException if the current thread is interrupted
* while acquiring the lock (and interruption of lock acquisition
* is supported)
* @throws E if the given runnable throws exception
* @see Lock#tryLock(long, TimeUnit)
*/
public <E extends Throwable>
boolean tryLock(ThrowingRunnable<E> runnable, long time, TimeUnit unit)
throws InterruptedException, E {
Lock l = getLock();
if (!l.tryLock(time, unit))
return false;
try {
runnable.run();
} finally {
l.unlock();
}
return true;
}
/**
* Computes a result with a lock-synchronized block.
* <pre>
* <code> Object value = sync.resultWithLock(() -> {
* // ...
* return ...;
* });</code></pre>
*
* @param <T> the type of the result
* @param <E> the base type of exceptions thrown by the given supplier
* (normally resolved by the Java compiler)
* @param supplier function computing the result
* @return a result of given type
* @throws E if computing exception occurs
* @see Lock#lock()
* @see #resultWithLockInterruptibly(ThrowingSupplier)
*/
public <T, E extends Throwable>
T resultWithLock(ThrowingSupplier<T, E> supplier) throws E {
Lock l = getLock();
l.lock();
try {
return supplier.get();
} finally {
l.unlock();
}
}
/**
* <i>resultWithLockInterruptibly</i>
*
* @param <T> the type of the result
* @param <E> the base type of exceptions thrown by the given supplier
* (normally resolved by the Java compiler)
* @param supplier function computing the result
* @return a result of given type
* @throws E if computing exception occurs
* @throws InterruptedException if the current thread is interrupted
* while acquiring the lock (and interruption of lock acquisition
* is supported)
* @see Lock#lockInterruptibly()
* @see #resultWithLock(ThrowingSupplier)
*/
public <T, E extends Throwable>
T resultWithLockInterruptibly(ThrowingSupplier<T, E> supplier)
throws E, InterruptedException {
Lock l = getLock();
l.lockInterruptibly();
try {
return supplier.get();
} finally {
l.unlock();
}
}
/**
* A runnable that may throw checked exception.
*
* @param <E> the base type of exceptions thrown by this runnable
* @see Runnable
*/
@FunctionalInterface
public static interface ThrowingRunnable<E extends Throwable> {
void run() throws E;
}
/**
* Supplier of results that may throw checked exception.
*
* @param <T> the type of results supplied by this supplier
* @param <E> the base type of exceptions thrown by this supplier
* @see java.util.function.Supplier
*/
@FunctionalInterface
public static interface ThrowingSupplier<T, E extends Throwable> {
T get() throws E;
}
}
/*
* This module, both source code and documentation,
* is in the Public Domain, and comes with NO WARRANTY.
*/
package net.example.concurrent;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.stream.LongStream;
public class SyncTest {
private static final int COUNT = 10_000;
private final Sync<? extends Lock> sync = Sync.reentrant();
private long sum;
public static void main(String[] args) throws Exception {
SyncTest test = new SyncTest();
try (ParallelExecutor executor = test.newExecutor();
LongStream range = LongStream.range(1, COUNT)) {
//range.parallel().forEach(test::addWithLock);
range.sequential().forEach(executor::add);
}
long expected = LongStream.range(1, COUNT).sum();
// REVISIT: Ensure volatile test.sum here.
if (test.sum != expected) {
throw new AssertionError( "Sum\n"
+ "expected: " + expected
+ "\n but got: " + test.sum);
}
System.out.println("Ok");
}
void addWithLock(long n) {
sync.withLock(() -> add(n));
}
void add(long n) {
sum += n;
}
private ParallelExecutor newExecutor() {
return new ParallelExecutor();
}
class ParallelExecutor implements AutoCloseable {
private final ThreadPoolExecutor executor =
new ThreadPoolExecutor(10, 1_000, 1, TimeUnit.MINUTES,
new SynchronousQueue<Runnable>());
void add(long n) {
executor.execute(() -> SyncTest.this.addWithLock(n));
}
@Override
public void close() throws Exception {
System.out.append("Max execution threads: ")
.println(executor.getLargestPoolSize());
executor.shutdown();
if (!executor.awaitTermination(15, TimeUnit.SECONDS)) {
throw new IllegalStateException("Timed out after 1 minute");
}
}
} // class ParallelExecutor
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment