Skip to content

Instantly share code, notes, and snippets.

@SemenMartynov
Created September 25, 2020 10:37
Show Gist options
  • Save SemenMartynov/b0d8a80df9a1a88688edb5a8fed8a823 to your computer and use it in GitHub Desktop.
Save SemenMartynov/b0d8a80df9a1a88688edb5a8fed8a823 to your computer and use it in GitHub Desktop.
[Multiple producer - one consumer] with the modern C++
#include <thread>
#include <iostream>
#include <queue>
#include <mutex>
#include <algorithm>
#include <condition_variable>
#include <atomic>
#include <random>
std::mutex mtx;
std::condition_variable condvar;
std::atomic<int> wip; // workers in progress
std::queue<int> queue;
void producer(int n, std::mt19937 &generator)
{
for (int i = 0; i != n; ++i)
{
auto sleep_time = 100 + generator() % 1000;
{
std::lock_guard<std::mutex> lg(mtx);
queue.push(i);
std::cout << "[tid=" << std::this_thread::get_id() << "] pushing " << i << " add sleep for " << sleep_time << "ms" << std::endl;
}
condvar.notify_all();
// sleep
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time));
}
--wip;
condvar.notify_all();
}
void consumer()
{
while (true)
{
std::unique_lock<std::mutex> ul(mtx);
condvar.wait(ul, [] { return !wip || !queue.empty(); }); // spurious wakeup protection, i.e:
//while (wip || queue.empty())
// condvar.wait(ul);
while (!queue.empty())
{
std::cout << "[tid=" << std::this_thread::get_id() << "] consuming " << queue.front() << std::endl;
queue.pop();
}
if (wip == 0)
break;
}
}
int main()
{
const int max_tasks = 10; // how many int do we need from the each producer?
// Random generator
std::mt19937 generator((unsigned int)std::chrono::system_clock::now().time_since_epoch().count());
std::vector<std::thread *> threads(std::thread::hardware_concurrency()); // threads == vCPUs
// start all
std::transform(threads.begin(), threads.end(), threads.begin(), [&generator](auto *thr) {
++wip;
return new std::thread(producer, max_tasks, std::ref(generator));
});
std::thread cons(consumer);
// join all
std::for_each(threads.begin(), threads.end(), [](auto *thr) { thr->join(); delete thr; });
cons.join();
std::cout << "Completed!" << std::endl;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment