Last active
October 3, 2015 04:11
-
-
Save Preetam/7c5cadae7b3b7016eeb5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <queue> | |
#include <vector> | |
#include <memory> | |
#include <mutex> | |
#include <condition_variable> | |
#include <unordered_map> | |
#include <thread> | |
#include <random> | |
#include <chrono> | |
enum MessageType { | |
MSG_PING, | |
MSG_PONG, | |
MSG_RBCAST | |
}; | |
struct Message | |
{ | |
int id; | |
int type; | |
int source; | |
int destination; | |
std::string content; | |
}; | |
std::mutex mutex; | |
std::mutex print_mutex; | |
std::condition_variable cv; | |
std::queue<Message> messages; | |
struct Node | |
{ | |
int id; | |
std::queue<Message> _messages; | |
std::shared_ptr<std::mutex> _mutex; | |
std::shared_ptr<std::condition_variable> _cv; | |
std::vector<int> peers; | |
std::unordered_map<int,bool> seen_messages; | |
Node() | |
{ | |
_mutex = std::make_shared<std::mutex>(); | |
_cv = std::make_shared<std::condition_variable>(); | |
} | |
void add_peer(int peer_id) { | |
peers.push_back(peer_id); | |
} | |
void send(Message m) | |
{ | |
std::unique_lock<std::mutex> _(mutex); | |
m.source = id; | |
messages.push(m); | |
cv.notify_one(); | |
} | |
void receive(Message m) | |
{ | |
std::unique_lock<std::mutex> _(*_mutex); | |
_messages.push(m); | |
_cv->notify_one(); | |
} | |
void run() | |
{ | |
while (true) { | |
std::unique_lock<std::mutex> _(*_mutex); | |
_cv->wait(_, [this]() { | |
return _messages.size() > 0; | |
}); | |
while (_messages.size() > 0) { | |
auto m = _messages.front(); | |
_messages.pop(); | |
switch (m.type) { | |
case MSG_PING: | |
m.destination = m.source; | |
m.source = id; | |
m.type = MSG_PONG; | |
send(m); | |
break; | |
case MSG_PONG: | |
break; | |
case MSG_RBCAST: | |
if (seen_messages.find(m.id) == seen_messages.end()) { | |
seen_messages[m.id] = true; | |
int source = m.source; | |
std::unique_lock<std::mutex> _(mutex); | |
for (int i = 0; i < peers.size(); i++) { | |
if (i == id || i == source) { | |
continue; | |
} | |
m.destination = i; | |
m.source = id; | |
messages.push(m); | |
} | |
cv.notify_one(); | |
} | |
} | |
} | |
} | |
} | |
}; | |
int | |
main() { | |
std::cerr << "Starting" << std::endl; | |
std::vector<std::shared_ptr<Node>> nodes; | |
std::vector<std::unique_ptr<std::thread>> node_threads; | |
for (int i = 0; i < 10; i++) { | |
auto n = std::make_shared<Node>(); | |
n->id = i; | |
for (int j = 0; j < 10; j++) { | |
if (i != j) { | |
n->add_peer(j); | |
} | |
} | |
nodes.push_back(n); | |
auto node_thread = std::make_unique<std::thread>([n]() { | |
n->run(); | |
}); | |
node_threads.push_back(std::move(node_thread)); | |
} | |
std::thread message_delivery_thread([&nodes]() { | |
while (true) { | |
std::unique_lock<std::mutex> _(mutex); | |
cv.wait(_, []() { | |
return messages.size() > 0; | |
}); | |
while (messages.size() > 0) { | |
auto m = messages.front(); | |
std::cerr << "Delivery thread: delivering a message from " << | |
m.source << " to " << m.destination << std::endl; | |
nodes[m.destination]->receive(m); | |
messages.pop(); | |
} | |
} | |
}); | |
Message m; | |
std::random_device rd; | |
m.id = rd(); | |
m.type = MSG_RBCAST; | |
m.destination = 5; | |
nodes[5]->receive(m); | |
message_delivery_thread.join(); | |
std::cerr << "Finished" << std::endl; | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment