Created
December 28, 2010 07:59
-
-
Save loudej/757031 to your computer and use it in GitHub Desktop.
observable + task
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using NUnit.Framework; | |
namespace FuncGist { | |
public static class ObservableExtensions { | |
public static Task Subscribe<T>(this IObservable<T> observable, Action<T> next) { | |
var source = new TaskCompletionSource<object>(); | |
try { | |
observable.Subscribe(new Shim<T>(next, source.SetException, () => source.SetResult(null))); | |
} | |
catch (Exception ex) { | |
source.SetException(ex); | |
} | |
return source.Task; | |
} | |
public static Task<T2> Aggregate<T, T2>(this IObservable<T> observable, T2 initialValue, Func<T2, T, T2> next) { | |
var source = new TaskCompletionSource<T2>(); | |
try { | |
var currentValue = initialValue; | |
observable.Subscribe(new Shim<T>(t => currentValue = next(currentValue, t), source.SetException, () => source.SetResult(currentValue))); | |
} | |
catch (Exception ex) { | |
source.SetException(ex); | |
} | |
return source.Task; | |
} | |
class Shim<T> : IObserver<T> { | |
private readonly Action<T> _next; | |
private readonly Action<Exception> _error; | |
private readonly Action _completed; | |
public Shim(Action<T> next, Action<Exception> error, Action completed) { | |
_next = next; | |
_error = error; | |
_completed = completed; | |
} | |
public void OnNext(T value) { _next(value); } | |
public void OnError(Exception error) { _error(error); } | |
public void OnCompleted() { _completed(); } | |
} | |
} | |
[TestFixture] | |
public class ObservableTests { | |
private readonly IObservable<string>[] _sources = new IObservable<string>[] { new SyncSource(), new AsyncSource() }; | |
[Test] | |
public void CallingSubscribe() { | |
foreach (var source in _sources) { | |
var sb = new StringBuilder(); | |
var task = source.Subscribe(data => sb.Append(data)) | |
.ContinueWith(t => Assert.That(sb.ToString(), Is.EqualTo("<p>hello world</p>"))); | |
task.Wait(); | |
} | |
} | |
[Test] | |
public void CallingSubscribeLikeAnAggregateMethod() { | |
foreach (var source in _sources) { | |
var task1 = source.Aggregate("", (agg, data) => agg + data) | |
.ContinueWith(t => Assert.That(t.Result, Is.EqualTo("<p>hello world</p>"))); | |
var task2 = source.Aggregate(0, (agg, data) => agg + data.Length) | |
.ContinueWith(t => Assert.That(t.Result, Is.EqualTo(18))); | |
Task.WaitAll(task1, task2); | |
} | |
} | |
[Test] | |
public void InlineBlockingAggregateCall() | |
{ | |
var source = new AsyncSource(); | |
// this is so wrong, but looks so right :) | |
var text = source.Aggregate("", (agg, data) => agg + data).Result; | |
var length = source.Aggregate(0, (agg, data) => agg + data.Length).Result; | |
Assert.That(text, Is.EqualTo("<p>hello world</p>")); | |
Assert.That(length, Is.EqualTo(18)); | |
} | |
/// <summary> | |
/// test data source | |
/// </summary> | |
private class SyncSource : IObservable<string> { | |
public IDisposable Subscribe(IObserver<string> observer) { | |
observer.OnNext("<p>"); | |
observer.OnNext("hello world"); | |
observer.OnNext("</p>"); | |
observer.OnCompleted(); | |
return new Disposable(() => { }); // ignored, because we're sync | |
} | |
} | |
/// <summary> | |
/// test data source | |
/// </summary> | |
private class AsyncSource : IObservable<string> { | |
public IDisposable Subscribe(IObserver<string> observer) { | |
bool[] stop = { false }; // ignored, because we're sync, but... for example... here's a stop flag to close over | |
new Thread(_ => { | |
Thread.Sleep(250); | |
if (stop[0]) return; | |
observer.OnNext("<p>"); | |
Thread.Sleep(250); | |
if (stop[0]) return; | |
observer.OnNext("hello world"); | |
Thread.Sleep(250); | |
if (stop[0]) return; | |
observer.OnNext("</p>"); | |
Thread.Sleep(250); | |
if (stop[0]) return; | |
observer.OnCompleted(); | |
}).Start(); | |
return new Disposable(() => stop[0] = true); | |
} | |
} | |
private class Disposable : IDisposable { | |
private readonly Action _dispose; | |
public Disposable(Action dispose) { _dispose = dispose; } | |
public void Dispose() { _dispose(); } | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment