Skip to content

Instantly share code, notes, and snippets.

@simonharrer
Last active May 18, 2022 14:18
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save simonharrer/d80565c414861657f6616fea7c042be7 to your computer and use it in GitHub Desktop.
Save simonharrer/d80565c414861657f6616fea7c042be7 to your computer and use it in GitHub Desktop.
Java Concurrency Best Practices in Code
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
class ConcurrencyHelper {
public static void main(String[] args) {
// THIS IS JUST FOR TESTING PURPOSES, DO NOT EXECUTE THIS CODE AS IS
Future<String> future = null;
guardedWaitUntilReturn(future::get);
final String result = guardedWaitUntilResult(future::get);
Thread thread = null;
guardedWaitUntilReturn(thread::join);
BlockingQueue<String> blockingQueue = null;
guardedWaitUntilReturn(() -> blockingQueue.put("element"));
final String element = guardedWaitUntilResult(blockingQueue::take);
Semaphore semaphore = new Semaphore(1, true);
// first improvement
guardedWaitUntilReturn(semaphore::acquire);
// but even better:
try (Resource resource = Resources.open(semaphore)) {
// critical section
} catch (Exception e) {
// handle it
}
ExecutorService executorService = null;
guardedWaitUntil(executorService::isTerminated,
() -> executorService.awaitTermination(100, TimeUnit.MILLISECONDS));
}
@ThreadSafe
private static class Queue<E> {
private final Object lock = new Object();
@GuardedBy(value = "lock")
private E entry;
public E take() {
synchronized (lock) {
guardedWaitWhile(this::isEmpty, lock::wait);
lock.notifyAll();
E result = this.entry;
this.entry = null;
return result;
}
}
private boolean isEmpty() {
return this.entry == null;
}
private boolean isFull() {
return !isEmpty();
}
public void put(E entry) {
synchronized (lock) {
guardedWaitWhile(this::isFull, lock::wait);
this.entry = entry;
lock.notifyAll();
}
}
}
public static interface WaitRunnable {
void waitUntil() throws InterruptedException, Exception;
}
/**
* Ensures that it always waits until the wait has finished correctly.
* <p>
* Use this for typical guarded waits, e.g., <code>semaphore.acquire()</code>,
* <code>blockingQueue.put(elem)</code>, <code>thread.join()</code>, ...
* <p>
* Example:
* <code>
* waitUntil(thread::join);
* </code>
*
* @param runnable the runnable which contains a wait of which it can be woken up through an {@link InterruptedException}
*/
public static void guardedWaitUntilReturn(WaitRunnable runnable) {
while (true) {
try {
runnable.waitUntil();
break;
} catch (InterruptedException ignored) {
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
public static void guardedWaitUntil(Supplier<Boolean> guard, WaitRunnable runnable) {
while (!guard.get()) {
try {
runnable.waitUntil();
} catch (InterruptedException ignored) {
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
public static void guardedWaitWhile(Supplier<Boolean> guard, WaitRunnable runnable) {
while (guard.get()) {
try {
runnable.waitUntil();
} catch (InterruptedException ignored) {
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
public static interface WaitCallable<T> {
T waitUntilResult() throws Exception;
}
/**
* USe this for handling guarded waits like <code>blockingQueue.take()</code> or <code>future.get()</code>
* <p>
* Example:
* <code>
* waitForResult(future::get);
* </code>
*
* @param interruptableCallable
* @param <T>
* @return the value the wait will return after it finished successfully.
*/
public static <T> T guardedWaitUntilResult(WaitCallable<T> interruptableCallable) {
while (true) {
try {
return interruptableCallable.waitUntilResult();
} catch (InterruptedException ignored) {
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
/**
* Collection of methods to work with typical {@link Resource} in a convenient way.
* This includes {@link Semaphore}.
*/
public static class Resources {
/**
* Acquires a {@link Semaphore} and provides an {@link AutoCloseable} which releases the semaphore after use.
* <p>
* <code>
* Semaphore semaphore = ...<br />
* try(Resource resource = Resource.open(semaphore)) {<br />
* // after use, resource is released automatically<br />
* }
* </code>
*
* @param semaphore the semaphore to be acquired and released
* @return the {@link AutoCloseable} resource
*/
public static Resource open(Semaphore semaphore) {
return Resource.open(new Resource(
semaphore::acquireUninterruptibly,
semaphore::release
));
}
}
public static class Resource implements AutoCloseable {
private static Resource open(Resource resource) {
resource.openResource.run();
return resource;
}
private final Runnable openResource;
private final Runnable freeResource;
public Resource(Runnable openResource, Runnable freeResource) {
this.openResource = Objects.requireNonNull(openResource);
this.freeResource = Objects.requireNonNull(freeResource);
}
@Override
public void close() throws Exception {
freeResource.run();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment