Created
July 1, 2011 13:59
-
-
Save OmerMor/1058594 to your computer and use it in GitHub Desktop.
SampledSelect Rx Operator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Reactive.Concurrency; | |
using System.Reactive.Linq; | |
using System.Reactive.Subjects; | |
using System.Threading; | |
using Microsoft.Reactive.Testing; | |
using NUnit.Framework; | |
using System.Linq; | |
namespace TestRx | |
{ | |
public class Tester : ReactiveTest | |
{ | |
[Test] | |
public void TestSampledSelect() | |
{ | |
var scheduler = new TestScheduler(); | |
var source = scheduler.CreateHotObservable( | |
OnNext(110, "A"), | |
OnNext(120, "B"), | |
OnNext(130, "C"), | |
OnNext(140, "D"), | |
// ... delay ... | |
OnNext(180, "E"), | |
OnNext(190, "F"), | |
OnNext(200, "G"), OnNext(201, "H"), OnNext(202, "I"), // burst | |
// ... delay ... | |
OnNext(250, "J"), | |
// ... delay ... | |
OnNext(300, "K"), OnNext(301, "L"), OnNext(302, "M"), // burst | |
// complete | |
OnCompleted<string>(302), | |
OnNext(303, "N") | |
); | |
var computationTime = TimeSpan.FromTicks(15); | |
Func<IObservable<string>, IObservable<string>> computation = | |
src => src | |
.Select(str => str + str) | |
.Delay(computationTime, scheduler); | |
var sampledSource = source.SampledSelect(computation); | |
var results = scheduler.Start(() => sampledSource, created: 0, subscribed: 0, disposed: 10000); | |
var expectedResults = new[] | |
{ | |
OnNext(125, "AA"), | |
OnNext(140, "BB"), | |
// skipping "CC" | |
OnNext(155, "DD"), | |
OnNext(195, "EE"), | |
OnNext(210, "FF"), | |
// skipping "GG" | |
// skipping "HH" | |
OnNext(225, "II"), | |
OnNext(265, "JJ"), | |
OnNext(315, "KK"), | |
// skipping "LL" | |
OnNext(330, "MM"), | |
OnCompleted<string>(330) | |
}; | |
ReactiveAssert.AreElementsEqual(expectedResults, results.Messages); | |
} | |
private IObservable<string> generateSource() | |
{ | |
var timeMap = new Dictionary<int, int> | |
{ | |
{0, 10}, | |
{1, 10}, | |
{2, 10}, | |
{3, 10}, | |
{4, 40}, | |
{5, 10}, | |
{6, 10}, | |
{7, 1}, | |
{8, 1}, | |
{9, 48}, | |
{10, 50}, | |
{11, 1}, | |
{12, 1}, | |
}; | |
var source = Observable.Generate(0, i => i <= 12, i => ++i, | |
i => ((char)('A' + i)).ToString(), | |
i => TimeSpan.FromMilliseconds(timeMap[i]*10)); | |
return source; | |
} | |
[Test] | |
public void TestSampledSelect_with_real_scheduler() | |
{ | |
var source = generateSource(); | |
var computationTime = TimeSpan.FromMilliseconds(150); | |
Func<string, string> computation = str => | |
{ | |
Thread.Sleep(computationTime); | |
Console.WriteLine(str+str); | |
return str + str; | |
}; | |
Func<IObservable<string>, IObservable<string>> observableComputation = | |
src => src.Select(computation); | |
var sampledSource = source.SampledSelect(computation)/*.Take(9)*/; | |
var results = sampledSource.ToEnumerable().ToArray(); | |
var expectedResults = new[] | |
{ | |
"AA", "BB", "DD", "EE", "FF", "II", "JJ", "KK", "MM", | |
}; | |
ReactiveAssert.AreElementsEqual(expectedResults, results); | |
} | |
} | |
public static class RxOperators | |
{ | |
public static IObservable<TResult> SampledSelect<T, TResult>( | |
this IObservable<T> source, | |
Func<IObservable<T>, IObservable<TResult>> selector) | |
{ | |
var hotResult = source.Publish(src => | |
{ | |
var latest = new BehaviorSubject<Tuple<T>>(null); | |
var subscription = src | |
.Select(value => Tuple.Create(value)) | |
.Subscribe(latest); | |
var sampledSource = selector( | |
latest | |
.Where(tuple => tuple != null) | |
.Select(tuple => tuple.Item1) | |
.Take(1) | |
.Do(_ => latest.OnNext(null))) | |
.Repeat(); | |
var result = sampledSource.Finally(subscription.Dispose); | |
return result; | |
}); | |
return hotResult; | |
} | |
public static IObservable<TResult> SampledSelect<T, TResult>( | |
this IObservable<T> source, | |
Func<T, TResult> selector, | |
IScheduler scheduler = null) | |
{ | |
scheduler = scheduler ?? Scheduler.TaskPool; | |
var hotResult = source.Publish(src => | |
{ | |
var latest = new BehaviorSubject<Tuple<T>>(null); | |
var subscription = src | |
.Select(value => Tuple.Create(value)) | |
.Subscribe(latest); | |
var sampledSource = latest | |
.Where(tuple => tuple != null) | |
.Select(tuple => tuple.Item1) | |
.Take(1) | |
.ObserveOn(scheduler) | |
.Do(_ => latest.OnNext(null)) | |
.Select(selector) | |
.Repeat() | |
; | |
var result = sampledSource.Finally(subscription.Dispose); | |
return result | |
//.Materialize().Do(Console.WriteLine).Dematerialize() | |
; | |
}); | |
return hotResult; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment