Skip to content

Instantly share code, notes, and snippets.

@PolarNick239
Last active September 11, 2022 12:30
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save PolarNick239/f727c0cd923398dc397a05f515452123 to your computer and use it in GitHub Desktop.
Save PolarNick239/f727c0cd923398dc397a05f515452123 to your computer and use it in GitHub Desktop.
C++ concurrent blocking queue with limited size (based on boost condition_variable)
#include <queue>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
// Based on https://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html
template<typename Data>
class BlockingQueue {
private:
std::queue<Data> queue;
mutable boost::mutex queue_mutex;
const size_t queue_limit;
bool is_closed = false;
boost::condition_variable new_item_or_closed_event;
boost::condition_variable item_removed_event;
#ifndef NDEBUG
size_t pushes_in_progress = 0;
#endif
public:
BlockingQueue(size_t size_limit=0) : queue_limit(size_limit)
{}
void push(const Data& data)
{
boost::mutex::scoped_lock lock(queue_mutex);
#ifndef NDEBUG
++pushes_in_progress;
#endif
if (queue_limit > 0) {
while (queue.size() >= queue_limit) {
item_removed_event.wait(lock);
}
}
assert (!is_closed);
queue.push(data);
#ifndef NDEBUG
--pushes_in_progress;
#endif
new_item_or_closed_event.notify_one();
}
bool try_push(const Data& data)
{
boost::mutex::scoped_lock lock(queue_mutex);
if (queue_limit > 0) {
if (queue.size() >= queue_limit) {
return false;
}
}
assert (!is_closed);
queue.push(data);
new_item_or_closed_event.notify_one();
return true;
}
void close()
{
boost::mutex::scoped_lock lock(queue_mutex);
assert (!is_closed);
#ifndef NDEBUG
assert (pushes_in_progress == 0);
#endif
is_closed = true;
new_item_or_closed_event.notify_all();
}
bool pop(Data &popped_value)
{
boost::mutex::scoped_lock lock(queue_mutex);
while (queue.empty()) {
if (is_closed) {
return false;
}
new_item_or_closed_event.wait(lock);
}
popped_value = queue.front();
queue.pop();
item_removed_event.notify_one();
return true;
}
bool try_pop(Data &popped_value)
{
boost::mutex::scoped_lock lock(queue_mutex);
if (queue.empty()) {
return false;
}
popped_value = queue.front();
queue.pop();
item_removed_event.notify_one();
return true;
}
bool empty() const
{
boost::mutex::scoped_lock lock(queue_mutex);
return queue.empty();
}
bool closed() const
{
boost::mutex::scoped_lock lock(queue_mutex);
return is_closed;
}
size_t limit() const
{
return queue_limit;
}
size_t size() const
{
boost::mutex::scoped_lock lock(queue_mutex);
return queue.size();
}
};
#include <gtest/gtest.h>
#include "blocking_queue.h"
#include <boost/thread/thread.hpp>
TEST(blocking_queue, simple_try_pop)
{
const int n = 1000;
BlockingQueue<int> queue;
ASSERT_TRUE(queue.empty());
for (int i = 0; i < n; ++i) {
int res;
ASSERT_FALSE(queue.try_pop(res));
queue.push(i);
ASSERT_TRUE(queue.try_pop(res));
ASSERT_EQ(i, res);
}
ASSERT_TRUE(queue.empty());
queue.close();
int res;
ASSERT_FALSE(queue.pop(res));
ASSERT_TRUE(queue.closed());
ASSERT_TRUE(queue.empty());
}
TEST(blocking_queue, simple_pop)
{
const int n = 1000;
BlockingQueue<int> queue;
ASSERT_TRUE(queue.empty());
for (int i = 0; i < n; ++i) {
int res;
queue.push(i);
ASSERT_TRUE(queue.pop(res));
ASSERT_EQ(i, res);
}
ASSERT_TRUE(queue.empty());
queue.close();
int res;
ASSERT_FALSE(queue.pop(res));
ASSERT_TRUE(queue.closed());
ASSERT_TRUE(queue.empty());
}
TEST(blocking_queue, push_and_close)
{
BlockingQueue<int> queue;
ASSERT_TRUE(queue.empty());
ASSERT_FALSE(queue.closed());
queue.push(239);
queue.close();
ASSERT_FALSE(queue.empty());
ASSERT_TRUE(queue.closed());
int res;
ASSERT_TRUE(queue.pop(res));
ASSERT_EQ(239, res);
ASSERT_FALSE(queue.pop(res));
}
class QueuePopWorker {
BlockingQueue<int>& queue;
public:
std::vector<int> results;
QueuePopWorker(BlockingQueue<int>& queue) : queue(queue) {}
void start() {
int result;
while(queue.pop(result)) {
results.push_back(result);
}
ASSERT_TRUE(queue.empty());
ASSERT_TRUE(queue.closed());
}
};
class QueuePushWorker {
BlockingQueue<int>& queue;
std::vector<int> data;
bool should_close;
public:
QueuePushWorker(BlockingQueue<int>& queue, std::vector<int> data, bool should_close) : queue(queue), data(data), should_close(should_close) {}
void start() {
int result;
for (auto value : data) {
if (queue.limit() != 0) {
ASSERT_LE(queue.size(), queue.limit());
}
queue.push(value);
if (queue.limit() != 0) {
ASSERT_LE(queue.size(), queue.limit());
}
}
if (should_close) {
queue.close();
}
}
};
TEST(blocking_queue, single_producer_single_consumer)
{
const int runs = 1000;
const int n = 1000;
const int limit = 100;
for (int run = 0; run < runs; ++run) {
BlockingQueue<int> queue(limit);
ASSERT_TRUE(queue.empty());
std::vector<int> data;
for (int i = 0; i < n; ++i) {
data.push_back(i);
}
QueuePushWorker producer(queue, data, true);
QueuePopWorker consumer(queue);
boost::thread consumer_thread(&QueuePopWorker::start, &consumer);
boost::thread producer_thread(&QueuePushWorker::start, &producer);
producer_thread.join();
consumer_thread.join();
ASSERT_TRUE(queue.empty());
ASSERT_TRUE(queue.closed());
ASSERT_EQ(consumer.results.size(), n);
for (int i = 0; i < n; ++i) {
ASSERT_EQ(consumer.results[i], i);
}
}
}
TEST(blocking_queue, multiple_producer_multilple_consumer)
{
const int runs = 10;
const int n = 10000;
const int limit = 100;
const int units = 10;
for (int run = 0; run < runs; ++run) {
BlockingQueue<int> queue(limit);
ASSERT_TRUE(queue.empty());
std::vector<QueuePushWorker> producers;
std::vector<QueuePopWorker> consumers;
for (int i = 0; i < units; ++i) {
std::vector<int> data;
for (int j = i * n; j < (i + 1) * n; ++j) {
data.push_back(j);
}
producers.push_back(QueuePushWorker(queue, data, false));
consumers.push_back(QueuePopWorker(queue));
}
std::vector<boost::thread> producers_threads(units);
std::vector<boost::thread> consumers_threads(units);
for (int i = 0; i < units; ++i) {
producers_threads[i] = boost::thread(&QueuePushWorker::start, &producers[i]);
consumers_threads[i] = boost::thread(&QueuePopWorker::start, &consumers[i]);
}
for (int i = 0; i < units; ++i) {
producers_threads[i].join();
}
queue.close();
for (int i = 0; i < units; ++i) {
consumers_threads[i].join();
}
ASSERT_TRUE(queue.empty());
ASSERT_TRUE(queue.closed());
std::vector<bool> is_found(n * units, false);
for (int i = 0; i < units; ++i) {
for (int value : consumers[i].results) {
ASSERT_FALSE(is_found[value]);
ASSERT_GE(value, 0);
ASSERT_LT(value, n * units);
is_found[value] = true;
}
}
for (int i = 0; i < n * units; ++i) {
ASSERT_TRUE(is_found[i]);
}
}
}
@dolmens
Copy link

dolmens commented Jul 30, 2020

it confused me that you unlock before notify in push but not in pop, why?

@PolarNick239
Copy link
Author

PolarNick239 commented Jul 30, 2020

This is just unclean code - order of each explicit unlock() call and its' following notify_one()/notify_all() does not matter, because the thread which will be notified - before continuing its execution will wait for acquiring the same queue_mutex lock.

Thanks, fixed - all explicit unlock() calls removed.

On the other hand, you may prefer to add explicit unlock before each notify() call for slightly better performance (so that notified thread will not always wait for acquiring the same queue_mutex lock because it is still hold by notifier's thread, see 'Be careful when you notify' in https://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment