Last active
January 7, 2016 15:27
-
-
Save bnorm/4e5d7149ad32a929c309 to your computer and use it in GitHub Desktop.
Utility for customizing the ForkJoinPool that a parallel stream uses.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Comparator; | |
import java.util.DoubleSummaryStatistics; | |
import java.util.IntSummaryStatistics; | |
import java.util.Iterator; | |
import java.util.LongSummaryStatistics; | |
import java.util.Optional; | |
import java.util.OptionalDouble; | |
import java.util.OptionalInt; | |
import java.util.OptionalLong; | |
import java.util.PrimitiveIterator; | |
import java.util.Spliterator; | |
import java.util.concurrent.ForkJoinPool; | |
import java.util.function.BiConsumer; | |
import java.util.function.BiFunction; | |
import java.util.function.BinaryOperator; | |
import java.util.function.Consumer; | |
import java.util.function.DoubleBinaryOperator; | |
import java.util.function.DoubleConsumer; | |
import java.util.function.DoubleFunction; | |
import java.util.function.DoublePredicate; | |
import java.util.function.DoubleToIntFunction; | |
import java.util.function.DoubleToLongFunction; | |
import java.util.function.DoubleUnaryOperator; | |
import java.util.function.Function; | |
import java.util.function.IntBinaryOperator; | |
import java.util.function.IntConsumer; | |
import java.util.function.IntFunction; | |
import java.util.function.IntPredicate; | |
import java.util.function.IntToDoubleFunction; | |
import java.util.function.IntToLongFunction; | |
import java.util.function.IntUnaryOperator; | |
import java.util.function.LongBinaryOperator; | |
import java.util.function.LongConsumer; | |
import java.util.function.LongFunction; | |
import java.util.function.LongPredicate; | |
import java.util.function.LongToDoubleFunction; | |
import java.util.function.LongToIntFunction; | |
import java.util.function.LongUnaryOperator; | |
import java.util.function.ObjDoubleConsumer; | |
import java.util.function.ObjIntConsumer; | |
import java.util.function.ObjLongConsumer; | |
import java.util.function.Predicate; | |
import java.util.function.Supplier; | |
import java.util.function.ToDoubleFunction; | |
import java.util.function.ToIntFunction; | |
import java.util.function.ToLongFunction; | |
import java.util.stream.Collector; | |
import java.util.stream.DoubleStream; | |
import java.util.stream.IntStream; | |
import java.util.stream.LongStream; | |
import java.util.stream.Stream; | |
public class StreamParallizer { | |
private final ForkJoinPool pool; | |
public StreamParallizer(ForkJoinPool pool) { | |
this.pool = pool; | |
} | |
// @formatter:off | |
public <T> Stream<T> parallel(Stream<T> stream) { return new DelegateStream<>(stream.parallel()); } | |
public IntStream parallel(IntStream stream) { return new DelegateIntStream(stream.parallel()); } | |
public LongStream parallel(LongStream stream) { return new DelegateLongStream(stream.parallel()); } | |
public DoubleStream parallel(DoubleStream stream) { return new DelegateDoubleStream(stream.parallel()); } | |
// @formatter:on | |
private class DelegateStream<T> implements Stream<T> { | |
// @formatter:off | |
private Stream<T> stream; | |
private DelegateStream(Stream<T> stream) { this.stream = stream; } | |
@Override public Stream<T> filter(Predicate<? super T> predicate) { stream = stream.filter(predicate); return this; } | |
@Override public Stream<T> distinct() { stream = stream.distinct(); return this; } | |
@Override public Stream<T> sorted() { stream = stream.sorted(); return this; } | |
@Override public Stream<T> sorted(Comparator<? super T> comparator) { stream = stream.sorted(comparator); return this; } | |
@Override public Stream<T> peek(Consumer<? super T> action) { stream = stream.peek(action); return this; } | |
@Override public Stream<T> limit(long maxSize) { stream = stream.limit(maxSize); return this; } | |
@Override public Stream<T> skip(long n) { stream = stream.skip(n); return this; } | |
@Override public Stream<T> sequential() { stream = stream.sequential(); return this; } | |
@Override public Stream<T> parallel() { stream = stream.parallel(); return this; } | |
@Override public Stream<T> unordered() { stream = stream.unordered(); return this; } | |
@Override public Stream<T> onClose(Runnable closeHandler) { stream = stream.onClose(closeHandler); return this; } | |
@Override public boolean isParallel() { return stream.isParallel(); } | |
@Override public <R> Stream<R> map(Function<? super T, ? extends R> mapper) { return new DelegateStream<>(stream.map(mapper)); } | |
@Override public IntStream mapToInt(ToIntFunction<? super T> mapper) { return new DelegateIntStream(stream.mapToInt(mapper)); } | |
@Override public LongStream mapToLong(ToLongFunction<? super T> mapper) { return new DelegateLongStream(stream.mapToLong(mapper)); } | |
@Override public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) { return new DelegateDoubleStream(stream.mapToDouble(mapper)); } | |
@Override public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) { return new DelegateStream<>(stream.flatMap(mapper)); } | |
@Override public IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) { return new DelegateIntStream(stream.flatMapToInt(mapper)); } | |
@Override public LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) { return new DelegateLongStream(stream.flatMapToLong(mapper)); } | |
@Override public DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) { return new DelegateDoubleStream(stream.flatMapToDouble(mapper)); } | |
@Override public void forEach(Consumer<? super T> action) { pool.submit(() -> stream.forEach(action)).join(); } | |
@Override public void forEachOrdered(Consumer<? super T> action) { pool.submit(() -> stream.forEachOrdered(action)).join(); } | |
@Override public Object[] toArray() { return pool.submit(() -> stream.toArray()).join(); } | |
@Override public <A> A[] toArray(IntFunction<A[]> generator) { return pool.submit(() -> stream.toArray(generator)).join(); } | |
@Override public T reduce(T identity, BinaryOperator<T> accumulator) { return pool.submit(() -> stream.reduce(identity, accumulator)).join(); } | |
@Override public Optional<T> reduce(BinaryOperator<T> accumulator) { return pool.submit(() -> stream.reduce(accumulator)).join(); } | |
@Override public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) { return pool.submit(() -> stream.reduce(identity, accumulator, combiner)).join(); } | |
@Override public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) { return pool.submit(() -> stream.collect(supplier, accumulator, combiner)).join(); } | |
@Override public <R, A> R collect(Collector<? super T, A, R> collector) { return pool.submit(() -> stream.collect(collector)).join(); } | |
@Override public Optional<T> min(Comparator<? super T> comparator) { return pool.submit(() -> stream.min(comparator)).join(); } | |
@Override public Optional<T> max(Comparator<? super T> comparator) { return pool.submit(() -> stream.min(comparator)).join(); } | |
@Override public long count() { return pool.submit(() -> stream.count()).join(); } | |
@Override public boolean anyMatch(Predicate<? super T> predicate) { return pool.submit(() -> stream.anyMatch(predicate)).join(); } | |
@Override public boolean allMatch(Predicate<? super T> predicate) { return pool.submit(() -> stream.allMatch(predicate)).join(); } | |
@Override public boolean noneMatch(Predicate<? super T> predicate) { return pool.submit(() -> stream.noneMatch(predicate)).join(); } | |
@Override public Optional<T> findFirst() { return pool.submit(() -> stream.findFirst()).join(); } | |
@Override public Optional<T> findAny() { return pool.submit(() -> stream.findAny()).join(); } | |
@Override public Iterator<T> iterator() { return pool.submit(() -> stream.iterator()).join(); } | |
@Override public Spliterator<T> spliterator() { return pool.submit(() -> stream.spliterator()).join(); } | |
@Override public void close() { pool.submit(() -> stream.close()).join(); } | |
// @formatter:on | |
} | |
private class DelegateIntStream implements IntStream { | |
// @formatter:off | |
private IntStream stream; | |
private DelegateIntStream(IntStream stream) { this.stream = stream; } | |
@Override public IntStream filter(IntPredicate predicate) { stream = stream.filter(predicate); return this; } | |
@Override public IntStream distinct() { stream = stream.distinct(); return this; } | |
@Override public IntStream sorted() { stream = stream.sorted(); return this; } | |
@Override public IntStream peek(IntConsumer action) { stream = stream.peek(action); return this; } | |
@Override public IntStream limit(long maxSize) { stream = stream.limit(maxSize); return this; } | |
@Override public IntStream skip(long n) { stream = stream.skip(n); return this; } | |
@Override public IntStream sequential() { stream = stream.sequential(); return this; } | |
@Override public IntStream parallel() { stream = stream.parallel(); return this; } | |
@Override public IntStream unordered() { stream = stream.unordered(); return this; } | |
@Override public IntStream onClose(Runnable closeHandler) { stream = stream.onClose(closeHandler); return this; } | |
@Override public boolean isParallel() { return stream.isParallel(); } | |
@Override public IntStream map(IntUnaryOperator mapper) { return new DelegateIntStream(stream.map(mapper)); } | |
@Override public <U> Stream<U> mapToObj(IntFunction<? extends U> mapper) { return new DelegateStream<>(stream.mapToObj(mapper)); } | |
@Override public LongStream mapToLong(IntToLongFunction mapper) { return new DelegateLongStream(stream.mapToLong(mapper)); } | |
@Override public DoubleStream mapToDouble(IntToDoubleFunction mapper) { return new DelegateDoubleStream(stream.mapToDouble(mapper)); } | |
@Override public IntStream flatMap(IntFunction<? extends IntStream> mapper) { return new DelegateIntStream(stream.flatMap(mapper)); } | |
@Override public LongStream asLongStream() { return new DelegateLongStream(stream.asLongStream()); } | |
@Override public DoubleStream asDoubleStream() { return new DelegateDoubleStream(stream.asDoubleStream()); } | |
@Override public Stream<Integer> boxed() { return new DelegateStream<>(stream.boxed()); } | |
@Override public void forEach(IntConsumer action) { pool.submit(() -> stream.forEach(action)).join(); } | |
@Override public void forEachOrdered(IntConsumer action) { pool.submit(() -> stream.forEachOrdered(action)).join(); } | |
@Override public int[] toArray() { return pool.submit(() -> stream.toArray()).join(); } | |
@Override public int reduce(int identity, IntBinaryOperator op) { return pool.submit(() -> stream.reduce(identity, op)).join(); } | |
@Override public OptionalInt reduce(IntBinaryOperator op) { return pool.submit(() -> stream.reduce(op)).join(); } | |
@Override public <R> R collect(Supplier<R> supplier, ObjIntConsumer<R> accumulator, BiConsumer<R, R> combiner) { return pool.submit(() -> stream.collect(supplier, accumulator, combiner)).join(); } | |
@Override public int sum() { return pool.submit(() -> stream.sum()).join(); } | |
@Override public OptionalInt min() { return pool.submit(() -> stream.min()).join(); } | |
@Override public OptionalInt max() { return pool.submit(() -> stream.min()).join(); } | |
@Override public long count() { return pool.submit(() -> stream.count()).join(); } | |
@Override public OptionalDouble average() { return pool.submit(() -> stream.average()).join(); } | |
@Override public IntSummaryStatistics summaryStatistics() { return pool.submit(() -> stream.summaryStatistics()).join(); } | |
@Override public boolean anyMatch(IntPredicate predicate) { return pool.submit(() -> stream.anyMatch(predicate)).join(); } | |
@Override public boolean allMatch(IntPredicate predicate) { return pool.submit(() -> stream.allMatch(predicate)).join(); } | |
@Override public boolean noneMatch(IntPredicate predicate) { return pool.submit(() -> stream.noneMatch(predicate)).join(); } | |
@Override public OptionalInt findFirst() { return pool.submit(() -> stream.findFirst()).join(); } | |
@Override public OptionalInt findAny() { return pool.submit(() -> stream.findAny()).join(); } | |
@Override public PrimitiveIterator.OfInt iterator() { return pool.submit(() -> stream.iterator()).join(); } | |
@Override public Spliterator.OfInt spliterator() { return pool.submit(() -> stream.spliterator()).join(); } | |
@Override public void close() { pool.submit(() -> stream.close()).join(); } | |
// @formatter:on | |
} | |
private class DelegateLongStream implements LongStream { | |
// @formatter:off | |
private LongStream stream; | |
private DelegateLongStream(LongStream stream) { this.stream = stream; } | |
@Override public LongStream filter(LongPredicate predicate) { stream = stream.filter(predicate); return this; } | |
@Override public LongStream distinct() { stream = stream.distinct(); return this; } | |
@Override public LongStream sorted() { stream = stream.sorted(); return this; } | |
@Override public LongStream peek(LongConsumer action) { stream = stream.peek(action); return this; } | |
@Override public LongStream limit(long maxSize) { stream = stream.limit(maxSize); return this; } | |
@Override public LongStream skip(long n) { stream = stream.skip(n); return this; } | |
@Override public LongStream sequential() { stream = stream.sequential(); return this; } | |
@Override public LongStream parallel() { stream = stream.parallel(); return this; } | |
@Override public LongStream unordered() { stream = stream.unordered(); return this; } | |
@Override public LongStream onClose(Runnable closeHandler) { stream = stream.onClose(closeHandler); return this; } | |
@Override public boolean isParallel() { return stream.isParallel(); } | |
@Override public LongStream map(LongUnaryOperator mapper) { return new DelegateLongStream(stream.map(mapper)); } | |
@Override public <U> Stream<U> mapToObj(LongFunction<? extends U> mapper) { return new DelegateStream<>(stream.mapToObj(mapper)); } | |
@Override public IntStream mapToInt(LongToIntFunction mapper) { return new DelegateIntStream(stream.mapToInt(mapper)); } | |
@Override public DoubleStream mapToDouble(LongToDoubleFunction mapper) { return new DelegateDoubleStream(stream.mapToDouble(mapper)); } | |
@Override public LongStream flatMap(LongFunction<? extends LongStream> mapper) { return new DelegateLongStream(stream.flatMap(mapper)); } | |
@Override public DoubleStream asDoubleStream() { return new DelegateDoubleStream(stream.asDoubleStream()); } | |
@Override public Stream<Long> boxed() { return new DelegateStream<>(stream.boxed()); } | |
@Override public void forEach(LongConsumer action) { pool.submit(() -> stream.forEach(action)).join(); } | |
@Override public void forEachOrdered(LongConsumer action) { pool.submit(() -> stream.forEachOrdered(action)).join(); } | |
@Override public long[] toArray() { return pool.submit(() -> stream.toArray()).join(); } | |
@Override public long reduce(long identity, LongBinaryOperator op) { return pool.submit(() -> stream.reduce(identity, op)).join(); } | |
@Override public OptionalLong reduce(LongBinaryOperator op) { return pool.submit(() -> stream.reduce(op)).join(); } | |
@Override public <R> R collect(Supplier<R> supplier, ObjLongConsumer<R> accumulator, BiConsumer<R, R> combiner) { return pool.submit(() -> stream.collect(supplier, accumulator, combiner)).join(); } | |
@Override public long sum() { return pool.submit(() -> stream.sum()).join(); } | |
@Override public OptionalLong min() { return pool.submit(() -> stream.min()).join(); } | |
@Override public OptionalLong max() { return pool.submit(() -> stream.min()).join(); } | |
@Override public long count() { return pool.submit(() -> stream.count()).join(); } | |
@Override public OptionalDouble average() { return pool.submit(() -> stream.average()).join(); } | |
@Override public LongSummaryStatistics summaryStatistics() { return pool.submit(() -> stream.summaryStatistics()).join(); } | |
@Override public boolean anyMatch(LongPredicate predicate) { return pool.submit(() -> stream.anyMatch(predicate)).join(); } | |
@Override public boolean allMatch(LongPredicate predicate) { return pool.submit(() -> stream.allMatch(predicate)).join(); } | |
@Override public boolean noneMatch(LongPredicate predicate) { return pool.submit(() -> stream.noneMatch(predicate)).join(); } | |
@Override public OptionalLong findFirst() { return pool.submit(() -> stream.findFirst()).join(); } | |
@Override public OptionalLong findAny() { return pool.submit(() -> stream.findAny()).join(); } | |
@Override public PrimitiveIterator.OfLong iterator() { return pool.submit(() -> stream.iterator()).join(); } | |
@Override public Spliterator.OfLong spliterator() { return pool.submit(() -> stream.spliterator()).join(); } | |
@Override public void close() { pool.submit(() -> stream.close()).join(); } | |
// @formatter:on | |
} | |
private class DelegateDoubleStream implements DoubleStream { | |
// @formatter:off | |
private DoubleStream stream; | |
private DelegateDoubleStream(DoubleStream stream) { this.stream = stream; } | |
@Override public DoubleStream filter(DoublePredicate predicate) { stream = stream.filter(predicate); return this; } | |
@Override public DoubleStream distinct() { stream = stream.distinct(); return this; } | |
@Override public DoubleStream sorted() { stream = stream.sorted(); return this; } | |
@Override public DoubleStream peek(DoubleConsumer action) { stream = stream.peek(action); return this; } | |
@Override public DoubleStream limit(long maxSize) { stream = stream.limit(maxSize); return this; } | |
@Override public DoubleStream skip(long n) { stream = stream.skip(n); return this; } | |
@Override public DoubleStream sequential() { stream = stream.sequential(); return this; } | |
@Override public DoubleStream parallel() { stream = stream.parallel(); return this; } | |
@Override public DoubleStream unordered() { stream = stream.unordered(); return this; } | |
@Override public DoubleStream onClose(Runnable closeHandler) { stream = stream.onClose(closeHandler); return this; } | |
@Override public boolean isParallel() { return stream.isParallel(); } | |
@Override public DoubleStream map(DoubleUnaryOperator mapper) { return new DelegateDoubleStream(stream.map(mapper)); } | |
@Override public <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) { return new DelegateStream<>(stream.mapToObj(mapper)); } | |
@Override public IntStream mapToInt(DoubleToIntFunction mapper) { return new DelegateIntStream(stream.mapToInt(mapper)); } | |
@Override public LongStream mapToLong(DoubleToLongFunction mapper) { return new DelegateLongStream(stream.mapToLong(mapper)); } | |
@Override public DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) { return new DelegateDoubleStream(stream.flatMap(mapper)); } | |
@Override public Stream<Double> boxed() { return new DelegateStream<>(stream.boxed()); } | |
@Override public void forEach(DoubleConsumer action) { pool.submit(() -> stream.forEach(action)).join(); } | |
@Override public void forEachOrdered(DoubleConsumer action) { pool.submit(() -> stream.forEachOrdered(action)).join(); } | |
@Override public double[] toArray() { return pool.submit(() -> stream.toArray()).join(); } | |
@Override public double reduce(double identity, DoubleBinaryOperator op) { return pool.submit(() -> stream.reduce(identity, op)).join(); } | |
@Override public OptionalDouble reduce(DoubleBinaryOperator op) { return pool.submit(() -> stream.reduce(op)).join(); } | |
@Override public <R> R collect(Supplier<R> supplier, ObjDoubleConsumer<R> accumulator, BiConsumer<R, R> combiner) { return pool.submit(() -> stream.collect(supplier, accumulator, combiner)).join(); } | |
@Override public double sum() { return pool.submit(() -> stream.sum()).join(); } | |
@Override public OptionalDouble min() { return pool.submit(() -> stream.min()).join(); } | |
@Override public OptionalDouble max() { return pool.submit(() -> stream.min()).join(); } | |
@Override public long count() { return pool.submit(() -> stream.count()).join(); } | |
@Override public OptionalDouble average() { return pool.submit(() -> stream.average()).join(); } | |
@Override public DoubleSummaryStatistics summaryStatistics() { return pool.submit(() -> stream.summaryStatistics()).join(); } | |
@Override public boolean anyMatch(DoublePredicate predicate) { return pool.submit(() -> stream.anyMatch(predicate)).join(); } | |
@Override public boolean allMatch(DoublePredicate predicate) { return pool.submit(() -> stream.allMatch(predicate)).join(); } | |
@Override public boolean noneMatch(DoublePredicate predicate) { return pool.submit(() -> stream.noneMatch(predicate)).join(); } | |
@Override public OptionalDouble findFirst() { return pool.submit(() -> stream.findFirst()).join(); } | |
@Override public OptionalDouble findAny() { return pool.submit(() -> stream.findAny()).join(); } | |
@Override public PrimitiveIterator.OfDouble iterator() { return pool.submit(() -> stream.iterator()).join(); } | |
@Override public Spliterator.OfDouble spliterator() { return pool.submit(() -> stream.spliterator()).join(); } | |
@Override public void close() { pool.submit(() -> stream.close()).join(); } | |
// @formatter:on | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment