Skip to content

Instantly share code, notes, and snippets.

@kirillsulim
Created December 27, 2018 07:26
Show Gist options
  • Save kirillsulim/5f70fb427750a141e083bfbc0b39f7f3 to your computer and use it in GitHub Desktop.
Save kirillsulim/5f70fb427750a141e083bfbc0b39f7f3 to your computer and use it in GitHub Desktop.
(draft) MT now exactly right lock-free sliding window
package some;
import java.time.Clock;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BinaryOperator;
/**
* SlidingWindowCounter
*
* @author Kirill Sulim
*/
public class SlidingWindowCounter<Value> {
private final Clock clock;
private final BinaryOperator<Value> adder;
private final BinaryOperator<Value> substractor;
private final long width;
private final AtomicReference<Value> accumulated;
private final ConcurrentLinkedQueue<CountData<Value>> queue = new ConcurrentLinkedQueue<>();
/**
* Creates sliding widow counter
*
* @param width window width in milliseconds
* @param defaultValue starting sum value, usually zero-value
* @param adder add function
* @param substractor substract function
* @param clock clock
*/
public SlidingWindowCounter(
final long width,
final Value defaultValue,
final BinaryOperator<Value> adder,
final BinaryOperator<Value> substractor,
Clock clock
) {
this.clock = clock;
this.adder = adder;
this.substractor = substractor;
this.width = width;
this.accumulated = new AtomicReference<>(defaultValue);
}
/**
* Creates sliding window counter with default clock
*
* @param width window width in milliseconds
* @param defaultValue starting sum value, usually zero-value
* @param adder add function
* @param substractor substract function
*/
public SlidingWindowCounter(
final long width,
final Value defaultValue,
final BinaryOperator<Value> adder,
final BinaryOperator<Value> substractor
) {
this(
width,
defaultValue,
adder,
substractor,
Clock.systemUTC()
);
}
/**
* Increment value
*
* @param value
*/
public void inc(final Value value) {
final CountData<Value> data = new CountData<>(clock.millis(), value);
accumulated.accumulateAndGet(value, adder);
queue.add(data);
}
/**
* Get current accumulated value
*
* @return accumulated value
*/
public Value get() {
final long outdated = clock.millis() - width;
CountData<Value> peek = queue.peek();
Value lastResult = accumulated.get();
while (Objects.nonNull(peek) && peek.timestamp < outdated) {
if (queue.remove(peek)) {
lastResult = accumulated.accumulateAndGet(peek.value, substractor);
peek = queue.peek();
}
}
return lastResult;
}
/**
* Utility class to store value and timestamp
*/
private static class CountData<Value> {
private final long timestamp;
private final Value value;
public CountData(long timestamp, Value value) {
this.timestamp = timestamp;
this.value = value;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment