Created
May 13, 2011 21:58
-
-
Save loudej/971392 to your computer and use it in GitHub Desktop.
Larger example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.IO; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace ObsBody | |
{ | |
internal class Program | |
{ | |
static void Main(string[] args) | |
{ | |
Launch<SyncProducer, SyncConsumer>(); | |
Launch<PausableProducer, SyncConsumer>(); | |
Launch<ThreadedProducer, SyncConsumer>(); | |
Launch<SyncProducer, PausingConsumer>(); | |
Launch<PausableProducer, PausingConsumer>(); | |
Launch<ThreadedProducer, PausingConsumer>(); | |
WriteLine(); | |
WriteLine("== Sleeping =="); | |
Thread.Sleep(200); | |
Magic<SyncProducer>(); | |
Magic<PausableProducer>(); | |
Magic<ThreadedProducer>(); | |
WriteLine(); | |
WriteLine("== Press enter to exit =="); | |
Console.ReadLine(); | |
} | |
static void Launch<TProducer, TConsumer>() | |
where TProducer : IObservable<Tuple<ArraySegment<byte>, Action, Action>>, new() | |
where TConsumer : IObserver<Tuple<ArraySegment<byte>, Action, Action>>, new() | |
{ | |
WriteLine(); | |
WriteLine("== Launching {0} << {1} ==", typeof(TConsumer).Name, typeof(TProducer).Name); | |
var producer = new TProducer(); | |
var consumer = new TConsumer(); | |
producer.Subscribe(consumer); | |
// example of linq/reactive-like transformation | |
//producer.Select(ToLower).Subscribe(consumer); | |
WriteLine("== Launched {0} << {1} ==", typeof (TConsumer).Name, typeof (TProducer).Name); | |
} | |
static void Magic<TProducer>() | |
where TProducer : IObservable<Tuple<ArraySegment<byte>, Action, Action>>, new() | |
{ | |
WriteLine(); | |
WriteLine("== Magic {0} ==", typeof(TProducer).Name); | |
var producer = new TProducer(); | |
var bytes = producer | |
.Select(ToLower) | |
.Aggregate( | |
new MemoryStream(), | |
(stream, tuple) => | |
{ | |
var data = tuple.Item1; | |
WriteLine("Magic: Aggregate {0}", Encoding.UTF8.GetString(data.Array, data.Offset, data.Count)); | |
stream.Write(data.Array, data.Offset, data.Count); | |
return stream; | |
}, | |
stream => stream.ToArray()); | |
WriteLine("Magic: Bytes {0}", Encoding.UTF8.GetString(bytes)); | |
} | |
static ArraySegment<byte> ToLower(ArraySegment<byte> value) | |
{ | |
var text = Encoding.UTF8.GetString(value.Array, value.Offset, value.Count); | |
var bytes = Encoding.UTF8.GetBytes(text.ToLowerInvariant()); | |
return new ArraySegment<byte>(bytes); | |
} | |
public static void WriteLine(string text) | |
{ | |
Console.WriteLine(string.Format("[{0}] {1}", Thread.CurrentThread.ManagedThreadId, text)); | |
} | |
public static void WriteLine(string format, params object[] args) | |
{ | |
WriteLine(string.Format(format, args)); | |
} | |
public static void WriteLine() | |
{ | |
WriteLine(""); | |
} | |
} | |
public class SyncConsumer : IObserver<Tuple<ArraySegment<byte>, Action, Action>> | |
{ | |
public void OnNext(Tuple<ArraySegment<byte>, Action, Action> value) | |
{ | |
Program.WriteLine("SyncConsumer: {0}", Encoding.UTF8.GetString(value.Item1.Array, value.Item1.Offset, value.Item1.Count)); | |
} | |
public void OnError(Exception error) | |
{ | |
Program.WriteLine("SyncConsumer: OnError {0}", error.Message); | |
} | |
public void OnCompleted() | |
{ | |
Program.WriteLine("SyncConsumer: OnCompleted"); | |
} | |
} | |
public class PausingConsumer : IObserver<Tuple<ArraySegment<byte>, Action, Action>> | |
{ | |
public void OnNext(Tuple<ArraySegment<byte>, Action, Action> value) | |
{ | |
var text = Encoding.UTF8.GetString(value.Item1.Array, value.Item1.Offset, value.Item1.Count); | |
var pause = value.Item2; | |
var resume = value.Item3; | |
if (pause == null) | |
{ | |
Program.WriteLine("AsyncConsumer: Sync {0}", text); | |
} | |
else | |
{ | |
// using thread to simulate a callback on an async api (e.g. network BeginWrite) | |
Program.WriteLine("AsyncConsumer: Begin {0}", text); | |
pause(); | |
new Thread(() => | |
{ | |
Thread.Sleep(25); | |
Program.WriteLine("AsyncConsumer: End {0}", text); | |
resume(); | |
}).Start(); | |
} | |
} | |
public void OnError(Exception error) | |
{ | |
Program.WriteLine("AsyncConsumer: OnError {0}", error.Message); | |
} | |
public void OnCompleted() | |
{ | |
Program.WriteLine("AsyncConsumer: OnCompleted"); | |
} | |
} | |
public class SyncProducer : IObservable<Tuple<ArraySegment<byte>, Action, Action>> | |
{ | |
public IDisposable Subscribe(IObserver<Tuple<ArraySegment<byte>, Action, Action>> observer) | |
{ | |
var data = new ArraySegment<byte>(Encoding.UTF8.GetBytes("Hello world from SyncProducer. ")); | |
// shorter sync (extension method) | |
observer.OnNext(data); | |
// longer sync (actual method) | |
observer.OnNext(new Tuple<ArraySegment<byte>, Action, Action>( | |
data, | |
null, | |
null)); | |
observer.OnCompleted(); | |
return Disposable.Noop; | |
} | |
} | |
public class PausableProducer : IObservable<Tuple<ArraySegment<byte>, Action, Action>> | |
{ | |
public IDisposable Subscribe(IObserver<Tuple<ArraySegment<byte>, Action, Action>> observer) | |
{ | |
// ReSharper disable AccessToModifiedClosure | |
var data = new ArraySegment<byte>(Encoding.UTF8.GetBytes("Hello world from PausableProducer. ")); | |
var cancelled = false; | |
var go = default(Action<int>); | |
// this is only one possible way of implementing an async mechanism - | |
// it's a re-enterable closure (inspired by c# await) | |
go = mark => | |
{ | |
switch (cancelled ? -1 : mark) | |
{ | |
case 0: | |
break; | |
case 1: | |
goto mark1; | |
case 2: | |
goto mark2; | |
default: | |
return; | |
} | |
// shorter async (extension method) | |
if (observer.OnNext(data, () => go(1)) || cancelled) | |
return; | |
mark1: | |
// longer async (actual method) | |
var paused = false; | |
observer.OnNext(new Tuple<ArraySegment<byte>, Action, Action>( | |
data, | |
() => paused = true, | |
() => go(2))); | |
if (paused || cancelled) return; | |
mark2: | |
observer.OnCompleted(); | |
}; | |
go(0); | |
return new Disposable(() => cancelled = true); | |
} | |
} | |
public class ThreadedProducer : IObservable<Tuple<ArraySegment<byte>, Action, Action>> | |
{ | |
public IDisposable Subscribe(IObserver<Tuple<ArraySegment<byte>, Action, Action>> observer) | |
{ | |
var data = new ArraySegment<byte>(Encoding.UTF8.GetBytes("Hello world from ThreadedProducer. ")); | |
// using threads is clearly a Very Bad Thing. | |
// this is only as a brute-force way of simulating calls | |
// coming in to the observer from distant callbacks on alien stack frames | |
var threads = new Thread[3]; | |
threads[0] = new Thread(() => | |
{ | |
Thread.Sleep(50); | |
// shorter async (extension method) | |
if (!observer.OnNext(data, threads[1].Start)) | |
threads[1].Start(); | |
}); | |
threads[1] = new Thread(() => | |
{ | |
// longer async (actual method) | |
var paused = false; | |
observer.OnNext(new Tuple<ArraySegment<byte>, Action, Action>( | |
data, | |
() => paused = true, | |
threads[2].Start)); | |
if (!paused) | |
threads[2].Start(); | |
}); | |
threads[2] = new Thread(observer.OnCompleted); | |
threads[0].Start(); | |
return Disposable.Noop; | |
} | |
} | |
public class Disposable : IDisposable | |
{ | |
readonly Action _dispose; | |
public static readonly IDisposable Noop = new Disposable(() => { }); | |
public Disposable(Action dispose) | |
{ | |
_dispose = dispose; | |
} | |
public void Dispose() | |
{ | |
_dispose(); | |
} | |
} | |
public static class BodyObservableExtensions | |
{ | |
public static bool OnNext(this IObserver<Tuple<ArraySegment<byte>, Action, Action>> observer, ArraySegment<byte> data, Action continuation) | |
{ | |
var delayed = false; | |
observer.OnNext(new Tuple<ArraySegment<byte>, Action, Action>(data, () => delayed = true, continuation)); | |
return delayed; | |
} | |
public static void OnNext(this IObserver<Tuple<ArraySegment<byte>, Action, Action>> observer, ArraySegment<byte> data) | |
{ | |
observer.OnNext(new Tuple<ArraySegment<byte>, Action, Action>(data, null, null)); | |
} | |
} | |
public static class MagicExtensions | |
{ | |
class Observable<T> : IObservable<T> | |
{ | |
readonly Func<Action<T>, Action<Exception>, Action, IDisposable> _subscribe; | |
public Observable(Func<Action<T>, Action<Exception>, Action, IDisposable> subscribe) | |
{ | |
_subscribe = subscribe; | |
} | |
public IDisposable Subscribe(IObserver<T> observer) | |
{ | |
return _subscribe(observer.OnNext, observer.OnError, observer.OnCompleted); | |
} | |
} | |
class Observer<T> : IObserver<T> | |
{ | |
readonly Action<T> _next; | |
readonly Action<Exception> _error; | |
readonly Action _complete; | |
public Observer(Action<T> next, Action<Exception> error, Action complete) | |
{ | |
_next = next; | |
_error = error; | |
_complete = complete; | |
} | |
public void OnNext(T value) | |
{ | |
_next(value); | |
} | |
public void OnError(Exception error) | |
{ | |
_error(error); | |
} | |
public void OnCompleted() | |
{ | |
_complete(); | |
} | |
} | |
public static IDisposable Subscribe<T>(this IObservable<T> observable, Action<T> next, Action<Exception> error, Action complete) | |
{ | |
return observable.Subscribe(new Observer<T>(next, error, complete)); | |
} | |
public static TResult Aggregate<TSource, TAccumulate, TResult>( | |
this IObservable<TSource> observable, | |
TAccumulate seed, | |
Func<TAccumulate, TSource, TAccumulate> func, | |
Func<TAccumulate, TResult> resultSelector) | |
{ | |
var tcs = new TaskCompletionSource<TResult>(); | |
var accumulate = seed; | |
observable.Subscribe( | |
source => accumulate = func(accumulate, source), | |
tcs.SetException, | |
() => tcs.SetResult(resultSelector(accumulate))); | |
return tcs.Task.Result; | |
} | |
public static IObservable<TResult> Transform<TSource, TResult>(this IObservable<TSource> observable, Func<Func<Action<TSource>, Action<Exception>, Action, IDisposable>, Action<TResult>, Action<Exception>, Action, IDisposable> transformation) | |
{ | |
return new Observable<TResult>((next, fault, done) => transformation((next2, fault2, done2) => observable.Subscribe(new Observer<TSource>(next2, fault2, done2)), next, fault, done)); | |
} | |
public static IObservable<TResult> Select<TSource, TResult>(this IObservable<TSource> observable, Func<TSource, TResult> selector) | |
{ | |
return observable.Transform<TSource, TResult>((subscribe, next, error, complete) => subscribe(value => next(selector(value)), error, complete)); | |
} | |
public static IObservable<Tuple<TResult, Action, Action>> Select<TSource, TResult>(this IObservable<Tuple<TSource, Action, Action>> observable, Func<TSource, TResult> selector) | |
{ | |
return observable.Select(value => Tuple.Create(selector(value.Item1), value.Item2, value.Item3)); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment