Last active
May 18, 2022 14:18
-
-
Save simonharrer/d80565c414861657f6616fea7c042be7 to your computer and use it in GitHub Desktop.
Java Concurrency Best Practices in Code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Objects; | |
import java.util.Random; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.Semaphore; | |
import java.util.concurrent.TimeUnit; | |
import java.util.function.Supplier; | |
class ConcurrencyHelper { | |
public static void main(String[] args) { | |
// THIS IS JUST FOR TESTING PURPOSES, DO NOT EXECUTE THIS CODE AS IS | |
Future<String> future = null; | |
guardedWaitUntilReturn(future::get); | |
final String result = guardedWaitUntilResult(future::get); | |
Thread thread = null; | |
guardedWaitUntilReturn(thread::join); | |
BlockingQueue<String> blockingQueue = null; | |
guardedWaitUntilReturn(() -> blockingQueue.put("element")); | |
final String element = guardedWaitUntilResult(blockingQueue::take); | |
Semaphore semaphore = new Semaphore(1, true); | |
// first improvement | |
guardedWaitUntilReturn(semaphore::acquire); | |
// but even better: | |
try (Resource resource = Resources.open(semaphore)) { | |
// critical section | |
} catch (Exception e) { | |
// handle it | |
} | |
ExecutorService executorService = null; | |
guardedWaitUntil(executorService::isTerminated, | |
() -> executorService.awaitTermination(100, TimeUnit.MILLISECONDS)); | |
} | |
@ThreadSafe | |
private static class Queue<E> { | |
private final Object lock = new Object(); | |
@GuardedBy(value = "lock") | |
private E entry; | |
public E take() { | |
synchronized (lock) { | |
guardedWaitWhile(this::isEmpty, lock::wait); | |
lock.notifyAll(); | |
E result = this.entry; | |
this.entry = null; | |
return result; | |
} | |
} | |
private boolean isEmpty() { | |
return this.entry == null; | |
} | |
private boolean isFull() { | |
return !isEmpty(); | |
} | |
public void put(E entry) { | |
synchronized (lock) { | |
guardedWaitWhile(this::isFull, lock::wait); | |
this.entry = entry; | |
lock.notifyAll(); | |
} | |
} | |
} | |
public static interface WaitRunnable { | |
void waitUntil() throws InterruptedException, Exception; | |
} | |
/** | |
* Ensures that it always waits until the wait has finished correctly. | |
* <p> | |
* Use this for typical guarded waits, e.g., <code>semaphore.acquire()</code>, | |
* <code>blockingQueue.put(elem)</code>, <code>thread.join()</code>, ... | |
* <p> | |
* Example: | |
* <code> | |
* waitUntil(thread::join); | |
* </code> | |
* | |
* @param runnable the runnable which contains a wait of which it can be woken up through an {@link InterruptedException} | |
*/ | |
public static void guardedWaitUntilReturn(WaitRunnable runnable) { | |
while (true) { | |
try { | |
runnable.waitUntil(); | |
break; | |
} catch (InterruptedException ignored) { | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
} | |
public static void guardedWaitUntil(Supplier<Boolean> guard, WaitRunnable runnable) { | |
while (!guard.get()) { | |
try { | |
runnable.waitUntil(); | |
} catch (InterruptedException ignored) { | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
} | |
public static void guardedWaitWhile(Supplier<Boolean> guard, WaitRunnable runnable) { | |
while (guard.get()) { | |
try { | |
runnable.waitUntil(); | |
} catch (InterruptedException ignored) { | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
} | |
public static interface WaitCallable<T> { | |
T waitUntilResult() throws Exception; | |
} | |
/** | |
* USe this for handling guarded waits like <code>blockingQueue.take()</code> or <code>future.get()</code> | |
* <p> | |
* Example: | |
* <code> | |
* waitForResult(future::get); | |
* </code> | |
* | |
* @param interruptableCallable | |
* @param <T> | |
* @return the value the wait will return after it finished successfully. | |
*/ | |
public static <T> T guardedWaitUntilResult(WaitCallable<T> interruptableCallable) { | |
while (true) { | |
try { | |
return interruptableCallable.waitUntilResult(); | |
} catch (InterruptedException ignored) { | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
} | |
/** | |
* Collection of methods to work with typical {@link Resource} in a convenient way. | |
* This includes {@link Semaphore}. | |
*/ | |
public static class Resources { | |
/** | |
* Acquires a {@link Semaphore} and provides an {@link AutoCloseable} which releases the semaphore after use. | |
* <p> | |
* <code> | |
* Semaphore semaphore = ...<br /> | |
* try(Resource resource = Resource.open(semaphore)) {<br /> | |
* // after use, resource is released automatically<br /> | |
* } | |
* </code> | |
* | |
* @param semaphore the semaphore to be acquired and released | |
* @return the {@link AutoCloseable} resource | |
*/ | |
public static Resource open(Semaphore semaphore) { | |
return Resource.open(new Resource( | |
semaphore::acquireUninterruptibly, | |
semaphore::release | |
)); | |
} | |
} | |
public static class Resource implements AutoCloseable { | |
private static Resource open(Resource resource) { | |
resource.openResource.run(); | |
return resource; | |
} | |
private final Runnable openResource; | |
private final Runnable freeResource; | |
public Resource(Runnable openResource, Runnable freeResource) { | |
this.openResource = Objects.requireNonNull(openResource); | |
this.freeResource = Objects.requireNonNull(freeResource); | |
} | |
@Override | |
public void close() throws Exception { | |
freeResource.run(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment