Created
September 23, 2016 11:24
-
-
Save danwalmsley/10abc4cc246d3edb35119d6ceb9082fb to your computer and use it in GitHub Desktop.
Rx Rolling Buffer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class AvaloniaObservableExtensions | |
{ | |
public static IObservable<IList<T>> RollingBuffer<T>(this IObservable<Timestamped<T>> @this, TimeSpan buffering) | |
{ | |
return Observable.Create<IList<T>>(o => | |
{ | |
var list = new LinkedList<Timestamped<T>>(); | |
return @this.Subscribe(tx => | |
{ | |
list.AddLast(tx); | |
while (list.First.Value.Timestamp < DateTime.Now.Subtract(buffering)) | |
{ | |
list.RemoveFirst(); | |
} | |
o.OnNext(list.Select(tx2 => tx2.Value).ToList()); | |
}, ex => o.OnError(ex), () => o.OnCompleted()); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment