Skip to content

Instantly share code, notes, and snippets.

@bholt
Created July 30, 2015 16:00
Show Gist options
  • Save bholt/d85dfa634190829b217c to your computer and use it in GitHub Desktop.
Save bholt/d85dfa634190829b217c to your computer and use it in GitHub Desktop.
Concurrent queue built for single-producer/single-consumer that uses a condition variable to signal consumer.
#include <future>
using namespace std;
template< typename T >
class ConcurrentQueue {
Queue<T> q;
mutex m;
condition_variable cv;
bool closed;
void block_until_nonempty(unique_lock<mutex>& lk) {
cv.wait(lk, [this]{ return !q.empty() || closed; });
}
public:
ConcurrentQueue(): q(), m(), cv(), closed(false) {}
void close() { closed = true; cv.notify_one(); }
// template< typename... Args >
// void emplace(Args&&... args) {
// {
// lock_guard<mutex> _(m);
// q.push_back(*new T(std::forward<Args>(args)...));
// }
// cv.notify_one();
// }
void push(T *o) {
{
lock_guard<mutex> _(m);
q.push_back(*o);
}
cv.notify_one();
}
bool prepend(Queue<T>& o) {
bool newitems = false;
{
lock_guard<mutex> _(m);
newitems = !o.empty();
q.splice(q.begin(), o);
}
cv.notify_one();
return newitems;
}
friend bool operator<<(Queue<T>& o, ConcurrentQueue<T>& q) {
unique_lock<mutex> lk(q.m);
q.block_until_nonempty(lk);
// lock_guard<mutex> _(q.m);
bool newitems = !q.q.empty();
o.splice(o.end(), q.q);
lk.unlock();
return newitems;
}
friend bool operator<<(ConcurrentQueue<T>& q, Queue<T>& o) {
bool newitems = false;
{
lock_guard<mutex> _(q.m);
newitems = !o.empty();
q.q.splice(q.q.end(), o);
}
q.cv.notify_one();
return newitems;
}
friend void operator<<(ConcurrentQueue<T>& q, T *o) { q.push(o); }
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment