Created
March 14, 2017 11:31
-
-
Save unix-junkie/4fb67786a9e58ab308b04239840c50d6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example; | |
import static java.lang.Thread.currentThread; | |
import static java.lang.Thread.interrupted; | |
import static java.lang.Thread.sleep; | |
import static java.util.concurrent.Executors.newSingleThreadExecutor; | |
import static java.util.concurrent.TimeUnit.MILLISECONDS; | |
import static org.hamcrest.CoreMatchers.instanceOf; | |
import static org.hamcrest.CoreMatchers.is; | |
import static org.hamcrest.CoreMatchers.nullValue; | |
import static org.hamcrest.MatcherAssert.assertThat; | |
import static org.testng.Assert.fail; | |
import java.io.IOException; | |
import java.util.concurrent.ArrayBlockingQueue; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.FutureTask; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.RejectedExecutionException; | |
import java.util.concurrent.ThreadPoolExecutor; | |
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import org.testng.annotations.Test; | |
public final class ProblemAwareExecutorTest { | |
@Test | |
@SuppressWarnings("static-method") | |
public void testNormalExecution() throws ExecutionException, InterruptedException { | |
final ExecutorService executorService = newSingleThreadExecutor(); | |
try { | |
final AtomicBoolean runnableExecuted = new AtomicBoolean(); | |
executorService.submit(() -> runnableExecuted.set(true)).get(); | |
final AtomicBoolean runnableWithResultExecuted = new AtomicBoolean(); | |
executorService.submit(() -> runnableWithResultExecuted.set(true), (Void) null); | |
final AtomicBoolean callableExecuted = new AtomicBoolean(); | |
executorService.submit((Callable<Void>) () -> { | |
callableExecuted.set(true); | |
return null; | |
}).get(); | |
final AtomicBoolean runnableFutureExecuted = new AtomicBoolean(); | |
executorService.submit(new FutureTask<>((Callable<Void>) () -> { | |
runnableFutureExecuted.set(true); | |
return null; | |
})).get(); | |
assertThat(runnableExecuted.get(), is(true)); | |
assertThat(runnableWithResultExecuted.get(), is(true)); | |
assertThat(callableExecuted.get(), is(true)); | |
assertThat(runnableFutureExecuted.get(), is(true)); | |
} finally { | |
executorService.shutdown(); | |
executorService.awaitTermination(Long.MAX_VALUE, MILLISECONDS); | |
} | |
} | |
@Test | |
@SuppressWarnings("static-method") | |
public void testRunnableFailed() throws InterruptedException { | |
final String message = "Error message"; | |
final ExecutorService executorService = newSingleThreadExecutor(); | |
try { | |
final Future<?> future = executorService.submit((Runnable) () -> { | |
throw new UnsupportedOperationException(message); | |
}); | |
try { | |
future.get(); | |
fail("Future.get() should have thrown an ExecutionException"); | |
} catch (final ExecutionException ee) { | |
final Throwable cause = ee.getCause(); | |
assertThat(cause, is(instanceOf(UnsupportedOperationException.class))); | |
assertThat(cause.getMessage(), is(message)); | |
} | |
} finally { | |
executorService.shutdown(); | |
executorService.awaitTermination(Long.MAX_VALUE, MILLISECONDS); | |
} | |
} | |
@Test | |
@SuppressWarnings("static-method") | |
public void testRunnableWithResultFailed() throws InterruptedException { | |
final String message = "Error message"; | |
final ExecutorService executorService = newSingleThreadExecutor(); | |
try { | |
final Future<Void> future = executorService.submit(() -> { | |
throw new UnsupportedOperationException(message); | |
}, null); | |
try { | |
future.get(); | |
fail("Future.get() should have thrown an ExecutionException"); | |
} catch (final ExecutionException ee) { | |
final Throwable cause = ee.getCause(); | |
assertThat(cause, is(instanceOf(UnsupportedOperationException.class))); | |
assertThat(cause.getMessage(), is(message)); | |
} | |
} finally { | |
executorService.shutdown(); | |
executorService.awaitTermination(Long.MAX_VALUE, MILLISECONDS); | |
} | |
} | |
@Test | |
@SuppressWarnings("static-method") | |
public void testCallableFailed() throws InterruptedException { | |
final String message = "Error message"; | |
final ExecutorService executorService = newSingleThreadExecutor(); | |
try { | |
final Future<Void> future = executorService.submit(() -> { | |
throw new IOException(message); | |
}); | |
try { | |
future.get(); | |
fail("Future.get() should have thrown an ExecutionException"); | |
} catch (final ExecutionException ee) { | |
final Throwable cause = ee.getCause(); | |
assertThat(cause, is(instanceOf(IOException.class))); | |
assertThat(cause.getMessage(), is(message)); | |
} | |
} finally { | |
executorService.shutdown(); | |
executorService.awaitTermination(Long.MAX_VALUE, MILLISECONDS); | |
} | |
} | |
@Test | |
@SuppressWarnings("static-method") | |
public void testRunnableFutureFailed() throws InterruptedException, ExecutionException { | |
final String message = "Error message"; | |
final ExecutorService executorService = newSingleThreadExecutor(); | |
try { | |
final Future<?> future = executorService.submit(new FutureTask<>((Callable<Void>) () -> { | |
throw new IOException(message); | |
})); | |
assertThat(future.get(), is(nullValue())); | |
assertThat(future.isDone(), is(true)); | |
} finally { | |
executorService.shutdown(); | |
executorService.awaitTermination(Long.MAX_VALUE, MILLISECONDS); | |
} | |
} | |
@Test | |
@SuppressWarnings("static-method") | |
public void testRunnableFutureCancelled() throws InterruptedException, ExecutionException { | |
final ExecutorService executorService = newSingleThreadExecutor(); | |
try { | |
final FutureTask<Void> task = new FutureTask<>(() -> { | |
while (!interrupted()) { | |
try { | |
sleep(Long.MAX_VALUE); | |
} catch (@SuppressWarnings("unused") final InterruptedException ignored) { | |
currentThread().interrupt(); | |
} | |
} | |
return null; | |
}); | |
task.cancel(true); | |
final Future<?> future = executorService.submit(task); | |
assertThat(future.get(), is(nullValue())); | |
assertThat(future.isDone(), is(true)); | |
} finally { | |
executorService.shutdown(); | |
executorService.awaitTermination(Long.MAX_VALUE, MILLISECONDS); | |
} | |
} | |
@Test | |
@SuppressWarnings("static-method") | |
public void testRunnableFutureInterrupted() throws InterruptedException, ExecutionException { | |
final ExecutorService executorService = newSingleThreadExecutor(); | |
try { | |
final Future<?> future = executorService.submit(new FutureTask<Void>(() -> { | |
currentThread().interrupt(); | |
sleep(Long.MAX_VALUE); | |
return null; | |
})); | |
assertThat(future.get(), is(nullValue())); | |
assertThat(future.isDone(), is(true)); | |
} finally { | |
executorService.shutdown(); | |
executorService.awaitTermination(Long.MAX_VALUE, MILLISECONDS); | |
} | |
} | |
@Test | |
@SuppressWarnings("static-method") | |
public void testQueueFullSubmit() throws InterruptedException { | |
final ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 0L, MILLISECONDS, | |
new ArrayBlockingQueue<>(1)); | |
executorService.setRejectedExecutionHandler(new AbortPolicy()); | |
try { | |
for (int i = 0; i < 10; i++) { | |
executorService.submit(() -> { | |
while (!interrupted()) { | |
try { | |
sleep(Long.MAX_VALUE); | |
} catch (@SuppressWarnings("unused") final InterruptedException ignored) { | |
currentThread().interrupt(); | |
} | |
} | |
}); | |
} | |
fail("A RejectedExecutionException should have been thrown"); | |
} catch (@SuppressWarnings("unused") final RejectedExecutionException ignored) { | |
assertThat(executorService.isShutdown(), is(false)); | |
} finally { | |
executorService.shutdownNow(); | |
executorService.awaitTermination(Long.MAX_VALUE, MILLISECONDS); | |
} | |
} | |
@Test | |
@SuppressWarnings("static-method") | |
public void testQueueFullExecute() throws InterruptedException { | |
final ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 0L, MILLISECONDS, | |
new ArrayBlockingQueue<>(1)); | |
executorService.setRejectedExecutionHandler(new AbortPolicy()); | |
try { | |
for (int i = 0; i < 10; i++) { | |
executorService.execute(() -> { | |
while (!interrupted()) { | |
try { | |
sleep(Long.MAX_VALUE); | |
} catch (@SuppressWarnings("unused") final InterruptedException ignored) { | |
currentThread().interrupt(); | |
} | |
} | |
}); | |
} | |
fail("A RejectedExecutionException should have been thrown"); | |
} catch (@SuppressWarnings("unused") final RejectedExecutionException ignored) { | |
assertThat(executorService.isShutdown(), is(false)); | |
} finally { | |
executorService.shutdownNow(); | |
executorService.awaitTermination(Long.MAX_VALUE, MILLISECONDS); | |
} | |
} | |
@Test | |
@SuppressWarnings("static-method") | |
public void testQueueFullTpe20sSubmit() throws InterruptedException { | |
final ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 0L, MILLISECONDS, | |
new ArrayBlockingQueue<>(1)); | |
executorService.setRejectedExecutionHandler(new AbortPolicy()); | |
try { | |
for (int i = 0; i < 10; i++) { | |
executorService.submit(() -> { | |
try { | |
sleep(20_000L); | |
} catch (@SuppressWarnings("unused") final InterruptedException ignored) { | |
currentThread().interrupt(); | |
} | |
}); | |
} | |
fail("A RejectedExecutionException should have been thrown"); | |
} catch (@SuppressWarnings("unused") final RejectedExecutionException ignored) { | |
assertThat(executorService.isShutdown(), is(false)); | |
} finally { | |
executorService.shutdownNow(); | |
executorService.awaitTermination(Long.MAX_VALUE, MILLISECONDS); | |
} | |
} | |
@Test | |
@SuppressWarnings("static-method") | |
public void testQueueFullTpe20sExecute() throws InterruptedException { | |
final ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 0L, MILLISECONDS, | |
new ArrayBlockingQueue<>(1)); | |
executorService.setRejectedExecutionHandler(new AbortPolicy()); | |
try { | |
for (int i = 0; i < 10; i++) { | |
executorService.execute(() -> { | |
try { | |
sleep(20_000L); | |
} catch (@SuppressWarnings("unused") final InterruptedException ignored) { | |
currentThread().interrupt(); | |
} | |
}); | |
} | |
fail("A RejectedExecutionException should have been thrown"); | |
} catch (@SuppressWarnings("unused") final RejectedExecutionException ignored) { | |
assertThat(executorService.isShutdown(), is(false)); | |
} finally { | |
executorService.shutdownNow(); | |
executorService.awaitTermination(Long.MAX_VALUE, MILLISECONDS); | |
} | |
} | |
@Test | |
@SuppressWarnings("static-method") | |
public void testSubmitAfterShutdown() throws InterruptedException { | |
final ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 0L, MILLISECONDS, | |
new LinkedBlockingQueue<>()); | |
executorService.setRejectedExecutionHandler(new AbortPolicy()); | |
executorService.shutdown(); | |
assertThat(executorService.isShutdown(), is(true)); | |
try { | |
for (int i = 0; i < 10; i++) { | |
executorService.submit(() -> { | |
// empty | |
}); | |
} | |
fail("A RejectedExecutionException should have been thrown"); | |
} catch (@SuppressWarnings("unused") final RejectedExecutionException ignored) { | |
assertThat(executorService.isShutdown(), is(true)); | |
} finally { | |
executorService.awaitTermination(Long.MAX_VALUE, MILLISECONDS); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment