Skip to content

Instantly share code, notes, and snippets.

@Preetam
Last active October 3, 2015 04:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Preetam/7c5cadae7b3b7016eeb5 to your computer and use it in GitHub Desktop.
Save Preetam/7c5cadae7b3b7016eeb5 to your computer and use it in GitHub Desktop.
#include <iostream>
#include <queue>
#include <vector>
#include <memory>
#include <mutex>
#include <condition_variable>
#include <unordered_map>
#include <thread>
#include <random>
#include <chrono>
enum MessageType {
MSG_PING,
MSG_PONG,
MSG_RBCAST
};
struct Message
{
int id;
int type;
int source;
int destination;
std::string content;
};
std::mutex mutex;
std::mutex print_mutex;
std::condition_variable cv;
std::queue<Message> messages;
struct Node
{
int id;
std::queue<Message> _messages;
std::shared_ptr<std::mutex> _mutex;
std::shared_ptr<std::condition_variable> _cv;
std::vector<int> peers;
std::unordered_map<int,bool> seen_messages;
Node()
{
_mutex = std::make_shared<std::mutex>();
_cv = std::make_shared<std::condition_variable>();
}
void add_peer(int peer_id) {
peers.push_back(peer_id);
}
void send(Message m)
{
std::unique_lock<std::mutex> _(mutex);
m.source = id;
messages.push(m);
cv.notify_one();
}
void receive(Message m)
{
std::unique_lock<std::mutex> _(*_mutex);
_messages.push(m);
_cv->notify_one();
}
void run()
{
while (true) {
std::unique_lock<std::mutex> _(*_mutex);
_cv->wait(_, [this]() {
return _messages.size() > 0;
});
while (_messages.size() > 0) {
auto m = _messages.front();
_messages.pop();
switch (m.type) {
case MSG_PING:
m.destination = m.source;
m.source = id;
m.type = MSG_PONG;
send(m);
break;
case MSG_PONG:
break;
case MSG_RBCAST:
if (seen_messages.find(m.id) == seen_messages.end()) {
seen_messages[m.id] = true;
int source = m.source;
std::unique_lock<std::mutex> _(mutex);
for (int i = 0; i < peers.size(); i++) {
if (i == id || i == source) {
continue;
}
m.destination = i;
m.source = id;
messages.push(m);
}
cv.notify_one();
}
}
}
}
}
};
int
main() {
std::cerr << "Starting" << std::endl;
std::vector<std::shared_ptr<Node>> nodes;
std::vector<std::unique_ptr<std::thread>> node_threads;
for (int i = 0; i < 10; i++) {
auto n = std::make_shared<Node>();
n->id = i;
for (int j = 0; j < 10; j++) {
if (i != j) {
n->add_peer(j);
}
}
nodes.push_back(n);
auto node_thread = std::make_unique<std::thread>([n]() {
n->run();
});
node_threads.push_back(std::move(node_thread));
}
std::thread message_delivery_thread([&nodes]() {
while (true) {
std::unique_lock<std::mutex> _(mutex);
cv.wait(_, []() {
return messages.size() > 0;
});
while (messages.size() > 0) {
auto m = messages.front();
std::cerr << "Delivery thread: delivering a message from " <<
m.source << " to " << m.destination << std::endl;
nodes[m.destination]->receive(m);
messages.pop();
}
}
});
Message m;
std::random_device rd;
m.id = rd();
m.type = MSG_RBCAST;
m.destination = 5;
nodes[5]->receive(m);
message_delivery_thread.join();
std::cerr << "Finished" << std::endl;
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment