Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
package spullara.streams;
import java.util.ArrayList;
import java.util.List;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
* Decide at runtime whether this stream should be parallel.
* <p>
* User: sam
* Date: 2/9/14
* Time: 12:38 PM
*/
public class MaybeParallel {
static class SpliteratorException extends RuntimeException {
private final Spliterator spliterator;
SpliteratorException(Spliterator spliterator) {
this.spliterator = spliterator;
}
public Spliterator getSpliterator() {
return spliterator;
}
}
public static <T>Stream<T> maybeParallel(Stream<T> stream, Decider decider) {
Stream<T> sequential = stream.sequential();
Spliterator<T> s = sequential.spliterator();
Spliterator<T> spliterator = new Spliterator<T>() {
@Override
public boolean tryAdvance(Consumer<? super T> action) {
try {
return s.tryAdvance(action);
} catch (SpliteratorException e) {
throw e;
}
}
@Override
public Spliterator<T> trySplit() {
return s.trySplit();
}
@Override
public long estimateSize() {
return s.estimateSize();
}
@Override
public int characteristics() {
return s.characteristics();
}
};
return StreamSupport.stream(spliterator, false).peek(new Consumer<T>() {
volatile int elements = 0;
Long start;
@Override
public void accept(T t) {
if (start == null) {
start = System.nanoTime();
}
if (elements++ > 0) {
if (decider.switchToParallel(elements, System.nanoTime() - start)) {
throw new SpliteratorException(spliterator);
}
}
}
});
}
public static void main(String[] args) {
List<Integer> integers = new ArrayList<>(100);
for (int i = 0; i < 100; i++) {
integers.add(i);
}
Stream<Integer> integerStream = maybeParallel(integers.stream(), (e, t) -> {
double v = ((double) t) / e;
return v > 5_000_000;
});
try {
integerStream.forEach(i -> {
System.out.println("Sequential: " + i);
try {
Thread.sleep(i);
} catch (InterruptedException e) {
}
});
} catch (SpliteratorException e) {
Stream<Integer> stream = StreamSupport.stream(e.getSpliterator(), true);
stream.forEach(i -> {
System.out.println("Parallel: " + i);
});
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.