Skip to content

Instantly share code, notes, and snippets.

@bobpace
Last active January 3, 2016 05:19
Show Gist options
  • Save bobpace/8414593 to your computer and use it in GitHub Desktop.
Save bobpace/8414593 to your computer and use it in GitHub Desktop.
Testing Rx Observables
[TestFixture]
public class ExampleTests : InteractionContext<Example>
{
private List<string> _inputs;
private TestSchedulers _schedulerProvider;
[SetUp]
public void Setup()
{
_inputs = new List<string>
{
"one",
"two",
"three"
};
_schedulerProvider = new TestSchedulers();
Container.Inject<ISchedulerProvider>(_schedulerProvider);
}
[Test]
public void can_get_data()
{
var data = ClassUnderTest.GetData(_inputs);
data.Subscribe(
results =>
{
results.ShouldNotBeEmpty();
results.ShouldHaveTheSameElementsAs(
"one-processed", "two-processed", "three-processed"
);
},
error => Assert.Fail(error.ToString()));
AdvanceTo(TimeSpan.FromSeconds(10));
}
private void AdvanceTo(TimeSpan interval)
{
var schedulers = new[] {_schedulerProvider.Default, _schedulerProvider.Dispatcher};
schedulers.Each(x => x.AdvanceTo(interval.Ticks));
}
}
public class Example
{
private readonly ISchedulerProvider _schedulerProvider;
public Example(ISchedulerProvider schedulerProvider)
{
_schedulerProvider = schedulerProvider;
Results = Enumerable.Empty<string>();
}
public IEnumerable<string> Results { get; private set; }
public IObservable<IList<string>> GetData(IEnumerable<string> inputs)
{
return inputs.ToObservable()
.SelectMany(MakeRequest)
.ToList()
.SubscribeOn(_schedulerProvider.Default)
.ObserveOn(_schedulerProvider.Dispatcher)
.Timeout(TimeSpan.FromSeconds(5), _schedulerProvider.Dispatcher);
}
private IObservable<string> MakeRequest(string item)
{
return Observable.Return(item + "-processed");
}
}
public interface ISchedulerProvider
{
IScheduler CurrentThread { get; }
IScheduler Dispatcher { get; }
IScheduler Immediate { get; }
IScheduler Default { get; }
}
public sealed class SchedulerProvider : ISchedulerProvider
{
public IScheduler CurrentThread
{
get { return Scheduler.CurrentThread; }
}
public IScheduler Dispatcher
{
get { return DispatcherScheduler.Current; }
}
public IScheduler Immediate
{
get { return Scheduler.Immediate; }
}
public IScheduler Default
{
get { return Scheduler.Default; }
}
}
public sealed class TestSchedulers : ISchedulerProvider
{
private readonly TestScheduler _currentThread = new TestScheduler();
private readonly TestScheduler _dispatcher = new TestScheduler();
private readonly TestScheduler _immediate = new TestScheduler();
private readonly TestScheduler _default = new TestScheduler();
IScheduler ISchedulerProvider.CurrentThread { get { return CurrentThread; }}
IScheduler ISchedulerProvider.Dispatcher { get { return Dispatcher; }}
IScheduler ISchedulerProvider.Immediate { get { return Immediate; }}
IScheduler ISchedulerProvider.Default { get { return Default; }}
public TestScheduler CurrentThread
{
get { return _currentThread; }
}
public TestScheduler Dispatcher
{
get { return _dispatcher; }
}
public TestScheduler Immediate
{
get { return _immediate; }
}
public TestScheduler Default
{
get { return _default; }
}
}
public sealed class ImmediateSchedulers : ISchedulerProvider
{
public IScheduler CurrentThread { get { return Scheduler.Immediate; } }
public IScheduler Dispatcher { get { return Scheduler.Immediate; } }
public IScheduler Immediate { get { return Scheduler.Immediate; } }
public IScheduler Default { get { return Scheduler.Immediate; } }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment