Created
July 31, 2013 11:30
-
-
Save doug65536/6121263 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifdef WITH_BOOST | |
#include <boost/thread.hpp> | |
#include <boost/bind.hpp> | |
typedef boost::thread thread; | |
typedef boost::mutex thread_mutex; | |
typedef boost::mutex::scoped_lock thread_scoped_lock; | |
typedef boost::condition_variable thread_condition_variable; | |
#define function_bind boost::bind | |
#define sleep_ms(ms) boost::this_thread::sleep_for(boost::chrono::milliseconds(ms)) | |
#elif __cplusplus >= 201103L | |
/* C++11 */ | |
#include <thread> | |
#include <functional> | |
#include <condition_variable> | |
#include <mutex> | |
typedef std::thread thread; | |
typedef std::mutex thread_mutex; | |
typedef std::unique_lock<thread_mutex> thread_scoped_lock; | |
typedef std::condition_variable thread_condition_variable; | |
#define function_bind std::bind | |
#define sleep_ms(ms) std::this_thread::sleep_for(std::chrono::milliseconds(ms)) | |
#else | |
#error Do not know how to use threads on this compiler, enable boost | |
#endif | |
#include <deque> | |
#include <cassert> | |
template<typename T> | |
class ProducerConsumer | |
{ | |
thread_mutex queue_lock; | |
thread_condition_variable queue_not_empty; | |
thread_condition_variable queue_not_full; | |
typedef std::deque<T> Queue; | |
Queue queue; | |
/* provides a way for the consumer to know that no more items will be added */ | |
bool pushing_complete; | |
public: | |
typedef typename Queue::size_type size_type; | |
typedef typename Queue::difference_type difference_type; | |
private: | |
size_type max_items; | |
/* disable copy construct and assignment */ | |
ProducerConsumer(const ProducerConsumer&); /* = delete; */ | |
void operator=(const ProducerConsumer&); /* = delete; */ | |
public: | |
ProducerConsumer() | |
: max_items(queue.max_size()) | |
, pushing_complete(false) | |
{ | |
} | |
void set_limit(size_type limit) | |
{ | |
thread_scoped_lock lock(queue_lock); | |
assert(limit > 0); | |
bool was_full = (queue.size() == max_items); | |
/* clamp limit to valid range */ | |
if (limit == max_items) | |
return; | |
if (limit < 1) | |
limit = 1; | |
else if (limit > queue.max_size()) | |
limit = queue.max_size(); | |
int notify; | |
/* need to have two code paths to avoid signed overflow */ | |
if (limit > max_items) { | |
size_type added_space = limit - max_items; | |
max_items = limit; | |
notify = !was_full ? 0 : added_space > 1 ? 2 : 1; | |
} | |
else { | |
max_items = limit; | |
notify = 0; | |
} | |
if (notify > 1) | |
queue_not_full.notify_all(); | |
else if (notify > 0) | |
queue_not_full.notify_one(); | |
} | |
void push(const T& item) | |
{ | |
thread_scoped_lock lock(queue_lock); | |
/* wait for queue to not be full */ | |
while (queue.size() == max_items) | |
queue_not_full.wait(lock); | |
queue.push_back(item); | |
/* if queue was empty before, notify one consumer */ | |
if (queue.size() == 1) | |
queue_not_empty.notify_one(); | |
} | |
/* returns false if pushing is complete and queue is completely drained */ | |
bool pop(T& item) | |
{ | |
thread_scoped_lock lock(queue_lock); | |
/* wait for queue to not be empty */ | |
while (queue.empty()) { | |
if (pushing_complete) | |
return false; | |
queue_not_empty.wait(lock); | |
} | |
item = queue.front(); | |
queue.pop_front(); | |
/* if queue was full before, notify one producer */ | |
if (queue.size() == max_items - 1) | |
queue_not_full.notify_one(); | |
return true; | |
} | |
template<typename I> | |
void push_range(const I& begin, const I& end) | |
{ | |
thread_scoped_lock lock(queue_lock); | |
bool was_empty = queue.empty(); | |
for (I iter = begin; iter != end; ++iter) { | |
/* wait for queue to not be full */ | |
while (queue.size() == max_items) | |
queue_not_empty.wait(lock); | |
queue.push_back(*iter); | |
} | |
if (was_empty) { | |
if (queue.size() > 1) | |
queue_not_empty.notify_all(); | |
else | |
queue_not_empty.notify_one(); | |
} | |
} | |
/* atomically puts entire queue content into the passed container | |
* (which requires push_back). | |
* if wait parameter is true, wait for at least one item */ | |
template<typename C> | |
bool pop_all_into(C &output_container, bool wait) | |
{ | |
thread_scoped_lock lock(queue_lock); | |
if (!wait && queue.empty()) | |
return false; | |
/* wait for queue to not be empty */ | |
while (queue.empty()) { | |
if (pushing_complete) | |
return false; | |
queue_not_empty.wait(lock); | |
} | |
std::copy(queue.begin(), queue.end(), std::back_inserter(output_container)); | |
bool was_full = (queue.size() == max_items); | |
queue.clear(); | |
/* if queue was full before, notify all producers */ | |
if (was_full) { | |
if (max_items > 1) | |
queue_not_full.notify_all(); | |
else | |
queue_not_full.notify_one(); | |
} | |
return true; | |
} | |
/* when we have C++11 use perfect forwarding */ | |
template<typename C> | |
void push_all_from(const C &input_container) | |
{ | |
assert(!pushing_complete); | |
push_range(input_container.begin(), input_container.end()); | |
} | |
bool try_push(const T& item) | |
{ | |
assert(!pushing_complete); | |
thread_scoped_lock lock(queue_lock); | |
if (queue.size() == max_items) | |
return false; | |
queue.push_back(item); | |
/* if queue was empty before, notify one consumer */ | |
if (queue.size() == 1) | |
queue_not_empty.notify_one(); | |
return true; | |
} | |
bool try_pop(T& item) | |
{ | |
thread_scoped_lock lock(queue_lock); | |
if (queue.empty()) | |
return false; | |
item = queue.front(); | |
queue.pop_front(); | |
/* if queue was full before, notify one producer */ | |
if (queue.size() == max_items - 1) | |
queue_not_full.notify_one(); | |
return true; | |
} | |
/* tell the queue that you won't be pushing any more items. | |
* if the queue is empty, wake up all threads that are waiting | |
* for an item */ | |
void push_complete() | |
{ | |
thread_scoped_lock lock(queue_lock); | |
pushing_complete = true; | |
if (queue.empty()) | |
queue_not_empty.notify_all(); | |
} | |
/* returns true if pushing is completed, | |
* but the queue is not necessarily drained */ | |
bool is_push_complete() const | |
{ | |
thread_scoped_lock lock(queue_lock); | |
return pushing_complete; | |
} | |
/* returns true if pushing is completed and the queue is drained */ | |
bool is_completed() const | |
{ | |
thread_scoped_lock lock(queue_lock); | |
return queue.empty() && pushing_complete; | |
} | |
bool empty() const | |
{ | |
thread_scoped_lock lock(queue_lock); | |
return queue.empty(); | |
} | |
size_type size() const | |
{ | |
thread_scoped_lock lock(queue_lock); | |
return queue.size(); | |
} | |
size_type capacity() const | |
{ | |
return max_items; | |
} | |
void clear() | |
{ | |
thread_scoped_lock lock(queue_lock); | |
/* use swap instead of clear so it will | |
* actually deallocate memory */ | |
std::swap(queue, Queue()); | |
/* empty queue is not full by definition */ | |
queue_not_full.notify_all(); | |
} | |
/* clear the queue, allow adding, wake any threads that are blocked | |
* waiting to add an item */ | |
void reset() | |
{ | |
thread_scoped_lock lock(queue_lock); | |
queue.clear(); | |
pushing_complete = false; | |
queue_not_full.notify_all(); | |
} | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment