Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
ThrottledBuffer for Rx
public static partial class ObsEx
{
public static IObservable<IList<T>> ThrottledBuffer<T>(this IObservable<T> source, TimeSpan period, IScheduler scheduler)
{
return Observable.Create<IList<T>>(obs =>
{
var yieldTimer = new SerialDisposable();
var buffer = new List<T>();
Action yeildBuffer = () => {
//Not on same thread, so need to be careful here.
var copy = Interlocked.Exchange(ref buffer, new List<T>());
if(copy.Count>0)
obs.OnNext(copy);
};
var subscription = source.Subscribe(
i =>
{
buffer.Add(i);
yieldTimer.Disposable = scheduler.Schedule(period, yeildBuffer);
},
ex =>
{
yieldTimer.Dispose();
yeildBuffer();
obs.OnError(ex);
},
() =>
{
yieldTimer.Dispose();
yeildBuffer();
obs.OnCompleted();
});
return System.Reactive.Disposables.StableCompositeDisposable.Create(yieldTimer, subscription);
});
}
}
void Main()
{
var tests = new ThrottledBufferTests();
tests.EmptySequenceProjectsToAnEmptySequence();
tests.ValuesWithInPeriodAreBuffered();
tests.ValuesOutsidePeriodAreInSeperateBuffers();
//DoesNotYeildEmptyBuffers(); Proved in EmptySequenceProjectsToAnEmptySequence & ValuesOutsidePeriodAreInSeperateBuffers
Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(10)
.Concat(Observable.Interval(TimeSpan.FromSeconds(1)).Take(2))
.ThrottledBuffer(TimeSpan.FromMilliseconds(250), Scheduler.Default)
.Dump();
}
public class ThrottledBufferTests
{
public void EmptySequenceProjectsToAnEmptySequence()
{
var testScheduler = new TestScheduler();
var observer = testScheduler.CreateObserver<IList<int>>();
Observable.Empty<int>(testScheduler)
.ThrottledBuffer(TimeSpan.FromSeconds(1), testScheduler)
.Subscribe(observer);
testScheduler.Start();
ReactiveAssert.AssertEqual(observer.Messages, ReactiveTest.OnCompleted<IList<int>>(1));
}
public void ValuesWithInPeriodAreBuffered()
{
var testScheduler = new TestScheduler();
var observer = testScheduler.CreateObserver<IList<int>>();
var source = testScheduler.CreateColdObservable<int>(
ReactiveTest.OnNext(250.Ms(), 1),
ReactiveTest.OnNext(300.Ms(), 2),
ReactiveTest.OnNext(350.Ms(), 3)
);
source
.ThrottledBuffer(TimeSpan.FromMilliseconds(100), testScheduler)
.Subscribe(observer);
testScheduler.Start();
Assert.Equal(observer.Messages.Count, 1);
Assert.Equal(observer.Messages[0].Time, 450.Ms());
Assert.Equal(observer.Messages[0].Value.Kind, NotificationKind.OnNext);
ReactiveAssert.AreElementsEqual(observer.Messages[0].Value.Value, new List<int>() { 1, 2, 3 });
}
public void ValuesOutsidePeriodAreInSeperateBuffers()
{
var testScheduler = new TestScheduler();
var observer = testScheduler.CreateObserver<IList<int>>();
var source = testScheduler.CreateColdObservable<int>(
ReactiveTest.OnNext(250.Ms(), 1),
ReactiveTest.OnNext(300.Ms(), 2),
ReactiveTest.OnNext(350.Ms(), 3),
ReactiveTest.OnNext(1250.Ms(), 4),
ReactiveTest.OnNext(1300.Ms(), 5),
ReactiveTest.OnNext(1350.Ms(), 6)
);
source
.ThrottledBuffer(TimeSpan.FromMilliseconds(100), testScheduler)
.Subscribe(observer);
testScheduler.Start();
Assert.Equal(observer.Messages.Count, 2);
Assert.Equal(observer.Messages[0].Time, 450.Ms());
Assert.Equal(observer.Messages[0].Value.Kind, NotificationKind.OnNext);
ReactiveAssert.AreElementsEqual(observer.Messages[0].Value.Value, new List<int>() { 1, 2, 3 });
Assert.Equal(observer.Messages[1].Time, 1450.Ms());
Assert.Equal(observer.Messages[1].Value.Kind, NotificationKind.OnNext);
ReactiveAssert.AreElementsEqual(observer.Messages[1].Value.Value, new List<int>() { 4, 5, 6 });
}
}
public static class TimeEx
{
public static long Ms(this int milliseconds)
{
return TimeSpan.FromMilliseconds(milliseconds).Ticks;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.