Skip to content

Instantly share code, notes, and snippets.

@Fuud
Last active September 8, 2016 12:14
Show Gist options
  • Save Fuud/f61aa79de93312e33ce7fef8b4275fec to your computer and use it in GitHub Desktop.
Save Fuud/f61aa79de93312e33ce7fef8b4275fec to your computer and use it in GitHub Desktop.
SimpleWindowCounter
package com.github.metricscore.hdr.counter;
import com.codahale.metrics.Clock;
import java.time.Duration;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.StampedLock;
public class SimpleWindowCounter implements WindowCounter {
private final Duration windowSize;
private final int chunkCount;
private final boolean smoothly;
private final Clock clock;
private final long chunkDurationMs;
private AtomicReference<Chunk> currentChunk;
public SimpleWindowCounter(Duration windowSize, int chunkCount, boolean smoothly, Clock clock) {
this.windowSize = windowSize;
this.chunkCount = chunkCount;
this.smoothly = smoothly;
this.chunkDurationMs = windowSize.toMillis() / chunkCount;
this.clock = clock;
currentChunk = new AtomicReference<>(new Chunk(new long[chunkCount - 1], new long[chunkCount], clock.getTime() + chunkDurationMs));
}
@Override
public void add(long delta) {
final long currentTime = clock.getTime();
getCurrentChunk(currentTime).add(delta);
}
@Override
public long getSum() {
final long currentTime = clock.getTime();
if (smoothly) {
return getCurrentChunk(currentTime).getSmoothlySum(currentTime, chunkDurationMs);
} else {
return getCurrentChunk(currentTime).getSum();
}
}
private Chunk getCurrentChunk(long currentTime) {
while (true) {
final Chunk currentChunk = this.currentChunk.get();
if (!currentChunk.shouldExpire(currentTime)) {
return currentChunk;
} else {
Optional<Chunk> newChunk = currentChunk.tryExpireAndCreateNext(chunkDurationMs);
newChunk.ifPresent(chunk -> {
this.currentChunk.compareAndSet(currentChunk, chunk);
});
}
}
}
private static class Chunk {
/**
* summedPrevSums[0] - contains sum of chunks sums for last (chunkCount - 1) chunks
* summedPrevSums[1] - contains sum of chunks sums for last (chunkCount - 2) chunks
* summedPrevSums[2] - contains sum of chunks sums for last (chunkCount - 3) chunks
* etc
*/
private final long[] summedPrevSums;
/**
* byChunkPrevSums[0] - contains sum that was in last chunk in window
* byChunkPrevSums[1] - contains sum that was in the next to the last chunk in window
* byChunkPrevSums[2] - contains sum of chunks in the next to the next to the last chunk in window
* etc
*/
private final long[] byChunkPrevSums;
private final LongAdder sumInThisChunk = new LongAdder();
private final long expirationTime;
//non-null only if expired
private final AtomicReference<Chunk> nextChunk = new AtomicReference<>();
private final AtomicBoolean expired = new AtomicBoolean();
private final StampedLock stampedLock = new StampedLock();
private Chunk(long[] summedPrevSums, long[] byChunkPrevSums, long expirationTime) {
this.summedPrevSums = summedPrevSums;
this.byChunkPrevSums = byChunkPrevSums;
this.expirationTime = expirationTime;
}
private long getSum() {
return summedPrevSums[0] + sumInThisChunk.longValue();
}
private long getSmoothlySum(long currentTimeMs, long chunkDurationMs) {
return summedPrevSums[0] + sumInThisChunk.longValue() + (long) (byChunkPrevSums[0] * ((expirationTime - currentTimeMs) * 1.0 / chunkDurationMs));
}
private void add(long delta) {
final long lock = stampedLock.readLock();
try {
if (expired.get()) {
while (nextChunk.get() == null) {
}
nextChunk.get().add(delta);
} else {
sumInThisChunk.add(delta);
}
} finally {
stampedLock.unlock(lock);
}
}
private boolean shouldExpire(long currentTime) {
return expirationTime <= currentTime;
}
private Optional<Chunk> tryExpireAndCreateNext(long chunkDurationMs) {
final long lock = stampedLock.readLock();
try {
if (expired.get()) {
//someone else will done job for us
return Optional.empty();
}
expired.set(true);
final long sumInCurrentChunk = this.sumInThisChunk.longValue();
long[] newSummedPrevSums = new long[summedPrevSums.length];
for (int i = 0; i < newSummedPrevSums.length - 1; i++) {
newSummedPrevSums[i] = summedPrevSums[i + 1] + sumInCurrentChunk;
}
newSummedPrevSums[newSummedPrevSums.length - 1] = sumInCurrentChunk;
long[] newByChunkPrevSums = new long[byChunkPrevSums.length];
for (int i = 0; i < newByChunkPrevSums.length - 1; i++) {
newByChunkPrevSums[i] = byChunkPrevSums[i + 1];
}
newByChunkPrevSums[newByChunkPrevSums.length - 1] = sumInCurrentChunk;
final Chunk chunk = new Chunk(newSummedPrevSums, newByChunkPrevSums, this.expirationTime + chunkDurationMs);
// System.out.println("newChunk: " + chunk.toString());
nextChunk.set(chunk);
return Optional.of(chunk);
} finally {
stampedLock.unlock(lock);
}
}
@Override
public String toString() {
return "Chunk{" +
"summedPrevSums=" + Arrays.toString(summedPrevSums) +
", byChunkPrevSums=" + Arrays.toString(byChunkPrevSums) +
", sumInThisChunk=" + sumInThisChunk +
", expirationTime=" + expirationTime +
'}';
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment