Skip to content

Instantly share code, notes, and snippets.

@hencjo
Created August 16, 2018 09:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hencjo/699b406d8ebeaaf4aa247eea626aa7f0 to your computer and use it in GitHub Desktop.
Save hencjo/699b406d8ebeaaf4aa247eea626aa7f0 to your computer and use it in GitHub Desktop.
Key-based Debouncer for Java
package debouncer;
import java.util.HashMap;
import java.util.concurrent.locks.ReentrantLock;
/**
* Utility class guarding a task to be execute by a single thread synchronized on some key.
*
* Other threads can try to schedule execution of task while the task is executing on the original thread.
* The first thread to do so will block and subsequently get unblocked to execute the task again as soon
* as the original thread has finished the task. If even more threads try to execute the task for the
* same key we will return immediately (the first waiting thread will make sure the task is executed anew).
*
* @param <A> The type of the key to synchronize on.
*/
public class Debouncer<A> {
// package private for test
static final class Locks {
public final ReentrantLock running = new ReentrantLock();
public final ReentrantLock waiting = new ReentrantLock();
}
private final HashMap<A, Locks> state = new HashMap<>();
private final DebouncedTask<A> dt;
private Debouncer(DebouncedTask<A> dt) {
this.dt = dt;
}
/**
* Trigger execution of the task for key <code>a</code>.
*
* If the task is already executing by some other thread when the calling thread will either:
* <ul>
* <li>Enter wait state, i.e. block and wait until the original has finished and then execute the task again on the calling thread.</li>
* <li>Return immediately if some other thread is already in the wait state.</li>
* </ul>
*
* @param a The key to synchronize on, will be passed as parameter to the task on execution.
* @throws Exception if execution of the task throws an exception.
*/
public void debounce(A a) throws Exception {
Locks locks;
synchronized (state) {
locks = state.computeIfAbsent(a, __ -> new Locks());
}
debounce(a, locks, dt);
synchronized (state) {
if (locks.running.tryLock()) {
state.remove(a);
locks.running.unlock();
}
}
}
// package private for test
static <A> void debounce(A a, Locks s, DebouncedTask dt) throws Exception {
if (!s.waiting.tryLock()) return;
try {
try {
s.running.lock();
} finally {
s.waiting.unlock();
}
dt.run(a);
} finally {
s.running.unlock();
}
}
/**
* Create a new instance.
*
* @param <A> The type of the key to synchronize on.
* @param da Task to execute when the key is "debounced". Should be idempotent.
* @return the created instance.
*/
public static <A> Debouncer<A> debouncer(DebouncedTask<A> da) {
return new Debouncer(da);
}
@FunctionalInterface
public interface DebouncedTask<A> {
void run(A a) throws Exception;
}
}
package debouncer;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class DebouncerTest {
@Test
public void testNeverMoreThan_1_simultaneousExcecution() throws InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(3);
AtomicInteger ai = new AtomicInteger(0);
Debouncer.Locks s = new Debouncer.Locks();
for (int i = 0; i < 100; i++) {
es.submit(() -> {
try {
Debouncer.debounce("", s, a -> {
int i1 = ai.incrementAndGet();
Thread.sleep(10);
assertEquals(1, i1);
ai.decrementAndGet();
});
} catch (Exception e) {
e.printStackTrace();
}
});
}
es.shutdown();
if (!es.awaitTermination(10, TimeUnit.SECONDS)) {
System.err.println("awaitTermination timed out.");
}
}
@Test
public void burts_and_executions() throws InterruptedException {
// with bursts of 0-2, # run tasks should equal # submitted tasks.
// with bursts of > 2, # run tasks < # submitted tasks.
assertEquals(100, burstSubroutine(1, 100));
assertEquals(200, burstSubroutine(2, 100));
assertTrue(burstSubroutine(3, 100) < 300);
assertTrue(burstSubroutine(4, 100) < 400);
assertTrue(burstSubroutine(100, 100) < 10000);
}
private int burstSubroutine(int burstSize, int bursts) throws InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(3);
AtomicInteger ai = new AtomicInteger(0);
Debouncer.Locks s = new Debouncer.Locks();
for (int i = 0; i < bursts; i++) {
CountDownLatch firstTaskIsRunning = new CountDownLatch(1);
CountDownLatch allTasksInBurstSubmitted = new CountDownLatch(1);
CountDownLatch allTasksInBurstFinished = new CountDownLatch(burstSize);
for (int j = 0; j < burstSize; j++) {
es.execute(() -> {
try {
Debouncer.debounce("", s, a -> {
firstTaskIsRunning.countDown();
ai.incrementAndGet();
//Thread.sleep(10);
allTasksInBurstSubmitted.await();
});
} catch (Exception e) {
e.printStackTrace();
} finally {
allTasksInBurstFinished.countDown();
}
});
firstTaskIsRunning.await();
}
allTasksInBurstSubmitted.countDown();
allTasksInBurstFinished.await();
}
es.shutdown();
if (!es.awaitTermination(10, TimeUnit.SECONDS)) {
System.err.println("awaitTermination timed out.");
}
return ai.get();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment