Created
January 5, 2014 11:34
-
-
Save nblumhardt/8267240 to your computer and use it in GitHub Desktop.
Seq sliding window app example, from http://nblumhardt.com/2014/01/seq-apps/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Seq.Apps; | |
using Seq.Apps.LogEvents; | |
using System; | |
using System.Linq; | |
namespace Seq.Sample.SlidingWindows | |
{ | |
/// <summary> | |
/// Counts events in a sliding time window, writing a message back to the | |
/// stream when a set threshold is reached. | |
/// </summary> | |
public class SlidingWindowReactor : Reactor, ISubscribeTo<LogEventData> | |
{ | |
// Each bucket counts events in a one-second interval | |
int[] _buckets; | |
// A running sum of all buckets | |
int _sum; | |
// The index of the current ('latest') bucket | |
int _currentBucket; | |
// The date/time, rounded down to the second, corresponding | |
// with the latest bucket. | |
DateTime _currentBucketSecond; | |
// The number of events in the time window that will trigger the | |
// output message | |
int _eventsInWindowThreshold; | |
// The time to suppress messages after the threshold has been crossed | |
TimeSpan _suppressionTime; | |
// Tracks when suppression is due until | |
DateTime? _suppressUntilUtc; | |
// Output events will carry a ThresholdName property with this value | |
string _thresholdName; | |
// Called by Seq whenever an event is send to the app | |
public void On(Event<LogEventData> evt) | |
{ | |
EnsureInitialized(evt); | |
int eventBucket; | |
if (!TrySlideWindow(evt, out eventBucket)) | |
return; | |
_buckets[eventBucket]++; | |
_sum++; | |
if (IsSuppressed()) | |
return; | |
if (_sum < _eventsInWindowThreshold) | |
return; | |
_suppressUntilUtc = DateTime.UtcNow + _suppressionTime; | |
Log.Information("Threshold {ThresholdName} reached: {EventCount} events observed within {WindowSize} sec. (message suppressed for {SuppressionSeconds} sec.)", | |
_thresholdName, _sum, _buckets.Length, (int)_suppressionTime.TotalSeconds); | |
} | |
// Sets up the sliding buffer when the first event is sent | |
void EnsureInitialized(Event<LogEventData> evt) | |
{ | |
if (_buckets == null) | |
{ | |
var window = App.GetSetting<int>("WindowSeconds"); | |
_buckets = new int[window]; | |
_eventsInWindowThreshold = App.GetSetting<int>("EventsInWindowThreshold"); | |
_currentBucketSecond = SecondsFloor(evt.TimestampUtc); | |
_currentBucket = 0; | |
_suppressionTime = TimeSpan.FromSeconds(App.GetSetting<int>("SuppressionSeconds", 0)); | |
_thresholdName = App.GetSetting<string>("ThresholdName"); | |
} | |
} | |
// Adjusts the window to fit the event, providing the index into | |
// the buckets array that the event belongs to; if the event is | |
// too late to fit the window, returns false | |
bool TrySlideWindow(Event<LogEventData> evt, out int eventBucket) | |
{ | |
var eventSeconds = SecondsFloor(evt.TimestampUtc); | |
var distance = (int)(eventSeconds - _currentBucketSecond).TotalSeconds; | |
if (distance < 0) | |
{ | |
if (distance <= -_buckets.Length) | |
{ | |
eventBucket = 0; | |
return false; | |
} | |
eventBucket = (_currentBucket + distance) % _buckets.Length; | |
return true; | |
} | |
if (distance > 0) | |
{ | |
var newCurrent = (_currentBucket + distance) % _buckets.Length; | |
var firstReused = (_currentBucket + 1) % _buckets.Length; | |
if (distance >= _buckets.Length) | |
{ | |
_sum = 0; | |
for (var i = 0; i < _buckets.Length; i++) | |
{ | |
_buckets[i] = 0; | |
} | |
} | |
else if (newCurrent >= firstReused) | |
{ | |
for (var i = firstReused; i <= newCurrent; i++) | |
{ | |
_sum -= _buckets[i]; | |
_buckets[i] = 0; | |
} | |
} | |
else | |
{ | |
for (var i = firstReused; i < _buckets.Length; i++) | |
{ | |
_sum -= _buckets[i]; | |
_buckets[i] = 0; | |
} | |
for (var i = 0; i <= newCurrent; i++) | |
{ | |
_sum -= _buckets[i]; | |
_buckets[i] = 0; | |
} | |
} | |
_currentBucket = newCurrent; | |
_currentBucketSecond = eventSeconds; | |
} | |
eventBucket = _currentBucket; | |
return true; | |
} | |
// Check the current suppression time | |
bool IsSuppressed() | |
{ | |
return _suppressUntilUtc.HasValue && _suppressUntilUtc.Value > DateTime.UtcNow; | |
} | |
// Returns the provided date time rounded down to the nearest second | |
DateTime SecondsFloor(DateTime dateTime) | |
{ | |
return dateTime - TimeSpan.FromTicks(dateTime.Ticks % TimeSpan.TicksPerSecond); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment