Last active
August 29, 2015 14:22
-
-
Save bman654/43d6876d1b0db905651f to your computer and use it in GitHub Desktop.
Requires the "Reactive Extensions - Testing Library" nuget package
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Threading.Tasks; | |
using NUnit.Framework; | |
namespace ThrottleWithMax | |
{ | |
using System.Reactive.Concurrency; | |
using System.Reactive.Linq; | |
using System.Reactive; | |
using Microsoft.Reactive.Testing; | |
[TestFixture] | |
public class Test | |
{ | |
private const int _THROTTLE = 50; | |
private const int _TIMEOUT = 100; | |
private const int _COMPLETE = 100000; | |
[TestCase("groupby", new[] { 1, 10 }, new[] { 10 }, new[] { 10 + _THROTTLE }, TestName = "g1")] | |
[TestCase("groupby", new[] { 1, 10, 40, 60 }, new[] { 60 }, new[] { 1 + _TIMEOUT }, TestName = "g2")] | |
[TestCase("groupby", new[] { 1, 45, 1000, 1040, 1080, 1110, }, new[] { 45, 1080, 1110 }, new[] { 45 + _THROTTLE, 1000 + _TIMEOUT, 1110 + _THROTTLE }, TestName = "g3")] | |
[TestCase("window", new[] { 1, 10 }, new[] { 10 }, new[] { 10 + _THROTTLE }, TestName = "w1")] | |
[TestCase("window", new[] { 1, 10, 40, 60 }, new[] { 60 }, new[] { 1 + _TIMEOUT }, TestName = "w2")] | |
[TestCase("window", new[] { 1, 45, 1000, 1040, 1080, 1110, }, new[] { 45, 1080, 1110 }, new[] { 45 + _THROTTLE, 1000 + _TIMEOUT, 1110 + _THROTTLE }, TestName = "w3")] | |
public void Throttle(string which, int[] pattern, int[] expectedPattern, int[] expectedTimes) | |
{ | |
var scheduler = new TestScheduler(); | |
var completeEvent = new[] { ReactiveTest.OnCompleted(_COMPLETE, _COMPLETE) }; | |
var source = scheduler.CreateColdObservable(pattern.Select(v => ReactiveTest.OnNext(v, v)).Concat(completeEvent).ToArray()); | |
var throttled = source.ThrottleWithMax(which, TimeSpan.FromTicks(_THROTTLE), TimeSpan.FromTicks(_TIMEOUT), scheduler); | |
var observer = scheduler.CreateObserver<int>(); | |
throttled.Subscribe(observer); | |
// start the clock | |
scheduler.Start(); | |
// check the results | |
var expected = expectedPattern.Zip(expectedTimes, (v, t) => ReactiveTest.OnNext(t, v)).Concat(completeEvent).ToList(); | |
CollectionAssert.AreEqual(expected, observer.Messages); | |
} | |
} | |
public static class Extensions | |
{ | |
public static IObservable<T> ThrottleWithMax<T>(this IObservable<T> source, string which, TimeSpan throttle, TimeSpan maxTime, IScheduler scheduler = null) | |
{ | |
return (which == "groupby") ? source.ThrottleWithMax_GroupBy(throttle, maxTime, scheduler) : source.ThrottleWithMax_Window(throttle, maxTime, scheduler); | |
} | |
public static IObservable<T> ThrottleWithMax_GroupBy<T>(this IObservable<T> source, TimeSpan throttle, TimeSpan maxTime, IScheduler scheduler = null) | |
{ | |
return source | |
.GroupByUntil( | |
t => 0, // they all get the same key | |
t => t, // the element is the element | |
g => | |
{ | |
// expire the group when it slows down for throttle | |
// or when it exceeds maxTime | |
return g | |
.Throttle(throttle, scheduler ?? Scheduler.Default) | |
.Timeout(maxTime, Observable.Empty<T>(), scheduler ?? Scheduler.Default); | |
}) | |
.SelectMany(g => g.LastAsync()); | |
} | |
public static IObservable<T> ThrottleWithMax_Window<T>(this IObservable<T> source, TimeSpan throttle, TimeSpan maxTime, IScheduler scheduler = null) | |
{ | |
return source.Publish(p => p | |
.Window(() => | |
{ | |
// close the window when p slows down for throttle | |
// or when it exceeds maxTime. | |
// do not start throttling or the maxTime timer | |
// until the first p of the new window arrives | |
var throttleTimer = p.Throttle(throttle, scheduler ?? Scheduler.Default); | |
var timeoutTimer = p.Delay(maxTime, scheduler ?? Scheduler.Default); | |
// signal when either timer signals | |
return throttleTimer.Amb(timeoutTimer); | |
}) | |
.SelectMany(w => w.TakeLast(1))); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment