Last active
October 30, 2015 16:28
-
-
Save bassemZohdy/70b130f3f90230ce6fec to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Iterator; | |
import java.util.NoSuchElementException; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.TimeUnit; | |
import java.util.function.Consumer; | |
import java.util.stream.Stream; | |
import java.util.stream.StreamSupport; | |
public class ConsumerStream<T> implements Consumer<T> { | |
private static final int LENGTH = 1000; | |
private final Stream<T> stream; | |
private final Queueing q; | |
private volatile boolean end = false; | |
public ConsumerStream(int length) { | |
this.q = new Queueing(length); | |
this.stream = StreamSupport.stream(q.spliterator(), false); | |
} | |
public ConsumerStream() { | |
this(LENGTH); | |
} | |
public Stream<T> stream() { | |
return this.stream; | |
} | |
@Override | |
public void accept(T t) { | |
if (end) | |
throw new IllegalStateException("This consumer has been stoped."); | |
if (t != null) | |
this.q.accept(t); | |
} | |
public void stop() { | |
end = true; | |
} | |
final private class Queueing implements Consumer<T>, Iterable<T> { | |
private final int TIMEOUT = 100; | |
private final int length; | |
private final BlockingQueue<T> q; | |
private Queueing(int length) { | |
this.length = length; | |
q = new LinkedBlockingQueue<T>(this.length); | |
} | |
@Override | |
public Iterator<T> iterator() { | |
return new Iterator<T>() { | |
@Override | |
public boolean hasNext() { | |
return !end || !q.isEmpty(); | |
} | |
@Override | |
public T next() { | |
if (!hasNext()) | |
throw new NoSuchElementException(); | |
return get(); | |
} | |
}; | |
} | |
public T get() { | |
try { | |
T t; | |
do { | |
} while ((t = q.poll(TIMEOUT, TimeUnit.MILLISECONDS)) == null && !end); | |
return t; | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
throw new RuntimeException(e); | |
} | |
} | |
@Override | |
public void accept(T t) { | |
try { | |
q.put(t); | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
throw new RuntimeException(e); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment