Skip to content

Instantly share code, notes, and snippets.

@vmrob
Last active March 29, 2021 23:40
Show Gist options
  • Star 10 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save vmrob/e4fde208302ae8979b57 to your computer and use it in GitHub Desktop.
Save vmrob/e4fde208302ae8979b57 to your computer and use it in GitHub Desktop.
Go channels in C++
#include <future>
#include <iostream>
#include <thread>
#include <queue>
template <typename T>
class concurrent_queue {
private:
std::queue<T> _queue;
std::mutex _mutex;
public:
bool empty() {
std::lock_guard<std::mutex> l(_mutex);
return _queue.empty();
}
size_t size() {
std::lock_guard<std::mutex> l(_mutex);
return _queue.size();
}
bool try_pop(T& out) {
std::lock_guard<std::mutex> l(_mutex);
if (_queue.empty()) {
return false;
}
out = std::move(_queue.front());
_queue.pop();
return true;
}
void push(T val) {
std::lock_guard<std::mutex> l(_mutex);
_queue.push(std::move(val));
}
};
class channel_closed : public std::runtime_error {
using std::runtime_error::runtime_error;
};
template <typename T>
class buffered_channel {
private:
concurrent_queue<T> _queue;
std::atomic_bool _open;
public:
buffered_channel() {
_open.store(true);
}
~buffered_channel() {
drain();
}
void drain() {
_open.store(false);
}
bool is_open() {
return _open || !_queue.empty();
}
void send(T val) {
if (!_open) {
throw channel_closed("send attempt while closed");
}
_queue.push(std::move(val));
}
bool recv(T& val) {
while (is_open()) {
if (_queue.try_pop(val)) {
return true;
}
std::this_thread::yield();
}
return false;
}
};
int main() {
buffered_channel<size_t> input;
buffered_channel<size_t> output;
std::vector<std::thread> producers;
std::vector<std::thread> consumers;
for (int i = 0; i < 2; ++i) {
producers.emplace_back([&]{
for (size_t i = 0; i < 1000; ++i) {
input.send(1);
}
});
consumers.emplace_back([&]{
size_t total = 0;
size_t next = 0;
while (input.recv(next)) {
total += next;
}
output.send(total);
});
}
for (auto&& t : producers) {
t.join();
}
input.drain();
for (auto&& t : consumers) {
t.join();
}
output.drain();
size_t total = 0;
size_t next = 0;
while (output.recv(next)) {
total += next;
}
std::cout << "total: " << total << std::endl;
return 0;
}
@zjhmale
Copy link

zjhmale commented Mar 29, 2020

Hi, I just encountered with

libc++abi.dylib: terminating with uncaught exception of type std::__1::system_error: mutex lock failed: Invalid argument

if I run the following code

#include <iostream>
#include <string>
#include <functional>
#include <thread>
#include <chrono>

using namespace std;
using namespace std::chrono;

void subscribe() {
    buffered_channel<int> chan;
    thread t([&] () {
        int a;
        while (chan.recv(a)) {
            cout << "callback!!!" << a << endl;
        }
    });
    t.detach();
    chan.send(1);
}

int main(int argc, char** argv) {
    subscribe();
    this_thread::sleep_for(seconds(1));
    return 0;
}

but if I execute the body of subscribe function directly inside main, everything will be fine which is very confusing, any idea about this issue? many thanks in advance.

@keraba
Copy link

keraba commented Aug 21, 2020

The reason for your crash is that 'chan' is destroyed as soon as the main thread leaves 'subscribe()'. When the other thread accesses it, it crashes. By moving it to 'main()', the channel lives until the end of the program.

@oldbane
Copy link

oldbane commented Mar 5, 2021

Nice.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment