Skip to content

Instantly share code, notes, and snippets.

@sfackler
Created May 25, 2017 15:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sfackler/2ea5c5b2ee0cc1447b9ae415292adaba to your computer and use it in GitHub Desktop.
Save sfackler/2ea5c5b2ee0cc1447b9ae415292adaba to your computer and use it in GitHub Desktop.
// This implementation and the test suite comes from Java metric-core's
// ExponentiallyDecayingReservoir.
use ordered_float::NotNaN;
use std::collections::BTreeMap;
use std::time::{Instant, Duration};
use rand::{self, Rng, Open01};
const DEFAULT_SIZE: usize = 1028;
const DEFAULT_ALPHA: f64 = 0.015;
const RESCALE_THRESHOLD_SECS: u64 = 60 * 60;
struct WeightedSample {
value: i64,
weight: f64,
}
pub struct Histogram {
values: BTreeMap<NotNaN<f64>, WeightedSample>,
alpha: f64,
size: usize,
count: u64,
start_time: Instant,
next_scale_time: Instant,
}
impl Histogram {
pub fn new() -> Histogram {
Histogram::from_size_and_alpha(DEFAULT_SIZE, DEFAULT_ALPHA)
}
pub fn from_size_and_alpha(size: usize, alpha: f64) -> Histogram {
let now = Instant::now();
Histogram {
values: BTreeMap::new(),
alpha: alpha,
size: size,
count: 0,
start_time: now,
next_scale_time: now + Duration::from_secs(RESCALE_THRESHOLD_SECS),
}
}
pub fn update(&mut self, value: i64) {
self.update_inner(value, Instant::now());
}
fn update_inner(&mut self, value: i64, time: Instant) {
self.rescale_if_needed(time);
let item_weight = self.weight(time - self.start_time);
let sample = WeightedSample {
value,
weight: item_weight,
};
// Open01 since we don't want to divide by 0
let priority = item_weight / rand::thread_rng().gen::<Open01<f64>>().0;
let priority = NotNaN::from(priority);
self.count += 1;
if self.values.len() < self.size {
self.values.insert(priority, sample);
} else {
let first = *self.values.keys().next().unwrap();
if first < priority && self.values.insert(priority, sample).is_none() {
self.values.remove(&first).unwrap();
}
}
}
fn rescale_if_needed(&mut self, now: Instant) {
if now >= self.next_scale_time {
self.rescale(now);
}
}
pub fn count(&self) -> u64 {
self.count
}
pub fn snapshot(&self) -> WeightedSnapshot {
let mut entries = self.values
.values()
.map(|s| {
SnapshotEntry {
value: s.value,
norm_weight: s.weight,
quantile: 0.,
}
})
.collect::<Vec<_>>();
entries.sort_by_key(|e| e.value);
let sum_weight = entries.iter().map(|e| e.norm_weight).sum::<f64>();
for entry in &mut entries {
entry.norm_weight /= sum_weight;
}
entries
.iter_mut()
.fold(0., |acc, e| {
e.quantile = acc;
acc + e.norm_weight
});
WeightedSnapshot(entries)
}
fn weight(&self, time: Duration) -> f64 {
(self.alpha * time.as_secs() as f64).exp()
}
fn rescale(&mut self, now: Instant) {
self.next_scale_time = now + Duration::from_secs(RESCALE_THRESHOLD_SECS);
let old_start_time = self.start_time;
self.start_time = now;
let scaling_factor = (-self.alpha * (now - old_start_time).as_secs() as f64).exp();
self.values = self.values
.iter()
.map(|(&k, v)| {
(k * scaling_factor,
WeightedSample {
value: v.value,
weight: v.weight * scaling_factor,
})
})
.collect();
}
}
struct SnapshotEntry {
value: i64,
norm_weight: f64,
quantile: f64,
}
pub struct WeightedSnapshot(Vec<SnapshotEntry>);
impl WeightedSnapshot {
pub fn value(&self, quantile: f64) -> i64 {
assert!(quantile >= 0. && quantile <= 1.);
if self.0.is_empty() {
return 0;
}
let quantile = NotNaN::from(quantile);
let idx = match self.0
.binary_search_by(|e| NotNaN::from(e.quantile).cmp(&quantile)) {
Ok(idx) => idx,
Err(idx) if idx >= self.0.len() => self.0.len() - 1,
Err(idx) => idx,
};
self.0[idx].value
}
pub fn max(&self) -> i64 {
self.0.last().map_or(0, |e| e.value)
}
pub fn min(&self) -> i64 {
self.0.first().map_or(0, |e| e.value)
}
pub fn mean(&self) -> f64 {
self.0
.iter()
.map(|e| e.value as f64 * e.norm_weight)
.sum::<f64>()
}
pub fn stddev(&self) -> f64 {
if self.0.len() <= 1 {
return 0.;
}
let mean = self.mean();
let variance = self.0
.iter()
.map(|e| {
let diff = e.value as f64 - mean;
e.norm_weight * diff * diff
})
.sum::<f64>();
variance.sqrt()
}
}
#[cfg(test)]
mod test {
use std::ops::Range;
use super::*;
#[test]
fn a_histogram_of_100_out_of_1000_elements() {
let mut histogram = Histogram::from_size_and_alpha(100, 0.99);
for i in 0..1000 {
histogram.update(i);
}
assert_eq!(histogram.values.len(), 100);
let snapshot = histogram.snapshot();
assert_eq!(snapshot.0.len(), 100);
assert_all_values_between(snapshot, 0..1000);
}
#[test]
fn a_histogram_of_100_out_of_10_elements() {
let mut histogram = Histogram::from_size_and_alpha(100, 0.99);
for i in 0..10 {
histogram.update(i);
}
let snapshot = histogram.snapshot();
assert_eq!(snapshot.0.len(), 10);
assert_all_values_between(snapshot, 0..10);
}
#[test]
fn a_heavily_biased_histogram_of_100_out_of_1000_elements() {
let mut histogram = Histogram::from_size_and_alpha(1000, 0.01);
for i in 0..100 {
histogram.update(i);
}
assert_eq!(histogram.values.len(), 100);
let snapshot = histogram.snapshot();
assert_eq!(snapshot.0.len(), 100);
assert_all_values_between(snapshot, 0..100);
}
#[test]
fn long_periods_of_inactivity_should_not_corrupt_sampling_state() {
let mut histogram = Histogram::from_size_and_alpha(10, 0.015);
let mut now = histogram.start_time;
// add 1000 values at a rate of 10 values/second
let delta = Duration::from_millis(100);
for i in 0..1000 {
now += delta;
histogram.update_inner(1000 + i, now);
}
let snapshot = histogram.snapshot();
assert_eq!(snapshot.0.len(), 10);
assert_all_values_between(snapshot, 1000..2000);
// wait for 15 hours and add another value.
// this should trigger a rescale. Note that the number of samples will
// be reduced because of the very small scaling factor that will make
// all existing priorities equal to zero after rescale.
now += Duration::from_secs(15 * 60 * 60);
histogram.update_inner(2000, now);
let snapshot = histogram.snapshot();
assert_eq!(snapshot.0.len(), 2);
assert_all_values_between(snapshot, 1000..3000);
// add 1000 values at a rate of 10 values/second
for i in 0..1000 {
now += delta;
histogram.update_inner(3000 + i, now);
}
let snapshot = histogram.snapshot();
assert_eq!(snapshot.0.len(), 10);
assert_all_values_between(snapshot, 3000..4000);
}
#[test]
fn spot_lift() {
let mut histogram = Histogram::from_size_and_alpha(1000, 0.015);
let mut now = histogram.start_time;
let values_per_minute = 10;
let values_interval = Duration::from_secs(60) / values_per_minute;
// mode 1: steady regime for 120 minutes
for _ in 0..120 * values_per_minute {
histogram.update_inner(177, now);
now += values_interval;
}
// switching to mode 2: 10 minutes with the same rate, but larger value
for _ in 0..10 * values_per_minute {
histogram.update_inner(9999, now);
now += values_interval;
}
// expect that the quantiles should be about mode 2 after 10 minutes
assert_eq!(histogram.snapshot().value(0.5), 9999);
}
#[test]
fn spot_fall() {
let mut histogram = Histogram::from_size_and_alpha(1000, 0.015);
let mut now = histogram.start_time;
let values_per_minute = 10;
let values_interval = Duration::from_secs(60) / values_per_minute;
// mode 1: steady regime for 120 minutes
for _ in 0..120 * values_per_minute {
histogram.update_inner(9998, now);
now += values_interval;
}
// switching to mode 2: 10 minutes with the same rate, but smaller value
for _ in 0..10 * values_per_minute {
histogram.update_inner(178, now);
now += values_interval;
}
// expect that the quantiles should be about mode 2 after 10 minutes
assert_eq!(histogram.snapshot().value(0.5), 178);
}
#[test]
fn quantiles_should_be_based_on_weights() {
let mut histogram = Histogram::from_size_and_alpha(1000, 0.015);
let mut now = histogram.start_time;
for _ in 0..40 {
histogram.update_inner(177, now);
}
now += Duration::from_secs(120);
for _ in 0..10 {
histogram.update_inner(9999, now);
}
let snapshot = histogram.snapshot();
assert_eq!(snapshot.0.len(), 50);
// the first added 40 items (177) have weights 1
// the next added 10 items (9999) have weights ~6
// so, it's 40 vs 60 distribution, not 40 vs 10
assert_eq!(snapshot.value(0.5), 9999);
assert_eq!(snapshot.value(0.75), 9999);
}
fn assert_all_values_between(snapshot: WeightedSnapshot, range: Range<i64>) {
for entry in &snapshot.0 {
assert!(entry.value >= range.start && entry.value < range.end,
"snapshot value {} was not in {:?}",
entry.value,
range);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment