Skip to content

Instantly share code, notes, and snippets.

@spullara
Last active December 19, 2023 22:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save spullara/8906058 to your computer and use it in GitHub Desktop.
Save spullara/8906058 to your computer and use it in GitHub Desktop.
package com.sampullara;
import java.util.ArrayList;
import java.util.List;
import java.util.Spliterator;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
* Decide at runtime whether this stream should be parallel.
* <p>
* User: sam
* Date: 2/9/14
* Time: 12:38 PM
*/
public class MaybeParallel {
static class SpliteratorException extends RuntimeException {
private final Spliterator spliterator;
private final Object element;
SpliteratorException(Spliterator spliterator, Object element) {
this.spliterator = spliterator;
this.element = element;
}
public Spliterator getSpliterator() {
return spliterator;
}
Object getElement() {
return element;
}
}
public static <T>Stream<T> maybeParallel(Stream<T> stream, Decider decider) {
Stream<T> sequential = stream.sequential();
Spliterator<T> s = sequential.spliterator();
Spliterator<T> spliterator = new Spliterator<T>() {
private T caughtElement = null;
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (caughtElement != null) {
action.accept(caughtElement);
caughtElement = null;
return true;
}
try {
return s.tryAdvance(action);
} catch (SpliteratorException e) {
caughtElement = (T) e.getElement();
throw e;
}
}
@Override
public Spliterator<T> trySplit() {
return s.trySplit();
}
@Override
public long estimateSize() {
return s.estimateSize();
}
@Override
public int characteristics() {
return s.characteristics();
}
};
return StreamSupport.stream(spliterator, false).peek(new Consumer<T>() {
volatile int elements = 0;
Long start;
@Override
public void accept(T t) {
if (start == null) {
start = System.nanoTime();
}
if (elements++ > 0) {
if (decider.switchToParallel(elements, System.nanoTime() - start)) {
throw new SpliteratorException(spliterator, t);
}
}
}
});
}
public static void main(String[] args) {
AtomicInteger total = new AtomicInteger();
List<Integer> integers = new ArrayList<>(100);
for (int i = 0; i < 100; i++) {
integers.add(i);
}
Stream<Integer> integerStream = maybeParallel(integers.stream(), (e, t) -> {
double v = ((double) t) / e;
return v > 5_000_000;
});
try {
integerStream.forEach(i -> {
total.getAndIncrement();
System.out.println("Sequential: " + i);
try {
Thread.sleep(i);
} catch (InterruptedException e) {
}
});
} catch (SpliteratorException e) {
Stream<Integer> stream = StreamSupport.stream(e.getSpliterator(), true);
stream.forEach(i -> {
total.getAndIncrement();
System.out.println("Parallel: " + i);
});
}
System.out.println("Total: " + (total.get() == 100));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment