Skip to content

Instantly share code, notes, and snippets.

@Randgalt
Last active December 19, 2015 16:59
Show Gist options
  • Save Randgalt/5988052 to your computer and use it in GitHub Desktop.
Save Randgalt/5988052 to your computer and use it in GitHub Desktop.
Alternative implementation of DispatchingWatcher
package org.apache.curator.framework.imps;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.zookeeper.ClientCnxn;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ExecutionException;
class DispatchingWatcher implements Watcher, Closeable
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final LoadingCache<String, Set<NamespaceWatcher>> watchers = CacheBuilder
.newBuilder()
.build
(
new CacheLoader<String, Set<NamespaceWatcher>>()
{
@Override
public Set<NamespaceWatcher> load(String key) throws Exception
{
return Sets.newSetFromMap(Maps.<NamespaceWatcher, Boolean>newConcurrentMap());
}
}
);
@Override
public void process(WatchedEvent watchedEvent)
{
// NOTE: ZK watchers are single threaded so sycn is unnecessary here
String path = watchedEvent.getPath();
Collection<NamespaceWatcher> watchersForPath;
try
{
watchersForPath = watchers.get(path);
}
catch ( ExecutionException e )
{
log.error("Unexpected error", e);
throw new RuntimeException(e); // should never happen
}
// We don't want to remove Watchers on None events (e.g. disconnected, expired etc).
switch ( watchedEvent.getType() )
{
case None:
{
clearIfNeeded(watchedEvent.getState());
break;
}
default:
{
watchers.invalidate(path);
break;
}
}
for ( NamespaceWatcher watcher : watchersForPath )
{
try
{
watcher.process(watchedEvent);
}
catch ( Exception e )
{
log.error("Error while calling watcher.", e);
}
}
}
/**
* Registers a {@link NamespaceWatcher}.
*
* @param path The registration path.
* @param watcher The watcher.
* @return The global watcher instance.
*/
public Watcher addNamespaceWatcher(String path, NamespaceWatcher watcher) throws ExecutionException
{
watchers.get(path).add(watcher);
return this;
}
@Override
public void close()
{
watchers.invalidateAll();
}
/**
* Clears all {@link Watcher} objects if needed.
*
* @param state The keeper state.
*/
private void clearIfNeeded(Event.KeeperState state)
{
if ( ClientCnxn.getDisableAutoResetWatch() && (state != Event.KeeperState.SyncConnected) )
{
watchers.invalidateAll();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment