-
-
Save tharindukumara/025c98ddbd406db34067a587c6d9533d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "master.h" | |
#include <sys/syscall.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <unistd.h> | |
#include <string.h> | |
#include <sys/types.h> | |
#include <sys/socket.h> | |
#include <netinet/in.h> | |
#include <netdb.h> | |
#include <iostream> | |
#include <fstream> | |
#include <zookeeper_log.h> | |
#include <sstream> | |
#include <fcntl.h> | |
void log_tid(){ | |
char buf [256]; | |
sprintf(buf,"########################### Thread Id : %d ###########################", syscall(SYS_gettid)); | |
LOG_DEBUG((buf)); | |
} | |
int is_connected() { | |
LOG_DEBUG(("########################## Is Connected ##########################")); | |
log_tid(); | |
return connected; | |
} | |
int is_expired() { | |
LOG_DEBUG(("########################## Is Expired ##########################")); | |
log_tid(); | |
return expired; | |
} | |
void take_leadership() { | |
LOG_DEBUG(("########################## Primary Selected ##########################")); | |
log_tid(); | |
} | |
void master_check_completion (int rc, const char *value, int value_len, const struct Stat *stat, const void *data) { | |
LOG_DEBUG(("########################## master_check_completion ##########################")); | |
log_tid(); | |
int master_id; | |
switch (rc) { | |
case ZCONNECTIONLOSS: | |
case ZOPERATIONTIMEOUT: | |
check_master(); | |
break; | |
case ZOK: | |
sscanf(value, "%x", &master_id ); | |
if(master_id == server_id) { | |
take_leadership(); | |
LOG_DEBUG(("Elected primary master")); | |
} else { | |
master_exists(); | |
LOG_DEBUG(("The primary is some other process")); | |
} | |
break; | |
case ZNONODE: | |
run_for_master(); | |
break; | |
default: | |
LOG_ERROR(("Something went wrong when checking the master lock: %s", rc2string(rc))); | |
break; | |
} | |
} | |
void check_master () { | |
LOG_DEBUG(("########################## check_master ##########################")); | |
log_tid(); | |
zoo_aget(zh, "/master", 0, master_check_completion, NULL); | |
} | |
void run_for_master(); | |
void master_exists_watcher (zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) { | |
LOG_DEBUG(("########################## master_exists_watcher ##########################")); | |
log_tid(); | |
if( type == ZOO_DELETED_EVENT) { | |
assert( !strcmp(path, "/master") ); | |
run_for_master(); | |
} else { | |
LOG_DEBUG(("Watched event: ", type2string(type))); | |
} | |
} | |
void master_exists_completion (int rc, const struct Stat *stat, const void *data) { | |
LOG_DEBUG(("########################## master_exists_completion ##########################")); | |
log_tid(); | |
switch (rc) { | |
case ZCONNECTIONLOSS: | |
case ZOPERATIONTIMEOUT: | |
master_exists(); | |
break; | |
case ZOK: | |
break; | |
case ZNONODE: | |
LOG_INFO(("previous master is gone, running for master")); | |
run_for_master(); | |
break; | |
default: | |
LOG_WARN(("something went wrong when executing exists: %s", rc2string(rc))); | |
break; | |
} | |
} | |
void master_exists() { | |
LOG_DEBUG(("########################## master_exists ##########################")); | |
log_tid(); | |
zoo_awexists(zh, "/master", master_exists_watcher, NULL, master_exists_completion, NULL); | |
} | |
void master_create_completion (int rc, const char *value, const void *data) { | |
LOG_DEBUG(("########################## master_create_completion ##########################")); | |
log_tid(); | |
switch (rc) { | |
case ZCONNECTIONLOSS: | |
check_master(); | |
break; | |
case ZOK: | |
take_leadership(); | |
break; | |
case ZNODEEXISTS: | |
master_exists(); | |
LOG_DEBUG(("########################## I am the Mirror ##########################")); | |
break; | |
default: | |
LOG_ERROR(("something went wrong when running for master.")); | |
break; | |
} | |
} | |
void run_for_master() { | |
LOG_DEBUG(("########################## Run For Master ##########################")); | |
log_tid(); | |
if(!connected) { | |
LOG_WARN(("client not connected to ZooKeeper")); | |
return; | |
} | |
char server_id_string[9]; | |
snprintf(server_id_string, 9, "%x", server_id); | |
zoo_acreate(zh, | |
"/master", | |
(const char *) server_id_string, | |
strlen(server_id_string) + 1, | |
&ZOO_OPEN_ACL_UNSAFE, | |
ZOO_EPHEMERAL, | |
master_create_completion, | |
NULL); | |
} | |
void main_watcher (zhandle_t *zkh, int type, int state, const char *path, void* context) | |
{ | |
LOG_DEBUG(("########################## Main Watcher ##########################")); | |
log_tid(); | |
/* | |
* zookeeper_init might not have returned, so we | |
* use zkh instead. | |
*/ | |
if (type == ZOO_SESSION_EVENT) { | |
if (state == ZOO_CONNECTED_STATE) { | |
connected = 1; | |
LOG_DEBUG(("Received a connected event.")); | |
} else if (state == ZOO_CONNECTING_STATE) { | |
if(connected == 1) { | |
LOG_WARN(("Disconnected.")); | |
} | |
connected = 0; | |
} else if (state == ZOO_EXPIRED_SESSION_STATE) { | |
expired = 1; | |
connected = 0; | |
zookeeper_close(zkh); | |
} | |
} | |
LOG_DEBUG(("Event: %s, %d", type2string(type), state)); | |
} | |
int init (char * hostPort) { | |
LOG_DEBUG(("########################## INIT ##########################")); | |
log_tid(); | |
srand(time(NULL)); | |
server_id = rand(); | |
// zoo_set_log_stream(clogger); | |
zoo_set_debug_level(ZOO_LOG_LEVEL_DEBUG); | |
zh = zookeeper_init(hostPort, main_watcher, 45000, 0, 0, 0); | |
return errno; | |
} | |
#define MAX_LEN 1024 | |
int main (int argc, char * argv[]) { | |
if (argc != 2) { | |
fprintf(stderr, "USAGE: %s host:port\n", argv[0]); | |
exit(1); | |
} | |
int pipefd[2]; | |
pipe(pipefd); | |
if (fork() == 0) | |
{ | |
close(pipefd[0]); // close reading end in the child | |
dup2(pipefd[1], 1); // send stdout to the pipe | |
dup2(pipefd[1], 2); // send stderr to the pipe | |
close(pipefd[1]); // this descriptor is no longer needed | |
/* | |
* Initialize ZooKeeper session | |
* | |
*/ | |
if(init(argv[1])){ | |
LOG_ERROR(("Error while initializing the master: ", errno)); | |
} | |
LOG_DEBUG(("########################################################")); | |
while(!is_connected()) { | |
sleep(1); | |
} | |
LOG_DEBUG(("connected, going to bootstrap and run for master")); | |
/* | |
* Run for master | |
*/ | |
run_for_master(); | |
while(!is_expired()) { | |
sleep(1); | |
} | |
} | |
else | |
{ | |
char buffer[1024]; | |
close(pipefd[1]); // close the write end of the pipe in the parent | |
FILE* fp = fdopen(pipefd[0], "r"); | |
int count = 0; | |
while(fgets(buffer, sizeof buffer, fp)) /* read from pipe into buffer */ | |
{ | |
printf("%d : zookeeper_client: %s", count++, buffer); | |
} | |
} | |
return 0; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include <assert.h> | |
#include <errno.h> | |
#include <string.h> | |
#include <stdlib.h> | |
#include <stdarg.h> | |
#include <unistd.h> | |
#include <time.h> | |
#include <zookeeper.h> | |
#include <zookeeper_log.h> | |
#include <zookeeper.jute.h> | |
static const char *hostPort; | |
static zhandle_t *zh; | |
static int connected = 0; | |
static int expired = 0; | |
static int server_id; | |
/* | |
* Function definitions. | |
*/ | |
void create_parent(); | |
void run_for_master(); | |
void check_master(); | |
void master_exists(); | |
/* | |
* Master states | |
*/ | |
enum master_states { | |
RUNNING, | |
ELECTED, | |
NOTELECTED | |
}; | |
static enum master_states state; | |
enum master_states get_state () { | |
return state; | |
} | |
/* | |
* Instances of this struct are | |
* used when assigning a task to | |
* a worker. | |
*/ | |
struct task_info { | |
char * name; | |
char * value; | |
int value_len; | |
char * worker; | |
}; | |
/* | |
* The following two methods convert | |
* event types and return codes, respectively, | |
* to strings. | |
*/ | |
static const char * type2string(int type){ | |
if (type == ZOO_CREATED_EVENT) | |
return "CREATED_EVENT"; | |
if (type == ZOO_DELETED_EVENT) | |
return "DELETED_EVENT"; | |
if (type == ZOO_CHANGED_EVENT) | |
return "CHANGED_EVENT"; | |
if (type == ZOO_CHILD_EVENT) | |
return "CHILD_EVENT"; | |
if (type == ZOO_SESSION_EVENT) | |
return "SESSION_EVENT"; | |
if (type == ZOO_NOTWATCHING_EVENT) | |
return "NOTWATCHING_EVENT"; | |
return "UNKNOWN_EVENT_TYPE"; | |
} | |
static const char * rc2string(int rc){ | |
if (rc == ZOK) { | |
return "OK"; | |
} | |
if (rc == ZSYSTEMERROR) { | |
return "System error"; | |
} | |
if (rc == ZRUNTIMEINCONSISTENCY) { | |
return "Runtime inconsistency"; | |
} | |
if (rc == ZDATAINCONSISTENCY) { | |
return "Data inconsistency"; | |
} | |
if (rc == ZCONNECTIONLOSS) { | |
return "Connection to the server has been lost"; | |
} | |
if (rc == ZMARSHALLINGERROR) { | |
return "Error while marshalling or unmarshalling data "; | |
} | |
if (rc == ZUNIMPLEMENTED) { | |
return "Operation not implemented"; | |
} | |
if (rc == ZOPERATIONTIMEOUT) { | |
return "Operation timeout"; | |
} | |
if (rc == ZBADARGUMENTS) { | |
return "Invalid argument"; | |
} | |
if (rc == ZINVALIDSTATE) { | |
return "Invalid zhandle state"; | |
} | |
if (rc == ZAPIERROR) { | |
return "API error"; | |
} | |
if (rc == ZNONODE) { | |
return "Znode does not exist"; | |
} | |
if (rc == ZNOAUTH) { | |
return "Not authenticated"; | |
} | |
if (rc == ZBADVERSION) { | |
return "Version conflict"; | |
} | |
if (rc == ZNOCHILDRENFOREPHEMERALS) { | |
return "Ephemeral nodes may not have children"; | |
} | |
if (rc == ZNODEEXISTS) { | |
return "Znode already exists"; | |
} | |
if (rc == ZNOTEMPTY) { | |
return "The znode has children"; | |
} | |
if (rc == ZSESSIONEXPIRED) { | |
return "The session has been expired by the server"; | |
} | |
if (rc == ZINVALIDCALLBACK) { | |
return "Invalid callback specified"; | |
} | |
if (rc == ZINVALIDACL) { | |
return "Invalid ACL specified"; | |
} | |
if (rc == ZAUTHFAILED) { | |
return "Client authentication failed"; | |
} | |
if (rc == ZCLOSING) { | |
return "ZooKeeper session is closing"; | |
} | |
if (rc == ZNOTHING) { | |
return "No response from server"; | |
} | |
if (rc == ZSESSIONMOVED) { | |
return "Session moved to a different server"; | |
} | |
return "UNKNOWN_EVENT_TYPE"; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment