Skip to content

Instantly share code, notes, and snippets.

@Masterkatze
Created October 15, 2019 17:37
Show Gist options
  • Save Masterkatze/093267fc5f837f7fac3959727b6e0176 to your computer and use it in GitHub Desktop.
Save Masterkatze/093267fc5f837f7fac3959727b6e0176 to your computer and use it in GitHub Desktop.
#include <condition_variable>
#include <mutex>
#include <thread>
#include <queue>
#include <iostream>
#include <random>
#include <ctime>
#include <unistd.h>
template<typename T>
class ProducerConsumer
{
public:
explicit ProducerConsumer(ulong maxSizeInBytes);
void produce(const T &element);
T consume();
void producerIsOver();
bool isFinished();
private:
std::queue<T> queue;
std::condition_variable synchronizer;
std::mutex mutex;
bool producerOver;
ulong maxSizeInBytes;
};
template<typename T>
ProducerConsumer<T>::ProducerConsumer(ulong maxSizeInBytes): producerOver(false), maxSizeInBytes(maxSizeInBytes) { }
/**
* @brief Producer - add element in the list
* @tparam T - The type of the element
* @param element - The element to add
*/
template<typename T> void ProducerConsumer<T>::produce(const T &element)
{
std::unique_lock<std::mutex> lock(mutex);
// Block the thread until the size of the queue allows to add more elements in it
if (sizeof(T) * (queue.size() + 1) > maxSizeInBytes)
{
synchronizer.wait(lock, [&]()
{
return sizeof(T) * (queue.size() + 1) <= maxSizeInBytes;
});
}
queue.push(element);
synchronizer.notify_one();
}
/**
* @brief Consumer - Get an element of the list and wait for it if the list is empty
* @tparam T - The type of the element
* @return The element
*/
template<typename T> T ProducerConsumer<T>::consume()
{
std::unique_lock<std::mutex> lock(mutex);
// Block the thread until the queue is empty and the producer has finished to fill the queue
if (queue.empty() && !producerOver) {
synchronizer.wait(lock, [&]() {
return !queue.empty() || producerOver;
});
}
T element = queue.front();
queue.pop();
// Wake up the producer if he's waiting
synchronizer.notify_all();
return element;
}
/**
* @brief Tells the producer has finished to fill the container
* @tparam T - The type of element's container
*/
template<typename T> void ProducerConsumer<T>::producerIsOver()
{
// @todo Useless ?
std::unique_lock<std::mutex> lock(mutex);
producerOver = true;
}
/**
* @brief Tells if the whole process is over (IE producer has finished to fill the container and the container is empty)
* @tparam T - The type of element's container
* @return True if the whole process is over, false otherwise
*/
template<typename T> bool ProducerConsumer<T>::isFinished()
{
std::unique_lock<std::mutex> lock(mutex);
return producerOver && queue.empty();
}
int main()
{
ProducerConsumer<long double> container(sizeof(long double));
std::random_device randomDevice;
std::mt19937 randomGenerator(randomDevice());
std::uniform_int_distribution<uint32_t> distribution(10000, 1000000);
randomGenerator.seed(static_cast<unsigned long int>(std::time(nullptr)));
std::thread producer([&]()
{
for (int i = 1; i <= 20; ++i)
{
usleep(distribution(randomGenerator));
container.produce(i);
}
container.producerIsOver();
});
std::thread consumer([&]()
{
do
{
usleep(distribution(randomGenerator));
std::cout << "Consume " << container.consume() << std::endl;
}
while (!container.isFinished());
});
producer.join();
consumer.join();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment