Skip to content

Instantly share code, notes, and snippets.

@dotero-87
Last active July 20, 2021 17:24
Show Gist options
  • Save dotero-87/a89245e27cee3078dbc0322712276ae1 to your computer and use it in GitHub Desktop.
Save dotero-87/a89245e27cee3078dbc0322712276ae1 to your computer and use it in GitHub Desktop.
package com.mercadolibre.fbm.wms.routes.engine.service.dispatcher;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.time.LocalTime;
import java.time.temporal.ChronoUnit;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
import java.util.stream.Stream;
import static java.lang.String.format;
import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.ForkJoinPool.commonPool;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static java.util.stream.IntStream.range;
import static org.junit.jupiter.params.provider.Arguments.arguments;
public class ConcurrencyTest {
private static final int NUMBER_OF_TASKS = 100;
private static final int TASK_PROCESSING_TIME_IN_MILLIS = 200;
private static final int THREAD_POOL_SIZE = 10;
private final Set<Integer> numbers = range(0, NUMBER_OF_TASKS).boxed().collect(toSet());
@DisplayName("stream()")
@Test
public void testSequentialStream() {
// Given
final Runnable runnable = () ->
numbers.stream().forEach(this::longRunningTask);
// When
final ElapsedTime elapsedTime = ElapsedTime.of(runnable);
// Then
System.out.println("stream() " + elapsedTime.get());
}
@DisplayName("forEach() using executor")
@MethodSource("executorProvider")
@ParameterizedTest(name = "{0}")
public void testForEachWithExecutor(final String message, final ExecutorService executor) {
// Given
// Aunque usa CompletableFuture.runAsync, en realidad NO ES CONCURRENTE
final Runnable runnable = () ->
numbers.stream()
.map(i -> runAsync(() -> longRunningTask(i), executor))
.forEach(CompletableFuture::join);
// When
final ElapsedTime elapsedTime = ElapsedTime.of(runnable);
// Then
System.out.println(message + " " + elapsedTime.get());
}
@DisplayName("CompletableFuture using executor")
@MethodSource("executorProvider")
@ParameterizedTest(name = "{0}")
public void testFutureWithExecutor(final String message, final ExecutorService executor) {
// Given
final Runnable runnable = () ->
numbers.stream()
.map(i -> runAsync(() -> longRunningTask(i), executor))
.collect(toList())
.stream()
.forEach(CompletableFuture::join);
// When
final ElapsedTime elapsedTime = ElapsedTime.of(runnable);
// Then
System.out.println(message + " " + elapsedTime.get());
}
@DisplayName("parallelStream() without executor")
@Test
public void testParallelStreamWithoutExecutor() {
// Given
final Runnable runnable = () ->
numbers.parallelStream().forEach(ConcurrencyTest.this::longRunningTask);
// When
final ElapsedTime elapsedTime = ElapsedTime.of(runnable);
// Then
System.out.println("parallelStream() " + elapsedTime.get());
}
@DisplayName("parallelStream() using executor")
@MethodSource("executorProvider")
@ParameterizedTest(name = "{0}")
public void testParallelStreamWithExecutor(final String message, final ExecutorService executor) {
// Given
final Runnable runnable = () -> {
try {
executor.submit(() ->
numbers.parallelStream().forEach(ConcurrencyTest.this::longRunningTask)
).get();
} catch (Exception e) {
e.printStackTrace();
}
};
// When
final ElapsedTime elapsedTime = ElapsedTime.of(runnable);
// Then
System.out.println(message + " " + elapsedTime.get());
}
private void longRunningTask(final int number) {
try {
System.out.println(format("%s - Processing %d", Thread.currentThread(), number));
Thread.sleep(TASK_PROCESSING_TIME_IN_MILLIS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private interface ElapsedTime extends Supplier<Long> {
static ElapsedTime of(final Runnable runnable) {
return () -> {
final LocalTime start = LocalTime.now();
runnable.run();
final LocalTime finish = LocalTime.now();
return ChronoUnit.MILLIS.between(start, finish);
};
}
}
private static Stream<Arguments> executorProvider() {
return Stream.of(
arguments("commonThreadPool", commonPool()),
arguments("customThreadPool", newFixedThreadPool(THREAD_POOL_SIZE)),
arguments("forkJoinThreadPool", new ForkJoinPool(THREAD_POOL_SIZE))
);
}
}
@dotero-87
Copy link
Author

Resultados:
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment