Created
April 23, 2018 13:37
-
-
Save laymain/9ce194bb2924f78582e71e350672bbef to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Spliterator; | |
import java.util.concurrent.ArrayBlockingQueue; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.function.Consumer; | |
import java.util.stream.Stream; | |
import java.util.stream.StreamSupport; | |
public class InfiniteBlockingQueue<T> { | |
private static final Object POISON_PILL = new Object(); | |
private final BlockingQueue<T> queue; | |
public InfiniteBlockingQueue() { | |
queue = new LinkedBlockingQueue<>(); | |
} | |
public InfiniteBlockingQueue(int capacity) { | |
queue = new ArrayBlockingQueue<>(capacity); | |
} | |
public void put(T item) throws InterruptedException { | |
queue.put(item); | |
} | |
public T take() throws InterruptedException { | |
Object item = queue.take(); | |
if (item == POISON_PILL) { | |
return null; | |
} | |
//noinspection unchecked | |
return (T)item; | |
} | |
public void close() throws InterruptedException { | |
//noinspection unchecked | |
((BlockingQueue<Object>)queue).put(POISON_PILL); | |
} | |
public Stream<T> stream() { | |
return StreamSupport.stream(new QueueSpliterator<>(this), false); | |
} | |
private static class QueueSpliterator<T> implements Spliterator<T> { | |
private final InfiniteBlockingQueue<T> queue; | |
private QueueSpliterator(final InfiniteBlockingQueue<T> queue) { | |
this.queue = queue; | |
} | |
@Override | |
public int characteristics() { | |
return Spliterator.CONCURRENT | Spliterator.NONNULL | Spliterator.ORDERED; | |
} | |
@Override | |
public long estimateSize() { | |
return Long.MAX_VALUE; | |
} | |
@Override | |
public boolean tryAdvance(final Consumer<? super T> action) { | |
final T next; | |
try { | |
next = this.queue.take(); | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
return false; | |
} | |
if (next == null) { | |
return false; | |
} | |
action.accept(next); | |
return true; | |
} | |
@Override | |
public Spliterator<T> trySplit() { | |
return null; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment