Skip to content

Instantly share code, notes, and snippets.

@nblumhardt
Created January 5, 2014 11:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nblumhardt/8267240 to your computer and use it in GitHub Desktop.
Save nblumhardt/8267240 to your computer and use it in GitHub Desktop.
Seq sliding window app example, from http://nblumhardt.com/2014/01/seq-apps/
using Seq.Apps;
using Seq.Apps.LogEvents;
using System;
using System.Linq;
namespace Seq.Sample.SlidingWindows
{
/// <summary>
/// Counts events in a sliding time window, writing a message back to the
/// stream when a set threshold is reached.
/// </summary>
public class SlidingWindowReactor : Reactor, ISubscribeTo<LogEventData>
{
// Each bucket counts events in a one-second interval
int[] _buckets;
// A running sum of all buckets
int _sum;
// The index of the current ('latest') bucket
int _currentBucket;
// The date/time, rounded down to the second, corresponding
// with the latest bucket.
DateTime _currentBucketSecond;
// The number of events in the time window that will trigger the
// output message
int _eventsInWindowThreshold;
// The time to suppress messages after the threshold has been crossed
TimeSpan _suppressionTime;
// Tracks when suppression is due until
DateTime? _suppressUntilUtc;
// Output events will carry a ThresholdName property with this value
string _thresholdName;
// Called by Seq whenever an event is send to the app
public void On(Event<LogEventData> evt)
{
EnsureInitialized(evt);
int eventBucket;
if (!TrySlideWindow(evt, out eventBucket))
return;
_buckets[eventBucket]++;
_sum++;
if (IsSuppressed())
return;
if (_sum < _eventsInWindowThreshold)
return;
_suppressUntilUtc = DateTime.UtcNow + _suppressionTime;
Log.Information("Threshold {ThresholdName} reached: {EventCount} events observed within {WindowSize} sec. (message suppressed for {SuppressionSeconds} sec.)",
_thresholdName, _sum, _buckets.Length, (int)_suppressionTime.TotalSeconds);
}
// Sets up the sliding buffer when the first event is sent
void EnsureInitialized(Event<LogEventData> evt)
{
if (_buckets == null)
{
var window = App.GetSetting<int>("WindowSeconds");
_buckets = new int[window];
_eventsInWindowThreshold = App.GetSetting<int>("EventsInWindowThreshold");
_currentBucketSecond = SecondsFloor(evt.TimestampUtc);
_currentBucket = 0;
_suppressionTime = TimeSpan.FromSeconds(App.GetSetting<int>("SuppressionSeconds", 0));
_thresholdName = App.GetSetting<string>("ThresholdName");
}
}
// Adjusts the window to fit the event, providing the index into
// the buckets array that the event belongs to; if the event is
// too late to fit the window, returns false
bool TrySlideWindow(Event<LogEventData> evt, out int eventBucket)
{
var eventSeconds = SecondsFloor(evt.TimestampUtc);
var distance = (int)(eventSeconds - _currentBucketSecond).TotalSeconds;
if (distance < 0)
{
if (distance <= -_buckets.Length)
{
eventBucket = 0;
return false;
}
eventBucket = (_currentBucket + distance) % _buckets.Length;
return true;
}
if (distance > 0)
{
var newCurrent = (_currentBucket + distance) % _buckets.Length;
var firstReused = (_currentBucket + 1) % _buckets.Length;
if (distance >= _buckets.Length)
{
_sum = 0;
for (var i = 0; i < _buckets.Length; i++)
{
_buckets[i] = 0;
}
}
else if (newCurrent >= firstReused)
{
for (var i = firstReused; i <= newCurrent; i++)
{
_sum -= _buckets[i];
_buckets[i] = 0;
}
}
else
{
for (var i = firstReused; i < _buckets.Length; i++)
{
_sum -= _buckets[i];
_buckets[i] = 0;
}
for (var i = 0; i <= newCurrent; i++)
{
_sum -= _buckets[i];
_buckets[i] = 0;
}
}
_currentBucket = newCurrent;
_currentBucketSecond = eventSeconds;
}
eventBucket = _currentBucket;
return true;
}
// Check the current suppression time
bool IsSuppressed()
{
return _suppressUntilUtc.HasValue && _suppressUntilUtc.Value > DateTime.UtcNow;
}
// Returns the provided date time rounded down to the nearest second
DateTime SecondsFloor(DateTime dateTime)
{
return dateTime - TimeSpan.FromTicks(dateTime.Ticks % TimeSpan.TicksPerSecond);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment