Skip to content

Instantly share code, notes, and snippets.

@danwalmsley
Created September 23, 2016 11:24
Show Gist options
  • Save danwalmsley/10abc4cc246d3edb35119d6ceb9082fb to your computer and use it in GitHub Desktop.
Save danwalmsley/10abc4cc246d3edb35119d6ceb9082fb to your computer and use it in GitHub Desktop.
Rx Rolling Buffer
public static class AvaloniaObservableExtensions
{
public static IObservable<IList<T>> RollingBuffer<T>(this IObservable<Timestamped<T>> @this, TimeSpan buffering)
{
return Observable.Create<IList<T>>(o =>
{
var list = new LinkedList<Timestamped<T>>();
return @this.Subscribe(tx =>
{
list.AddLast(tx);
while (list.First.Value.Timestamp < DateTime.Now.Subtract(buffering))
{
list.RemoveFirst();
}
o.OnNext(list.Select(tx2 => tx2.Value).ToList());
}, ex => o.OnError(ex), () => o.OnCompleted());
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment