Skip to content

Instantly share code, notes, and snippets.

@HappyCerberus
Created October 19, 2012 23:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save HappyCerberus/3921334 to your computer and use it in GitHub Desktop.
Save HappyCerberus/3921334 to your computer and use it in GitHub Desktop.
main.cpp
g++ -std=c++11 -pedantic -Wall -Wextra -ggdb3 -D_GLIBCXX_USE_NANOSLEEP -D_GLIBCXX_USE_SCHED_YIELD -lpthread main.cpp node.cpp
#include <iostream>
#include "node.h"
#include <thread>
#include <algorithm>
#include <stdexcept>
#include <unistd.h>
using namespace std;
int main()
{
// initialization
reset_sent_messages();
srand(time(NULL));
// prepare the array
const unsigned total_nodes = 1000;
vector< shared_ptr<Node> > nodes;
nodes.reserve(total_nodes);
// create nodes, put them into array
for (unsigned i = 0; i < total_nodes; ++i)
nodes.push_back(make_shared<Node>(i+1));
// shuffle the nodes randomly
random_shuffle(nodes.begin(),nodes.end());
// connect the nodes
for (unsigned i = 0; i < total_nodes-1; ++i)
nodes[i]->connect(nodes[i+1]);
nodes[total_nodes-1]->connect(nodes[0]);
// prepare the futures to store the elected leader information
vector< future<int> > leaders;
for (auto i : nodes)
leaders.push_back(i->get_leader());
// run the threads with the main node logic
for (auto i : nodes)
thread([](shared_ptr<Node> node) { node->logic(); },i).detach();
/* NOTE:
* For demonstrational purposes we are using promise<->future for final
* synchronization. This is a very unnatural model. Normaly we wouldn't
* detach the threads and use join().
*
* Do note that if a thread isn't detached and the thread variable (returned
* by thread call) is destroyed, the program will be immediately terminated.
* This is due to the fact, that such thread would be effectively leaked.
* Not running in detached mode, but incapable of joining the spawn thread.
*/
// do the final synchronization (so that main doesn't end before the algorithm does)
for (unsigned i = 0; i < total_nodes; i++)
{
if (leaders[i].get() != (int)total_nodes)
throw runtime_error("Node didn't correctly detect it's leader.");
}
// final reports
cout << "Asynchronous run finished." << endl;
cout << "Total number of sent messages was : " << sent_messages() << endl;
return 0;
}
#include "node.h"
#include <thread>
#include <iostream>
#include <stdexcept>
using namespace std;
atomic<unsigned> message_count;
unsigned sent_messages() { return message_count; }
void reset_sent_messages() { message_count = 0; }
static mutex cout_lock;
// helper routine to output full lines from the algorithm
#define atomic_log(x) do { cout_lock.lock(); cout << x; cout_lock.unlock(); } while(0)
Node::Node(unsigned node_id) : p_id(node_id), p_transit(false), p_done(false)
{
if (node_id == 0)
throw range_error("Node id has to be greater than 0.");
}
void Node::receive_message(const pair<int,int> &message)
{
// simple blocking implementation
p_message_buffer_lock.lock();
p_message_buffer.push(message);
p_message_buffer_lock.unlock();
}
void Node::receive_message(int node_id, int distance)
{
receive_message(make_pair(node_id,distance));
}
bool Node::checkout_message(std::pair<int,int>& message)
{
// try to lock the message buffer for this thread
// if it's locked, return false and wait a bit using yield
if (!p_message_buffer_lock.try_lock())
{
this_thread::yield();
return false;
}
// if we don't have any messages, yield and return false
if (p_message_buffer.size() == 0)
{
p_message_buffer_lock.unlock();
this_thread::yield();
return false;
}
// we have a message, retrieve it, and remove from buffer
message = p_message_buffer.front();
p_message_buffer.pop();
// don't forget to unlock
p_message_buffer_lock.unlock();
return true;
}
void Node::connect(shared_ptr<Node> next)
{
// simple blocking implementation
// only needed if connecting is done asynchronously
p_next_lock.lock();
p_next = next;
p_next_lock.unlock();
}
void Node::transmit_message(int node_id, int distance)
{
shared_ptr<Node> next;
// if the node isn't connected yet, do blocking wait
while(1)
{
p_next_lock.lock();
next = p_next.lock();
if (next) break;
p_next_lock.unlock();
this_thread::yield();
}
next->receive_message(node_id,distance);
p_next_lock.unlock();
++message_count;
}
void Node::process_message(int n_id1, int n_id2)
{
/* detection of leader
*
* - trasmit notification message telling other nodes
* - set self as leader
* - mark algorithm as done
*/
if (n_id1 == p_id || n_id2 == p_id)
{
atomic_log("Node [ " << p_id << " ] self-declared leader." << endl);
transmit_message(p_id,-1);
p_leader.set_value(p_id);
p_done = true;
}
/* detection of transit mode
*
* - mark the state
*/
else if (n_id1 > p_id || n_id2 > p_id)
{
p_transit = true;
atomic_log("Node [ " << p_id << " ] switched into transit mode." << endl);
}
/* if the node is not yet a leader, or in transit mode re-send the base messages */
else
{
transmit_message(p_id,1);
transmit_message(p_id,2);
}
}
void Node::transit_loop_step()
{
/* single step in transit mode
*
* - read a single message and re-transmit it to the next node
* - if the message is a leader notification, mark the leader node
*/
pair<int,int> message;
if (checkout_message(message))
transmit_message(message.first,message.second);
if (message.second == -1)
{
atomic_log("Node [ " << p_id << " ] recognized node [ " << message.first << " ] as the leader node." << endl);
p_leader.set_value(message.first);
p_done = true;
}
this_thread::yield();
}
void Node::logic()
{
transmit_message(p_id,1);
transmit_message(p_id,2);
int message1 = 0, message2 = 0;
while (true)
{
// if the algorithm is done, simply exit the thread
if (p_done)
return;
// if in transit mode, run a single step of transmit logic
if (p_transit)
{
transit_loop_step();
}
// if we have two messages, process them
else if (message1 != 0 && message2 != 0)
{
process_message(message1,message2);
message1 = 0; message2 = 0;
}
// if we don't have two messages, receive new messages
else
{
pair<int,int> message;
if (checkout_message(message))
{
if (message1 == 0)
{
message1 = message.first;
if (message.second == 2 && message.first > p_id)
transmit_message(message.first,1);
}
else
{
message2 = message.first;
if (message.second == 2 && message.first > p_id)
transmit_message(message.first,1);
}
}
}
this_thread::yield();
}
}
future<int> Node::get_leader()
{
return p_leader.get_future();
}
#ifndef NODE_H
#define NODE_H
#include <queue>
#include <mutex>
#include <memory>
#include <future>
#include <atomic>
class Node
{
public:
Node(unsigned node_id);
/** \brief Put a new message into this nodes message buffer */
void receive_message(int node_id, int distance);
/** \brief Put a new message into this nodes message buffer */
void receive_message(const std::pair<int,int>& message);
/** \brief Connect this node to a next node in the circle */
void connect(std::shared_ptr<Node> next);
/** \brief Node logic */
void logic();
/** \brief Get the leader node id */
std::future<int> get_leader();
private:
/** \brief Try to pull one message from the buffer
*
* Non-blocking operation.
*/
bool checkout_message(std::pair<int,int>& message);
/** \brief Transmit one message to the connected node
*
* Blocking operation.
*/
void transmit_message(int node_id, int distance);
/** \brief Sub logic for processing the message
*
* Process the received message
* - detection of leader
* - detection of transiting state
* - trasmition of appropriate messages
*/
void process_message(int n_id1, int n_id2);
/** \brief Sub logic for transiting nodes
*
* Simple re-trasmit routine.
*/
void transit_loop_step();
private:
std::queue<std::pair<int,int> > p_message_buffer;
std::mutex p_message_buffer_lock;
std::weak_ptr<Node> p_next;
std::mutex p_next_lock;
int p_id;
bool p_transit;
bool p_done;
std::promise<int> p_leader;
};
extern std::atomic<unsigned> message_count;
/** \brief Return count of sent messages */
unsigned sent_messages();
/** \brief Reinitialize the count of sent messages back to zero */
void reset_sent_messages();
#endif // NODE_H
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment