Skip to content

Instantly share code, notes, and snippets.

@lexich
Created June 27, 2012 07:36
Show Gist options
  • Save lexich/3002246 to your computer and use it in GitHub Desktop.
Save lexich/3002246 to your computer and use it in GitHub Desktop.
BlockQueue java realization
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Blocking queue
*
* @param <T> queue item type
*/
class BlockQueue<T>{
private final Queue<T> queue = new LinkedList<T>();
private AtomicInteger queueSize = new AtomicInteger();
/**
* Put object in queue
*
* @param src input object
*/
public synchronized void put(T src) {
queue.add(src);
queueSize.incrementAndGet();
}
/**
* Get object from the head of the queue
*
* @return object
*/
public T pop() {
//if queue is empty in this moment, we don't use blocking
if (isEmpty()) {
System.out.println("BlockQueue::pop() - Queue is empty (don't locked). Thread:" + Thread.currentThread().getId());
return null;
} else {
synchronized (this) {
if (queue.size() > 0) {
queueSize.decrementAndGet();
System.out.println("BlockQueue::pop() - take item from queue. Thread:" + Thread.currentThread().getId());
return queue.poll();
}
System.out.println("BlockQueue::pop() - Queue is empty (locked). Thread:" + Thread.currentThread().getId());
return null;
}
}
}
/**
* Size of the queue
*
* @return size
*/
public int size() {
return queueSize.intValue();
}
/**
* Real size of the queue block function
*
* @return size
*/
public synchronized int realSize() {
return queue.size();
}
/**
* Empty flag of queue state
*
* @return empty flag
*/
public boolean isEmpty() {
return queueSize.intValue() == 0;
}
}
/**
* BlockQueue Reader
*/
class Reader implements Runnable{
private final BlockQueue<Integer> queue;
private AtomicBoolean stopFlag = new AtomicBoolean(false);
public Reader(BlockQueue<Integer> aQueue) {
queue = aQueue;
}
/**
* Stop process when queue is empty
*/
public void safeStop() {
stopFlag.set(true);
}
public void run() {
stopFlag.set(false);
/**
* Cycle works until another thread to call safeStop() && queue to be empty
* isEmpty() is CAS function and when we call pop, another thread may clear queue
* and we get null result
*/
while (!queue.isEmpty() | !stopFlag.get()) {
Integer obj = queue.pop();
if (obj != null) {
System.out.println("Reader::run() - Read:" + obj + ". Thread:" + Thread.currentThread().getId());
} else {
System.out.println("Reader::run() - Empty. Thread:" + Thread.currentThread().getId());
}
}
}
}
/**
* BlockQueue Writer
*/
class Writer implements Runnable{
private final BlockQueue<Integer> queue;
private final int from;
private final int to;
public Writer(final BlockQueue<Integer> aqueue, final int afrom, final int ato) {
if (afrom > ato) {
throw new IllegalArgumentException("afrom > ato");
}
queue = aqueue;
from = afrom;
to = ato;
}
public void run() {
for (int i = from; i < to; ++i) {
System.out.println("Writer::run() - Write:" + i + ". Thread:" + Thread.currentThread().getId());
queue.put(i);
}
}
}
/**
* Executor class
*/
class MainClass{
private BlockQueue<Integer> queue = new BlockQueue<Integer>();
/**
* Excecution
*
* @param aWriter number of writers
* @param aReader number of readers
* @param interval item numbers which reader threaf put in queue
*
* @throws InterruptedException
*/
public void exec(int aWriter, int aReader, int interval) throws InterruptedException {
if (aReader <= 0)
throw new IllegalArgumentException("aReader < 0");
if (aWriter <= 0)
throw new IllegalArgumentException("aWriter < 0");
if (interval <= 0)
throw new IllegalArgumentException("interval < 0");
Thread thWriters[] = new Thread[aWriter];
Thread thReaders[] = new Thread[aReader];
Reader readWorkers[] = new Reader[aReader];
int from = 0;
for (int i = 0; i < aWriter; ++i) {
thWriters[i] = new Thread(new Writer(queue, from, from + interval));
from += interval;
}
for (int i = 0; i < aReader; ++i) {
Reader reader = new Reader(queue);
readWorkers[i] = reader;
thReaders[i] = new Thread(reader);
}
for (Thread th : thReaders)
th.start();
for (Thread th : thWriters)
th.start();
for (Thread th : thWriters)
th.join();
for (Reader reader : readWorkers)
reader.safeStop();
for (Thread th : thReaders)
th.join();
System.out.println("MainClass::exec() - Size queue:" + queue.size() + ". Real queue size:" + queue.realSize());
}
}
public class Main{
public static void main(String[] args) {
Integer writer = null, reader = null, interval = null;
for (String arg : args) {
String pair[] = arg.split("=");
if (pair.length == 2) {
if (pair[0].equals("w")) {
writer = Integer.parseInt(pair[1]);
} else if (pair[0].equals("r")) {
reader = Integer.parseInt(pair[1]);
} else if (pair[0].equals("i")) {
interval = Integer.parseInt(pair[1]);
}
}
}
Scanner scanner = new Scanner(System.in);
if (writer == null) {
System.out.print("Input number of writers:");
writer = scanner.nextInt();
}
if (reader == null) {
System.out.print("Input number of readers:");
reader = scanner.nextInt();
}
if (interval == null) {
System.out.print("Input interval:");
interval = scanner.nextInt();
}
System.out.println("Main::main() - Start");
MainClass app = new MainClass();
Date start = new Date();
try {
app.exec(writer, reader, interval);
} catch (InterruptedException e) {
e.printStackTrace();
}
Date stop = new Date();
long time = stop.getTime() - start.getTime();
System.out.println("Main::main() - Stop time:" + time);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment