Skip to content

Instantly share code, notes, and snippets.

@Geal
Last active September 22, 2022 13:36
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Geal/31e7e11bba655d3c662e738043422d72 to your computer and use it in GitHub Desktop.
Save Geal/31e7e11bba655d3c662e738043422d72 to your computer and use it in GitHub Desktop.
bounded_producer.cpp
#include <condition_variable>
#include <mutex>
#include <thread>
#include <iostream>
#include <queue>
#include <chrono>
using namespace std;
class Queue {
public:
int pop() {
unique_lock<std::mutex> mlock(mutex);
while (queue.empty()) {
cond.wait(mlock);
}
int item = queue.front();
queue.pop();
cond.notify_one();
return item;
}
void push(const int item) {
unique_lock<std::mutex> mlock(mutex);
while (queue.size() >= 10) {
printf("queue full, waiting\n");
cond.wait(mlock);
}
queue.push(item);
mlock.unlock();
cond.notify_one();
}
bool try_pop(int& res) {
unique_lock<std::mutex> mlock(mutex);
if (queue.empty()) {
return false;
} else {
int item = queue.front();
queue.pop();
cond.notify_one();
res = item;
return true;
}
}
int size() {
return queue.size();
}
private:
std::queue<int> queue;
std::mutex mutex;
std::condition_variable cond;
};
int main() {
Queue q12;
Queue q13;
Queue q24;
Queue q34;
q12.push(42);
int a = 0;
if (q12.try_pop(a)) {
printf("the queue gave the value %d\n", a);
} else {
printf("there was no value\n");
}
std::thread t1([&]() {
int counter = 0;
while(true) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
q12.push(counter);
printf("t1 sending %d (q12 size: %d)\n", counter, q12.size());
counter += 1;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
q13.push(counter);
printf("t1 sending %d (q13 size: %d)\n", counter, q13.size());
counter += 1;
}
});
std::thread t2([&]() {
while(true) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
int a = q12.pop();
printf("t2 received a=%d\n", a);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
int b = q12.pop();
printf("t2 received b=%d\n", b);
q24.push(a+b);
printf("t1 sending %d (q24 size: %d)\n", a+b, q24.size());
}
});
std::thread t3([&]() {
while(true) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
int x = q13.pop();
printf("t3 received x=%d\n", x);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
q34.push(x*2);
printf("t3 sending %d (q34 size: %d)\n", x*2, q34.size());
}
});
std::thread t4([&]() {
while(true) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
int a = q24.pop();
printf("t4 received a=%d\n", a);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
int b = q34.pop();
printf("t4 received b=%d\n", b);
printf("a+b=%d\n", a+b);
}
});
t1.join();
t2.join();
t3.join();
t4.join();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment