Skip to content

Instantly share code, notes, and snippets.

@bassemZohdy
Last active October 30, 2015 16:28
Show Gist options
  • Save bassemZohdy/70b130f3f90230ce6fec to your computer and use it in GitHub Desktop.
Save bassemZohdy/70b130f3f90230ce6fec to your computer and use it in GitHub Desktop.
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class ConsumerStream<T> implements Consumer<T> {
private static final int LENGTH = 1000;
private final Stream<T> stream;
private final Queueing q;
private volatile boolean end = false;
public ConsumerStream(int length) {
this.q = new Queueing(length);
this.stream = StreamSupport.stream(q.spliterator(), false);
}
public ConsumerStream() {
this(LENGTH);
}
public Stream<T> stream() {
return this.stream;
}
@Override
public void accept(T t) {
if (end)
throw new IllegalStateException("This consumer has been stoped.");
if (t != null)
this.q.accept(t);
}
public void stop() {
end = true;
}
final private class Queueing implements Consumer<T>, Iterable<T> {
private final int TIMEOUT = 100;
private final int length;
private final BlockingQueue<T> q;
private Queueing(int length) {
this.length = length;
q = new LinkedBlockingQueue<T>(this.length);
}
@Override
public Iterator<T> iterator() {
return new Iterator<T>() {
@Override
public boolean hasNext() {
return !end || !q.isEmpty();
}
@Override
public T next() {
if (!hasNext())
throw new NoSuchElementException();
return get();
}
};
}
public T get() {
try {
T t;
do {
} while ((t = q.poll(TIMEOUT, TimeUnit.MILLISECONDS)) == null && !end);
return t;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
@Override
public void accept(T t) {
try {
q.put(t);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment