Skip to content

Instantly share code, notes, and snippets.

@loudej
Created December 28, 2010 07:59
Show Gist options
  • Save loudej/757031 to your computer and use it in GitHub Desktop.
Save loudej/757031 to your computer and use it in GitHub Desktop.
observable + task
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
namespace FuncGist {
public static class ObservableExtensions {
public static Task Subscribe<T>(this IObservable<T> observable, Action<T> next) {
var source = new TaskCompletionSource<object>();
try {
observable.Subscribe(new Shim<T>(next, source.SetException, () => source.SetResult(null)));
}
catch (Exception ex) {
source.SetException(ex);
}
return source.Task;
}
public static Task<T2> Aggregate<T, T2>(this IObservable<T> observable, T2 initialValue, Func<T2, T, T2> next) {
var source = new TaskCompletionSource<T2>();
try {
var currentValue = initialValue;
observable.Subscribe(new Shim<T>(t => currentValue = next(currentValue, t), source.SetException, () => source.SetResult(currentValue)));
}
catch (Exception ex) {
source.SetException(ex);
}
return source.Task;
}
class Shim<T> : IObserver<T> {
private readonly Action<T> _next;
private readonly Action<Exception> _error;
private readonly Action _completed;
public Shim(Action<T> next, Action<Exception> error, Action completed) {
_next = next;
_error = error;
_completed = completed;
}
public void OnNext(T value) { _next(value); }
public void OnError(Exception error) { _error(error); }
public void OnCompleted() { _completed(); }
}
}
[TestFixture]
public class ObservableTests {
private readonly IObservable<string>[] _sources = new IObservable<string>[] { new SyncSource(), new AsyncSource() };
[Test]
public void CallingSubscribe() {
foreach (var source in _sources) {
var sb = new StringBuilder();
var task = source.Subscribe(data => sb.Append(data))
.ContinueWith(t => Assert.That(sb.ToString(), Is.EqualTo("<p>hello world</p>")));
task.Wait();
}
}
[Test]
public void CallingSubscribeLikeAnAggregateMethod() {
foreach (var source in _sources) {
var task1 = source.Aggregate("", (agg, data) => agg + data)
.ContinueWith(t => Assert.That(t.Result, Is.EqualTo("<p>hello world</p>")));
var task2 = source.Aggregate(0, (agg, data) => agg + data.Length)
.ContinueWith(t => Assert.That(t.Result, Is.EqualTo(18)));
Task.WaitAll(task1, task2);
}
}
[Test]
public void InlineBlockingAggregateCall()
{
var source = new AsyncSource();
// this is so wrong, but looks so right :)
var text = source.Aggregate("", (agg, data) => agg + data).Result;
var length = source.Aggregate(0, (agg, data) => agg + data.Length).Result;
Assert.That(text, Is.EqualTo("<p>hello world</p>"));
Assert.That(length, Is.EqualTo(18));
}
/// <summary>
/// test data source
/// </summary>
private class SyncSource : IObservable<string> {
public IDisposable Subscribe(IObserver<string> observer) {
observer.OnNext("<p>");
observer.OnNext("hello world");
observer.OnNext("</p>");
observer.OnCompleted();
return new Disposable(() => { }); // ignored, because we're sync
}
}
/// <summary>
/// test data source
/// </summary>
private class AsyncSource : IObservable<string> {
public IDisposable Subscribe(IObserver<string> observer) {
bool[] stop = { false }; // ignored, because we're sync, but... for example... here's a stop flag to close over
new Thread(_ => {
Thread.Sleep(250);
if (stop[0]) return;
observer.OnNext("<p>");
Thread.Sleep(250);
if (stop[0]) return;
observer.OnNext("hello world");
Thread.Sleep(250);
if (stop[0]) return;
observer.OnNext("</p>");
Thread.Sleep(250);
if (stop[0]) return;
observer.OnCompleted();
}).Start();
return new Disposable(() => stop[0] = true);
}
}
private class Disposable : IDisposable {
private readonly Action _dispose;
public Disposable(Action dispose) { _dispose = dispose; }
public void Dispose() { _dispose(); }
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment