Skip to content

Instantly share code, notes, and snippets.

@arthurtsang
Last active December 15, 2015 13:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arthurtsang/5268023 to your computer and use it in GitHub Desktop.
Save arthurtsang/5268023 to your computer and use it in GitHub Desktop.
ZooKeeper watcher without loosing events
private void getConfigsFromZookeeper() throws IOException {
logger.debug("Connected to Zookeeper");
synchronized(lock) {
try {
configs = trackChildren(configs);
} catch (Exception e) {
logger.error("Error tracking Zookeeper config", e);
}
}
}
/*
* Tracks any new configs that has been added to ZooKeeper since the last start of this bundle.
* The returned Map will contain these new configs in addition to the configs that are already being tracked.
*/
private Map<String,String> trackChildren(Map<String,String> oldConfigs) throws KeeperException, InterruptedException, IOException {
Map<String,String> newConfigs = Collections.synchronizedMap( new HashMap<String,String>(oldConfigs) );
List<String> children = children = zk.getChildren("/configs", true);
for( String child : children ) {
// if it is a new child node
if( !newConfigs.containsKey(child) ) {
trackChild(newConfigs, "/configs/"+child );
}
}
return newConfigs;
}
/*
* upon datachange event, get the new data & set watcher again
* upon nodedelete event, process has already deleted it from state, but watch it again in case it has been recreated before trackChildren set the watch again
* if the child node had been been processed, trackChildren will call this method to process it and watch it
*/
private void trackChild(Map<String,String> newConfigs, String path) throws KeeperException, InterruptedException, IOException {
if( path == null ) return;
try {
byte[] data = zk.getData(path, true, null);
// do something
logger.debug("watching " + path );
}catch(NoNodeException e){
}
}
public void process(WatchedEvent event) {
logger.debug( "ZK process event: " + event.getType() + " " + event.getPath() );
if( event.getType() == Event.EventType.None ) {
if( event.getState().ordinal() == 3 ) {
try {
getConfigsFromZookeeper();
} catch (IOException e) {
logger.error( "Failed download configurations from zookeeper", e );
}
}
}
if( event.getType() == Event.EventType.NodeChildrenChanged ) {
synchronized(lock) {
try {
logger.debug("about to track children");
configs = trackChildren(configs);
logger.debug("processed all children");
} catch (Exception e) { }
}
}
if( event.getType() == Event.EventType.NodeDataChanged ){
synchronized(lock) {
try {
logger.debug("data change detected for " + event.getPath());
trackChild(configs,event.getPath());
} catch (Exception e) {
}
}
}
if( event.getType() == Event.EventType.NodeDeleted ) {
String child = (event.getPath()==null) ? null : (new File(event.getPath())).getName();
synchronized(lock) {
try {
logger.debug("node " + event.getPath() + " deleted");
String pid = configs.remove(child);
Configuration config = configAdmin.getConfiguration(pid);
config.delete();
trackChild(configs,event.getPath());
} catch (Exception e) {
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment