Skip to content

Instantly share code, notes, and snippets.

@doug65536
Created July 31, 2013 11:30
Show Gist options
  • Save doug65536/6121263 to your computer and use it in GitHub Desktop.
Save doug65536/6121263 to your computer and use it in GitHub Desktop.
#ifdef WITH_BOOST
#include <boost/thread.hpp>
#include <boost/bind.hpp>
typedef boost::thread thread;
typedef boost::mutex thread_mutex;
typedef boost::mutex::scoped_lock thread_scoped_lock;
typedef boost::condition_variable thread_condition_variable;
#define function_bind boost::bind
#define sleep_ms(ms) boost::this_thread::sleep_for(boost::chrono::milliseconds(ms))
#elif __cplusplus >= 201103L
/* C++11 */
#include <thread>
#include <functional>
#include <condition_variable>
#include <mutex>
typedef std::thread thread;
typedef std::mutex thread_mutex;
typedef std::unique_lock<thread_mutex> thread_scoped_lock;
typedef std::condition_variable thread_condition_variable;
#define function_bind std::bind
#define sleep_ms(ms) std::this_thread::sleep_for(std::chrono::milliseconds(ms))
#else
#error Do not know how to use threads on this compiler, enable boost
#endif
#include <deque>
#include <cassert>
template<typename T>
class ProducerConsumer
{
thread_mutex queue_lock;
thread_condition_variable queue_not_empty;
thread_condition_variable queue_not_full;
typedef std::deque<T> Queue;
Queue queue;
/* provides a way for the consumer to know that no more items will be added */
bool pushing_complete;
public:
typedef typename Queue::size_type size_type;
typedef typename Queue::difference_type difference_type;
private:
size_type max_items;
/* disable copy construct and assignment */
ProducerConsumer(const ProducerConsumer&); /* = delete; */
void operator=(const ProducerConsumer&); /* = delete; */
public:
ProducerConsumer()
: max_items(queue.max_size())
, pushing_complete(false)
{
}
void set_limit(size_type limit)
{
thread_scoped_lock lock(queue_lock);
assert(limit > 0);
bool was_full = (queue.size() == max_items);
/* clamp limit to valid range */
if (limit == max_items)
return;
if (limit < 1)
limit = 1;
else if (limit > queue.max_size())
limit = queue.max_size();
int notify;
/* need to have two code paths to avoid signed overflow */
if (limit > max_items) {
size_type added_space = limit - max_items;
max_items = limit;
notify = !was_full ? 0 : added_space > 1 ? 2 : 1;
}
else {
max_items = limit;
notify = 0;
}
if (notify > 1)
queue_not_full.notify_all();
else if (notify > 0)
queue_not_full.notify_one();
}
void push(const T& item)
{
thread_scoped_lock lock(queue_lock);
/* wait for queue to not be full */
while (queue.size() == max_items)
queue_not_full.wait(lock);
queue.push_back(item);
/* if queue was empty before, notify one consumer */
if (queue.size() == 1)
queue_not_empty.notify_one();
}
/* returns false if pushing is complete and queue is completely drained */
bool pop(T& item)
{
thread_scoped_lock lock(queue_lock);
/* wait for queue to not be empty */
while (queue.empty()) {
if (pushing_complete)
return false;
queue_not_empty.wait(lock);
}
item = queue.front();
queue.pop_front();
/* if queue was full before, notify one producer */
if (queue.size() == max_items - 1)
queue_not_full.notify_one();
return true;
}
template<typename I>
void push_range(const I& begin, const I& end)
{
thread_scoped_lock lock(queue_lock);
bool was_empty = queue.empty();
for (I iter = begin; iter != end; ++iter) {
/* wait for queue to not be full */
while (queue.size() == max_items)
queue_not_empty.wait(lock);
queue.push_back(*iter);
}
if (was_empty) {
if (queue.size() > 1)
queue_not_empty.notify_all();
else
queue_not_empty.notify_one();
}
}
/* atomically puts entire queue content into the passed container
* (which requires push_back).
* if wait parameter is true, wait for at least one item */
template<typename C>
bool pop_all_into(C &output_container, bool wait)
{
thread_scoped_lock lock(queue_lock);
if (!wait && queue.empty())
return false;
/* wait for queue to not be empty */
while (queue.empty()) {
if (pushing_complete)
return false;
queue_not_empty.wait(lock);
}
std::copy(queue.begin(), queue.end(), std::back_inserter(output_container));
bool was_full = (queue.size() == max_items);
queue.clear();
/* if queue was full before, notify all producers */
if (was_full) {
if (max_items > 1)
queue_not_full.notify_all();
else
queue_not_full.notify_one();
}
return true;
}
/* when we have C++11 use perfect forwarding */
template<typename C>
void push_all_from(const C &input_container)
{
assert(!pushing_complete);
push_range(input_container.begin(), input_container.end());
}
bool try_push(const T& item)
{
assert(!pushing_complete);
thread_scoped_lock lock(queue_lock);
if (queue.size() == max_items)
return false;
queue.push_back(item);
/* if queue was empty before, notify one consumer */
if (queue.size() == 1)
queue_not_empty.notify_one();
return true;
}
bool try_pop(T& item)
{
thread_scoped_lock lock(queue_lock);
if (queue.empty())
return false;
item = queue.front();
queue.pop_front();
/* if queue was full before, notify one producer */
if (queue.size() == max_items - 1)
queue_not_full.notify_one();
return true;
}
/* tell the queue that you won't be pushing any more items.
* if the queue is empty, wake up all threads that are waiting
* for an item */
void push_complete()
{
thread_scoped_lock lock(queue_lock);
pushing_complete = true;
if (queue.empty())
queue_not_empty.notify_all();
}
/* returns true if pushing is completed,
* but the queue is not necessarily drained */
bool is_push_complete() const
{
thread_scoped_lock lock(queue_lock);
return pushing_complete;
}
/* returns true if pushing is completed and the queue is drained */
bool is_completed() const
{
thread_scoped_lock lock(queue_lock);
return queue.empty() && pushing_complete;
}
bool empty() const
{
thread_scoped_lock lock(queue_lock);
return queue.empty();
}
size_type size() const
{
thread_scoped_lock lock(queue_lock);
return queue.size();
}
size_type capacity() const
{
return max_items;
}
void clear()
{
thread_scoped_lock lock(queue_lock);
/* use swap instead of clear so it will
* actually deallocate memory */
std::swap(queue, Queue());
/* empty queue is not full by definition */
queue_not_full.notify_all();
}
/* clear the queue, allow adding, wake any threads that are blocked
* waiting to add an item */
void reset()
{
thread_scoped_lock lock(queue_lock);
queue.clear();
pushing_complete = false;
queue_not_full.notify_all();
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment