Created
October 15, 2019 17:37
-
-
Save Masterkatze/093267fc5f837f7fac3959727b6e0176 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <condition_variable> | |
#include <mutex> | |
#include <thread> | |
#include <queue> | |
#include <iostream> | |
#include <random> | |
#include <ctime> | |
#include <unistd.h> | |
template<typename T> | |
class ProducerConsumer | |
{ | |
public: | |
explicit ProducerConsumer(ulong maxSizeInBytes); | |
void produce(const T &element); | |
T consume(); | |
void producerIsOver(); | |
bool isFinished(); | |
private: | |
std::queue<T> queue; | |
std::condition_variable synchronizer; | |
std::mutex mutex; | |
bool producerOver; | |
ulong maxSizeInBytes; | |
}; | |
template<typename T> | |
ProducerConsumer<T>::ProducerConsumer(ulong maxSizeInBytes): producerOver(false), maxSizeInBytes(maxSizeInBytes) { } | |
/** | |
* @brief Producer - add element in the list | |
* @tparam T - The type of the element | |
* @param element - The element to add | |
*/ | |
template<typename T> void ProducerConsumer<T>::produce(const T &element) | |
{ | |
std::unique_lock<std::mutex> lock(mutex); | |
// Block the thread until the size of the queue allows to add more elements in it | |
if (sizeof(T) * (queue.size() + 1) > maxSizeInBytes) | |
{ | |
synchronizer.wait(lock, [&]() | |
{ | |
return sizeof(T) * (queue.size() + 1) <= maxSizeInBytes; | |
}); | |
} | |
queue.push(element); | |
synchronizer.notify_one(); | |
} | |
/** | |
* @brief Consumer - Get an element of the list and wait for it if the list is empty | |
* @tparam T - The type of the element | |
* @return The element | |
*/ | |
template<typename T> T ProducerConsumer<T>::consume() | |
{ | |
std::unique_lock<std::mutex> lock(mutex); | |
// Block the thread until the queue is empty and the producer has finished to fill the queue | |
if (queue.empty() && !producerOver) { | |
synchronizer.wait(lock, [&]() { | |
return !queue.empty() || producerOver; | |
}); | |
} | |
T element = queue.front(); | |
queue.pop(); | |
// Wake up the producer if he's waiting | |
synchronizer.notify_all(); | |
return element; | |
} | |
/** | |
* @brief Tells the producer has finished to fill the container | |
* @tparam T - The type of element's container | |
*/ | |
template<typename T> void ProducerConsumer<T>::producerIsOver() | |
{ | |
// @todo Useless ? | |
std::unique_lock<std::mutex> lock(mutex); | |
producerOver = true; | |
} | |
/** | |
* @brief Tells if the whole process is over (IE producer has finished to fill the container and the container is empty) | |
* @tparam T - The type of element's container | |
* @return True if the whole process is over, false otherwise | |
*/ | |
template<typename T> bool ProducerConsumer<T>::isFinished() | |
{ | |
std::unique_lock<std::mutex> lock(mutex); | |
return producerOver && queue.empty(); | |
} | |
int main() | |
{ | |
ProducerConsumer<long double> container(sizeof(long double)); | |
std::random_device randomDevice; | |
std::mt19937 randomGenerator(randomDevice()); | |
std::uniform_int_distribution<uint32_t> distribution(10000, 1000000); | |
randomGenerator.seed(static_cast<unsigned long int>(std::time(nullptr))); | |
std::thread producer([&]() | |
{ | |
for (int i = 1; i <= 20; ++i) | |
{ | |
usleep(distribution(randomGenerator)); | |
container.produce(i); | |
} | |
container.producerIsOver(); | |
}); | |
std::thread consumer([&]() | |
{ | |
do | |
{ | |
usleep(distribution(randomGenerator)); | |
std::cout << "Consume " << container.consume() << std::endl; | |
} | |
while (!container.isFinished()); | |
}); | |
producer.join(); | |
consumer.join(); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment