Skip to content

Instantly share code, notes, and snippets.

@duarten
Created February 9, 2011 22:15
Show Gist options
  • Save duarten/819430 to your computer and use it in GitHub Desktop.
Save duarten/819430 to your computer and use it in GitHub Desktop.
An awaitable exchanger with timeout support
public class AsyncExchanger<T>
{
private ExchangerAwaiter _awaiter;
private class ExchangerAwaiter : IAwaiter<T>
{
private const int PENDING = 0;
private const int DONE = 1;
private const int TIMEOUT = 2;
private readonly AsyncExchanger<T> _asyncExchanger;
private T _instance;
private Action _continuation;
private volatile int _status = PENDING;
public ExchangerAwaiter(AsyncExchanger<T> asyncExchanger, T instance)
{
_asyncExchanger = asyncExchanger;
_instance = instance;
}
public bool BeginAwait(Action continuation)
{
_continuation = continuation;
do
{
ExchangerAwaiter you = _asyncExchanger._awaiter;
if (you != null
&& (you = Interlocked.Exchange(ref _asyncExchanger._awaiter, null)) != null
&& you.satisfy(ref _instance))
{
_status = DONE;
return false;
}
if (Interlocked.CompareExchange(ref _asyncExchanger._awaiter, this, null) == null)
{
return true;
}
} while (true);
}
public T EndAwait()
{
Debug.Assert(_status != PENDING);
return _instance;
}
public bool TryCancelAwait(out Action continuation)
{
continuation = _continuation;
if (change_state_to(TIMEOUT))
{
Interlocked.CompareExchange(ref _asyncExchanger._awaiter, null, this);
return true;
}
return false;
}
public IAwaiter<T> GetAwaiter()
{
return this;
}
private bool satisfy(ref T instance)
{
if (change_state_to(DONE) == false)
{
return false;
}
var myInstance = _instance;
_instance = instance;
instance = myInstance;
TaskEx.Run(_continuation);
return true;
}
private bool change_state_to(int state)
{
return Interlocked.CompareExchange(ref _status, state, PENDING) == PENDING;
}
}
public IAwaiter<T> Exchange(T instance)
{
return new ExchangerAwaiter(this, instance);
}
}
public static class AwaiterExtensions
{
public static IAwaiter<T> WithTimeout<T>(this IAwaiter<T> awaiter, int timeout)
{
return new AwaiterWithTimer<T>(awaiter, timeout);
}
}
public class AwaiterWithTimer<T> : IAwaiter<T>
{
private readonly IAwaiter<T> _innerAwaiter;
private readonly int _timeout;
private readonly Timer _timer;
private volatile bool _timedOut;
public AwaiterWithTimer(IAwaiter<T> innerAwaiter, int timeout)
{
_innerAwaiter = innerAwaiter;
_timeout = timeout;
_timer = new Timer(_ =>
{
_timer.Dispose();
Action continuation;
if (_innerAwaiter.TryCancelAwait(out continuation))
{
_timedOut = true;
continuation();
}
});
}
public bool BeginAwait(Action continuation)
{
if (_innerAwaiter.BeginAwait(continuation))
{
_timer.Change(_timeout, Timeout.Infinite);
return true;
}
return false;
}
public IAwaiter<T> GetAwaiter()
{
return this;
}
public T EndAwait()
{
if (_timedOut)
{
throw new TimeoutException();
}
return _innerAwaiter.EndAwait();
}
public bool TryCancelAwait(out Action continuation)
{
return _innerAwaiter.TryCancelAwait(out continuation);
}
}
public interface IAwaiter<out T>
{
bool BeginAwait(Action continuation);
T EndAwait();
bool TryCancelAwait(out Action continuation);
IAwaiter<T> GetAwaiter();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment