Skip to content

Instantly share code, notes, and snippets.

@bvanderveen
Created December 25, 2010 04:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bvanderveen/754691 to your computer and use it in GitHub Desktop.
Save bvanderveen/754691 to your computer and use it in GitHub Desktop.
Examples of implementing an APM operation using the Reactive Framework and the Task Parallel Library
using System;
using System.Disposables;
using System.Linq;
using System.Threading;
namespace RxVsTplSamples
{
public class RxAsyncOperation<TArg, TResult>
{
Func<TArg, IObservable<TResult>> operation;
IDisposable disposable;
public RxAsyncOperation(Func<TArg, IObservable<TResult>> operation)
{
this.operation = operation;
}
public IAsyncResult BeginInvoke(TArg arg, AsyncCallback callback, object state)
{
AsyncResult<TResult> asyncResult = new AsyncResult<TResult>(callback, state);
disposable = operation(arg).Subscribe(
// the boolean arguments to SetAsCompleted indicate whether the operation
// completed synchronously. IObservable doesn't relay that information,
// so we assume false.
r =>
{
UnsubscribeIfPossible();
asyncResult.SetAsCompleted(r, false);
},
e =>
{
UnsubscribeIfPossible();
asyncResult.SetAsCompleted(e, false);
},
() => { });
// if the operation completed during the call to subscribe, we won't have
// unsubscribed yet, so do it now.
if (asyncResult.IsCompleted)
UnsubscribeIfPossible();
return asyncResult;
}
public TResult EndInvoke(IAsyncResult asyncResult)
{
// the AsyncResult<T> implementation takes care of waiting and throwing exceptions for us.
return ((AsyncResult<TResult>)asyncResult).EndInvoke();
}
void UnsubscribeIfPossible()
{
if (disposable != null)
{
disposable.Dispose();
disposable = null;
}
}
}
#region IAsyncResult implementation
// the next two classes lifted from http://msdn.microsoft.com/en-us/magazine/cc163467.aspx
class AsyncResult : IAsyncResult
{
// Fields set at construction which never change while
// operation is pending
private readonly AsyncCallback m_AsyncCallback;
private readonly Object m_AsyncState;
// Fields set at construction which do change after
// operation completes
private const Int32 c_StatePending = 0;
private const Int32 c_StateCompletedSynchronously = 1;
private const Int32 c_StateCompletedAsynchronously = 2;
private Int32 m_CompletedState = c_StatePending;
// Field that may or may not get set depending on usage
private ManualResetEvent m_AsyncWaitHandle;
// Fields set when operation completes
private Exception m_exception;
public AsyncResult(AsyncCallback asyncCallback, Object state)
{
m_AsyncCallback = asyncCallback;
m_AsyncState = state;
}
public void SetAsCompleted(
Exception exception, Boolean completedSynchronously)
{
// Passing null for exception means no error occurred.
// This is the common case
m_exception = exception;
// The m_CompletedState field MUST be set prior calling the callback
Int32 prevState = Interlocked.Exchange(ref m_CompletedState,
completedSynchronously ? c_StateCompletedSynchronously :
c_StateCompletedAsynchronously);
if (prevState != c_StatePending)
throw new InvalidOperationException(
"You can set a result only once");
// If the event exists, set it
if (m_AsyncWaitHandle != null) m_AsyncWaitHandle.Set();
// If a callback method was set, call it
if (m_AsyncCallback != null) m_AsyncCallback(this);
}
public void EndInvoke()
{
// This method assumes that only 1 thread calls EndInvoke
// for this object
if (!IsCompleted)
{
// If the operation isn't done, wait for it
AsyncWaitHandle.WaitOne();
AsyncWaitHandle.Close();
m_AsyncWaitHandle = null; // Allow early GC
}
// Operation is done: if an exception occured, throw it
if (m_exception != null) throw new Exception("Exception during operation.", m_exception);
}
#region Implementation of IAsyncResult
public Object AsyncState { get { return m_AsyncState; } }
public Boolean CompletedSynchronously
{
get
{
return Thread.VolatileRead(ref m_CompletedState) ==
c_StateCompletedSynchronously;
}
}
public WaitHandle AsyncWaitHandle
{
get
{
if (m_AsyncWaitHandle == null)
{
Boolean done = IsCompleted;
ManualResetEvent mre = new ManualResetEvent(done);
if (Interlocked.CompareExchange(ref m_AsyncWaitHandle,
mre, null) != null)
{
// Another thread created this object's event; dispose
// the event we just created
mre.Close();
}
else
{
if (!done && IsCompleted)
{
// If the operation wasn't done when we created
// the event but now it is done, set the event
m_AsyncWaitHandle.Set();
}
}
}
return m_AsyncWaitHandle;
}
}
public Boolean IsCompleted
{
get
{
return Thread.VolatileRead(ref m_CompletedState) !=
c_StatePending;
}
}
#endregion
}
class AsyncResult<TResult> : AsyncResult
{
// Field set when operation completes
private TResult m_result = default(TResult);
public AsyncResult(AsyncCallback asyncCallback, Object state) :
base(asyncCallback, state) { }
public void SetAsCompleted(TResult result,
Boolean completedSynchronously)
{
// Save the asynchronous operation's result
m_result = result;
// Tell the base class that the operation completed
// sucessfully (no exception)
base.SetAsCompleted(null, completedSynchronously);
}
new public TResult EndInvoke()
{
base.EndInvoke(); // Wait until operation has completed
return m_result; // Return the result (if above didn't throw)
}
}
#endregion
}
using System;
using System.Threading.Tasks;
namespace RxVsTplSample
{
public class TplAsyncOperation<TArg, TResult>
{
Func<TArg, Task<TResult>> operation;
public TplAsyncOperation(Func<TArg, Task<TResult>> operation)
{
this.operation = operation;
}
public IAsyncResult BeginInvoke(TArg arg, AsyncCallback callback, object state)
{
var task = operation(arg);
task.ContinueWith(_ => callback(task));
return task;
}
public TResult EndInvoke(IAsyncResult result)
{
return ((Task<TResult>)result).Result;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment