Skip to content

Instantly share code, notes, and snippets.

@ruudud
Created June 18, 2016 21:13
Show Gist options
  • Save ruudud/b1b11459981d7615f6efd86562b60429 to your computer and use it in GitHub Desktop.
Save ruudud/b1b11459981d7615f6efd86562b60429 to your computer and use it in GitHub Desktop.
Working with streams, futures, threads and executor service in Java
package com.github.ruudud.exectest;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class ExecTest {
public static void main(String[] args) throws InterruptedException {
final int threads = 4;
final ExecutorService executorService = new ThreadPoolExecutor(
threads, threads, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new ThreadFactoryBuilder().setNameFormat("foo-%d").build());
final String logTemplate = "%s method, Actual time spent: %d ms, Total time slept in threads: %d ms";
final List<String> objects = ImmutableList.of("1-Foo", "2-Bar", "3-Baz", "4-Bif", "5-Bof", "6-Baf");
final Stopwatch stopwatch = Stopwatch.createStarted();
final Long totalSleepFirst = submitAndGet(executorService, objects);
System.out.println(String.format(logTemplate, "Submit get", stopwatch.elapsed(TimeUnit.MILLISECONDS), totalSleepFirst));
stopwatch.reset().start();
final List<Callable<Long>> callables = objects.stream()
.map(DoAThing::new)
.collect(Collectors.toList());
final Long totalSleepSnd = invokeAllCallables(executorService, callables);
System.out.println(String.format(logTemplate, "Callables invokeAll", stopwatch.elapsed(TimeUnit.MILLISECONDS), totalSleepSnd));
}
private static Long invokeAllCallables(ExecutorService executorService, List<Callable<Long>> callables) throws InterruptedException {
return executorService.invokeAll(callables).stream()
.map(f -> {
try {
return f.get(8L, TimeUnit.SECONDS);
} catch (Exception e) {
System.out.println("Timed out after 8 seconds.");
return 1L;
}
}).collect(Collectors.summingLong(Long::longValue));
}
private static Long submitAndGet(ExecutorService executorService, List<String> customers) {
return customers.stream()
.map(c -> executorService.submit(() -> {
final long elapse = (long) (250 + Math.floor(5000 * Math.random()));
System.out.println(c + ": Doing in " + elapse + "ms...");
Thread.sleep(elapse);
System.out.println(c + ": Done sleeping for " + elapse + "ms...");
return elapse;
}))
.map(f -> {
try {
return f.get(8L, TimeUnit.SECONDS);
} catch (Exception e) {
System.out.println("Timed out after 8 seconds.");
return 1L;
}
}).collect(Collectors.summingLong(Long::longValue));
}
private static class DoAThing implements Callable<Long> {
private final String name;
public DoAThing(final String name) {
this.name = name;
}
@Override
public Long call() throws Exception {
final long elapse = (long) (250 + Math.floor(5000 * Math.random()));
System.out.println(this.name + ": Doing in " + elapse + "ms...");
Thread.sleep(elapse);
System.out.println(this.name + ": Done sleeping for " + elapse + "ms...");
return elapse;
}
}
}
@ruudud
Copy link
Author

ruudud commented Jun 18, 2016

Example run:

1-Foo: Doing in 3543ms...
1-Foo: Done sleeping for 3543ms...
2-Bar: Doing in 3854ms...
2-Bar: Done sleeping for 3854ms...
3-Baz: Doing in 2456ms...
3-Baz: Done sleeping for 2456ms...
4-Bif: Doing in 4664ms...
4-Bif: Done sleeping for 4664ms...
5-Bof: Doing in 406ms...
5-Bof: Done sleeping for 406ms...
6-Baf: Doing in 2099ms...
6-Baf: Done sleeping for 2099ms...
Submit get method, Actual time spent: 17120 ms, Total time slept in threads: 17022 ms
2-Bar: Doing in 3213ms...
1-Foo: Doing in 1132ms...
4-Bif: Doing in 4838ms...
3-Baz: Doing in 4398ms...
1-Foo: Done sleeping for 1132ms...
5-Bof: Doing in 3783ms...
2-Bar: Done sleeping for 3213ms...
6-Baf: Doing in 2497ms...
3-Baz: Done sleeping for 4398ms...
4-Bif: Done sleeping for 4838ms...
5-Bof: Done sleeping for 3783ms...
6-Baf: Done sleeping for 2497ms...
Callables invokeAll method, Actual time spent: 5719 ms, Total time slept in threads: 19861 ms

@theawless
Copy link

theawless commented Jan 6, 2022

I ended up here while searching for why my stream + executor service submit method was running so slow.
The fix was to introduce a stream terminal operation to ensure all operations are submitted first and then their futures are getted.

    private static Long submitAndGet(ExecutorService executorService, List<String> customers) {
        return customers.stream()
                        .map(c -> executorService.submit(() -> {
                            final long elapse = (long) (250 + Math.floor(5000 * Math.random()));
                            System.out.println(c + ": Doing in " + elapse + "ms...");
                            Thread.sleep(elapse);
                            System.out.println(c + ": Done sleeping for " + elapse + "ms...");
                            return elapse;
                        }))
                        .collect(Collectors.toList()).stream() // !@# FIX RIGHT HERE #@!
                        .map(f -> {
                            try {
                                return f.get(8L, TimeUnit.SECONDS);
                            } catch (Exception e) {
                                System.out.println("Timed out after 8 seconds.");
                                return 1L;
                            }
                        }).mapToLong(Long::longValue).sum();
    }
3-Baz: Doing in 4665ms...
1-Foo: Doing in 1515ms...
4-Bif: Doing in 1958ms...
2-Bar: Doing in 1757ms...
1-Foo: Done sleeping for 1515ms...
5-Bof: Doing in 3705ms...
2-Bar: Done sleeping for 1757ms...
6-Baf: Doing in 2199ms...
4-Bif: Done sleeping for 1958ms...
6-Baf: Done sleeping for 2199ms...
3-Baz: Done sleeping for 4665ms...
5-Bof: Done sleeping for 3705ms...
Submit get method, Actual time spent: 5294 ms, Total time slept in threads: 15799 ms
1-Foo: Doing in 4849ms...
2-Bar: Doing in 4052ms...
4-Bif: Doing in 1690ms...
3-Baz: Doing in 4650ms...
4-Bif: Done sleeping for 1690ms...
5-Bof: Doing in 311ms...
5-Bof: Done sleeping for 311ms...
6-Baf: Doing in 1726ms...
6-Baf: Done sleeping for 1726ms...
2-Bar: Done sleeping for 4052ms...
3-Baz: Done sleeping for 4650ms...
1-Foo: Done sleeping for 4849ms...
Callables invokeAll method, Actual time spent: 4852 ms, Total time slept in threads: 17278 ms

More information on the execution order of streams:
https://stackoverflow.com/questions/29915591/java-8-stream-operations-execution-order

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment