Skip to content

Instantly share code, notes, and snippets.

@shikhar
Created September 1, 2015 16:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shikhar/d4305ec18333c6623502 to your computer and use it in GitHub Desktop.
Save shikhar/d4305ec18333c6623502 to your computer and use it in GitHub Desktop.
import java.io.IOException;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
public class CheckpointTracker {
private final Checkpointing checkpointing;
private final String key;
private final long every;
private final SortedSet<Long> inProcess = new ConcurrentSkipListSet<>();
private final AtomicLong total = new AtomicLong();
private volatile long updateGeneration;
public CheckpointTracker(Checkpointing checkpointing, String key, long every) {
Preconditions.checkArgument(every > 0);
this.checkpointing = checkpointing;
this.key = key;
this.every = every;
}
public Optional<Long> currentValue() {
try {
return checkpointing.get(key);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void startProcessing(long value) {
inProcess.add(value);
}
public void doneProcessing(long value) {
inProcess.remove(value);
}
public boolean maybeUpdate(long count) {
addTotal(count);
return maybeUpdate();
}
public long incrementTotal() {
return total.incrementAndGet();
}
public long addTotal(long n) {
return total.addAndGet(n);
}
public boolean maybeUpdate() {
final long currentTotal = total.get();
final long newUpdateGeneration = currentTotal / every;
if (newUpdateGeneration > updateGeneration) {
synchronized (this) {
if (newUpdateGeneration > updateGeneration) {
update(inProcess.first());
updateGeneration = newUpdateGeneration;
return true;
}
}
}
return false;
}
public void update(final long value) {
try {
checkpointing.set(key, value);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment