Skip to content

Instantly share code, notes, and snippets.

@vitaly-t
Last active March 2, 2024 21:59
Show Gist options
  • Save vitaly-t/e4475271ad08ee6879da756b31c068a0 to your computer and use it in GitHub Desktop.
Save vitaly-t/e4475271ad08ee6879da756b31c068a0 to your computer and use it in GitHub Desktop.
Value spike detection
import {Observable, switchMap} from 'rxjs';
/**
* Spike detection result.
*/
export interface ISpike {
/**
* Detected spike change, in percent.
*/
change: number;
/**
* Current value, compared to the `origin` value.
*/
current: number;
/**
* The value, deviation from which triggerred the spike.
*/
origin: number;
/**
* Interval (in ms) between `origin` and `current` values.
* It is the time it took to produce the spike inside the interval.
*/
interval: number;
/**
* Current size of the spike buffer. This provides an indication as to the frequency
* of spikes, or how many values it took to trigger the last spike.
* It also helps to detect potential memory overuse, if interval is too big,
* or the sequence is too fast, the internal buffer size may grow too big.
*/
size: number;
}
/**
* SpikeDetector constructor options.
*/
export type SpikeParams = {
/**
* Value change, in percent, to verify for spikes.
*/
change: number;
/**
* Time interval (in ms) during which to check for spikes.
*/
interval: number;
/**
* Spikes verification strategy:
* - false (default) - check current value against the first value within interval.
* - true - check current value against all previous values within interval.
*
* Its use should depend on the business/strategy logic of your data.
*/
checkAll?: boolean;
};
/**
* Detects percent-based spikes in value changes.
* See: https://stackoverflow.com/questions/77989971/value-deviation-pressure-with-rxjs
*/
export class SpikeDetector {
private buffer: Array<{ ts: number, value: number }> = [];
constructor(private sp: SpikeParams) {
}
/**
* Takes a new value, and if a spike detected - returns it;
* otherwise it returns `undefined`.
*/
next(value: number): ISpike | void {
const ts = Date.now();
const b = this.buffer;
while (b.length && ts - b[0].ts > this.sp.interval) {
b.shift(); // remove expired values
}
b.push({ts, value});
if (b.length >= 2) {
if (this.sp.checkAll) {
return this.checkAll(value, ts);
}
return this.checkFirst(value, ts);
}
}
/**
* Removes all elements from the buffer,
* in case the client logic requires it.
*/
reset(): void {
this.buffer.length = 0;
}
/**
* Verifies current value for spikes against the first element.
*
* It expects at least 2 elements present in the buffer.
*/
private checkFirst(current: number, ts: number): ISpike | void {
const b = this.buffer;
const origin = b[0].value;
const change = 100 * current / origin - 100;
if (Math.abs(change) >= this.sp.change) {
const size = b.length;
const interval = ts - b[0].ts;
this.buffer = b.slice(-1); // leaving just the last element
return {change, origin, current, interval, size};
}
}
/**
* Verifies current value for spikes against all previous elements.
*
* It expects at least 2 elements present in the buffer.
*/
private checkAll(current: number, ts: number): ISpike | void {
const b = this.buffer;
let i = 0;
do {
const origin = b[i].value;
const change = 100 * current / origin - 100;
if (Math.abs(change) >= this.sp.change) {
const size = b.length;
const interval = ts - b[i].ts;
this.buffer = b.slice(-1); // leaving just the last element
return {change, origin, current, interval, size};
}
} while (++i < b.length - 1);
}
}
/**
* RXJS helper for use of SpikeDetector class.
*/
export function changeSpike(input: Observable<number>, sp: SpikeParams): Observable<ISpike> {
const sd = new SpikeDetector(sp);
return input.pipe(switchMap(value =>
new Observable<ISpike>(obs => {
const spike = sd.next(value);
if (spike) {
obs.next(spike);
}
})
));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment