Created
December 15, 2018 10:33
-
-
Save tlinkowski/dda4786147a9a111184f6cbcd1ee0d6b to your computer and use it in GitHub Desktop.
Problem with parallel stream - it parallelizes per batch not per item
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.time.Duration; | |
import java.time.Instant; | |
import java.util.List; | |
import java.util.stream.Collectors; | |
import java.util.stream.IntStream; | |
import java.util.stream.Stream; | |
/** | |
* @author Tomasz Linkowski | |
*/ | |
public class ParallelStreamProblem { | |
public static void main(String[] args) { | |
new ParallelStreamProblem().processItems(); | |
} | |
private final Instant started = Instant.now(); | |
private void processItems() { | |
Stream.of(Batch.values()) | |
.flatMap(Batch::itemStream) | |
.parallel() | |
.forEach(this::processItem); | |
} | |
private void processItem(String item) { | |
long elapsedSeconds = Duration.between(started, Instant.now()).getSeconds(); | |
System.out.format("[%d] Processing %s on %s%n", elapsedSeconds, item, Thread.currentThread().getName()); | |
try { | |
Thread.sleep(1000); | |
} catch (InterruptedException ex) { | |
throw new RuntimeException(ex); | |
} | |
} | |
private enum Batch { | |
A(3), | |
B(10); | |
private final List<String> items; | |
Batch(int itemCount) { | |
items = IntStream.range(0, itemCount).mapToObj(i -> name() + i).collect(Collectors.toList()); | |
} | |
public Stream<String> itemStream() { | |
return items.stream(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[0] Processing B0 on main | |
[0] Processing A0 on ForkJoinPool.commonPool-worker-1 | |
[1] Processing B1 on main | |
[1] Processing A1 on ForkJoinPool.commonPool-worker-1 | |
[2] Processing B2 on main | |
[2] Processing A2 on ForkJoinPool.commonPool-worker-1 | |
[3] Processing B3 on main | |
[4] Processing B4 on main | |
[5] Processing B5 on main | |
[6] Processing B6 on main | |
[7] Processing B7 on main | |
[8] Processing B8 on main | |
[9] Processing B9 on main |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment