Created
August 16, 2018 09:11
-
-
Save hencjo/699b406d8ebeaaf4aa247eea626aa7f0 to your computer and use it in GitHub Desktop.
Key-based Debouncer for Java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package debouncer; | |
import java.util.HashMap; | |
import java.util.concurrent.locks.ReentrantLock; | |
/** | |
* Utility class guarding a task to be execute by a single thread synchronized on some key. | |
* | |
* Other threads can try to schedule execution of task while the task is executing on the original thread. | |
* The first thread to do so will block and subsequently get unblocked to execute the task again as soon | |
* as the original thread has finished the task. If even more threads try to execute the task for the | |
* same key we will return immediately (the first waiting thread will make sure the task is executed anew). | |
* | |
* @param <A> The type of the key to synchronize on. | |
*/ | |
public class Debouncer<A> { | |
// package private for test | |
static final class Locks { | |
public final ReentrantLock running = new ReentrantLock(); | |
public final ReentrantLock waiting = new ReentrantLock(); | |
} | |
private final HashMap<A, Locks> state = new HashMap<>(); | |
private final DebouncedTask<A> dt; | |
private Debouncer(DebouncedTask<A> dt) { | |
this.dt = dt; | |
} | |
/** | |
* Trigger execution of the task for key <code>a</code>. | |
* | |
* If the task is already executing by some other thread when the calling thread will either: | |
* <ul> | |
* <li>Enter wait state, i.e. block and wait until the original has finished and then execute the task again on the calling thread.</li> | |
* <li>Return immediately if some other thread is already in the wait state.</li> | |
* </ul> | |
* | |
* @param a The key to synchronize on, will be passed as parameter to the task on execution. | |
* @throws Exception if execution of the task throws an exception. | |
*/ | |
public void debounce(A a) throws Exception { | |
Locks locks; | |
synchronized (state) { | |
locks = state.computeIfAbsent(a, __ -> new Locks()); | |
} | |
debounce(a, locks, dt); | |
synchronized (state) { | |
if (locks.running.tryLock()) { | |
state.remove(a); | |
locks.running.unlock(); | |
} | |
} | |
} | |
// package private for test | |
static <A> void debounce(A a, Locks s, DebouncedTask dt) throws Exception { | |
if (!s.waiting.tryLock()) return; | |
try { | |
try { | |
s.running.lock(); | |
} finally { | |
s.waiting.unlock(); | |
} | |
dt.run(a); | |
} finally { | |
s.running.unlock(); | |
} | |
} | |
/** | |
* Create a new instance. | |
* | |
* @param <A> The type of the key to synchronize on. | |
* @param da Task to execute when the key is "debounced". Should be idempotent. | |
* @return the created instance. | |
*/ | |
public static <A> Debouncer<A> debouncer(DebouncedTask<A> da) { | |
return new Debouncer(da); | |
} | |
@FunctionalInterface | |
public interface DebouncedTask<A> { | |
void run(A a) throws Exception; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package debouncer; | |
import org.junit.Test; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import static org.junit.Assert.assertEquals; | |
import static org.junit.Assert.assertTrue; | |
public class DebouncerTest { | |
@Test | |
public void testNeverMoreThan_1_simultaneousExcecution() throws InterruptedException { | |
ExecutorService es = Executors.newFixedThreadPool(3); | |
AtomicInteger ai = new AtomicInteger(0); | |
Debouncer.Locks s = new Debouncer.Locks(); | |
for (int i = 0; i < 100; i++) { | |
es.submit(() -> { | |
try { | |
Debouncer.debounce("", s, a -> { | |
int i1 = ai.incrementAndGet(); | |
Thread.sleep(10); | |
assertEquals(1, i1); | |
ai.decrementAndGet(); | |
}); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
}); | |
} | |
es.shutdown(); | |
if (!es.awaitTermination(10, TimeUnit.SECONDS)) { | |
System.err.println("awaitTermination timed out."); | |
} | |
} | |
@Test | |
public void burts_and_executions() throws InterruptedException { | |
// with bursts of 0-2, # run tasks should equal # submitted tasks. | |
// with bursts of > 2, # run tasks < # submitted tasks. | |
assertEquals(100, burstSubroutine(1, 100)); | |
assertEquals(200, burstSubroutine(2, 100)); | |
assertTrue(burstSubroutine(3, 100) < 300); | |
assertTrue(burstSubroutine(4, 100) < 400); | |
assertTrue(burstSubroutine(100, 100) < 10000); | |
} | |
private int burstSubroutine(int burstSize, int bursts) throws InterruptedException { | |
ExecutorService es = Executors.newFixedThreadPool(3); | |
AtomicInteger ai = new AtomicInteger(0); | |
Debouncer.Locks s = new Debouncer.Locks(); | |
for (int i = 0; i < bursts; i++) { | |
CountDownLatch firstTaskIsRunning = new CountDownLatch(1); | |
CountDownLatch allTasksInBurstSubmitted = new CountDownLatch(1); | |
CountDownLatch allTasksInBurstFinished = new CountDownLatch(burstSize); | |
for (int j = 0; j < burstSize; j++) { | |
es.execute(() -> { | |
try { | |
Debouncer.debounce("", s, a -> { | |
firstTaskIsRunning.countDown(); | |
ai.incrementAndGet(); | |
//Thread.sleep(10); | |
allTasksInBurstSubmitted.await(); | |
}); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} finally { | |
allTasksInBurstFinished.countDown(); | |
} | |
}); | |
firstTaskIsRunning.await(); | |
} | |
allTasksInBurstSubmitted.countDown(); | |
allTasksInBurstFinished.await(); | |
} | |
es.shutdown(); | |
if (!es.awaitTermination(10, TimeUnit.SECONDS)) { | |
System.err.println("awaitTermination timed out."); | |
} | |
return ai.get(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment