Skip to content

Instantly share code, notes, and snippets.

@toluju
Created January 20, 2010 20:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save toluju/282211 to your computer and use it in GitHub Desktop.
Save toluju/282211 to your computer and use it in GitHub Desktop.
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.io.Closeable;
/**
* @author Toby
*/
public abstract class ThreadPipeIterator<T> implements Iterator<T>, Closeable {
public static final int QUEUE_SIZE = 100;
private BlockingQueue<T> queue;
private boolean done = false;
private boolean started = false;
private Thread thread;
public ThreadPipeIterator() {
queue = new LinkedBlockingQueue<T>(QUEUE_SIZE);
}
public ThreadPipeIterator(int queueSize) {
queue = new LinkedBlockingQueue<T>(queueSize);
}
protected abstract void process();
protected void emit(T t) throws InterruptedException {
queue.put(t);
}
private void startThread() {
thread = new Thread() {
@Override public void run() {
process();
done = true;
}
};
thread.start();
started = true;
}
public boolean hasNext() {
if (!started) startThread();
while (!done && queue.isEmpty()) {
try {
Thread.sleep(100);
}
catch (InterruptedException e) {
return false;
}
}
if (done) {
return !queue.isEmpty();
}
else {
return true;
}
}
public T next() {
if (!started) startThread();
while (!done && queue.isEmpty()) {
try {
Thread.sleep(100);
}
catch (InterruptedException e) {
return null;
}
}
return queue.poll();
}
public void remove() {
throw new UnsupportedOperationException("Not supported yet.");
}
public void close() throws IOException {
if (thread != null) {
thread.interrupt();
}
}
public static void main(String[] args) throws Exception {
ThreadPipeIterator<Integer> test = new ThreadPipeIterator<Integer>() {
@Override protected void process() {
Random random = new Random();
for (int x = 0; x < 200; ++x) {
try {
emit(random.nextInt());
}
catch (InterruptedException e) {
break;
}
}
}
};
while (test.hasNext()) {
System.out.println(test.next());
}
test.close();
System.out.println("Done!");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment