Last active
December 15, 2015 13:29
-
-
Save arthurtsang/5268023 to your computer and use it in GitHub Desktop.
ZooKeeper watcher without loosing events
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private void getConfigsFromZookeeper() throws IOException { | |
logger.debug("Connected to Zookeeper"); | |
synchronized(lock) { | |
try { | |
configs = trackChildren(configs); | |
} catch (Exception e) { | |
logger.error("Error tracking Zookeeper config", e); | |
} | |
} | |
} | |
/* | |
* Tracks any new configs that has been added to ZooKeeper since the last start of this bundle. | |
* The returned Map will contain these new configs in addition to the configs that are already being tracked. | |
*/ | |
private Map<String,String> trackChildren(Map<String,String> oldConfigs) throws KeeperException, InterruptedException, IOException { | |
Map<String,String> newConfigs = Collections.synchronizedMap( new HashMap<String,String>(oldConfigs) ); | |
List<String> children = children = zk.getChildren("/configs", true); | |
for( String child : children ) { | |
// if it is a new child node | |
if( !newConfigs.containsKey(child) ) { | |
trackChild(newConfigs, "/configs/"+child ); | |
} | |
} | |
return newConfigs; | |
} | |
/* | |
* upon datachange event, get the new data & set watcher again | |
* upon nodedelete event, process has already deleted it from state, but watch it again in case it has been recreated before trackChildren set the watch again | |
* if the child node had been been processed, trackChildren will call this method to process it and watch it | |
*/ | |
private void trackChild(Map<String,String> newConfigs, String path) throws KeeperException, InterruptedException, IOException { | |
if( path == null ) return; | |
try { | |
byte[] data = zk.getData(path, true, null); | |
// do something | |
logger.debug("watching " + path ); | |
}catch(NoNodeException e){ | |
} | |
} | |
public void process(WatchedEvent event) { | |
logger.debug( "ZK process event: " + event.getType() + " " + event.getPath() ); | |
if( event.getType() == Event.EventType.None ) { | |
if( event.getState().ordinal() == 3 ) { | |
try { | |
getConfigsFromZookeeper(); | |
} catch (IOException e) { | |
logger.error( "Failed download configurations from zookeeper", e ); | |
} | |
} | |
} | |
if( event.getType() == Event.EventType.NodeChildrenChanged ) { | |
synchronized(lock) { | |
try { | |
logger.debug("about to track children"); | |
configs = trackChildren(configs); | |
logger.debug("processed all children"); | |
} catch (Exception e) { } | |
} | |
} | |
if( event.getType() == Event.EventType.NodeDataChanged ){ | |
synchronized(lock) { | |
try { | |
logger.debug("data change detected for " + event.getPath()); | |
trackChild(configs,event.getPath()); | |
} catch (Exception e) { | |
} | |
} | |
} | |
if( event.getType() == Event.EventType.NodeDeleted ) { | |
String child = (event.getPath()==null) ? null : (new File(event.getPath())).getName(); | |
synchronized(lock) { | |
try { | |
logger.debug("node " + event.getPath() + " deleted"); | |
String pid = configs.remove(child); | |
Configuration config = configAdmin.getConfiguration(pid); | |
config.delete(); | |
trackChild(configs,event.getPath()); | |
} catch (Exception e) { | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment