Last active
September 8, 2016 12:14
-
-
Save Fuud/f61aa79de93312e33ce7fef8b4275fec to your computer and use it in GitHub Desktop.
SimpleWindowCounter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.github.metricscore.hdr.counter; | |
import com.codahale.metrics.Clock; | |
import java.time.Duration; | |
import java.util.Arrays; | |
import java.util.Optional; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicReference; | |
import java.util.concurrent.atomic.LongAdder; | |
import java.util.concurrent.locks.StampedLock; | |
public class SimpleWindowCounter implements WindowCounter { | |
private final Duration windowSize; | |
private final int chunkCount; | |
private final boolean smoothly; | |
private final Clock clock; | |
private final long chunkDurationMs; | |
private AtomicReference<Chunk> currentChunk; | |
public SimpleWindowCounter(Duration windowSize, int chunkCount, boolean smoothly, Clock clock) { | |
this.windowSize = windowSize; | |
this.chunkCount = chunkCount; | |
this.smoothly = smoothly; | |
this.chunkDurationMs = windowSize.toMillis() / chunkCount; | |
this.clock = clock; | |
currentChunk = new AtomicReference<>(new Chunk(new long[chunkCount - 1], new long[chunkCount], clock.getTime() + chunkDurationMs)); | |
} | |
@Override | |
public void add(long delta) { | |
final long currentTime = clock.getTime(); | |
getCurrentChunk(currentTime).add(delta); | |
} | |
@Override | |
public long getSum() { | |
final long currentTime = clock.getTime(); | |
if (smoothly) { | |
return getCurrentChunk(currentTime).getSmoothlySum(currentTime, chunkDurationMs); | |
} else { | |
return getCurrentChunk(currentTime).getSum(); | |
} | |
} | |
private Chunk getCurrentChunk(long currentTime) { | |
while (true) { | |
final Chunk currentChunk = this.currentChunk.get(); | |
if (!currentChunk.shouldExpire(currentTime)) { | |
return currentChunk; | |
} else { | |
Optional<Chunk> newChunk = currentChunk.tryExpireAndCreateNext(chunkDurationMs); | |
newChunk.ifPresent(chunk -> { | |
this.currentChunk.compareAndSet(currentChunk, chunk); | |
}); | |
} | |
} | |
} | |
private static class Chunk { | |
/** | |
* summedPrevSums[0] - contains sum of chunks sums for last (chunkCount - 1) chunks | |
* summedPrevSums[1] - contains sum of chunks sums for last (chunkCount - 2) chunks | |
* summedPrevSums[2] - contains sum of chunks sums for last (chunkCount - 3) chunks | |
* etc | |
*/ | |
private final long[] summedPrevSums; | |
/** | |
* byChunkPrevSums[0] - contains sum that was in last chunk in window | |
* byChunkPrevSums[1] - contains sum that was in the next to the last chunk in window | |
* byChunkPrevSums[2] - contains sum of chunks in the next to the next to the last chunk in window | |
* etc | |
*/ | |
private final long[] byChunkPrevSums; | |
private final LongAdder sumInThisChunk = new LongAdder(); | |
private final long expirationTime; | |
//non-null only if expired | |
private final AtomicReference<Chunk> nextChunk = new AtomicReference<>(); | |
private final AtomicBoolean expired = new AtomicBoolean(); | |
private final StampedLock stampedLock = new StampedLock(); | |
private Chunk(long[] summedPrevSums, long[] byChunkPrevSums, long expirationTime) { | |
this.summedPrevSums = summedPrevSums; | |
this.byChunkPrevSums = byChunkPrevSums; | |
this.expirationTime = expirationTime; | |
} | |
private long getSum() { | |
return summedPrevSums[0] + sumInThisChunk.longValue(); | |
} | |
private long getSmoothlySum(long currentTimeMs, long chunkDurationMs) { | |
return summedPrevSums[0] + sumInThisChunk.longValue() + (long) (byChunkPrevSums[0] * ((expirationTime - currentTimeMs) * 1.0 / chunkDurationMs)); | |
} | |
private void add(long delta) { | |
final long lock = stampedLock.readLock(); | |
try { | |
if (expired.get()) { | |
while (nextChunk.get() == null) { | |
} | |
nextChunk.get().add(delta); | |
} else { | |
sumInThisChunk.add(delta); | |
} | |
} finally { | |
stampedLock.unlock(lock); | |
} | |
} | |
private boolean shouldExpire(long currentTime) { | |
return expirationTime <= currentTime; | |
} | |
private Optional<Chunk> tryExpireAndCreateNext(long chunkDurationMs) { | |
final long lock = stampedLock.readLock(); | |
try { | |
if (expired.get()) { | |
//someone else will done job for us | |
return Optional.empty(); | |
} | |
expired.set(true); | |
final long sumInCurrentChunk = this.sumInThisChunk.longValue(); | |
long[] newSummedPrevSums = new long[summedPrevSums.length]; | |
for (int i = 0; i < newSummedPrevSums.length - 1; i++) { | |
newSummedPrevSums[i] = summedPrevSums[i + 1] + sumInCurrentChunk; | |
} | |
newSummedPrevSums[newSummedPrevSums.length - 1] = sumInCurrentChunk; | |
long[] newByChunkPrevSums = new long[byChunkPrevSums.length]; | |
for (int i = 0; i < newByChunkPrevSums.length - 1; i++) { | |
newByChunkPrevSums[i] = byChunkPrevSums[i + 1]; | |
} | |
newByChunkPrevSums[newByChunkPrevSums.length - 1] = sumInCurrentChunk; | |
final Chunk chunk = new Chunk(newSummedPrevSums, newByChunkPrevSums, this.expirationTime + chunkDurationMs); | |
// System.out.println("newChunk: " + chunk.toString()); | |
nextChunk.set(chunk); | |
return Optional.of(chunk); | |
} finally { | |
stampedLock.unlock(lock); | |
} | |
} | |
@Override | |
public String toString() { | |
return "Chunk{" + | |
"summedPrevSums=" + Arrays.toString(summedPrevSums) + | |
", byChunkPrevSums=" + Arrays.toString(byChunkPrevSums) + | |
", sumInThisChunk=" + sumInThisChunk + | |
", expirationTime=" + expirationTime + | |
'}'; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment