Created
May 13, 2011 18:08
-
-
Save loudej/971013 to your computer and use it in GitHub Desktop.
IObservable body
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using IBodyObservable = IObservable<Tuple< // body | |
ArraySegment<byte>, // data | |
Action, // delay | |
Action>>; //continuation | |
using BodyDelegate = Func< // body | |
Action< // next | |
ArraySegment<byte>, //data | |
Action, // continuation | |
bool>, // delayed | |
Action<Exception>, // error | |
Action, // complete | |
Action>; // cancel | |
public class HomeController : Controller | |
{ | |
public ActionResult Index() | |
{ | |
ViewBag.Message = "Welcome to ASP.NET MVC!"; | |
IObservable<Tuple<ArraySegment<byte>, Action, Action>> producer = new Producer(); | |
IObserver<Tuple<ArraySegment<byte>, Action, Action>> consumer = new Consumer(); | |
producer.Subscribe(consumer); | |
return View(); | |
} | |
public ActionResult About() | |
{ | |
return View(); | |
} | |
} | |
public class Producer : IObservable<Tuple<ArraySegment<byte>, Action, Action>> | |
{ | |
public IDisposable Subscribe(IObserver<Tuple<ArraySegment<byte>, Action, Action>> observer) | |
{ | |
var data = new ArraySegment<byte>(Encoding.UTF8.GetBytes("Hello world")); | |
Action continuation = () => { }; | |
// shorter sync (extension method) | |
observer.OnNext(data); | |
// shorter async (extension method) | |
bool delayed1 = observer.OnNext( | |
data, | |
continuation); | |
// longer sync (actual method) | |
observer.OnNext(new Tuple<ArraySegment<byte>, Action, Action>( | |
data, | |
null, | |
null)); | |
// longer async (actual method) | |
bool delayed2 = false; | |
observer.OnNext(new Tuple<ArraySegment<byte>, Action, Action>( | |
data, | |
() => delayed2 = true, | |
continuation)); | |
observer.OnCompleted(); | |
return new Disposable(() => { }); | |
} | |
} | |
public class Consumer : IObserver<Tuple<ArraySegment<byte>, Action, Action>> | |
{ | |
public void OnNext(Tuple<ArraySegment<byte>, Action, Action> value) | |
{ | |
var data = value.Item1; | |
var delay = value.Item2; | |
var continuation = value.Item3; | |
if (delay == null) | |
{ | |
// synchronous processing | |
return; | |
} | |
// asynchronous processing | |
delay(); | |
ThreadPool.QueueUserWorkItem(_ => continuation()); | |
} | |
public void OnError(Exception error) | |
{ | |
} | |
public void OnCompleted() | |
{ | |
} | |
} | |
public class Disposable : IDisposable | |
{ | |
readonly Action _dispose; | |
public Disposable(Action dispose) | |
{ | |
_dispose = dispose; | |
} | |
public void Dispose() | |
{ | |
_dispose(); | |
} | |
} | |
public static class BodyObservableExtensions | |
{ | |
public static bool OnNext(this IObserver<Tuple<ArraySegment<byte>, Action, Action>> observer, ArraySegment<byte> data, Action continuation) | |
{ | |
var delayed = false; | |
observer.OnNext(new Tuple<ArraySegment<byte>, Action, Action>(data, () => delayed = true, continuation)); | |
return delayed; | |
} | |
public static void OnNext(this IObserver<Tuple<ArraySegment<byte>, Action, Action>> observer, ArraySegment<byte> data) | |
{ | |
observer.OnNext(new Tuple<ArraySegment<byte>, Action, Action>(data, null, null)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment