Skip to content

Instantly share code, notes, and snippets.

@tharindukumara
Last active March 7, 2017 04:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tharindukumara/025c98ddbd406db34067a587c6d9533d to your computer and use it in GitHub Desktop.
Save tharindukumara/025c98ddbd406db34067a587c6d9533d to your computer and use it in GitHub Desktop.
#include "master.h"
#include <sys/syscall.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <iostream>
#include <fstream>
#include <zookeeper_log.h>
#include <sstream>
#include <fcntl.h>
void log_tid(){
char buf [256];
sprintf(buf,"########################### Thread Id : %d ###########################", syscall(SYS_gettid));
LOG_DEBUG((buf));
}
int is_connected() {
LOG_DEBUG(("########################## Is Connected ##########################"));
log_tid();
return connected;
}
int is_expired() {
LOG_DEBUG(("########################## Is Expired ##########################"));
log_tid();
return expired;
}
void take_leadership() {
LOG_DEBUG(("########################## Primary Selected ##########################"));
log_tid();
}
void master_check_completion (int rc, const char *value, int value_len, const struct Stat *stat, const void *data) {
LOG_DEBUG(("########################## master_check_completion ##########################"));
log_tid();
int master_id;
switch (rc) {
case ZCONNECTIONLOSS:
case ZOPERATIONTIMEOUT:
check_master();
break;
case ZOK:
sscanf(value, "%x", &master_id );
if(master_id == server_id) {
take_leadership();
LOG_DEBUG(("Elected primary master"));
} else {
master_exists();
LOG_DEBUG(("The primary is some other process"));
}
break;
case ZNONODE:
run_for_master();
break;
default:
LOG_ERROR(("Something went wrong when checking the master lock: %s", rc2string(rc)));
break;
}
}
void check_master () {
LOG_DEBUG(("########################## check_master ##########################"));
log_tid();
zoo_aget(zh, "/master", 0, master_check_completion, NULL);
}
void run_for_master();
void master_exists_watcher (zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {
LOG_DEBUG(("########################## master_exists_watcher ##########################"));
log_tid();
if( type == ZOO_DELETED_EVENT) {
assert( !strcmp(path, "/master") );
run_for_master();
} else {
LOG_DEBUG(("Watched event: ", type2string(type)));
}
}
void master_exists_completion (int rc, const struct Stat *stat, const void *data) {
LOG_DEBUG(("########################## master_exists_completion ##########################"));
log_tid();
switch (rc) {
case ZCONNECTIONLOSS:
case ZOPERATIONTIMEOUT:
master_exists();
break;
case ZOK:
break;
case ZNONODE:
LOG_INFO(("previous master is gone, running for master"));
run_for_master();
break;
default:
LOG_WARN(("something went wrong when executing exists: %s", rc2string(rc)));
break;
}
}
void master_exists() {
LOG_DEBUG(("########################## master_exists ##########################"));
log_tid();
zoo_awexists(zh, "/master", master_exists_watcher, NULL, master_exists_completion, NULL);
}
void master_create_completion (int rc, const char *value, const void *data) {
LOG_DEBUG(("########################## master_create_completion ##########################"));
log_tid();
switch (rc) {
case ZCONNECTIONLOSS:
check_master();
break;
case ZOK:
take_leadership();
break;
case ZNODEEXISTS:
master_exists();
LOG_DEBUG(("########################## I am the Mirror ##########################"));
break;
default:
LOG_ERROR(("something went wrong when running for master."));
break;
}
}
void run_for_master() {
LOG_DEBUG(("########################## Run For Master ##########################"));
log_tid();
if(!connected) {
LOG_WARN(("client not connected to ZooKeeper"));
return;
}
char server_id_string[9];
snprintf(server_id_string, 9, "%x", server_id);
zoo_acreate(zh,
"/master",
(const char *) server_id_string,
strlen(server_id_string) + 1,
&ZOO_OPEN_ACL_UNSAFE,
ZOO_EPHEMERAL,
master_create_completion,
NULL);
}
void main_watcher (zhandle_t *zkh, int type, int state, const char *path, void* context)
{
LOG_DEBUG(("########################## Main Watcher ##########################"));
log_tid();
/*
* zookeeper_init might not have returned, so we
* use zkh instead.
*/
if (type == ZOO_SESSION_EVENT) {
if (state == ZOO_CONNECTED_STATE) {
connected = 1;
LOG_DEBUG(("Received a connected event."));
} else if (state == ZOO_CONNECTING_STATE) {
if(connected == 1) {
LOG_WARN(("Disconnected."));
}
connected = 0;
} else if (state == ZOO_EXPIRED_SESSION_STATE) {
expired = 1;
connected = 0;
zookeeper_close(zkh);
}
}
LOG_DEBUG(("Event: %s, %d", type2string(type), state));
}
int init (char * hostPort) {
LOG_DEBUG(("########################## INIT ##########################"));
log_tid();
srand(time(NULL));
server_id = rand();
// zoo_set_log_stream(clogger);
zoo_set_debug_level(ZOO_LOG_LEVEL_DEBUG);
zh = zookeeper_init(hostPort, main_watcher, 45000, 0, 0, 0);
return errno;
}
#define MAX_LEN 1024
int main (int argc, char * argv[]) {
if (argc != 2) {
fprintf(stderr, "USAGE: %s host:port\n", argv[0]);
exit(1);
}
int pipefd[2];
pipe(pipefd);
if (fork() == 0)
{
close(pipefd[0]); // close reading end in the child
dup2(pipefd[1], 1); // send stdout to the pipe
dup2(pipefd[1], 2); // send stderr to the pipe
close(pipefd[1]); // this descriptor is no longer needed
/*
* Initialize ZooKeeper session
*
*/
if(init(argv[1])){
LOG_ERROR(("Error while initializing the master: ", errno));
}
LOG_DEBUG(("########################################################"));
while(!is_connected()) {
sleep(1);
}
LOG_DEBUG(("connected, going to bootstrap and run for master"));
/*
* Run for master
*/
run_for_master();
while(!is_expired()) {
sleep(1);
}
}
else
{
char buffer[1024];
close(pipefd[1]); // close the write end of the pipe in the parent
FILE* fp = fdopen(pipefd[0], "r");
int count = 0;
while(fgets(buffer, sizeof buffer, fp)) /* read from pipe into buffer */
{
printf("%d : zookeeper_client: %s", count++, buffer);
}
}
return 0;
}
#pragma once
#include <assert.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <stdarg.h>
#include <unistd.h>
#include <time.h>
#include <zookeeper.h>
#include <zookeeper_log.h>
#include <zookeeper.jute.h>
static const char *hostPort;
static zhandle_t *zh;
static int connected = 0;
static int expired = 0;
static int server_id;
/*
* Function definitions.
*/
void create_parent();
void run_for_master();
void check_master();
void master_exists();
/*
* Master states
*/
enum master_states {
RUNNING,
ELECTED,
NOTELECTED
};
static enum master_states state;
enum master_states get_state () {
return state;
}
/*
* Instances of this struct are
* used when assigning a task to
* a worker.
*/
struct task_info {
char * name;
char * value;
int value_len;
char * worker;
};
/*
* The following two methods convert
* event types and return codes, respectively,
* to strings.
*/
static const char * type2string(int type){
if (type == ZOO_CREATED_EVENT)
return "CREATED_EVENT";
if (type == ZOO_DELETED_EVENT)
return "DELETED_EVENT";
if (type == ZOO_CHANGED_EVENT)
return "CHANGED_EVENT";
if (type == ZOO_CHILD_EVENT)
return "CHILD_EVENT";
if (type == ZOO_SESSION_EVENT)
return "SESSION_EVENT";
if (type == ZOO_NOTWATCHING_EVENT)
return "NOTWATCHING_EVENT";
return "UNKNOWN_EVENT_TYPE";
}
static const char * rc2string(int rc){
if (rc == ZOK) {
return "OK";
}
if (rc == ZSYSTEMERROR) {
return "System error";
}
if (rc == ZRUNTIMEINCONSISTENCY) {
return "Runtime inconsistency";
}
if (rc == ZDATAINCONSISTENCY) {
return "Data inconsistency";
}
if (rc == ZCONNECTIONLOSS) {
return "Connection to the server has been lost";
}
if (rc == ZMARSHALLINGERROR) {
return "Error while marshalling or unmarshalling data ";
}
if (rc == ZUNIMPLEMENTED) {
return "Operation not implemented";
}
if (rc == ZOPERATIONTIMEOUT) {
return "Operation timeout";
}
if (rc == ZBADARGUMENTS) {
return "Invalid argument";
}
if (rc == ZINVALIDSTATE) {
return "Invalid zhandle state";
}
if (rc == ZAPIERROR) {
return "API error";
}
if (rc == ZNONODE) {
return "Znode does not exist";
}
if (rc == ZNOAUTH) {
return "Not authenticated";
}
if (rc == ZBADVERSION) {
return "Version conflict";
}
if (rc == ZNOCHILDRENFOREPHEMERALS) {
return "Ephemeral nodes may not have children";
}
if (rc == ZNODEEXISTS) {
return "Znode already exists";
}
if (rc == ZNOTEMPTY) {
return "The znode has children";
}
if (rc == ZSESSIONEXPIRED) {
return "The session has been expired by the server";
}
if (rc == ZINVALIDCALLBACK) {
return "Invalid callback specified";
}
if (rc == ZINVALIDACL) {
return "Invalid ACL specified";
}
if (rc == ZAUTHFAILED) {
return "Client authentication failed";
}
if (rc == ZCLOSING) {
return "ZooKeeper session is closing";
}
if (rc == ZNOTHING) {
return "No response from server";
}
if (rc == ZSESSIONMOVED) {
return "Session moved to a different server";
}
return "UNKNOWN_EVENT_TYPE";
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment