Skip to content

Instantly share code, notes, and snippets.

@doug65536
Last active December 20, 2015 10:31
Show Gist options
  • Save doug65536/6115840 to your computer and use it in GitHub Desktop.
Save doug65536/6115840 to your computer and use it in GitHub Desktop.
Producer consumer queue
template<typename T>
class ProducerConsumer
{
thread_mutex queue_lock;
thread_condition_variable queue_not_empty;
thread_condition_variable queue_not_full;
typedef std::deque<T> Queue;
Queue queue;
public:
typedef typename Queue::size_type size_type;
typedef typename Queue::difference_type difference_type;
private:
size_type max_items;
/* disable copy construct and assignment */
ProducerConsumer(const ProducerConsumer&); /* = delete; */
void operator=(const ProducerConsumer&); /* = delete; */
public:
ProducerConsumer()
: max_items(queue.max_size())
{
}
void set_limit(size_type limit)
{
thread_scoped_lock lock(queue_lock);
assert(limit > 0);
bool was_full = (queue.size() == max_items);
/* clamp limit to valid range */
if (limit == max_items)
return;
if (limit < 1)
limit = 1;
else if (limit > queue.max_size())
limit = queue.max_size();
int notify;
/* need to have two code paths to avoid signed overflow */
if (limit > max_items) {
size_type added_space = limit - max_items;
max_items = limit;
notify = !was_full ? 0 : added_space > 1 ? 2 : 1;
}
else {
max_items = limit;
notify = 0;
}
if (notify > 1)
queue_not_full.notify_all();
else if (notify > 0)
queue_not_full.notify_one();
}
void push(const T& item)
{
thread_scoped_lock lock(queue_lock);
/* wait for queue to not be full */
while (queue.size() == max_items)
queue_not_full.wait(lock);
queue.push_back(item);
/* if queue was empty before, notify one consumer */
if (queue.size() == 1)
queue_not_empty.notify_one();
}
void pop(T& item)
{
thread_scoped_lock lock(queue_lock);
/* wait for queue to not be empty */
while (queue.empty())
queue_not_empty.wait(lock);
item = queue.front();
queue.pop_front();
/* if queue was full before, notify one producer */
if (queue.size() == max_items - 1)
queue_not_full.notify_one();
}
template<typename I>
void push_range(const I& begin, const I& end)
{
thread_scoped_lock lock(queue_lock);
bool was_empty = queue.empty();
for (I iter = begin; iter != end; ++iter) {
/* wait for queue to not be full */
while (queue.size() == max_items)
queue_not_empty.wait(lock);
queue.push_back(*iter);
}
if (was_empty) {
if (queue.size() > 1)
queue_not_empty.notify_all();
else
queue_not_empty.notify_one();
}
}
/* atomically puts entire queue content into the passed container
* (which requires push_back).
* if wait parameter is true, wait for at least one item */
template<typename C>
bool pop_all_into(C &output_container, bool wait)
{
thread_scoped_lock lock(queue_lock);
if (!wait && queue.empty())
return false;
/* wait for queue to not be empty */
while (queue.empty())
queue_not_empty.wait(lock);
std::copy(queue.begin(), queue.end(), std::back_inserter(output_container));
bool was_full = (queue.size() == max_items);
queue.clear();
/* if queue was full before, notify all producers */
if (was_full) {
if (max_items > 1)
queue_not_full.notify_all();
else
queue_not_full.notify_one();
}
}
/* when we have C++11 use perfect forwarding */
template<typename C>
void push_all_from(const C &input_container)
{
push_range(input_container.begin(), input_container.end());
}
bool try_push(const T& item)
{
thread_scoped_lock lock(queue_lock);
if (queue.size() == max_items)
return false;
queue.push_back(item);
/* if queue was empty before, notify one consumer */
if (queue.size() == 1)
queue_not_empty.notify_one();
return true;
}
bool try_pop(T& item)
{
thread_scoped_lock lock(queue_lock);
if (queue.empty())
return false;
item = queue.front();
queue.pop_front();
/* if queue was full before, notify one producer */
if (queue.size() == max_items - 1)
queue_not_full.notify_one();
return true;
}
bool empty() const
{
thread_scoped_lock lock(queue_lock);
return queue.empty();
}
size_type size() const
{
thread_scoped_lock lock(queue_lock);
return queue.size();
}
size_type capacity() const
{
return max_items;
}
void clear()
{
thread_scoped_lock lock(queue_lock);
/* use swap instead of clear so it will
* actually deallocate memory */
std::swap(queue, Queue());
/* empty queue is not full by definition */
queue_not_full.notify_all();
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment