Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import java.util.concurrent.*;
public class SynchronizedQueue {
// Keep track of the number of times the producer test iterates.
static volatile int mProducerCounter = 0;
// Keep track of the number of times the consumer test iterates.
static volatile int mConsumerCounter = 0;
// Maximum timeout.
static final int TIMEOUT_SECONDS = 5;
// Error value for a timeout.
static final int TIMEOUT_OCCURRED = -1;
// Error value for a failure.
static final int FAILURE_OCCURRED = -2;
public static class QueueAdapter<E> {
private BlockingQueue<E> mQueue;
public QueueAdapter(BlockingQueue<E> queue) {
mQueue = queue;
}
/**
* Insert msg at the tail of the queue.
*/
public void put(E msg) throws InterruptedException, TimeoutException {
// Keep track of how many times we're called.
mProducerCounter++;
boolean timeoutValue = mQueue.offer(msg,
TIMEOUT_SECONDS,
TimeUnit.SECONDS);
if (timeoutValue == false)
throw new TimeoutException();
}
/**
* Remove msg from the head of the queue.
*/
public E take() throws InterruptedException, TimeoutException {
// Keep track of how many times we're called.
mConsumerCounter++;
E rValue = mQueue.poll(TIMEOUT_SECONDS,
TimeUnit.SECONDS);
if (rValue == null)
throw new TimeoutException();
return rValue;
}
}
private static QueueAdapter<Integer> mQueue = null;
static Runnable producerRunnable = new Runnable() {
public void run() {
for (int i = 0; i < mMaxIterations; i++)
try {
mQueue.put(i);
if (Thread.interrupted())
throw new InterruptedException();
} catch (InterruptedException e) {
System.out.println("Thread properly interrupted by "
+ e.toString() + " in producerRunnable");
// This isn't an error - it just means that
// we've been interrupted by the main Thread.
return;
} catch (TimeoutException e) {
System.out.println("Exception " + e.toString()
+ " occurred in producerRunnable");
// Indicate a timeout.
mProducerCounter = TIMEOUT_OCCURRED;
return;
} catch (Exception e) {
System.out.println("Exception " + e.toString()
+ " occurred in producerRunnable");
// Indicate a failure.
mProducerCounter = FAILURE_OCCURRED;
return;
}
}
};
static Runnable consumerRunnable = new Runnable() {
public void run() {
for (int i = 0; i < mMaxIterations; i++)
try {
if (Thread.interrupted()) {
throw new InterruptedException();
}
Integer result = (Integer) mQueue.take();
System.out.println("iteration = " + result);
} catch (InterruptedException e) {
System.out.println("Thread properly interrupted by "
+ e.toString() + " in consumerRunnable");
// This isn't an error - it just means that
// we've been interrupted by the main Thread.
return;
} catch (TimeoutException e) {
System.out.println("Exception " + e.toString()
+ " occurred in consumerRunnable");
// Indicate a timeout.
mConsumerCounter = TIMEOUT_OCCURRED;
return;
} catch (Exception e) {
System.out.println("Exception " + e.toString()
+ " occurred in consumerRunnable");
// Indicate a failure.
mConsumerCounter = FAILURE_OCCURRED;
return;
}
}
};
// Number of iterations to test
public static int mMaxIterations = 1000000;
public static int mMaxQueueSize = (mMaxIterations / 10);
public static void main(String[] args) {
try {
mQueue = new QueueAdapter<Integer>(new ArrayBlockingQueue<Integer>(mMaxQueueSize));
// mQueue = new QueueAdapter<Integer>(new LinkedBlockingQueue());
// create threads
Thread consumer = new Thread(consumerRunnable);
Thread producer = new Thread(producerRunnable);
// start the threads.
consumer.start();
producer.start();
// Give the Threads a chance to run before interrupting them.
Thread.sleep(100);
// interrupt the threads.
consumer.interrupt();
producer.interrupt();
// wait for the threads to exit.
consumer.join();
producer.join();
} catch (Exception e) {
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment