Created
June 18, 2016 21:13
-
-
Save ruudud/b1b11459981d7615f6efd86562b60429 to your computer and use it in GitHub Desktop.
Working with streams, futures, threads and executor service in Java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.github.ruudud.exectest; | |
import com.google.common.base.Stopwatch; | |
import com.google.common.collect.ImmutableList; | |
import com.google.common.util.concurrent.ThreadFactoryBuilder; | |
import java.util.List; | |
import java.util.concurrent.*; | |
import java.util.stream.Collectors; | |
public class ExecTest { | |
public static void main(String[] args) throws InterruptedException { | |
final int threads = 4; | |
final ExecutorService executorService = new ThreadPoolExecutor( | |
threads, threads, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), | |
new ThreadFactoryBuilder().setNameFormat("foo-%d").build()); | |
final String logTemplate = "%s method, Actual time spent: %d ms, Total time slept in threads: %d ms"; | |
final List<String> objects = ImmutableList.of("1-Foo", "2-Bar", "3-Baz", "4-Bif", "5-Bof", "6-Baf"); | |
final Stopwatch stopwatch = Stopwatch.createStarted(); | |
final Long totalSleepFirst = submitAndGet(executorService, objects); | |
System.out.println(String.format(logTemplate, "Submit get", stopwatch.elapsed(TimeUnit.MILLISECONDS), totalSleepFirst)); | |
stopwatch.reset().start(); | |
final List<Callable<Long>> callables = objects.stream() | |
.map(DoAThing::new) | |
.collect(Collectors.toList()); | |
final Long totalSleepSnd = invokeAllCallables(executorService, callables); | |
System.out.println(String.format(logTemplate, "Callables invokeAll", stopwatch.elapsed(TimeUnit.MILLISECONDS), totalSleepSnd)); | |
} | |
private static Long invokeAllCallables(ExecutorService executorService, List<Callable<Long>> callables) throws InterruptedException { | |
return executorService.invokeAll(callables).stream() | |
.map(f -> { | |
try { | |
return f.get(8L, TimeUnit.SECONDS); | |
} catch (Exception e) { | |
System.out.println("Timed out after 8 seconds."); | |
return 1L; | |
} | |
}).collect(Collectors.summingLong(Long::longValue)); | |
} | |
private static Long submitAndGet(ExecutorService executorService, List<String> customers) { | |
return customers.stream() | |
.map(c -> executorService.submit(() -> { | |
final long elapse = (long) (250 + Math.floor(5000 * Math.random())); | |
System.out.println(c + ": Doing in " + elapse + "ms..."); | |
Thread.sleep(elapse); | |
System.out.println(c + ": Done sleeping for " + elapse + "ms..."); | |
return elapse; | |
})) | |
.map(f -> { | |
try { | |
return f.get(8L, TimeUnit.SECONDS); | |
} catch (Exception e) { | |
System.out.println("Timed out after 8 seconds."); | |
return 1L; | |
} | |
}).collect(Collectors.summingLong(Long::longValue)); | |
} | |
private static class DoAThing implements Callable<Long> { | |
private final String name; | |
public DoAThing(final String name) { | |
this.name = name; | |
} | |
@Override | |
public Long call() throws Exception { | |
final long elapse = (long) (250 + Math.floor(5000 * Math.random())); | |
System.out.println(this.name + ": Doing in " + elapse + "ms..."); | |
Thread.sleep(elapse); | |
System.out.println(this.name + ": Done sleeping for " + elapse + "ms..."); | |
return elapse; | |
} | |
} | |
} |
I ended up here while searching for why my stream + executor service submit method was running so slow.
The fix was to introduce a stream terminal operation to ensure all operations are submit
ted first and then their futures are get
ted.
private static Long submitAndGet(ExecutorService executorService, List<String> customers) {
return customers.stream()
.map(c -> executorService.submit(() -> {
final long elapse = (long) (250 + Math.floor(5000 * Math.random()));
System.out.println(c + ": Doing in " + elapse + "ms...");
Thread.sleep(elapse);
System.out.println(c + ": Done sleeping for " + elapse + "ms...");
return elapse;
}))
.collect(Collectors.toList()).stream() // !@# FIX RIGHT HERE #@!
.map(f -> {
try {
return f.get(8L, TimeUnit.SECONDS);
} catch (Exception e) {
System.out.println("Timed out after 8 seconds.");
return 1L;
}
}).mapToLong(Long::longValue).sum();
}
3-Baz: Doing in 4665ms...
1-Foo: Doing in 1515ms...
4-Bif: Doing in 1958ms...
2-Bar: Doing in 1757ms...
1-Foo: Done sleeping for 1515ms...
5-Bof: Doing in 3705ms...
2-Bar: Done sleeping for 1757ms...
6-Baf: Doing in 2199ms...
4-Bif: Done sleeping for 1958ms...
6-Baf: Done sleeping for 2199ms...
3-Baz: Done sleeping for 4665ms...
5-Bof: Done sleeping for 3705ms...
Submit get method, Actual time spent: 5294 ms, Total time slept in threads: 15799 ms
1-Foo: Doing in 4849ms...
2-Bar: Doing in 4052ms...
4-Bif: Doing in 1690ms...
3-Baz: Doing in 4650ms...
4-Bif: Done sleeping for 1690ms...
5-Bof: Doing in 311ms...
5-Bof: Done sleeping for 311ms...
6-Baf: Doing in 1726ms...
6-Baf: Done sleeping for 1726ms...
2-Bar: Done sleeping for 4052ms...
3-Baz: Done sleeping for 4650ms...
1-Foo: Done sleeping for 4849ms...
Callables invokeAll method, Actual time spent: 4852 ms, Total time slept in threads: 17278 ms
More information on the execution order of streams:
https://stackoverflow.com/questions/29915591/java-8-stream-operations-execution-order
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Example run: