Skip to content

Instantly share code, notes, and snippets.

@sguada
Created December 24, 2014 23:26
Show Gist options
  • Save sguada/1e1d474a25f4ddcc7ba8 to your computer and use it in GitHub Desktop.
Save sguada/1e1d474a25f4ddcc7ba8 to your computer and use it in GitHub Desktop.
BlockingQueue with maximum capacity blocks on Push (if full) and on Pop (if empty)
#ifndef CAFFE_UTIL_BLOCKING_QUEUE_H_
#define CAFFE_UTIL_BLOCKING_QUEUE_H_
#include <queue>
#include "boost/thread.hpp"
namespace caffe {
template<typename T>
class BlockingQueue {
public:
void BlockingQueue(const int capacity)
: capacity_(capacity) { CHECK_GT(capacity, 0); }
inline int capacity() const { return capacity_; }
bool empty() const {
boost::mutex::scoped_lock lock(mutex_);
return queue_.empty();
}
bool full() const {
boost::mutex::scoped_lock lock(mutex_);
return (queue_.size() >= capacity_);
}
bool TryPush(const T& t) {
boost::mutex::scoped_lock lock(mutex_);
if (queue_.size() >= capacity_) {
return false;
}
queue_.push(t);
lock.unlock();
empty_condition_.notify_one();
return true;
}
void Push(const T& t, const string& log_on_wait = "") {
boost::mutex::scoped_lock lock(mutex_);
while (queue_.size() >= capacity_) {
if (!log_on_wait.empty()) {
time_t now = time(0);
if (now - last_wait_log_ > 5) {
last_wait_log_ = now;
LOG(INFO) << log_on_wait;
}
}
full_condition_.wait(lock);
}
queue_.push(t);
lock.unlock();
empty_condition_.notify_one();
}
bool TryPop(T& t) {
boost::mutex::scoped_lock lock(mutex_);
if (queue_.empty()) {
return false;
}
t = queue_.front();
queue_.pop();
lock.unlock();
full_condition_.notify_one();
return true;
}
T Pop(const string& log_on_wait = "") {
boost::mutex::scoped_lock lock(mutex_);
while (queue_.empty()) {
if (!log_on_wait.empty()) {
time_t now = time(0);
if (now - last_wait_log_ > 5) {
last_wait_log_ = now;
LOG(INFO) << log_on_wait;
}
}
empty_condition_.wait(lock);
}
T t = queue_.front();
queue_.pop();
lock.unlock();
full_condition_.notify_one();
return t;
}
// Return element without removing it
T Peek() {
boost::mutex::scoped_lock lock(mutex_);
while (queue_.empty())
empty_condition_.wait(lock);
return queue_.front();
}
private:
std::queue<T> queue_;
mutable boost::mutex mutex_;
boost::condition_variable empty_condition_;
boost::condition_variable full_condition_;
time_t last_wait_log_;
int capacity_;
};
} // namespace caffe
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment