Skip to content

Instantly share code, notes, and snippets.

@kennycason
Created September 20, 2016 18:25
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kennycason/80084ddb0208d8ea2b0fdce819a69bc2 to your computer and use it in GitHub Desktop.
Save kennycason/80084ddb0208d8ea2b0fdce819a69bc2 to your computer and use it in GitHub Desktop.
Demonstrate Java Parallel streams fall back to main thread in the event that all workers in common fork join pool are saturated.
package com.simplymeasured.gus.controller;
import java.util.Arrays;
/**
* Created by kenny on 9/20/16.
*
* Demonstrate Java Parallel streams fall back to main thread in the event that all workers in common fork join pool
* are saturated.
*/
public class ParallelStreamSaturationTest {
public static void main(final String[] args) {
// start a thread that will execute a parallel stream AFTER the common fork join pool has been saturated.
new Thread(() -> {
try {
System.out.println("Sleeping until ForkJoinPool is saturated, Thread Name: " + Thread.currentThread().getName());
Thread.sleep(3_000L);
} catch (final InterruptedException e) {
e.printStackTrace();
}
System.out.println("Starting another parallel stream, Thread Name: " + Thread.currentThread().getName());
Arrays.asList(1, 2, 3, 4, 5)
.parallelStream()
.forEach(id -> System.out.println("Second stream: " + id + " Thread Name: " + Thread.currentThread().getName()));
}).start();
// saturate common fork join pool
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.parallelStream()
.forEach(ParallelStreamSaturationTest::run);
}
private static void run(final int id) {
if (Thread.currentThread().getName().toLowerCase().contains("fork")) {
try {
System.out.println("Blocking " + Thread.currentThread().getName());
Thread.sleep(10_000_000L);
} catch (final InterruptedException e) {
e.printStackTrace();
}
} else {
System.out.println("Main thread: " + id + " Thread Name: " + Thread.currentThread().getName());
}
}
}
@kennycason
Copy link
Author

kennycason commented Sep 20, 2016

In code, ForkJoinPoo.ForkJoinTask this can also be seen:

    public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }

@kennycason
Copy link
Author

Output of program:

Sleeping until ForkJoinPool is saturated, Thread Name: Thread-0
Blocking ForkJoinPool.commonPool-worker-3
Blocking ForkJoinPool.commonPool-worker-1
Blocking ForkJoinPool.commonPool-worker-2
Main thread: 7 Thread Name: main
Blocking ForkJoinPool.commonPool-worker-6
Main thread: 4 Thread Name: main
Main thread: 10 Thread Name: main
Blocking ForkJoinPool.commonPool-worker-4
Blocking ForkJoinPool.commonPool-worker-5
Blocking ForkJoinPool.commonPool-worker-7
Starting another parallel stream, Thread Name: Thread-0
Second stream: 3 Thread Name: Thread-0
Second stream: 5 Thread Name: Thread-0
Second stream: 4 Thread Name: Thread-0
Second stream: 2 Thread Name: Thread-0
Second stream: 1 Thread Name: Thread-0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment