Skip to content

Instantly share code, notes, and snippets.

@loudej
Created May 13, 2011 18:08
Show Gist options
  • Save loudej/971013 to your computer and use it in GitHub Desktop.
Save loudej/971013 to your computer and use it in GitHub Desktop.
IObservable body
using IBodyObservable = IObservable<Tuple< // body
ArraySegment<byte>, // data
Action, // delay
Action>>; //continuation
using BodyDelegate = Func< // body
Action< // next
ArraySegment<byte>, //data
Action, // continuation
bool>, // delayed
Action<Exception>, // error
Action, // complete
Action>; // cancel
public class HomeController : Controller
{
public ActionResult Index()
{
ViewBag.Message = "Welcome to ASP.NET MVC!";
IObservable<Tuple<ArraySegment<byte>, Action, Action>> producer = new Producer();
IObserver<Tuple<ArraySegment<byte>, Action, Action>> consumer = new Consumer();
producer.Subscribe(consumer);
return View();
}
public ActionResult About()
{
return View();
}
}
public class Producer : IObservable<Tuple<ArraySegment<byte>, Action, Action>>
{
public IDisposable Subscribe(IObserver<Tuple<ArraySegment<byte>, Action, Action>> observer)
{
var data = new ArraySegment<byte>(Encoding.UTF8.GetBytes("Hello world"));
Action continuation = () => { };
// shorter sync (extension method)
observer.OnNext(data);
// shorter async (extension method)
bool delayed1 = observer.OnNext(
data,
continuation);
// longer sync (actual method)
observer.OnNext(new Tuple<ArraySegment<byte>, Action, Action>(
data,
null,
null));
// longer async (actual method)
bool delayed2 = false;
observer.OnNext(new Tuple<ArraySegment<byte>, Action, Action>(
data,
() => delayed2 = true,
continuation));
observer.OnCompleted();
return new Disposable(() => { });
}
}
public class Consumer : IObserver<Tuple<ArraySegment<byte>, Action, Action>>
{
public void OnNext(Tuple<ArraySegment<byte>, Action, Action> value)
{
var data = value.Item1;
var delay = value.Item2;
var continuation = value.Item3;
if (delay == null)
{
// synchronous processing
return;
}
// asynchronous processing
delay();
ThreadPool.QueueUserWorkItem(_ => continuation());
}
public void OnError(Exception error)
{
}
public void OnCompleted()
{
}
}
public class Disposable : IDisposable
{
readonly Action _dispose;
public Disposable(Action dispose)
{
_dispose = dispose;
}
public void Dispose()
{
_dispose();
}
}
public static class BodyObservableExtensions
{
public static bool OnNext(this IObserver<Tuple<ArraySegment<byte>, Action, Action>> observer, ArraySegment<byte> data, Action continuation)
{
var delayed = false;
observer.OnNext(new Tuple<ArraySegment<byte>, Action, Action>(data, () => delayed = true, continuation));
return delayed;
}
public static void OnNext(this IObserver<Tuple<ArraySegment<byte>, Action, Action>> observer, ArraySegment<byte> data)
{
observer.OnNext(new Tuple<ArraySegment<byte>, Action, Action>(data, null, null));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment