Created
September 1, 2015 16:16
-
-
Save shikhar/d4305ec18333c6623502 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.util.SortedSet; | |
import java.util.concurrent.ConcurrentSkipListSet; | |
import java.util.concurrent.atomic.AtomicLong; | |
import com.google.common.base.Optional; | |
import com.google.common.base.Preconditions; | |
public class CheckpointTracker { | |
private final Checkpointing checkpointing; | |
private final String key; | |
private final long every; | |
private final SortedSet<Long> inProcess = new ConcurrentSkipListSet<>(); | |
private final AtomicLong total = new AtomicLong(); | |
private volatile long updateGeneration; | |
public CheckpointTracker(Checkpointing checkpointing, String key, long every) { | |
Preconditions.checkArgument(every > 0); | |
this.checkpointing = checkpointing; | |
this.key = key; | |
this.every = every; | |
} | |
public Optional<Long> currentValue() { | |
try { | |
return checkpointing.get(key); | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
public void startProcessing(long value) { | |
inProcess.add(value); | |
} | |
public void doneProcessing(long value) { | |
inProcess.remove(value); | |
} | |
public boolean maybeUpdate(long count) { | |
addTotal(count); | |
return maybeUpdate(); | |
} | |
public long incrementTotal() { | |
return total.incrementAndGet(); | |
} | |
public long addTotal(long n) { | |
return total.addAndGet(n); | |
} | |
public boolean maybeUpdate() { | |
final long currentTotal = total.get(); | |
final long newUpdateGeneration = currentTotal / every; | |
if (newUpdateGeneration > updateGeneration) { | |
synchronized (this) { | |
if (newUpdateGeneration > updateGeneration) { | |
update(inProcess.first()); | |
updateGeneration = newUpdateGeneration; | |
return true; | |
} | |
} | |
} | |
return false; | |
} | |
public void update(final long value) { | |
try { | |
checkpointing.set(key, value); | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment