Skip to content

Instantly share code, notes, and snippets.

@bassemZohdy
Created September 21, 2015 17:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save bassemZohdy/e5fdd56de44cea3cd8ff to your computer and use it in GitHub Desktop.
Save bassemZohdy/e5fdd56de44cea3cd8ff to your computer and use it in GitHub Desktop.
import java.util.Comparator;
import java.util.Iterator;
import java.util.Optional;
import java.util.Spliterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToDoubleFunction;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
import java.util.stream.Collector;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
public class InfiniteStream<T> implements Consumer<T>, Stream<T> {
private static final int LENGTH = 1000;
private final Stream<T> stream;
private final Queueing q;
private final int length;
public InfiniteStream(int length) {
this.length = length;
this.q = new Queueing(this.length);
this.stream = Stream.generate(q);
}
public InfiniteStream() {
this(LENGTH);
}
@Override
public void accept(T t) {
q.accept(t);
}
@Override
public Iterator<T> iterator() {
return stream.iterator();
}
@Override
public Spliterator<T> spliterator() {
return stream.spliterator();
}
@Override
public boolean isParallel() {
return stream.isParallel();
}
@Override
public Stream<T> sequential() {
return stream.sequential();
}
@Override
public Stream<T> parallel() {
return stream.parallel();
}
@Override
public Stream<T> unordered() {
return stream.unordered();
}
@Override
public Stream<T> onClose(Runnable closeHandler) {
return stream.onClose(closeHandler);
}
@Override
public void close() {
stream.close();
}
@Override
public Stream<T> filter(Predicate<? super T> predicate) {
return stream.filter(predicate);
}
@Override
public <R> Stream<R> map(Function<? super T, ? extends R> mapper) {
return stream.map(mapper);
}
@Override
public IntStream mapToInt(ToIntFunction<? super T> mapper) {
return stream.mapToInt(mapper);
}
@Override
public LongStream mapToLong(ToLongFunction<? super T> mapper) {
return stream.mapToLong(mapper);
}
@Override
public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) {
return stream.mapToDouble(mapper);
}
@Override
public <R> Stream<R> flatMap(
Function<? super T, ? extends Stream<? extends R>> mapper) {
return stream.flatMap(mapper);
}
@Override
public IntStream flatMapToInt(
Function<? super T, ? extends IntStream> mapper) {
return stream.flatMapToInt(mapper);
}
@Override
public LongStream flatMapToLong(
Function<? super T, ? extends LongStream> mapper) {
return stream.flatMapToLong(mapper);
}
@Override
public DoubleStream flatMapToDouble(
Function<? super T, ? extends DoubleStream> mapper) {
return stream.flatMapToDouble(mapper);
}
@Override
public Stream<T> distinct() {
return stream.distinct();
}
@Override
public Stream<T> sorted() {
return stream.sorted();
}
@Override
public Stream<T> sorted(Comparator<? super T> comparator) {
return stream.sorted(comparator);
}
@Override
public Stream<T> peek(Consumer<? super T> action) {
return stream.peek(action);
}
@Override
public Stream<T> limit(long maxSize) {
return stream.limit(maxSize);
}
@Override
public Stream<T> skip(long n) {
return stream.skip(n);
}
@Override
public void forEach(Consumer<? super T> action) {
stream.forEach(action);
}
@Override
public void forEachOrdered(Consumer<? super T> action) {
stream.forEachOrdered(action);
}
@Override
public Object[] toArray() {
return stream.toArray();
}
@Override
public <A> A[] toArray(IntFunction<A[]> generator) {
return stream.toArray(generator);
}
@Override
public T reduce(T identity, BinaryOperator<T> accumulator) {
return stream.reduce(identity, accumulator);
}
@Override
public Optional<T> reduce(BinaryOperator<T> accumulator) {
return stream.reduce(accumulator);
}
@Override
public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner) {
return stream.reduce(identity, accumulator, combiner);
}
@Override
public <R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
return stream.collect(supplier, accumulator, combiner);
}
@Override
public <R, A> R collect(Collector<? super T, A, R> collector) {
return stream.collect(collector);
}
@Override
public Optional<T> min(Comparator<? super T> comparator) {
return stream.min(comparator);
}
@Override
public Optional<T> max(Comparator<? super T> comparator) {
return stream.max(comparator);
}
@Override
public long count() {
return stream.count();
}
@Override
public boolean anyMatch(Predicate<? super T> predicate) {
return stream.anyMatch(predicate);
}
@Override
public boolean allMatch(Predicate<? super T> predicate) {
return stream.allMatch(predicate);
}
@Override
public boolean noneMatch(Predicate<? super T> predicate) {
return stream.noneMatch(predicate);
}
@Override
public Optional<T> findFirst() {
return stream.findFirst();
}
@Override
public Optional<T> findAny() {
return stream.findAny();
}
final private class Queueing implements Consumer<T>, Supplier<T> {
private final int length;
private final BlockingQueue<T> q;
private Queueing(int length) {
this.length = length;
q = new LinkedBlockingQueue<T>(this.length);
}
@Override
public T get() {
try {
return q.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
@Override
public void accept(T t) {
try {
q.put(t);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
}
@jeffreyschultz
Copy link

How does this InfiniteStream shutdown? What if whatever is pushing to the Consumer interface finishes? What then? The call to take() of the blocking queue will never return.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment