Created
September 21, 2015 17:14
-
-
Save bassemZohdy/e5fdd56de44cea3cd8ff to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Comparator; | |
import java.util.Iterator; | |
import java.util.Optional; | |
import java.util.Spliterator; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.function.BiConsumer; | |
import java.util.function.BiFunction; | |
import java.util.function.BinaryOperator; | |
import java.util.function.Consumer; | |
import java.util.function.Function; | |
import java.util.function.IntFunction; | |
import java.util.function.Predicate; | |
import java.util.function.Supplier; | |
import java.util.function.ToDoubleFunction; | |
import java.util.function.ToIntFunction; | |
import java.util.function.ToLongFunction; | |
import java.util.stream.Collector; | |
import java.util.stream.DoubleStream; | |
import java.util.stream.IntStream; | |
import java.util.stream.LongStream; | |
import java.util.stream.Stream; | |
public class InfiniteStream<T> implements Consumer<T>, Stream<T> { | |
private static final int LENGTH = 1000; | |
private final Stream<T> stream; | |
private final Queueing q; | |
private final int length; | |
public InfiniteStream(int length) { | |
this.length = length; | |
this.q = new Queueing(this.length); | |
this.stream = Stream.generate(q); | |
} | |
public InfiniteStream() { | |
this(LENGTH); | |
} | |
@Override | |
public void accept(T t) { | |
q.accept(t); | |
} | |
@Override | |
public Iterator<T> iterator() { | |
return stream.iterator(); | |
} | |
@Override | |
public Spliterator<T> spliterator() { | |
return stream.spliterator(); | |
} | |
@Override | |
public boolean isParallel() { | |
return stream.isParallel(); | |
} | |
@Override | |
public Stream<T> sequential() { | |
return stream.sequential(); | |
} | |
@Override | |
public Stream<T> parallel() { | |
return stream.parallel(); | |
} | |
@Override | |
public Stream<T> unordered() { | |
return stream.unordered(); | |
} | |
@Override | |
public Stream<T> onClose(Runnable closeHandler) { | |
return stream.onClose(closeHandler); | |
} | |
@Override | |
public void close() { | |
stream.close(); | |
} | |
@Override | |
public Stream<T> filter(Predicate<? super T> predicate) { | |
return stream.filter(predicate); | |
} | |
@Override | |
public <R> Stream<R> map(Function<? super T, ? extends R> mapper) { | |
return stream.map(mapper); | |
} | |
@Override | |
public IntStream mapToInt(ToIntFunction<? super T> mapper) { | |
return stream.mapToInt(mapper); | |
} | |
@Override | |
public LongStream mapToLong(ToLongFunction<? super T> mapper) { | |
return stream.mapToLong(mapper); | |
} | |
@Override | |
public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) { | |
return stream.mapToDouble(mapper); | |
} | |
@Override | |
public <R> Stream<R> flatMap( | |
Function<? super T, ? extends Stream<? extends R>> mapper) { | |
return stream.flatMap(mapper); | |
} | |
@Override | |
public IntStream flatMapToInt( | |
Function<? super T, ? extends IntStream> mapper) { | |
return stream.flatMapToInt(mapper); | |
} | |
@Override | |
public LongStream flatMapToLong( | |
Function<? super T, ? extends LongStream> mapper) { | |
return stream.flatMapToLong(mapper); | |
} | |
@Override | |
public DoubleStream flatMapToDouble( | |
Function<? super T, ? extends DoubleStream> mapper) { | |
return stream.flatMapToDouble(mapper); | |
} | |
@Override | |
public Stream<T> distinct() { | |
return stream.distinct(); | |
} | |
@Override | |
public Stream<T> sorted() { | |
return stream.sorted(); | |
} | |
@Override | |
public Stream<T> sorted(Comparator<? super T> comparator) { | |
return stream.sorted(comparator); | |
} | |
@Override | |
public Stream<T> peek(Consumer<? super T> action) { | |
return stream.peek(action); | |
} | |
@Override | |
public Stream<T> limit(long maxSize) { | |
return stream.limit(maxSize); | |
} | |
@Override | |
public Stream<T> skip(long n) { | |
return stream.skip(n); | |
} | |
@Override | |
public void forEach(Consumer<? super T> action) { | |
stream.forEach(action); | |
} | |
@Override | |
public void forEachOrdered(Consumer<? super T> action) { | |
stream.forEachOrdered(action); | |
} | |
@Override | |
public Object[] toArray() { | |
return stream.toArray(); | |
} | |
@Override | |
public <A> A[] toArray(IntFunction<A[]> generator) { | |
return stream.toArray(generator); | |
} | |
@Override | |
public T reduce(T identity, BinaryOperator<T> accumulator) { | |
return stream.reduce(identity, accumulator); | |
} | |
@Override | |
public Optional<T> reduce(BinaryOperator<T> accumulator) { | |
return stream.reduce(accumulator); | |
} | |
@Override | |
public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, | |
BinaryOperator<U> combiner) { | |
return stream.reduce(identity, accumulator, combiner); | |
} | |
@Override | |
public <R> R collect(Supplier<R> supplier, | |
BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) { | |
return stream.collect(supplier, accumulator, combiner); | |
} | |
@Override | |
public <R, A> R collect(Collector<? super T, A, R> collector) { | |
return stream.collect(collector); | |
} | |
@Override | |
public Optional<T> min(Comparator<? super T> comparator) { | |
return stream.min(comparator); | |
} | |
@Override | |
public Optional<T> max(Comparator<? super T> comparator) { | |
return stream.max(comparator); | |
} | |
@Override | |
public long count() { | |
return stream.count(); | |
} | |
@Override | |
public boolean anyMatch(Predicate<? super T> predicate) { | |
return stream.anyMatch(predicate); | |
} | |
@Override | |
public boolean allMatch(Predicate<? super T> predicate) { | |
return stream.allMatch(predicate); | |
} | |
@Override | |
public boolean noneMatch(Predicate<? super T> predicate) { | |
return stream.noneMatch(predicate); | |
} | |
@Override | |
public Optional<T> findFirst() { | |
return stream.findFirst(); | |
} | |
@Override | |
public Optional<T> findAny() { | |
return stream.findAny(); | |
} | |
final private class Queueing implements Consumer<T>, Supplier<T> { | |
private final int length; | |
private final BlockingQueue<T> q; | |
private Queueing(int length) { | |
this.length = length; | |
q = new LinkedBlockingQueue<T>(this.length); | |
} | |
@Override | |
public T get() { | |
try { | |
return q.take(); | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
throw new RuntimeException(e); | |
} | |
} | |
@Override | |
public void accept(T t) { | |
try { | |
q.put(t); | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
throw new RuntimeException(e); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
How does this InfiniteStream shutdown? What if whatever is pushing to the Consumer interface finishes? What then? The call to take() of the blocking queue will never return.