Skip to content

Instantly share code, notes, and snippets.

@bvanderveen
Created December 22, 2010 02:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bvanderveen/750990 to your computer and use it in GitHub Desktop.
Save bvanderveen/750990 to your computer and use it in GitHub Desktop.
Examples of wrapping an APM operation using the Reactive Framework and the Task Parallel Library
using System;
using System.Disposables;
using System.IO;
using System.Linq;
using System.Threading;
namespace RxVsTplSamples
{
public static partial class Extensions
{
public static Func<IObservable<int>> ReadAsyncRx_FromAsync(this Stream stream, byte[] buffer, int offset, int count)
{
return Observable.FromAsyncPattern<int>(
(c, s) => stream.BeginRead(buffer, offset, count, c, s),
iasr => stream.EndRead(iasr));
}
public static IObservable<int> ReadAsyncRx_ApmObservable(this Stream stream, byte[] buffer, int offset, int count)
{
return new ApmObservable<int>(
(c, s) => stream.BeginRead(buffer, offset, count, c, s),
iasr => stream.EndRead(iasr));
}
}
// Observable.FromAsyncPattern reschedules the callback on the ThreadPoolScheduler. Lame!
// We need to allow the underlying APM operation to determine what thread the callback
// comes in on.
//
// And so we roll our own.
public class ApmObservable<T> : IObservable<T>
{
Func<AsyncCallback, object, IAsyncResult> begin;
Func<IAsyncResult, T> end;
public ApmObservable(Func<AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, T> end)
{
this.begin = begin;
this.end = end;
}
public IDisposable Subscribe(IObserver<T> observer)
{
IAsyncResult asyncResult = null;
// implementations of Subscribe should not throw exceptions.
try
{
// invoke the APM Begin method, provide a callback
asyncResult = begin(iasr =>
{
// when the callback is executed, attempt to
// retrieve the value from the APM End method
// and yield an exception if necessary.
T value = default(T);
try
{
value = end(iasr);
}
catch (Exception e)
{
observer.OnError(e);
return;
}
observer.OnNext(value);
observer.OnCompleted();
}, null);
}
catch (Exception e)
{
observer.OnError(e);
}
return Disposable.Empty;
}
}
}
using System;
using System.IO;
using System.Threading.Tasks;
namespace RxVsTplSample
{
public static partial class Extensions
{
public static Task<int> ReadAsyncTpl(this Stream stream, byte[] buffer, int offset, int count)
{
var tcs = new TaskCompletionSource<int>();
stream.BeginRead(buffer, offset, count, iasr =>
{
try
{
tcs.SetResult(stream.EndRead(iasr));
}
catch (Exception e)
{
tcs.SetException(e);
}
}, null);
return tcs.Task;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment