Skip to content

Instantly share code, notes, and snippets.

@yuba
Last active January 4, 2016 21:49
Show Gist options
  • Save yuba/8683119 to your computer and use it in GitHub Desktop.
Save yuba/8683119 to your computer and use it in GitHub Desktop.
ProducerConsumerIterable<T>
package net.miuras;
import com.sun.istack.internal.NotNull;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.LinkedBlockingQueue;
/**
* When you iterate ProducerConsumerIterable object, the source Iterable object will be iterated in another thread, which you can call a producer thread.
*/
public class ProducerConsumerIterable<T> implements Iterable<T> {
@NotNull final Iterable<T> source;
final int capacity;
/**
* Creates a {@code ProducerConsumerIterable} with a queue capacity of
* {@link Integer#MAX_VALUE}.
*
* @param source the source producer iterable
* @throws IllegalArgumentException if {@code source} is null
*/
public ProducerConsumerIterable(Iterable<T> source) {
if (source == null) throw new IllegalArgumentException();
this.source = source;
this.capacity = Integer.MAX_VALUE;
}
/**
* Creates a {@code ProducerConsumerIterable} with the given (fixed) capacity.
*
* @param source the source producer iterable
* @param capacity the capacity of its queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero or {@code source} is null
*/
public ProducerConsumerIterable(Iterable<T> source, int capacity) throws IllegalArgumentException{
if (source == null || capacity <= 0) throw new IllegalArgumentException();
this.source = source;
this.capacity = capacity;
}
@Override
public Iterator<T> iterator() {
return new Iterator<T>() {
@NotNull final private LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<T>(capacity);
@NotNull final private Object monitor = new Object();
volatile private boolean finished = false;
{
// producer thread
new Thread("Producer - " + source){
@Override
public void run() {
try {
for (final T element: source) {
synchronized (monitor) {
while (!queue.offer(element)) monitor.wait();
monitor.notifyAll();
}
}
} catch (InterruptedException ie) {
// nothing to do
} finally {
synchronized (monitor) {
finished = true;
monitor.notifyAll();
}
}
}
}.start();
}
@Override
public boolean hasNext() {
synchronized (monitor) {
while (true){
if (!queue.isEmpty()) return true;
if (finished) return false;
try {
monitor.wait();
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}
}
}
@Override
public T next() throws NoSuchElementException {
synchronized (monitor) {
while (true){
if (!queue.isEmpty()) {
final T element = queue.poll();
monitor.notifyAll();
return element;
}
if (finished) throw new NoSuchElementException();
try {
monitor.wait();
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}
}
}
@Override
public void remove() throws UnsupportedOperationException{
throw new UnsupportedOperationException();
}
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment