Created
October 19, 2012 23:47
-
-
Save HappyCerberus/3921334 to your computer and use it in GitHub Desktop.
main.cpp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
g++ -std=c++11 -pedantic -Wall -Wextra -ggdb3 -D_GLIBCXX_USE_NANOSLEEP -D_GLIBCXX_USE_SCHED_YIELD -lpthread main.cpp node.cpp |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include "node.h" | |
#include <thread> | |
#include <algorithm> | |
#include <stdexcept> | |
#include <unistd.h> | |
using namespace std; | |
int main() | |
{ | |
// initialization | |
reset_sent_messages(); | |
srand(time(NULL)); | |
// prepare the array | |
const unsigned total_nodes = 1000; | |
vector< shared_ptr<Node> > nodes; | |
nodes.reserve(total_nodes); | |
// create nodes, put them into array | |
for (unsigned i = 0; i < total_nodes; ++i) | |
nodes.push_back(make_shared<Node>(i+1)); | |
// shuffle the nodes randomly | |
random_shuffle(nodes.begin(),nodes.end()); | |
// connect the nodes | |
for (unsigned i = 0; i < total_nodes-1; ++i) | |
nodes[i]->connect(nodes[i+1]); | |
nodes[total_nodes-1]->connect(nodes[0]); | |
// prepare the futures to store the elected leader information | |
vector< future<int> > leaders; | |
for (auto i : nodes) | |
leaders.push_back(i->get_leader()); | |
// run the threads with the main node logic | |
for (auto i : nodes) | |
thread([](shared_ptr<Node> node) { node->logic(); },i).detach(); | |
/* NOTE: | |
* For demonstrational purposes we are using promise<->future for final | |
* synchronization. This is a very unnatural model. Normaly we wouldn't | |
* detach the threads and use join(). | |
* | |
* Do note that if a thread isn't detached and the thread variable (returned | |
* by thread call) is destroyed, the program will be immediately terminated. | |
* This is due to the fact, that such thread would be effectively leaked. | |
* Not running in detached mode, but incapable of joining the spawn thread. | |
*/ | |
// do the final synchronization (so that main doesn't end before the algorithm does) | |
for (unsigned i = 0; i < total_nodes; i++) | |
{ | |
if (leaders[i].get() != (int)total_nodes) | |
throw runtime_error("Node didn't correctly detect it's leader."); | |
} | |
// final reports | |
cout << "Asynchronous run finished." << endl; | |
cout << "Total number of sent messages was : " << sent_messages() << endl; | |
return 0; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "node.h" | |
#include <thread> | |
#include <iostream> | |
#include <stdexcept> | |
using namespace std; | |
atomic<unsigned> message_count; | |
unsigned sent_messages() { return message_count; } | |
void reset_sent_messages() { message_count = 0; } | |
static mutex cout_lock; | |
// helper routine to output full lines from the algorithm | |
#define atomic_log(x) do { cout_lock.lock(); cout << x; cout_lock.unlock(); } while(0) | |
Node::Node(unsigned node_id) : p_id(node_id), p_transit(false), p_done(false) | |
{ | |
if (node_id == 0) | |
throw range_error("Node id has to be greater than 0."); | |
} | |
void Node::receive_message(const pair<int,int> &message) | |
{ | |
// simple blocking implementation | |
p_message_buffer_lock.lock(); | |
p_message_buffer.push(message); | |
p_message_buffer_lock.unlock(); | |
} | |
void Node::receive_message(int node_id, int distance) | |
{ | |
receive_message(make_pair(node_id,distance)); | |
} | |
bool Node::checkout_message(std::pair<int,int>& message) | |
{ | |
// try to lock the message buffer for this thread | |
// if it's locked, return false and wait a bit using yield | |
if (!p_message_buffer_lock.try_lock()) | |
{ | |
this_thread::yield(); | |
return false; | |
} | |
// if we don't have any messages, yield and return false | |
if (p_message_buffer.size() == 0) | |
{ | |
p_message_buffer_lock.unlock(); | |
this_thread::yield(); | |
return false; | |
} | |
// we have a message, retrieve it, and remove from buffer | |
message = p_message_buffer.front(); | |
p_message_buffer.pop(); | |
// don't forget to unlock | |
p_message_buffer_lock.unlock(); | |
return true; | |
} | |
void Node::connect(shared_ptr<Node> next) | |
{ | |
// simple blocking implementation | |
// only needed if connecting is done asynchronously | |
p_next_lock.lock(); | |
p_next = next; | |
p_next_lock.unlock(); | |
} | |
void Node::transmit_message(int node_id, int distance) | |
{ | |
shared_ptr<Node> next; | |
// if the node isn't connected yet, do blocking wait | |
while(1) | |
{ | |
p_next_lock.lock(); | |
next = p_next.lock(); | |
if (next) break; | |
p_next_lock.unlock(); | |
this_thread::yield(); | |
} | |
next->receive_message(node_id,distance); | |
p_next_lock.unlock(); | |
++message_count; | |
} | |
void Node::process_message(int n_id1, int n_id2) | |
{ | |
/* detection of leader | |
* | |
* - trasmit notification message telling other nodes | |
* - set self as leader | |
* - mark algorithm as done | |
*/ | |
if (n_id1 == p_id || n_id2 == p_id) | |
{ | |
atomic_log("Node [ " << p_id << " ] self-declared leader." << endl); | |
transmit_message(p_id,-1); | |
p_leader.set_value(p_id); | |
p_done = true; | |
} | |
/* detection of transit mode | |
* | |
* - mark the state | |
*/ | |
else if (n_id1 > p_id || n_id2 > p_id) | |
{ | |
p_transit = true; | |
atomic_log("Node [ " << p_id << " ] switched into transit mode." << endl); | |
} | |
/* if the node is not yet a leader, or in transit mode re-send the base messages */ | |
else | |
{ | |
transmit_message(p_id,1); | |
transmit_message(p_id,2); | |
} | |
} | |
void Node::transit_loop_step() | |
{ | |
/* single step in transit mode | |
* | |
* - read a single message and re-transmit it to the next node | |
* - if the message is a leader notification, mark the leader node | |
*/ | |
pair<int,int> message; | |
if (checkout_message(message)) | |
transmit_message(message.first,message.second); | |
if (message.second == -1) | |
{ | |
atomic_log("Node [ " << p_id << " ] recognized node [ " << message.first << " ] as the leader node." << endl); | |
p_leader.set_value(message.first); | |
p_done = true; | |
} | |
this_thread::yield(); | |
} | |
void Node::logic() | |
{ | |
transmit_message(p_id,1); | |
transmit_message(p_id,2); | |
int message1 = 0, message2 = 0; | |
while (true) | |
{ | |
// if the algorithm is done, simply exit the thread | |
if (p_done) | |
return; | |
// if in transit mode, run a single step of transmit logic | |
if (p_transit) | |
{ | |
transit_loop_step(); | |
} | |
// if we have two messages, process them | |
else if (message1 != 0 && message2 != 0) | |
{ | |
process_message(message1,message2); | |
message1 = 0; message2 = 0; | |
} | |
// if we don't have two messages, receive new messages | |
else | |
{ | |
pair<int,int> message; | |
if (checkout_message(message)) | |
{ | |
if (message1 == 0) | |
{ | |
message1 = message.first; | |
if (message.second == 2 && message.first > p_id) | |
transmit_message(message.first,1); | |
} | |
else | |
{ | |
message2 = message.first; | |
if (message.second == 2 && message.first > p_id) | |
transmit_message(message.first,1); | |
} | |
} | |
} | |
this_thread::yield(); | |
} | |
} | |
future<int> Node::get_leader() | |
{ | |
return p_leader.get_future(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef NODE_H | |
#define NODE_H | |
#include <queue> | |
#include <mutex> | |
#include <memory> | |
#include <future> | |
#include <atomic> | |
class Node | |
{ | |
public: | |
Node(unsigned node_id); | |
/** \brief Put a new message into this nodes message buffer */ | |
void receive_message(int node_id, int distance); | |
/** \brief Put a new message into this nodes message buffer */ | |
void receive_message(const std::pair<int,int>& message); | |
/** \brief Connect this node to a next node in the circle */ | |
void connect(std::shared_ptr<Node> next); | |
/** \brief Node logic */ | |
void logic(); | |
/** \brief Get the leader node id */ | |
std::future<int> get_leader(); | |
private: | |
/** \brief Try to pull one message from the buffer | |
* | |
* Non-blocking operation. | |
*/ | |
bool checkout_message(std::pair<int,int>& message); | |
/** \brief Transmit one message to the connected node | |
* | |
* Blocking operation. | |
*/ | |
void transmit_message(int node_id, int distance); | |
/** \brief Sub logic for processing the message | |
* | |
* Process the received message | |
* - detection of leader | |
* - detection of transiting state | |
* - trasmition of appropriate messages | |
*/ | |
void process_message(int n_id1, int n_id2); | |
/** \brief Sub logic for transiting nodes | |
* | |
* Simple re-trasmit routine. | |
*/ | |
void transit_loop_step(); | |
private: | |
std::queue<std::pair<int,int> > p_message_buffer; | |
std::mutex p_message_buffer_lock; | |
std::weak_ptr<Node> p_next; | |
std::mutex p_next_lock; | |
int p_id; | |
bool p_transit; | |
bool p_done; | |
std::promise<int> p_leader; | |
}; | |
extern std::atomic<unsigned> message_count; | |
/** \brief Return count of sent messages */ | |
unsigned sent_messages(); | |
/** \brief Reinitialize the count of sent messages back to zero */ | |
void reset_sent_messages(); | |
#endif // NODE_H |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment