Skip to content

Instantly share code, notes, and snippets.

@dougnukem
Created September 25, 2011 23:32
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 9 You must be signed in to fork a gist
  • Save dougnukem/1241317 to your computer and use it in GitHub Desktop.
Save dougnukem/1241317 to your computer and use it in GitHub Desktop.
Example Threadsafe BlockingQueue implementation in Java
public class BlockingQueue implements Queue {
private java.util.Queue queue = new java.util.LinkedList();
/**
* Make a blocking Dequeue call so that we'll only return when the queue has
* something on it, otherwise we'll wait until something is put on it.
*
* @returns This will return null if the thread wait() call is interrupted.
*/
public synchronized Object dequeue() {
Object msg = null;
while (queue.isEmpty()) {
try {
wait();
} catch (InterruptedException e) {
// Error return the client a null item
return msg;
}
}
msg = queue.remove();
return msg;
}
/**
* Enqueue will add an object to this queue, and will notify any waiting
* threads that there is an object available.
*/
public synchronized void enqueue(Object o) {
queue.add(o);
// Wake up anyone waiting for something to be put on the queue.
notifyAll();
}
}
public class Consumer implements Runnable {
// This will be assigned in the constructor
private Queue queue = null;
public void process(Object msg) {
try {
//process message non-trivially (IE: it takes awhile).
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void run() {
while(true) {
doStuff();
}
}
public void doStuff() {
Object msg = queue.dequeue();
process(msg);
}
}
public class Producer implements Runnable {
// This will be assigned in the constructor
private Queue queue = null;
public void run() {
// Binds to socket, reads messages in
// packages message calls doSomething()
// doSomething(Object msg);
}
public void doSomething(Object msg) {
queue.enqueue(msg);
}
}
@jeremyshi
Copy link

I think you also need to consider the case that queue is full. In the real world, queue cannot has unlimited space.

@kunkun-tang
Copy link

@jeremyshi is correct, you need a limit attribute in the class of BlockingQueue

@vikranthpatoju
Copy link

does while loop is required?? having just if condition should serve the purpose rite??

@ianomad
Copy link

ianomad commented Dec 20, 2016

@vikranthpatoju is right. In enqueue method it is enough to do notify instead of notifyAll. The while(isEmpty) is not safe, since if there are 2 consumers simultaneously read the same element in the queue, you will have a race condition.

@shankyty
Copy link

shankyty commented Sep 25, 2017

       public class BoundedList<T> {
            private final int capcity;
            private final Queue<T> list;
            private final Object lock = new Object();
        
            public BoundedList(int capcity) {
                this.capcity = capcity;
                this.list = new ArrayDeque<>(capcity);
            }
        
            T poll() throws InterruptedException {
                synchronized (lock) {
                    while (list.isEmpty()) {
                        lock.wait();
                    }
                    T e = list.poll();
                    lock.notify();
                    return e;
                }
            }
        
            void add(T e) throws InterruptedException {
                synchronized (lock) {
                    while (list.size() == capcity) {
                        lock.wait();
                    }
                    list.add(e);
                    lock.notify();
                }
            }
        
        
        }

will this work

@chrislzm
Copy link

chrislzm commented Jul 6, 2018

@shankyty No, unfortunately that won't work, because it can result in the lost wakeup problem. Check out this stack overflow article: https://stackoverflow.com/a/3186336/7602403

One solution is to replace your calls to lock.notify() with lock.notifyAll()

@mtahiriqbaldeveloper
Copy link

@chrislzm its depends on the situation notifyAll will cause the thread contentation eg if a lot of threads read or get the data from the queue and few of threads writing the data to the queue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment