Skip to content

Instantly share code, notes, and snippets.

@bnorm
Last active January 7, 2016 15:27
Show Gist options
  • Save bnorm/4e5d7149ad32a929c309 to your computer and use it in GitHub Desktop.
Save bnorm/4e5d7149ad32a929c309 to your computer and use it in GitHub Desktop.
Utility for customizing the ForkJoinPool that a parallel stream uses.
import java.util.Comparator;
import java.util.DoubleSummaryStatistics;
import java.util.IntSummaryStatistics;
import java.util.Iterator;
import java.util.LongSummaryStatistics;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.PrimitiveIterator;
import java.util.Spliterator;
import java.util.concurrent.ForkJoinPool;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.DoubleBinaryOperator;
import java.util.function.DoubleConsumer;
import java.util.function.DoubleFunction;
import java.util.function.DoublePredicate;
import java.util.function.DoubleToIntFunction;
import java.util.function.DoubleToLongFunction;
import java.util.function.DoubleUnaryOperator;
import java.util.function.Function;
import java.util.function.IntBinaryOperator;
import java.util.function.IntConsumer;
import java.util.function.IntFunction;
import java.util.function.IntPredicate;
import java.util.function.IntToDoubleFunction;
import java.util.function.IntToLongFunction;
import java.util.function.IntUnaryOperator;
import java.util.function.LongBinaryOperator;
import java.util.function.LongConsumer;
import java.util.function.LongFunction;
import java.util.function.LongPredicate;
import java.util.function.LongToDoubleFunction;
import java.util.function.LongToIntFunction;
import java.util.function.LongUnaryOperator;
import java.util.function.ObjDoubleConsumer;
import java.util.function.ObjIntConsumer;
import java.util.function.ObjLongConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToDoubleFunction;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
import java.util.stream.Collector;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
public class StreamParallizer {
private final ForkJoinPool pool;
public StreamParallizer(ForkJoinPool pool) {
this.pool = pool;
}
// @formatter:off
public <T> Stream<T> parallel(Stream<T> stream) { return new DelegateStream<>(stream.parallel()); }
public IntStream parallel(IntStream stream) { return new DelegateIntStream(stream.parallel()); }
public LongStream parallel(LongStream stream) { return new DelegateLongStream(stream.parallel()); }
public DoubleStream parallel(DoubleStream stream) { return new DelegateDoubleStream(stream.parallel()); }
// @formatter:on
private class DelegateStream<T> implements Stream<T> {
// @formatter:off
private Stream<T> stream;
private DelegateStream(Stream<T> stream) { this.stream = stream; }
@Override public Stream<T> filter(Predicate<? super T> predicate) { stream = stream.filter(predicate); return this; }
@Override public Stream<T> distinct() { stream = stream.distinct(); return this; }
@Override public Stream<T> sorted() { stream = stream.sorted(); return this; }
@Override public Stream<T> sorted(Comparator<? super T> comparator) { stream = stream.sorted(comparator); return this; }
@Override public Stream<T> peek(Consumer<? super T> action) { stream = stream.peek(action); return this; }
@Override public Stream<T> limit(long maxSize) { stream = stream.limit(maxSize); return this; }
@Override public Stream<T> skip(long n) { stream = stream.skip(n); return this; }
@Override public Stream<T> sequential() { stream = stream.sequential(); return this; }
@Override public Stream<T> parallel() { stream = stream.parallel(); return this; }
@Override public Stream<T> unordered() { stream = stream.unordered(); return this; }
@Override public Stream<T> onClose(Runnable closeHandler) { stream = stream.onClose(closeHandler); return this; }
@Override public boolean isParallel() { return stream.isParallel(); }
@Override public <R> Stream<R> map(Function<? super T, ? extends R> mapper) { return new DelegateStream<>(stream.map(mapper)); }
@Override public IntStream mapToInt(ToIntFunction<? super T> mapper) { return new DelegateIntStream(stream.mapToInt(mapper)); }
@Override public LongStream mapToLong(ToLongFunction<? super T> mapper) { return new DelegateLongStream(stream.mapToLong(mapper)); }
@Override public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) { return new DelegateDoubleStream(stream.mapToDouble(mapper)); }
@Override public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) { return new DelegateStream<>(stream.flatMap(mapper)); }
@Override public IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) { return new DelegateIntStream(stream.flatMapToInt(mapper)); }
@Override public LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) { return new DelegateLongStream(stream.flatMapToLong(mapper)); }
@Override public DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) { return new DelegateDoubleStream(stream.flatMapToDouble(mapper)); }
@Override public void forEach(Consumer<? super T> action) { pool.submit(() -> stream.forEach(action)).join(); }
@Override public void forEachOrdered(Consumer<? super T> action) { pool.submit(() -> stream.forEachOrdered(action)).join(); }
@Override public Object[] toArray() { return pool.submit(() -> stream.toArray()).join(); }
@Override public <A> A[] toArray(IntFunction<A[]> generator) { return pool.submit(() -> stream.toArray(generator)).join(); }
@Override public T reduce(T identity, BinaryOperator<T> accumulator) { return pool.submit(() -> stream.reduce(identity, accumulator)).join(); }
@Override public Optional<T> reduce(BinaryOperator<T> accumulator) { return pool.submit(() -> stream.reduce(accumulator)).join(); }
@Override public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) { return pool.submit(() -> stream.reduce(identity, accumulator, combiner)).join(); }
@Override public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) { return pool.submit(() -> stream.collect(supplier, accumulator, combiner)).join(); }
@Override public <R, A> R collect(Collector<? super T, A, R> collector) { return pool.submit(() -> stream.collect(collector)).join(); }
@Override public Optional<T> min(Comparator<? super T> comparator) { return pool.submit(() -> stream.min(comparator)).join(); }
@Override public Optional<T> max(Comparator<? super T> comparator) { return pool.submit(() -> stream.min(comparator)).join(); }
@Override public long count() { return pool.submit(() -> stream.count()).join(); }
@Override public boolean anyMatch(Predicate<? super T> predicate) { return pool.submit(() -> stream.anyMatch(predicate)).join(); }
@Override public boolean allMatch(Predicate<? super T> predicate) { return pool.submit(() -> stream.allMatch(predicate)).join(); }
@Override public boolean noneMatch(Predicate<? super T> predicate) { return pool.submit(() -> stream.noneMatch(predicate)).join(); }
@Override public Optional<T> findFirst() { return pool.submit(() -> stream.findFirst()).join(); }
@Override public Optional<T> findAny() { return pool.submit(() -> stream.findAny()).join(); }
@Override public Iterator<T> iterator() { return pool.submit(() -> stream.iterator()).join(); }
@Override public Spliterator<T> spliterator() { return pool.submit(() -> stream.spliterator()).join(); }
@Override public void close() { pool.submit(() -> stream.close()).join(); }
// @formatter:on
}
private class DelegateIntStream implements IntStream {
// @formatter:off
private IntStream stream;
private DelegateIntStream(IntStream stream) { this.stream = stream; }
@Override public IntStream filter(IntPredicate predicate) { stream = stream.filter(predicate); return this; }
@Override public IntStream distinct() { stream = stream.distinct(); return this; }
@Override public IntStream sorted() { stream = stream.sorted(); return this; }
@Override public IntStream peek(IntConsumer action) { stream = stream.peek(action); return this; }
@Override public IntStream limit(long maxSize) { stream = stream.limit(maxSize); return this; }
@Override public IntStream skip(long n) { stream = stream.skip(n); return this; }
@Override public IntStream sequential() { stream = stream.sequential(); return this; }
@Override public IntStream parallel() { stream = stream.parallel(); return this; }
@Override public IntStream unordered() { stream = stream.unordered(); return this; }
@Override public IntStream onClose(Runnable closeHandler) { stream = stream.onClose(closeHandler); return this; }
@Override public boolean isParallel() { return stream.isParallel(); }
@Override public IntStream map(IntUnaryOperator mapper) { return new DelegateIntStream(stream.map(mapper)); }
@Override public <U> Stream<U> mapToObj(IntFunction<? extends U> mapper) { return new DelegateStream<>(stream.mapToObj(mapper)); }
@Override public LongStream mapToLong(IntToLongFunction mapper) { return new DelegateLongStream(stream.mapToLong(mapper)); }
@Override public DoubleStream mapToDouble(IntToDoubleFunction mapper) { return new DelegateDoubleStream(stream.mapToDouble(mapper)); }
@Override public IntStream flatMap(IntFunction<? extends IntStream> mapper) { return new DelegateIntStream(stream.flatMap(mapper)); }
@Override public LongStream asLongStream() { return new DelegateLongStream(stream.asLongStream()); }
@Override public DoubleStream asDoubleStream() { return new DelegateDoubleStream(stream.asDoubleStream()); }
@Override public Stream<Integer> boxed() { return new DelegateStream<>(stream.boxed()); }
@Override public void forEach(IntConsumer action) { pool.submit(() -> stream.forEach(action)).join(); }
@Override public void forEachOrdered(IntConsumer action) { pool.submit(() -> stream.forEachOrdered(action)).join(); }
@Override public int[] toArray() { return pool.submit(() -> stream.toArray()).join(); }
@Override public int reduce(int identity, IntBinaryOperator op) { return pool.submit(() -> stream.reduce(identity, op)).join(); }
@Override public OptionalInt reduce(IntBinaryOperator op) { return pool.submit(() -> stream.reduce(op)).join(); }
@Override public <R> R collect(Supplier<R> supplier, ObjIntConsumer<R> accumulator, BiConsumer<R, R> combiner) { return pool.submit(() -> stream.collect(supplier, accumulator, combiner)).join(); }
@Override public int sum() { return pool.submit(() -> stream.sum()).join(); }
@Override public OptionalInt min() { return pool.submit(() -> stream.min()).join(); }
@Override public OptionalInt max() { return pool.submit(() -> stream.min()).join(); }
@Override public long count() { return pool.submit(() -> stream.count()).join(); }
@Override public OptionalDouble average() { return pool.submit(() -> stream.average()).join(); }
@Override public IntSummaryStatistics summaryStatistics() { return pool.submit(() -> stream.summaryStatistics()).join(); }
@Override public boolean anyMatch(IntPredicate predicate) { return pool.submit(() -> stream.anyMatch(predicate)).join(); }
@Override public boolean allMatch(IntPredicate predicate) { return pool.submit(() -> stream.allMatch(predicate)).join(); }
@Override public boolean noneMatch(IntPredicate predicate) { return pool.submit(() -> stream.noneMatch(predicate)).join(); }
@Override public OptionalInt findFirst() { return pool.submit(() -> stream.findFirst()).join(); }
@Override public OptionalInt findAny() { return pool.submit(() -> stream.findAny()).join(); }
@Override public PrimitiveIterator.OfInt iterator() { return pool.submit(() -> stream.iterator()).join(); }
@Override public Spliterator.OfInt spliterator() { return pool.submit(() -> stream.spliterator()).join(); }
@Override public void close() { pool.submit(() -> stream.close()).join(); }
// @formatter:on
}
private class DelegateLongStream implements LongStream {
// @formatter:off
private LongStream stream;
private DelegateLongStream(LongStream stream) { this.stream = stream; }
@Override public LongStream filter(LongPredicate predicate) { stream = stream.filter(predicate); return this; }
@Override public LongStream distinct() { stream = stream.distinct(); return this; }
@Override public LongStream sorted() { stream = stream.sorted(); return this; }
@Override public LongStream peek(LongConsumer action) { stream = stream.peek(action); return this; }
@Override public LongStream limit(long maxSize) { stream = stream.limit(maxSize); return this; }
@Override public LongStream skip(long n) { stream = stream.skip(n); return this; }
@Override public LongStream sequential() { stream = stream.sequential(); return this; }
@Override public LongStream parallel() { stream = stream.parallel(); return this; }
@Override public LongStream unordered() { stream = stream.unordered(); return this; }
@Override public LongStream onClose(Runnable closeHandler) { stream = stream.onClose(closeHandler); return this; }
@Override public boolean isParallel() { return stream.isParallel(); }
@Override public LongStream map(LongUnaryOperator mapper) { return new DelegateLongStream(stream.map(mapper)); }
@Override public <U> Stream<U> mapToObj(LongFunction<? extends U> mapper) { return new DelegateStream<>(stream.mapToObj(mapper)); }
@Override public IntStream mapToInt(LongToIntFunction mapper) { return new DelegateIntStream(stream.mapToInt(mapper)); }
@Override public DoubleStream mapToDouble(LongToDoubleFunction mapper) { return new DelegateDoubleStream(stream.mapToDouble(mapper)); }
@Override public LongStream flatMap(LongFunction<? extends LongStream> mapper) { return new DelegateLongStream(stream.flatMap(mapper)); }
@Override public DoubleStream asDoubleStream() { return new DelegateDoubleStream(stream.asDoubleStream()); }
@Override public Stream<Long> boxed() { return new DelegateStream<>(stream.boxed()); }
@Override public void forEach(LongConsumer action) { pool.submit(() -> stream.forEach(action)).join(); }
@Override public void forEachOrdered(LongConsumer action) { pool.submit(() -> stream.forEachOrdered(action)).join(); }
@Override public long[] toArray() { return pool.submit(() -> stream.toArray()).join(); }
@Override public long reduce(long identity, LongBinaryOperator op) { return pool.submit(() -> stream.reduce(identity, op)).join(); }
@Override public OptionalLong reduce(LongBinaryOperator op) { return pool.submit(() -> stream.reduce(op)).join(); }
@Override public <R> R collect(Supplier<R> supplier, ObjLongConsumer<R> accumulator, BiConsumer<R, R> combiner) { return pool.submit(() -> stream.collect(supplier, accumulator, combiner)).join(); }
@Override public long sum() { return pool.submit(() -> stream.sum()).join(); }
@Override public OptionalLong min() { return pool.submit(() -> stream.min()).join(); }
@Override public OptionalLong max() { return pool.submit(() -> stream.min()).join(); }
@Override public long count() { return pool.submit(() -> stream.count()).join(); }
@Override public OptionalDouble average() { return pool.submit(() -> stream.average()).join(); }
@Override public LongSummaryStatistics summaryStatistics() { return pool.submit(() -> stream.summaryStatistics()).join(); }
@Override public boolean anyMatch(LongPredicate predicate) { return pool.submit(() -> stream.anyMatch(predicate)).join(); }
@Override public boolean allMatch(LongPredicate predicate) { return pool.submit(() -> stream.allMatch(predicate)).join(); }
@Override public boolean noneMatch(LongPredicate predicate) { return pool.submit(() -> stream.noneMatch(predicate)).join(); }
@Override public OptionalLong findFirst() { return pool.submit(() -> stream.findFirst()).join(); }
@Override public OptionalLong findAny() { return pool.submit(() -> stream.findAny()).join(); }
@Override public PrimitiveIterator.OfLong iterator() { return pool.submit(() -> stream.iterator()).join(); }
@Override public Spliterator.OfLong spliterator() { return pool.submit(() -> stream.spliterator()).join(); }
@Override public void close() { pool.submit(() -> stream.close()).join(); }
// @formatter:on
}
private class DelegateDoubleStream implements DoubleStream {
// @formatter:off
private DoubleStream stream;
private DelegateDoubleStream(DoubleStream stream) { this.stream = stream; }
@Override public DoubleStream filter(DoublePredicate predicate) { stream = stream.filter(predicate); return this; }
@Override public DoubleStream distinct() { stream = stream.distinct(); return this; }
@Override public DoubleStream sorted() { stream = stream.sorted(); return this; }
@Override public DoubleStream peek(DoubleConsumer action) { stream = stream.peek(action); return this; }
@Override public DoubleStream limit(long maxSize) { stream = stream.limit(maxSize); return this; }
@Override public DoubleStream skip(long n) { stream = stream.skip(n); return this; }
@Override public DoubleStream sequential() { stream = stream.sequential(); return this; }
@Override public DoubleStream parallel() { stream = stream.parallel(); return this; }
@Override public DoubleStream unordered() { stream = stream.unordered(); return this; }
@Override public DoubleStream onClose(Runnable closeHandler) { stream = stream.onClose(closeHandler); return this; }
@Override public boolean isParallel() { return stream.isParallel(); }
@Override public DoubleStream map(DoubleUnaryOperator mapper) { return new DelegateDoubleStream(stream.map(mapper)); }
@Override public <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) { return new DelegateStream<>(stream.mapToObj(mapper)); }
@Override public IntStream mapToInt(DoubleToIntFunction mapper) { return new DelegateIntStream(stream.mapToInt(mapper)); }
@Override public LongStream mapToLong(DoubleToLongFunction mapper) { return new DelegateLongStream(stream.mapToLong(mapper)); }
@Override public DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) { return new DelegateDoubleStream(stream.flatMap(mapper)); }
@Override public Stream<Double> boxed() { return new DelegateStream<>(stream.boxed()); }
@Override public void forEach(DoubleConsumer action) { pool.submit(() -> stream.forEach(action)).join(); }
@Override public void forEachOrdered(DoubleConsumer action) { pool.submit(() -> stream.forEachOrdered(action)).join(); }
@Override public double[] toArray() { return pool.submit(() -> stream.toArray()).join(); }
@Override public double reduce(double identity, DoubleBinaryOperator op) { return pool.submit(() -> stream.reduce(identity, op)).join(); }
@Override public OptionalDouble reduce(DoubleBinaryOperator op) { return pool.submit(() -> stream.reduce(op)).join(); }
@Override public <R> R collect(Supplier<R> supplier, ObjDoubleConsumer<R> accumulator, BiConsumer<R, R> combiner) { return pool.submit(() -> stream.collect(supplier, accumulator, combiner)).join(); }
@Override public double sum() { return pool.submit(() -> stream.sum()).join(); }
@Override public OptionalDouble min() { return pool.submit(() -> stream.min()).join(); }
@Override public OptionalDouble max() { return pool.submit(() -> stream.min()).join(); }
@Override public long count() { return pool.submit(() -> stream.count()).join(); }
@Override public OptionalDouble average() { return pool.submit(() -> stream.average()).join(); }
@Override public DoubleSummaryStatistics summaryStatistics() { return pool.submit(() -> stream.summaryStatistics()).join(); }
@Override public boolean anyMatch(DoublePredicate predicate) { return pool.submit(() -> stream.anyMatch(predicate)).join(); }
@Override public boolean allMatch(DoublePredicate predicate) { return pool.submit(() -> stream.allMatch(predicate)).join(); }
@Override public boolean noneMatch(DoublePredicate predicate) { return pool.submit(() -> stream.noneMatch(predicate)).join(); }
@Override public OptionalDouble findFirst() { return pool.submit(() -> stream.findFirst()).join(); }
@Override public OptionalDouble findAny() { return pool.submit(() -> stream.findAny()).join(); }
@Override public PrimitiveIterator.OfDouble iterator() { return pool.submit(() -> stream.iterator()).join(); }
@Override public Spliterator.OfDouble spliterator() { return pool.submit(() -> stream.spliterator()).join(); }
@Override public void close() { pool.submit(() -> stream.close()).join(); }
// @formatter:on
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment