Skip to content

Instantly share code, notes, and snippets.

@OmerMor
Created July 1, 2011 13:59
Show Gist options
  • Save OmerMor/1058594 to your computer and use it in GitHub Desktop.
Save OmerMor/1058594 to your computer and use it in GitHub Desktop.
SampledSelect Rx Operator
using System;
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using Microsoft.Reactive.Testing;
using NUnit.Framework;
using System.Linq;
namespace TestRx
{
public class Tester : ReactiveTest
{
[Test]
public void TestSampledSelect()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateHotObservable(
OnNext(110, "A"),
OnNext(120, "B"),
OnNext(130, "C"),
OnNext(140, "D"),
// ... delay ...
OnNext(180, "E"),
OnNext(190, "F"),
OnNext(200, "G"), OnNext(201, "H"), OnNext(202, "I"), // burst
// ... delay ...
OnNext(250, "J"),
// ... delay ...
OnNext(300, "K"), OnNext(301, "L"), OnNext(302, "M"), // burst
// complete
OnCompleted<string>(302),
OnNext(303, "N")
);
var computationTime = TimeSpan.FromTicks(15);
Func<IObservable<string>, IObservable<string>> computation =
src => src
.Select(str => str + str)
.Delay(computationTime, scheduler);
var sampledSource = source.SampledSelect(computation);
var results = scheduler.Start(() => sampledSource, created: 0, subscribed: 0, disposed: 10000);
var expectedResults = new[]
{
OnNext(125, "AA"),
OnNext(140, "BB"),
// skipping "CC"
OnNext(155, "DD"),
OnNext(195, "EE"),
OnNext(210, "FF"),
// skipping "GG"
// skipping "HH"
OnNext(225, "II"),
OnNext(265, "JJ"),
OnNext(315, "KK"),
// skipping "LL"
OnNext(330, "MM"),
OnCompleted<string>(330)
};
ReactiveAssert.AreElementsEqual(expectedResults, results.Messages);
}
private IObservable<string> generateSource()
{
var timeMap = new Dictionary<int, int>
{
{0, 10},
{1, 10},
{2, 10},
{3, 10},
{4, 40},
{5, 10},
{6, 10},
{7, 1},
{8, 1},
{9, 48},
{10, 50},
{11, 1},
{12, 1},
};
var source = Observable.Generate(0, i => i <= 12, i => ++i,
i => ((char)('A' + i)).ToString(),
i => TimeSpan.FromMilliseconds(timeMap[i]*10));
return source;
}
[Test]
public void TestSampledSelect_with_real_scheduler()
{
var source = generateSource();
var computationTime = TimeSpan.FromMilliseconds(150);
Func<string, string> computation = str =>
{
Thread.Sleep(computationTime);
Console.WriteLine(str+str);
return str + str;
};
Func<IObservable<string>, IObservable<string>> observableComputation =
src => src.Select(computation);
var sampledSource = source.SampledSelect(computation)/*.Take(9)*/;
var results = sampledSource.ToEnumerable().ToArray();
var expectedResults = new[]
{
"AA", "BB", "DD", "EE", "FF", "II", "JJ", "KK", "MM",
};
ReactiveAssert.AreElementsEqual(expectedResults, results);
}
}
public static class RxOperators
{
public static IObservable<TResult> SampledSelect<T, TResult>(
this IObservable<T> source,
Func<IObservable<T>, IObservable<TResult>> selector)
{
var hotResult = source.Publish(src =>
{
var latest = new BehaviorSubject<Tuple<T>>(null);
var subscription = src
.Select(value => Tuple.Create(value))
.Subscribe(latest);
var sampledSource = selector(
latest
.Where(tuple => tuple != null)
.Select(tuple => tuple.Item1)
.Take(1)
.Do(_ => latest.OnNext(null)))
.Repeat();
var result = sampledSource.Finally(subscription.Dispose);
return result;
});
return hotResult;
}
public static IObservable<TResult> SampledSelect<T, TResult>(
this IObservable<T> source,
Func<T, TResult> selector,
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.TaskPool;
var hotResult = source.Publish(src =>
{
var latest = new BehaviorSubject<Tuple<T>>(null);
var subscription = src
.Select(value => Tuple.Create(value))
.Subscribe(latest);
var sampledSource = latest
.Where(tuple => tuple != null)
.Select(tuple => tuple.Item1)
.Take(1)
.ObserveOn(scheduler)
.Do(_ => latest.OnNext(null))
.Select(selector)
.Repeat()
;
var result = sampledSource.Finally(subscription.Dispose);
return result
//.Materialize().Do(Console.WriteLine).Dematerialize()
;
});
return hotResult;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment