Skip to content

Instantly share code, notes, and snippets.

@zjhmale
Forked from vmrob/channel.cpp
Created March 24, 2020 10:30
Show Gist options
  • Save zjhmale/4725caa2e5bc22905b37596fea53d7fc to your computer and use it in GitHub Desktop.
Save zjhmale/4725caa2e5bc22905b37596fea53d7fc to your computer and use it in GitHub Desktop.
Go channels in C++
#include <future>
#include <iostream>
#include <thread>
#include <queue>
template <typename T>
class concurrent_queue {
private:
std::queue<T> _queue;
std::mutex _mutex;
public:
bool empty() {
std::lock_guard<std::mutex> l(_mutex);
return _queue.empty();
}
size_t size() {
std::lock_guard<std::mutex> l(_mutex);
return _queue.size();
}
bool try_pop(T& out) {
std::lock_guard<std::mutex> l(_mutex);
if (_queue.empty()) {
return false;
}
out = std::move(_queue.front());
_queue.pop();
return true;
}
void push(T val) {
std::lock_guard<std::mutex> l(_mutex);
_queue.push(std::move(val));
}
};
class channel_closed : public std::runtime_error {
using std::runtime_error::runtime_error;
};
template <typename T>
class buffered_channel {
private:
concurrent_queue<T> _queue;
std::atomic_bool _open;
public:
buffered_channel() {
_open.store(true);
}
~buffered_channel() {
drain();
}
void drain() {
_open.store(false);
}
bool is_open() {
return _open || !_queue.empty();
}
void send(T val) {
if (!_open) {
throw channel_closed("send attempt while closed");
}
_queue.push(std::move(val));
}
bool recv(T& val) {
while (is_open()) {
if (_queue.try_pop(val)) {
return true;
}
std::this_thread::yield();
}
return false;
}
};
int main() {
buffered_channel<size_t> input;
buffered_channel<size_t> output;
std::vector<std::thread> producers;
std::vector<std::thread> consumers;
for (int i = 0; i < 2; ++i) {
producers.emplace_back([&]{
for (size_t i = 0; i < 1000; ++i) {
input.send(1);
}
});
consumers.emplace_back([&]{
size_t total = 0;
size_t next = 0;
while (input.recv(next)) {
total += next;
}
output.send(total);
});
}
for (auto&& t : producers) {
t.join();
}
input.drain();
for (auto&& t : consumers) {
t.join();
}
output.drain();
size_t total = 0;
size_t next = 0;
while (output.recv(next)) {
total += next;
}
std::cout << "total: " << total << std::endl;
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment