Skip to content

Instantly share code, notes, and snippets.

@Randgalt
Created June 22, 2012 04:29
Show Gist options
  • Save Randgalt/2970233 to your computer and use it in GitHub Desktop.
Save Randgalt/2970233 to your computer and use it in GitHub Desktop.
package com.netflix.curator.framework.recipes.locks;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.curator.framework.CuratorFramework;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class Reaper implements Closeable
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final CuratorFramework client;
private final ExecutorService executor;
private final int reapingThresholdMs;
private final DelayQueue<PathHolder> queue = new DelayQueue<PathHolder>();
private volatile Future<Void> task;
private static final int REAPING_THRESHOLD_MS = (int)TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES);
private static class PathHolder implements Delayed
{
private final String path;
private final long expirationMs;
private PathHolder(String path, int delayMs)
{
this.path = path;
this.expirationMs = System.currentTimeMillis() + delayMs;
}
@Override
public long getDelay(TimeUnit unit)
{
return unit.convert(expirationMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o)
{
long diff = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0);
}
}
public Reaper(CuratorFramework client)
{
this(client, Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("Reaper-%d").build()), REAPING_THRESHOLD_MS);
}
public Reaper(CuratorFramework client, ExecutorService executor, int reapingThresholdMs)
{
this.client = client;
this.executor = executor;
this.reapingThresholdMs = reapingThresholdMs;
}
public void addPath(String path)
{
queue.add(new PathHolder(path, reapingThresholdMs));
}
public void start() throws Exception
{
task = executor.submit
(
new Callable<Void>()
{
@Override
public Void call() throws Exception
{
try
{
while ( !Thread.currentThread().isInterrupted() )
{
reap(queue.take().path);
}
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
}
return null;
}
}
);
}
@Override
public void close() throws IOException
{
try
{
queue.clear();
task.cancel(true);
}
catch ( Exception e )
{
log.error("Canceling task", e);
}
}
private void reap(String path)
{
try
{
Stat stat = client.checkExists().forPath(path);
if ( stat != null ) // otherwise already deleted
{
if ( stat.getNumChildren() == 0 )
{
try
{
client.delete().forPath(path);
log.info("Reaping path: " + path);
}
catch ( KeeperException.NoNodeException ignore )
{
// ignore
}
}
else
{
addPath(path); // it has children, check again later
}
}
}
catch ( Exception e )
{
log.error("Trying to reap: " + path, e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment