Last active
January 4, 2016 21:49
-
-
Save yuba/8683119 to your computer and use it in GitHub Desktop.
ProducerConsumerIterable<T>
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.miuras; | |
import com.sun.istack.internal.NotNull; | |
import java.util.Iterator; | |
import java.util.NoSuchElementException; | |
import java.util.concurrent.LinkedBlockingQueue; | |
/** | |
* When you iterate ProducerConsumerIterable object, the source Iterable object will be iterated in another thread, which you can call a producer thread. | |
*/ | |
public class ProducerConsumerIterable<T> implements Iterable<T> { | |
@NotNull final Iterable<T> source; | |
final int capacity; | |
/** | |
* Creates a {@code ProducerConsumerIterable} with a queue capacity of | |
* {@link Integer#MAX_VALUE}. | |
* | |
* @param source the source producer iterable | |
* @throws IllegalArgumentException if {@code source} is null | |
*/ | |
public ProducerConsumerIterable(Iterable<T> source) { | |
if (source == null) throw new IllegalArgumentException(); | |
this.source = source; | |
this.capacity = Integer.MAX_VALUE; | |
} | |
/** | |
* Creates a {@code ProducerConsumerIterable} with the given (fixed) capacity. | |
* | |
* @param source the source producer iterable | |
* @param capacity the capacity of its queue | |
* @throws IllegalArgumentException if {@code capacity} is not greater | |
* than zero or {@code source} is null | |
*/ | |
public ProducerConsumerIterable(Iterable<T> source, int capacity) throws IllegalArgumentException{ | |
if (source == null || capacity <= 0) throw new IllegalArgumentException(); | |
this.source = source; | |
this.capacity = capacity; | |
} | |
@Override | |
public Iterator<T> iterator() { | |
return new Iterator<T>() { | |
@NotNull final private LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<T>(capacity); | |
@NotNull final private Object monitor = new Object(); | |
volatile private boolean finished = false; | |
{ | |
// producer thread | |
new Thread("Producer - " + source){ | |
@Override | |
public void run() { | |
try { | |
for (final T element: source) { | |
synchronized (monitor) { | |
while (!queue.offer(element)) monitor.wait(); | |
monitor.notifyAll(); | |
} | |
} | |
} catch (InterruptedException ie) { | |
// nothing to do | |
} finally { | |
synchronized (monitor) { | |
finished = true; | |
monitor.notifyAll(); | |
} | |
} | |
} | |
}.start(); | |
} | |
@Override | |
public boolean hasNext() { | |
synchronized (monitor) { | |
while (true){ | |
if (!queue.isEmpty()) return true; | |
if (finished) return false; | |
try { | |
monitor.wait(); | |
} catch (InterruptedException ie) { | |
throw new RuntimeException(ie); | |
} | |
} | |
} | |
} | |
@Override | |
public T next() throws NoSuchElementException { | |
synchronized (monitor) { | |
while (true){ | |
if (!queue.isEmpty()) { | |
final T element = queue.poll(); | |
monitor.notifyAll(); | |
return element; | |
} | |
if (finished) throw new NoSuchElementException(); | |
try { | |
monitor.wait(); | |
} catch (InterruptedException ie) { | |
throw new RuntimeException(ie); | |
} | |
} | |
} | |
} | |
@Override | |
public void remove() throws UnsupportedOperationException{ | |
throw new UnsupportedOperationException(); | |
} | |
}; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment