Skip to content

Instantly share code, notes, and snippets.

@loudej
Created May 13, 2011 21:58
Show Gist options
  • Save loudej/971392 to your computer and use it in GitHub Desktop.
Save loudej/971392 to your computer and use it in GitHub Desktop.
Larger example
using System;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ObsBody
{
internal class Program
{
static void Main(string[] args)
{
Launch<SyncProducer, SyncConsumer>();
Launch<PausableProducer, SyncConsumer>();
Launch<ThreadedProducer, SyncConsumer>();
Launch<SyncProducer, PausingConsumer>();
Launch<PausableProducer, PausingConsumer>();
Launch<ThreadedProducer, PausingConsumer>();
WriteLine();
WriteLine("== Sleeping ==");
Thread.Sleep(200);
Magic<SyncProducer>();
Magic<PausableProducer>();
Magic<ThreadedProducer>();
WriteLine();
WriteLine("== Press enter to exit ==");
Console.ReadLine();
}
static void Launch<TProducer, TConsumer>()
where TProducer : IObservable<Tuple<ArraySegment<byte>, Action, Action>>, new()
where TConsumer : IObserver<Tuple<ArraySegment<byte>, Action, Action>>, new()
{
WriteLine();
WriteLine("== Launching {0} << {1} ==", typeof(TConsumer).Name, typeof(TProducer).Name);
var producer = new TProducer();
var consumer = new TConsumer();
producer.Subscribe(consumer);
// example of linq/reactive-like transformation
//producer.Select(ToLower).Subscribe(consumer);
WriteLine("== Launched {0} << {1} ==", typeof (TConsumer).Name, typeof (TProducer).Name);
}
static void Magic<TProducer>()
where TProducer : IObservable<Tuple<ArraySegment<byte>, Action, Action>>, new()
{
WriteLine();
WriteLine("== Magic {0} ==", typeof(TProducer).Name);
var producer = new TProducer();
var bytes = producer
.Select(ToLower)
.Aggregate(
new MemoryStream(),
(stream, tuple) =>
{
var data = tuple.Item1;
WriteLine("Magic: Aggregate {0}", Encoding.UTF8.GetString(data.Array, data.Offset, data.Count));
stream.Write(data.Array, data.Offset, data.Count);
return stream;
},
stream => stream.ToArray());
WriteLine("Magic: Bytes {0}", Encoding.UTF8.GetString(bytes));
}
static ArraySegment<byte> ToLower(ArraySegment<byte> value)
{
var text = Encoding.UTF8.GetString(value.Array, value.Offset, value.Count);
var bytes = Encoding.UTF8.GetBytes(text.ToLowerInvariant());
return new ArraySegment<byte>(bytes);
}
public static void WriteLine(string text)
{
Console.WriteLine(string.Format("[{0}] {1}", Thread.CurrentThread.ManagedThreadId, text));
}
public static void WriteLine(string format, params object[] args)
{
WriteLine(string.Format(format, args));
}
public static void WriteLine()
{
WriteLine("");
}
}
public class SyncConsumer : IObserver<Tuple<ArraySegment<byte>, Action, Action>>
{
public void OnNext(Tuple<ArraySegment<byte>, Action, Action> value)
{
Program.WriteLine("SyncConsumer: {0}", Encoding.UTF8.GetString(value.Item1.Array, value.Item1.Offset, value.Item1.Count));
}
public void OnError(Exception error)
{
Program.WriteLine("SyncConsumer: OnError {0}", error.Message);
}
public void OnCompleted()
{
Program.WriteLine("SyncConsumer: OnCompleted");
}
}
public class PausingConsumer : IObserver<Tuple<ArraySegment<byte>, Action, Action>>
{
public void OnNext(Tuple<ArraySegment<byte>, Action, Action> value)
{
var text = Encoding.UTF8.GetString(value.Item1.Array, value.Item1.Offset, value.Item1.Count);
var pause = value.Item2;
var resume = value.Item3;
if (pause == null)
{
Program.WriteLine("AsyncConsumer: Sync {0}", text);
}
else
{
// using thread to simulate a callback on an async api (e.g. network BeginWrite)
Program.WriteLine("AsyncConsumer: Begin {0}", text);
pause();
new Thread(() =>
{
Thread.Sleep(25);
Program.WriteLine("AsyncConsumer: End {0}", text);
resume();
}).Start();
}
}
public void OnError(Exception error)
{
Program.WriteLine("AsyncConsumer: OnError {0}", error.Message);
}
public void OnCompleted()
{
Program.WriteLine("AsyncConsumer: OnCompleted");
}
}
public class SyncProducer : IObservable<Tuple<ArraySegment<byte>, Action, Action>>
{
public IDisposable Subscribe(IObserver<Tuple<ArraySegment<byte>, Action, Action>> observer)
{
var data = new ArraySegment<byte>(Encoding.UTF8.GetBytes("Hello world from SyncProducer. "));
// shorter sync (extension method)
observer.OnNext(data);
// longer sync (actual method)
observer.OnNext(new Tuple<ArraySegment<byte>, Action, Action>(
data,
null,
null));
observer.OnCompleted();
return Disposable.Noop;
}
}
public class PausableProducer : IObservable<Tuple<ArraySegment<byte>, Action, Action>>
{
public IDisposable Subscribe(IObserver<Tuple<ArraySegment<byte>, Action, Action>> observer)
{
// ReSharper disable AccessToModifiedClosure
var data = new ArraySegment<byte>(Encoding.UTF8.GetBytes("Hello world from PausableProducer. "));
var cancelled = false;
var go = default(Action<int>);
// this is only one possible way of implementing an async mechanism -
// it's a re-enterable closure (inspired by c# await)
go = mark =>
{
switch (cancelled ? -1 : mark)
{
case 0:
break;
case 1:
goto mark1;
case 2:
goto mark2;
default:
return;
}
// shorter async (extension method)
if (observer.OnNext(data, () => go(1)) || cancelled)
return;
mark1:
// longer async (actual method)
var paused = false;
observer.OnNext(new Tuple<ArraySegment<byte>, Action, Action>(
data,
() => paused = true,
() => go(2)));
if (paused || cancelled) return;
mark2:
observer.OnCompleted();
};
go(0);
return new Disposable(() => cancelled = true);
}
}
public class ThreadedProducer : IObservable<Tuple<ArraySegment<byte>, Action, Action>>
{
public IDisposable Subscribe(IObserver<Tuple<ArraySegment<byte>, Action, Action>> observer)
{
var data = new ArraySegment<byte>(Encoding.UTF8.GetBytes("Hello world from ThreadedProducer. "));
// using threads is clearly a Very Bad Thing.
// this is only as a brute-force way of simulating calls
// coming in to the observer from distant callbacks on alien stack frames
var threads = new Thread[3];
threads[0] = new Thread(() =>
{
Thread.Sleep(50);
// shorter async (extension method)
if (!observer.OnNext(data, threads[1].Start))
threads[1].Start();
});
threads[1] = new Thread(() =>
{
// longer async (actual method)
var paused = false;
observer.OnNext(new Tuple<ArraySegment<byte>, Action, Action>(
data,
() => paused = true,
threads[2].Start));
if (!paused)
threads[2].Start();
});
threads[2] = new Thread(observer.OnCompleted);
threads[0].Start();
return Disposable.Noop;
}
}
public class Disposable : IDisposable
{
readonly Action _dispose;
public static readonly IDisposable Noop = new Disposable(() => { });
public Disposable(Action dispose)
{
_dispose = dispose;
}
public void Dispose()
{
_dispose();
}
}
public static class BodyObservableExtensions
{
public static bool OnNext(this IObserver<Tuple<ArraySegment<byte>, Action, Action>> observer, ArraySegment<byte> data, Action continuation)
{
var delayed = false;
observer.OnNext(new Tuple<ArraySegment<byte>, Action, Action>(data, () => delayed = true, continuation));
return delayed;
}
public static void OnNext(this IObserver<Tuple<ArraySegment<byte>, Action, Action>> observer, ArraySegment<byte> data)
{
observer.OnNext(new Tuple<ArraySegment<byte>, Action, Action>(data, null, null));
}
}
public static class MagicExtensions
{
class Observable<T> : IObservable<T>
{
readonly Func<Action<T>, Action<Exception>, Action, IDisposable> _subscribe;
public Observable(Func<Action<T>, Action<Exception>, Action, IDisposable> subscribe)
{
_subscribe = subscribe;
}
public IDisposable Subscribe(IObserver<T> observer)
{
return _subscribe(observer.OnNext, observer.OnError, observer.OnCompleted);
}
}
class Observer<T> : IObserver<T>
{
readonly Action<T> _next;
readonly Action<Exception> _error;
readonly Action _complete;
public Observer(Action<T> next, Action<Exception> error, Action complete)
{
_next = next;
_error = error;
_complete = complete;
}
public void OnNext(T value)
{
_next(value);
}
public void OnError(Exception error)
{
_error(error);
}
public void OnCompleted()
{
_complete();
}
}
public static IDisposable Subscribe<T>(this IObservable<T> observable, Action<T> next, Action<Exception> error, Action complete)
{
return observable.Subscribe(new Observer<T>(next, error, complete));
}
public static TResult Aggregate<TSource, TAccumulate, TResult>(
this IObservable<TSource> observable,
TAccumulate seed,
Func<TAccumulate, TSource, TAccumulate> func,
Func<TAccumulate, TResult> resultSelector)
{
var tcs = new TaskCompletionSource<TResult>();
var accumulate = seed;
observable.Subscribe(
source => accumulate = func(accumulate, source),
tcs.SetException,
() => tcs.SetResult(resultSelector(accumulate)));
return tcs.Task.Result;
}
public static IObservable<TResult> Transform<TSource, TResult>(this IObservable<TSource> observable, Func<Func<Action<TSource>, Action<Exception>, Action, IDisposable>, Action<TResult>, Action<Exception>, Action, IDisposable> transformation)
{
return new Observable<TResult>((next, fault, done) => transformation((next2, fault2, done2) => observable.Subscribe(new Observer<TSource>(next2, fault2, done2)), next, fault, done));
}
public static IObservable<TResult> Select<TSource, TResult>(this IObservable<TSource> observable, Func<TSource, TResult> selector)
{
return observable.Transform<TSource, TResult>((subscribe, next, error, complete) => subscribe(value => next(selector(value)), error, complete));
}
public static IObservable<Tuple<TResult, Action, Action>> Select<TSource, TResult>(this IObservable<Tuple<TSource, Action, Action>> observable, Func<TSource, TResult> selector)
{
return observable.Select(value => Tuple.Create(selector(value.Item1), value.Item2, value.Item3));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment