Created
June 27, 2012 07:36
-
-
Save lexich/3002246 to your computer and use it in GitHub Desktop.
BlockQueue java realization
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicInteger; | |
/** | |
* Blocking queue | |
* | |
* @param <T> queue item type | |
*/ | |
class BlockQueue<T>{ | |
private final Queue<T> queue = new LinkedList<T>(); | |
private AtomicInteger queueSize = new AtomicInteger(); | |
/** | |
* Put object in queue | |
* | |
* @param src input object | |
*/ | |
public synchronized void put(T src) { | |
queue.add(src); | |
queueSize.incrementAndGet(); | |
} | |
/** | |
* Get object from the head of the queue | |
* | |
* @return object | |
*/ | |
public T pop() { | |
//if queue is empty in this moment, we don't use blocking | |
if (isEmpty()) { | |
System.out.println("BlockQueue::pop() - Queue is empty (don't locked). Thread:" + Thread.currentThread().getId()); | |
return null; | |
} else { | |
synchronized (this) { | |
if (queue.size() > 0) { | |
queueSize.decrementAndGet(); | |
System.out.println("BlockQueue::pop() - take item from queue. Thread:" + Thread.currentThread().getId()); | |
return queue.poll(); | |
} | |
System.out.println("BlockQueue::pop() - Queue is empty (locked). Thread:" + Thread.currentThread().getId()); | |
return null; | |
} | |
} | |
} | |
/** | |
* Size of the queue | |
* | |
* @return size | |
*/ | |
public int size() { | |
return queueSize.intValue(); | |
} | |
/** | |
* Real size of the queue block function | |
* | |
* @return size | |
*/ | |
public synchronized int realSize() { | |
return queue.size(); | |
} | |
/** | |
* Empty flag of queue state | |
* | |
* @return empty flag | |
*/ | |
public boolean isEmpty() { | |
return queueSize.intValue() == 0; | |
} | |
} | |
/** | |
* BlockQueue Reader | |
*/ | |
class Reader implements Runnable{ | |
private final BlockQueue<Integer> queue; | |
private AtomicBoolean stopFlag = new AtomicBoolean(false); | |
public Reader(BlockQueue<Integer> aQueue) { | |
queue = aQueue; | |
} | |
/** | |
* Stop process when queue is empty | |
*/ | |
public void safeStop() { | |
stopFlag.set(true); | |
} | |
public void run() { | |
stopFlag.set(false); | |
/** | |
* Cycle works until another thread to call safeStop() && queue to be empty | |
* isEmpty() is CAS function and when we call pop, another thread may clear queue | |
* and we get null result | |
*/ | |
while (!queue.isEmpty() | !stopFlag.get()) { | |
Integer obj = queue.pop(); | |
if (obj != null) { | |
System.out.println("Reader::run() - Read:" + obj + ". Thread:" + Thread.currentThread().getId()); | |
} else { | |
System.out.println("Reader::run() - Empty. Thread:" + Thread.currentThread().getId()); | |
} | |
} | |
} | |
} | |
/** | |
* BlockQueue Writer | |
*/ | |
class Writer implements Runnable{ | |
private final BlockQueue<Integer> queue; | |
private final int from; | |
private final int to; | |
public Writer(final BlockQueue<Integer> aqueue, final int afrom, final int ato) { | |
if (afrom > ato) { | |
throw new IllegalArgumentException("afrom > ato"); | |
} | |
queue = aqueue; | |
from = afrom; | |
to = ato; | |
} | |
public void run() { | |
for (int i = from; i < to; ++i) { | |
System.out.println("Writer::run() - Write:" + i + ". Thread:" + Thread.currentThread().getId()); | |
queue.put(i); | |
} | |
} | |
} | |
/** | |
* Executor class | |
*/ | |
class MainClass{ | |
private BlockQueue<Integer> queue = new BlockQueue<Integer>(); | |
/** | |
* Excecution | |
* | |
* @param aWriter number of writers | |
* @param aReader number of readers | |
* @param interval item numbers which reader threaf put in queue | |
* | |
* @throws InterruptedException | |
*/ | |
public void exec(int aWriter, int aReader, int interval) throws InterruptedException { | |
if (aReader <= 0) | |
throw new IllegalArgumentException("aReader < 0"); | |
if (aWriter <= 0) | |
throw new IllegalArgumentException("aWriter < 0"); | |
if (interval <= 0) | |
throw new IllegalArgumentException("interval < 0"); | |
Thread thWriters[] = new Thread[aWriter]; | |
Thread thReaders[] = new Thread[aReader]; | |
Reader readWorkers[] = new Reader[aReader]; | |
int from = 0; | |
for (int i = 0; i < aWriter; ++i) { | |
thWriters[i] = new Thread(new Writer(queue, from, from + interval)); | |
from += interval; | |
} | |
for (int i = 0; i < aReader; ++i) { | |
Reader reader = new Reader(queue); | |
readWorkers[i] = reader; | |
thReaders[i] = new Thread(reader); | |
} | |
for (Thread th : thReaders) | |
th.start(); | |
for (Thread th : thWriters) | |
th.start(); | |
for (Thread th : thWriters) | |
th.join(); | |
for (Reader reader : readWorkers) | |
reader.safeStop(); | |
for (Thread th : thReaders) | |
th.join(); | |
System.out.println("MainClass::exec() - Size queue:" + queue.size() + ". Real queue size:" + queue.realSize()); | |
} | |
} | |
public class Main{ | |
public static void main(String[] args) { | |
Integer writer = null, reader = null, interval = null; | |
for (String arg : args) { | |
String pair[] = arg.split("="); | |
if (pair.length == 2) { | |
if (pair[0].equals("w")) { | |
writer = Integer.parseInt(pair[1]); | |
} else if (pair[0].equals("r")) { | |
reader = Integer.parseInt(pair[1]); | |
} else if (pair[0].equals("i")) { | |
interval = Integer.parseInt(pair[1]); | |
} | |
} | |
} | |
Scanner scanner = new Scanner(System.in); | |
if (writer == null) { | |
System.out.print("Input number of writers:"); | |
writer = scanner.nextInt(); | |
} | |
if (reader == null) { | |
System.out.print("Input number of readers:"); | |
reader = scanner.nextInt(); | |
} | |
if (interval == null) { | |
System.out.print("Input interval:"); | |
interval = scanner.nextInt(); | |
} | |
System.out.println("Main::main() - Start"); | |
MainClass app = new MainClass(); | |
Date start = new Date(); | |
try { | |
app.exec(writer, reader, interval); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
Date stop = new Date(); | |
long time = stop.getTime() - start.getTime(); | |
System.out.println("Main::main() - Stop time:" + time); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment