Skip to content

Instantly share code, notes, and snippets.

@laymain
Created April 23, 2018 13:37
Show Gist options
  • Save laymain/9ce194bb2924f78582e71e350672bbef to your computer and use it in GitHub Desktop.
Save laymain/9ce194bb2924f78582e71e350672bbef to your computer and use it in GitHub Desktop.
import java.util.Spliterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class InfiniteBlockingQueue<T> {
private static final Object POISON_PILL = new Object();
private final BlockingQueue<T> queue;
public InfiniteBlockingQueue() {
queue = new LinkedBlockingQueue<>();
}
public InfiniteBlockingQueue(int capacity) {
queue = new ArrayBlockingQueue<>(capacity);
}
public void put(T item) throws InterruptedException {
queue.put(item);
}
public T take() throws InterruptedException {
Object item = queue.take();
if (item == POISON_PILL) {
return null;
}
//noinspection unchecked
return (T)item;
}
public void close() throws InterruptedException {
//noinspection unchecked
((BlockingQueue<Object>)queue).put(POISON_PILL);
}
public Stream<T> stream() {
return StreamSupport.stream(new QueueSpliterator<>(this), false);
}
private static class QueueSpliterator<T> implements Spliterator<T> {
private final InfiniteBlockingQueue<T> queue;
private QueueSpliterator(final InfiniteBlockingQueue<T> queue) {
this.queue = queue;
}
@Override
public int characteristics() {
return Spliterator.CONCURRENT | Spliterator.NONNULL | Spliterator.ORDERED;
}
@Override
public long estimateSize() {
return Long.MAX_VALUE;
}
@Override
public boolean tryAdvance(final Consumer<? super T> action) {
final T next;
try {
next = this.queue.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
if (next == null) {
return false;
}
action.accept(next);
return true;
}
@Override
public Spliterator<T> trySplit() {
return null;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment