Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yaauie/9bb59575e91403ceb51517219d445356 to your computer and use it in GitHub Desktop.
Save yaauie/9bb59575e91403ceb51517219d445356 to your computer and use it in GitHub Desktop.
ExecutionTimer for getting the cumulative execution time including in-progress execution.
package org.logstash.instrument.metrics.timer;
import org.logstash.instrument.metrics.AbstractMetric;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
/**
* An {@link ExecutionTimer} is a concurrency-safe non-blocking timer whose value
* tracks the _cumulative_ execution time in milliseconds, including in-progress
* execution by multiple concurrent threads when they use either:
* - {@link ExecutionTimer#time}, or
* - both {@link ExecutionTimer#enter} and {@link ExecutionTimer#exit}.
*
* For legacy support, untracked execution time can be added with {@link ExecutionTimer#report}
*/
public class ExecutionTimer extends AbstractMetric<Long> implements TimerMetric<Long> {
private final AtomicReference<State> state = new AtomicReference<>(new State());
// test-only dependency injection
private final LongSupplier nanosSupplier;
public ExecutionTimer(final String name) {
this(name, System::nanoTime);
}
ExecutionTimer(final String name,
final LongSupplier nanosSupplier) {
super(name);
this.nanosSupplier = nanosSupplier;
}
@Override
public <R> R time(final Supplier<R> timeable) {
enter();
try {
return timeable.get();
} finally {
exit();
}
}
@Override
public void enter() {
state.getAndUpdate(State::incrementConcurrency);
}
@Override
public void exit() {
state.getAndUpdate(State::decrementConcurrency);
}
@Override
public void report(final long millis) {
state.getAndUpdate((s) -> s.addUntracked(millis));
}
@Override
public MetricType getType() {
return MetricType.COUNTER_LONG;
}
@Override
public Long getValue() {
return state.get().getValue();
}
class State {
private final long checkpointNanos;
private final long cumulativeMillis;
private final int concurrency;
public State() {
this(nanosSupplier.getAsLong(), 0L, 0);
}
private State(long checkpointNanos, long cumulativeMillis, int concurrency) {
this.checkpointNanos = checkpointNanos;
this.cumulativeMillis = cumulativeMillis;
this.concurrency = concurrency;
}
State incrementConcurrency() {
return adjustConcurrency(1);
}
State decrementConcurrency() {
return adjustConcurrency(-1);
}
State addUntracked(final long additionalMillis) {
return new State(this.checkpointNanos, Math.addExact(cumulativeMillis, additionalMillis), concurrency);
}
private State adjustConcurrency(final int vector) {
final long newCheckpoint = nanosSupplier.getAsLong();
return new State(newCheckpoint, getValue(newCheckpoint), this.concurrency + vector);
}
long getValue() {
return this.getValue(nanosSupplier.getAsLong());
}
private long getValue(final long nanos) {
final long deltaNanos = cumulativeMillis + (concurrency * (nanos - checkpointNanos));
return TimeUnit.MILLISECONDS.convert(deltaNanos, TimeUnit.NANOSECONDS);
}
}
}
package org.logstash.instrument.metrics.timer;
import org.logstash.instrument.metrics.Metric;
import java.util.function.Supplier;
public interface TimerMetric<Long> extends Metric<Long> {
// preferred convenience method for tracking execution
<R> R time(Supplier<R> timeable);
// secondary enter/exit methods
// caller is responsible for error handling
// and MUST send an `exit` for each `enter`.
void enter();
void exit();
// legacy: report additional time whose beginning was
// not marked.
void report(final long millis);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment