Skip to content

Instantly share code, notes, and snippets.

@unix-junkie
Created March 14, 2017 11:31
Show Gist options
  • Save unix-junkie/4fb67786a9e58ab308b04239840c50d6 to your computer and use it in GitHub Desktop.
Save unix-junkie/4fb67786a9e58ab308b04239840c50d6 to your computer and use it in GitHub Desktop.
package com.example;
import static java.lang.Thread.currentThread;
import static java.lang.Thread.interrupted;
import static java.lang.Thread.sleep;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.testng.Assert.fail;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
import java.util.concurrent.atomic.AtomicBoolean;
import org.testng.annotations.Test;
public final class ProblemAwareExecutorTest {
@Test
@SuppressWarnings("static-method")
public void testNormalExecution() throws ExecutionException, InterruptedException {
final ExecutorService executorService = newSingleThreadExecutor();
try {
final AtomicBoolean runnableExecuted = new AtomicBoolean();
executorService.submit(() -> runnableExecuted.set(true)).get();
final AtomicBoolean runnableWithResultExecuted = new AtomicBoolean();
executorService.submit(() -> runnableWithResultExecuted.set(true), (Void) null);
final AtomicBoolean callableExecuted = new AtomicBoolean();
executorService.submit((Callable<Void>) () -> {
callableExecuted.set(true);
return null;
}).get();
final AtomicBoolean runnableFutureExecuted = new AtomicBoolean();
executorService.submit(new FutureTask<>((Callable<Void>) () -> {
runnableFutureExecuted.set(true);
return null;
})).get();
assertThat(runnableExecuted.get(), is(true));
assertThat(runnableWithResultExecuted.get(), is(true));
assertThat(callableExecuted.get(), is(true));
assertThat(runnableFutureExecuted.get(), is(true));
} finally {
executorService.shutdown();
executorService.awaitTermination(Long.MAX_VALUE, MILLISECONDS);
}
}
@Test
@SuppressWarnings("static-method")
public void testRunnableFailed() throws InterruptedException {
final String message = "Error message";
final ExecutorService executorService = newSingleThreadExecutor();
try {
final Future<?> future = executorService.submit((Runnable) () -> {
throw new UnsupportedOperationException(message);
});
try {
future.get();
fail("Future.get() should have thrown an ExecutionException");
} catch (final ExecutionException ee) {
final Throwable cause = ee.getCause();
assertThat(cause, is(instanceOf(UnsupportedOperationException.class)));
assertThat(cause.getMessage(), is(message));
}
} finally {
executorService.shutdown();
executorService.awaitTermination(Long.MAX_VALUE, MILLISECONDS);
}
}
@Test
@SuppressWarnings("static-method")
public void testRunnableWithResultFailed() throws InterruptedException {
final String message = "Error message";
final ExecutorService executorService = newSingleThreadExecutor();
try {
final Future<Void> future = executorService.submit(() -> {
throw new UnsupportedOperationException(message);
}, null);
try {
future.get();
fail("Future.get() should have thrown an ExecutionException");
} catch (final ExecutionException ee) {
final Throwable cause = ee.getCause();
assertThat(cause, is(instanceOf(UnsupportedOperationException.class)));
assertThat(cause.getMessage(), is(message));
}
} finally {
executorService.shutdown();
executorService.awaitTermination(Long.MAX_VALUE, MILLISECONDS);
}
}
@Test
@SuppressWarnings("static-method")
public void testCallableFailed() throws InterruptedException {
final String message = "Error message";
final ExecutorService executorService = newSingleThreadExecutor();
try {
final Future<Void> future = executorService.submit(() -> {
throw new IOException(message);
});
try {
future.get();
fail("Future.get() should have thrown an ExecutionException");
} catch (final ExecutionException ee) {
final Throwable cause = ee.getCause();
assertThat(cause, is(instanceOf(IOException.class)));
assertThat(cause.getMessage(), is(message));
}
} finally {
executorService.shutdown();
executorService.awaitTermination(Long.MAX_VALUE, MILLISECONDS);
}
}
@Test
@SuppressWarnings("static-method")
public void testRunnableFutureFailed() throws InterruptedException, ExecutionException {
final String message = "Error message";
final ExecutorService executorService = newSingleThreadExecutor();
try {
final Future<?> future = executorService.submit(new FutureTask<>((Callable<Void>) () -> {
throw new IOException(message);
}));
assertThat(future.get(), is(nullValue()));
assertThat(future.isDone(), is(true));
} finally {
executorService.shutdown();
executorService.awaitTermination(Long.MAX_VALUE, MILLISECONDS);
}
}
@Test
@SuppressWarnings("static-method")
public void testRunnableFutureCancelled() throws InterruptedException, ExecutionException {
final ExecutorService executorService = newSingleThreadExecutor();
try {
final FutureTask<Void> task = new FutureTask<>(() -> {
while (!interrupted()) {
try {
sleep(Long.MAX_VALUE);
} catch (@SuppressWarnings("unused") final InterruptedException ignored) {
currentThread().interrupt();
}
}
return null;
});
task.cancel(true);
final Future<?> future = executorService.submit(task);
assertThat(future.get(), is(nullValue()));
assertThat(future.isDone(), is(true));
} finally {
executorService.shutdown();
executorService.awaitTermination(Long.MAX_VALUE, MILLISECONDS);
}
}
@Test
@SuppressWarnings("static-method")
public void testRunnableFutureInterrupted() throws InterruptedException, ExecutionException {
final ExecutorService executorService = newSingleThreadExecutor();
try {
final Future<?> future = executorService.submit(new FutureTask<Void>(() -> {
currentThread().interrupt();
sleep(Long.MAX_VALUE);
return null;
}));
assertThat(future.get(), is(nullValue()));
assertThat(future.isDone(), is(true));
} finally {
executorService.shutdown();
executorService.awaitTermination(Long.MAX_VALUE, MILLISECONDS);
}
}
@Test
@SuppressWarnings("static-method")
public void testQueueFullSubmit() throws InterruptedException {
final ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 0L, MILLISECONDS,
new ArrayBlockingQueue<>(1));
executorService.setRejectedExecutionHandler(new AbortPolicy());
try {
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
while (!interrupted()) {
try {
sleep(Long.MAX_VALUE);
} catch (@SuppressWarnings("unused") final InterruptedException ignored) {
currentThread().interrupt();
}
}
});
}
fail("A RejectedExecutionException should have been thrown");
} catch (@SuppressWarnings("unused") final RejectedExecutionException ignored) {
assertThat(executorService.isShutdown(), is(false));
} finally {
executorService.shutdownNow();
executorService.awaitTermination(Long.MAX_VALUE, MILLISECONDS);
}
}
@Test
@SuppressWarnings("static-method")
public void testQueueFullExecute() throws InterruptedException {
final ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 0L, MILLISECONDS,
new ArrayBlockingQueue<>(1));
executorService.setRejectedExecutionHandler(new AbortPolicy());
try {
for (int i = 0; i < 10; i++) {
executorService.execute(() -> {
while (!interrupted()) {
try {
sleep(Long.MAX_VALUE);
} catch (@SuppressWarnings("unused") final InterruptedException ignored) {
currentThread().interrupt();
}
}
});
}
fail("A RejectedExecutionException should have been thrown");
} catch (@SuppressWarnings("unused") final RejectedExecutionException ignored) {
assertThat(executorService.isShutdown(), is(false));
} finally {
executorService.shutdownNow();
executorService.awaitTermination(Long.MAX_VALUE, MILLISECONDS);
}
}
@Test
@SuppressWarnings("static-method")
public void testQueueFullTpe20sSubmit() throws InterruptedException {
final ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 0L, MILLISECONDS,
new ArrayBlockingQueue<>(1));
executorService.setRejectedExecutionHandler(new AbortPolicy());
try {
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
try {
sleep(20_000L);
} catch (@SuppressWarnings("unused") final InterruptedException ignored) {
currentThread().interrupt();
}
});
}
fail("A RejectedExecutionException should have been thrown");
} catch (@SuppressWarnings("unused") final RejectedExecutionException ignored) {
assertThat(executorService.isShutdown(), is(false));
} finally {
executorService.shutdownNow();
executorService.awaitTermination(Long.MAX_VALUE, MILLISECONDS);
}
}
@Test
@SuppressWarnings("static-method")
public void testQueueFullTpe20sExecute() throws InterruptedException {
final ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 0L, MILLISECONDS,
new ArrayBlockingQueue<>(1));
executorService.setRejectedExecutionHandler(new AbortPolicy());
try {
for (int i = 0; i < 10; i++) {
executorService.execute(() -> {
try {
sleep(20_000L);
} catch (@SuppressWarnings("unused") final InterruptedException ignored) {
currentThread().interrupt();
}
});
}
fail("A RejectedExecutionException should have been thrown");
} catch (@SuppressWarnings("unused") final RejectedExecutionException ignored) {
assertThat(executorService.isShutdown(), is(false));
} finally {
executorService.shutdownNow();
executorService.awaitTermination(Long.MAX_VALUE, MILLISECONDS);
}
}
@Test
@SuppressWarnings("static-method")
public void testSubmitAfterShutdown() throws InterruptedException {
final ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 0L, MILLISECONDS,
new LinkedBlockingQueue<>());
executorService.setRejectedExecutionHandler(new AbortPolicy());
executorService.shutdown();
assertThat(executorService.isShutdown(), is(true));
try {
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
// empty
});
}
fail("A RejectedExecutionException should have been thrown");
} catch (@SuppressWarnings("unused") final RejectedExecutionException ignored) {
assertThat(executorService.isShutdown(), is(true));
} finally {
executorService.awaitTermination(Long.MAX_VALUE, MILLISECONDS);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment