public
Last active

Sample Code for Guava Monitor Blog

  • Download Gist
MonitorExample.java
Java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
import com.google.common.util.concurrent.Monitor;
 
import java.util.concurrent.atomic.AtomicInteger;
 
/**
* Created by IntelliJ IDEA.
* User: bbejeck
* Date: 11/11/11
* Time: 10:01 PM
*/
public class MonitorExample {
 
private final Monitor monitor = new Monitor();
private volatile boolean condition = true;
private int taskDoneCounter;
private AtomicInteger taskSkippedCounter = new AtomicInteger(0);
private int stopTaskCount;
 
private Monitor.Guard conditionGuard = new Monitor.Guard(monitor) {
@Override
public boolean isSatisfied() {
return condition;
}
};
 
public void demoTryEnterIf() throws InterruptedException {
if (monitor.tryEnterIf(conditionGuard)) {
try {
simulatedWork();
taskDoneCounter++;
} finally {
monitor.leave();
}
} else {
taskSkippedCounter.incrementAndGet();
}
}
 
public void demoEnterIf() throws InterruptedException {
if (monitor.enterIf(conditionGuard)) {
try {
taskDoneCounter++;
if (taskDoneCounter == stopTaskCount) {
condition = false;
}
} finally {
monitor.leave();
}
} else {
taskSkippedCounter.incrementAndGet();
}
 
}
 
public void demoEnterWhen() throws InterruptedException {
monitor.enterWhen(conditionGuard);
try {
taskDoneCounter++;
if (taskDoneCounter == stopTaskCount) {
condition = false;
}
} finally {
monitor.leave();
}
}
 
private void simulatedWork() throws InterruptedException{
Thread.sleep(250);
}
 
public void reEvaluateGuardCondition() {
monitor.reevaluateGuards();
}
 
public int getStopTaskCount() {
return stopTaskCount;
}
 
public void setStopTaskCount(int stopTaskCount) {
this.stopTaskCount = stopTaskCount;
}
 
public void setCondition(boolean condition) {
this.condition = condition;
}
 
public int getTaskSkippedCounter() {
return taskSkippedCounter.get();
}
 
public int getTaskDoneCounter() {
return taskDoneCounter;
}
}
MonitorExampleTest.java
Java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
 
import java.lang.reflect.Method;
import java.util.concurrent.*;
 
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
 
/**
* Created by IntelliJ IDEA.
* User: bbejeck
* Date: 11/11/11
* Time: 11:48 PM
*/
public class MonitorExampleTest {
 
private MonitorExample monitorExample;
private ExecutorService executorService;
private int numberThreads = 10;
private CountDownLatch startSignal;
private CountDownLatch doneSignal;
 
@Before
public void setUp() throws Exception {
monitorExample = new MonitorExample();
executorService = Executors.newFixedThreadPool(numberThreads);
startSignal = new CountDownLatch(1);
doneSignal = new CountDownLatch(numberThreads);
}
 
@After
public void tearDown() {
executorService.shutdownNow();
}
 
/*
* First thread does some simulated work and the following
* 9 threads will move on.
*/
@Test
public void testDemoTryEnterIf() throws Exception {
setUpThreadsForTestingMethod("demoTryEnterIf");
startAllThreadsForTest();
waitForTestThreadsToFinish();
int expectedTaskCount = 1;
int expectedSkippedTasks = 9;
assertThat(monitorExample.getTaskDoneCounter(), is(expectedTaskCount));
assertThat(monitorExample.getTaskSkippedCounter(), is(expectedSkippedTasks));
}
 
/*
The first 5 threads will wait for the monitor because
the guard condition is true, but once it turns false the
rest of the threads drop off
*/
@Test
public void testDemoEnterIfOnlyFiveTasksComplete() throws Exception {
monitorExample.setStopTaskCount(5);
setUpThreadsForTestingMethod("demoEnterIf");
 
startAllThreadsForTest();
waitForTestThreadsToFinish();
int expectedTaskCount = 5;
int expectedSkippedTasks = 5;
 
assertThat(monitorExample.getTaskDoneCounter(), is(expectedTaskCount));
assertThat(monitorExample.getTaskSkippedCounter(), is(expectedSkippedTasks));
 
}
 
/*
All 10 threads enter the monitor as the guard condition
remains true the entire time.
*/
@Test
public void testDemoEnterIfAllTasksComplete() throws Exception {
monitorExample.setStopTaskCount(Integer.MAX_VALUE);
setUpThreadsForTestingMethod("demoEnterIf");
 
startAllThreadsForTest();
waitForTestThreadsToFinish();
int expectedTaskCount = 10;
int expectedSkippedTasks = 0;
 
assertThat(monitorExample.getTaskDoneCounter(), is(expectedTaskCount));
assertThat(monitorExample.getTaskSkippedCounter(), is(expectedSkippedTasks));
 
}
 
/*
Guard condition is initially false, but all 10 threads
enter the monitor.
*/
@Test
public void testDemoEnterWhen() throws Exception {
monitorExample.setStopTaskCount(Integer.MAX_VALUE);
monitorExample.setCondition(false);
setUpThreadsForTestingMethod("demoEnterWhen");
startAllThreadsForTest();
int expectedCompletedCount = 0;
int completedCount = monitorExample.getTaskDoneCounter();
assertThat(completedCount, is(expectedCompletedCount));
 
monitorExample.setCondition(true);
 
waitForTestThreadsToFinish();
expectedCompletedCount = 10;
completedCount = monitorExample.getTaskDoneCounter();
assertThat(completedCount, is(expectedCompletedCount));
}
/*
Artificially setting the guard to false after 3 threads complete to demonstrate that
the remaining 7 threads will wait until the guard condition returns true again and will
enter the monitor.
*/
@Test
public void testDemoEnterWhenAllTasksCompleteEvenWhenConditionChanges() throws Exception {
monitorExample.setCondition(true);
monitorExample.setStopTaskCount(3);
setUpThreadsForTestingMethod("demoEnterWhen");
startAllThreadsForTest();
 
//verifying that only 3 threads have initially worked, re-set the guard to true
FutureTask<Integer> checkInitialTasksCompleted = new FutureTask<Integer>(
new Callable<Integer>() {
public Integer call() {
int initialCompletedTasks = monitorExample.getTaskDoneCounter();
monitorExample.setCondition(true);
monitorExample.reEvaluateGuardCondition();
return initialCompletedTasks;
 
}
});
 
new Thread(checkInitialTasksCompleted).start();
 
int expectedCompletedCount = 3;
int completedCount = checkInitialTasksCompleted.get();
assertThat(completedCount, is(expectedCompletedCount));
 
waitForTestThreadsToFinish();
assertThat(completedCount, is(expectedCompletedCount));
expectedCompletedCount = 10;
completedCount = monitorExample.getTaskDoneCounter();
assertThat(completedCount, is(expectedCompletedCount));
}
 
private void waitForTestThreadsToFinish() throws InterruptedException {
doneSignal.await(1000l, TimeUnit.MILLISECONDS);
}
 
private void startAllThreadsForTest() {
startSignal.countDown();
}
 
private Method getMethodUnderTest(String methodName) throws Exception {
return monitorExample.getClass().getDeclaredMethod(methodName);
}
 
 
private void setUpThreadsForTestingMethod(String methodName) throws Exception {
final Method testMethod = getMethodUnderTest(methodName);
for (int i = 0; i < numberThreads; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
startSignal.await();
testMethod.invoke(monitorExample);
} catch (Exception e) {
//Don't care
} finally {
doneSignal.countDown();
}
}
});
}
}
 
}

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.