Skip to content

Instantly share code, notes, and snippets.

@joshrotenberg
Forked from creationix/node.c
Created April 26, 2011 22:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joshrotenberg/943323 to your computer and use it in GitHub Desktop.
Save joshrotenberg/943323 to your computer and use it in GitHub Desktop.
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <assert.h>
#include <time.h>
#include <stdbool.h>
#include <ev.h>
#include <evcom.h>
#include <sys/types.h>
// Load the logic for the config parsing
#include "config.c"
// 1M should be enough buffer for any reasonable message
#define MAX_MESSAGE_LENGTH 1048576
int active_messages; // Number left in this active group
// Streams to all the peers
evcom_stream *streams;
FILE *vc_fd;
FILE *checkpoint_fd;
int* vector_clock;
int* last_label_rcvd;
int* first_label_sent;
int sequenceNo = 0;
char buffer[MAX_MESSAGE_LENGTH];
int offset = 0;
static void go_active();
static void take_checkpoint();
// Sends a json message and cleans up memory
static void stream_write(evcom_stream* stream, cJSON* message) {
cJSON_AddItemToObject(message, "clock", cJSON_CreateIntArray(vector_clock, numNodes));
cJSON_AddNumberToObject(message, "from", nodeID);
char* json = cJSON_PrintUnformatted(message);
char* out = (char*)malloc(strlen(json) + 1);
sprintf(out, "%s\n", json);
printf("%02d: >-. %s\n", nodeID, json);
cJSON_Delete(message);
evcom_stream_write(stream, (void*)out, strlen(out));
free(json);
// free(out);
}
// Wrapper to make timeouts easy
static void set_timeout(void (*callback)(int), int milliseconds) {
ev_timer* timer;
timer = (ev_timer *)malloc(sizeof(ev_timer));
ev_timer_init(timer, callback, milliseconds / 1000.0, 0);
ev_timer_start(EV_DEFAULT_ timer);
}
static void on_message(evcom_stream* stream, cJSON* message) {
int i;
// Merge vector clocks
cJSON* vector = cJSON_GetObjectItem(message, "clock");
for (i = 0; i < numNodes; i++) {
int v = cJSON_GetArrayItem(vector, i)->valueint;
vector_clock[i] = vector_clock[i] > v ? vector_clock[i] : v;
}
if (cJSON_GetObjectItem(message, "to")) {
int from = cJSON_GetObjectItem(message, "from")->valueint;
last_label_rcvd[from] = vector_clock[from];
if (!active && maxNumber > 0) {
go_active();
}
}
if (cJSON_GetObjectItem(message, "llr")) {
int llr = cJSON_GetObjectItem(message, "llr")->valueint;
if (first_label_sent > llr) {
take_checkpoint();
}
}
}
// Grab the chunks and merge/split them into discrete message events
static void on_chunk (evcom_stream *stream, const void *base, size_t len) {
int i;
char* input;
input = (char*) base;
// char* debug = (char*)malloc(len + 1);
// memcpy(debug, base, len);
// debug[len] = 0;
// printf("%02d: on_chunk: %d '%s'\n", nodeID, len, debug);
for (i = 0; i < len; i++) {
// Ignore \r characters
if (input[i] == '\r') {
continue;
}
// Capture all content up to the first newline as a message
if (input[i] == '\n') {
buffer[offset] = 0;
offset++;
char* message = (char*)malloc(sizeof(char)*(offset));
memcpy(message, buffer, offset);
offset = 0;
cJSON *result = cJSON_Parse(message);
printf("%02d: `-> %s\n", nodeID, message);
free(message);
if (result == NULL) {
cJSON* error_message = cJSON_CreateObject();
cJSON_AddItemToObject(error_message, "error", cJSON_CreateString("Invalid JSON"));
stream_write(stream, error_message);
continue;
}
on_message(stream, result);
continue;
}
// Put a fence at the end of the buffer, just in case
if (offset < MAX_MESSAGE_LENGTH) {
buffer[offset] = input[i];
offset++;
} else {
printf("WARNING: Discarding %c\n", input[i]);
}
}
}
// Start up the tcp server
void start_tcp_server() {
evcom_server *server = (evcom_server*)malloc(sizeof(evcom_server));
evcom_server_init(server);
// Nested function for server connection
evcom_stream* on_server_connection (evcom_server *server, struct sockaddr *addr)
{
assert(server);
assert(addr);
// This is the stream from a peer.
evcom_stream *stream = malloc(sizeof(evcom_stream));
evcom_stream_init(stream);
stream->on_read = on_chunk;
return stream;
}
server->on_connection = on_server_connection;
int r = evcom_server_listen(server, (struct sockaddr*)&addresses[nodeID], 10);
assert(r == 0);
evcom_server_attach(EV_DEFAULT_ server);
printf("%02d: Server listening on port %d\n", nodeID, hostPort);
}
// Called when the input is invalid.
int usage(char const *argv[]) {
printf("Usage:\n\t%s config_file nodeID\n", argv[0]);
return 1;
}
// Connect to our peer stations and call "callback" when done.
static void connect_to_peers() {
int i;
streams = (evcom_stream *)malloc(sizeof(evcom_stream) * numNodes);
for (i = 0; i < numNeighbors; i++) {
int j = neighbors[i];
evcom_stream_init(&streams[j]);
assert(EVCOM_INITIALIZED == evcom_stream_state(&streams[j]));
printf("%02d: Connecting to peer %02d.\n", nodeID, j);
int r = evcom_stream_connect(&streams[j], (struct sockaddr*)&addresses[j]);
assert(r == 0);
evcom_stream_attach(EV_DEFAULT_ &streams[j]);
}
}
static void done() {
// Close up.
printf("%02d: Done\n", nodeID);
// closeLog();
exit(0);
}
static void send_message() {
if (maxNumber == 0) {
set_timeout(&done, 1000);
return;
}
if (active_messages == 0) { return; }
int target = neighbors[rand() % numNeighbors];
cJSON* message = cJSON_CreateObject();
cJSON_AddNumberToObject(message, "to", target);
vector_clock[nodeID]++;
first_label_sent[target] = vector_clock[nodeID] + 1;
stream_write(&streams[target], message);
active_messages--;
maxNumber--;
set_timeout(&send_message, minSendDelay);
}
static void go_active() {
active_messages = rand() % (maxPerActive - minPerActive + 1) + minPerActive;
send_message();
}
static void on_warm() {
if (active) {
go_active();
}
}
static void take_checkpoint() {
sequenceNo++;
printf("%02d: Take Checkpoint!\n", nodeID);
char* list = cJSON_PrintUnformatted(cJSON_CreateIntArray(vector_clock, numNodes));
fprintf(vc_fd,"%d\t\t\t%s\n", sequenceNo, list);
cJSON* checkpoint = cJSON_CreateObject();
cJSON_AddItemToObject(checkpoint, "clock", cJSON_CreateIntArray(vector_clock, numNodes));
cJSON_AddItemToObject(checkpoint, "llr", cJSON_CreateIntArray(last_label_rcvd, numNodes));
cJSON_AddItemToObject(checkpoint, "fls", cJSON_CreateIntArray(first_label_sent, numNodes));
fprintf(checkpoint_fd,"%d\t\t\t%s\t\t\t%s\n", sequenceNo, list, cJSON_PrintUnformatted(checkpoint));
int i;
for (i = 0; i < numNeighbors; i++) {
int target = neighbors[i];
if (last_label_rcvd[target] > 0) {
cJSON* message = cJSON_CreateObject();
cJSON_AddNumberToObject(message, "llr", last_label_rcvd[target]);
stream_write(&streams[target], message);
}
}
// Reset history
for (i = 0; i < numNodes; i++) {
last_label_rcvd[i] = 0;
first_label_sent[i] = 0;
}
}
int main (int argc, char const *argv[]) {
// Ensure there are 2 arguments on the command line
if (argc != 3) return usage(argv);
// Ensure the nodeID is an integer
nodeID = atoi(argv[2]);
// Set up system
load_config(argv[1]);
char vc_filename[9];
char checkpoint_filename[18];
sprintf(vc_filename, "vc%02d.txt", nodeID);
sprintf(checkpoint_filename, "checkpoints%02d.txt", nodeID);
vc_fd = fopen(vc_filename, "w");
checkpoint_fd = fopen(checkpoint_filename, "w");
// Initialize the clock and the checkpoint vars
int i;
vector_clock = (int*)malloc(sizeof(int) * numNodes);
last_label_rcvd = (int*)malloc(sizeof(int) * numNodes);
first_label_sent = (int*)malloc(sizeof(int) * numNodes);
for (i = 0; i < numNodes; i++) {
vector_clock[i] = 0;
last_label_rcvd[i] = 0;
first_label_sent[i] = 0;
}
// initLog();
start_tcp_server();
// Prime the random number generator
srand(10);
srand(rand());
set_timeout(&take_checkpoint, instDelay + 3000);
set_timeout(&on_warm, 3000);
set_timeout(&connect_to_peers, 1500);
ev_loop(EV_DEFAULT_ 0);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment