Skip to content

Instantly share code, notes, and snippets.

@natalie-o-perret
Last active June 16, 2020 21:52
Show Gist options
  • Save natalie-o-perret/50495893909b0ddf5e6c668bd74cd450 to your computer and use it in GitHub Desktop.
Save natalie-o-perret/50495893909b0ddf5e6c668bd74cd450 to your computer and use it in GitHub Desktop.
Kevin Goss - Design and Code Tasks from Scratch
using System;
using System.Threading;
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
namespace CSharpStuff
{
// Alternative to referencing Microsoft.VisualStudio.Threading
public class SingleThreadedSynchronizationContext : SynchronizationContext
{
private readonly BlockingCollection<Action> _queue = new BlockingCollection<Action>();
public SingleThreadedSynchronizationContext()
{
var thread = new Thread(() =>
{
SetSynchronizationContext(this);
foreach (var callback in _queue.GetConsumingEnumerable())
{
callback();
}
});
thread.Start();
}
public override void Post(SendOrPostCallback d, object state) => _queue.Add(() => d(state));
public override void Send(SendOrPostCallback d, object state)
{
using var mutex = new ManualResetEventSlim();
_queue.Add(() =>
{
d(state);
// ReSharper disable once AccessToDisposedClosure
mutex.Set();
});
mutex.Wait();
}
}
public static class Helpers
{
public static void DisplayCurrentSynchronizationContext()
{
Console.Write($"{nameof(SynchronizationContext)}.{nameof(SynchronizationContext.Current)}: ");
Console.WriteLine(SynchronizationContext.Current != null
? $"{SynchronizationContext.Current.GetType().Name}"
: "null");
}
}
public class ExampleA
{
private Future<int> DoSomethingAsync()
{
var promise = new Promise<int>();
var thread = new Thread(() =>
{
Thread.Sleep(100);
promise.Complete(42);
});
thread.Start();
return promise.Future;
}
public void Run()
{
Console.WriteLine($"{nameof(ExampleA)}: Start");
var future = DoSomethingAsync();
var future1 = future.ContinueWith(f =>
{
Thread.Sleep(1000);
Console.WriteLine($"First future completed with value {f.Result}");
return f.Result * 2;
})
.ContinueWith(f =>
{
Console.WriteLine($"Second future completed with value {f.Result}");
});
// Should not wait the stuff above to complete to execute
var future2 = future.ContinueWith(f =>
{
Console.WriteLine("Second continuation");
});
future.Wait();
Console.WriteLine($"{nameof(future)}.{nameof(future.Result)} = {future.Result}");
// Just making things a lil' more deterministic
future2.Wait();
future1.Wait();
Console.WriteLine($"{nameof(ExampleA)}: Done");
Console.WriteLine();
}
}
public class ExampleB
{
private Future Delay(int ms)
{
var promise = new Promise();
var thread = new Thread(() =>
{
Thread.Sleep(ms);
promise.Complete();
});
thread.Start();
return promise.Future;
}
private Future<int> DoSomething1Async() =>
Delay(500).ContinueWith(_ => 1);
private Future<int> DoSomething2Async() =>
Delay(500).ContinueWith(_ => 10);
private Future<int> DoSomething3Async() =>
Delay(500).ContinueWith(_ => 100);
private Future<int> DoSomethingElseAsync(int input) =>
Delay(500).ContinueWith(_ => input * 2);
// Compiler rewrites the code above as something below:
private Future<int> CallAsync()
{
var stateMachine = new StateMachine { This = this };
// Actually here, it call starts on the builder,
// => the builder can perform some initialization work
stateMachine.MoveNext();
return stateMachine.Builder.Task;
}
// Nested: cause need to access methods you're calling (ie. even if they're private)
// In debug mode the struct can be converted to a class so that the tracking is easier when debugging.
private struct StateMachine : IAsyncStateMachine
{
private int _state;
// Reference to the parent class if methods are not static
public ExampleB This;
public FutureBuilder<int> Builder;
private FutureAwaiter<int> _future;
private int _i;
private int _j;
private int _k;
public void SetStateMachine(IAsyncStateMachine stateMachine)
{
Builder.SetStateMachine(stateMachine);
}
public void MoveNext()
{
switch (_state)
{
case 0:
{
_state = 1;
_future = This.DoSomething1Async().GetAwaiter();
// Optimization
if (_future.IsCompleted)
{
goto case 1;
}
Builder.AwaitOnCompleted(ref _future, ref this);
return;
}
case 1:
{
_state = 2;
_i = _future.GetResult();
_future = This.DoSomething2Async().GetAwaiter();
// Optimization
if (_future.IsCompleted)
{
goto case 2;
}
Builder.AwaitOnCompleted(ref _future, ref this);
return;
}
case 2:
{
_state = 3;
_j = _future.GetResult();
_future = This.DoSomething3Async().GetAwaiter();
// Optimization
if (_future.IsCompleted)
{
goto case 3;
}
Builder.AwaitOnCompleted(ref _future, ref this);
return;
}
case 3:
{
_state = 4;
_k = _future.GetResult();
_future = This.DoSomethingElseAsync(_i + _j + _k).GetAwaiter();
// Optimization
if (_future.IsCompleted)
{
goto case 4;
}
Builder.AwaitOnCompleted(ref _future, ref this);
return;
}
case 4:
{
Builder.SetResult(_future.GetResult());
return;
}
}
}
}
public void Run()
{
Console.WriteLine($"{nameof(ExampleB)}: Start");
var future = CallAsync();
future.Wait();
Console.WriteLine(future.Result);
Console.WriteLine($"{nameof(ExampleB)}: Done");
Console.WriteLine();
}
}
public class ExampleC
{
private Future Delay(int ms)
{
var promise = new Promise();
var thread = new Thread(() =>
{
Thread.Sleep(ms);
promise.Complete();
});
thread.Start();
return promise.Future;
}
private Future<int> DoSomethingAsync1Async() =>
Delay(500).ContinueWith(_ => 1);
private Future<int> DoSomethingAsync2Async() =>
Delay(500).ContinueWith(_ => 10);
private Future<int> DoSomethingAsync3Async() =>
Delay(500).ContinueWith(_ => 100);
private Future<int> DoSomethingElseAsync(int input) =>
Delay(500).ContinueWith(_ => input * 2);
private async Future<int> CallAsync()
{
Helpers.DisplayCurrentSynchronizationContext();
var i = await DoSomethingAsync1Async();
Helpers.DisplayCurrentSynchronizationContext();
var j = await DoSomethingAsync2Async();
Helpers.DisplayCurrentSynchronizationContext();
var k = await DoSomethingAsync3Async();
Helpers.DisplayCurrentSynchronizationContext();
return await DoSomethingElseAsync(i + j + k);
}
public void Run()
{
Console.WriteLine($"{nameof(ExampleC)}: Start");
var future = CallAsync();
future.Wait();
Console.WriteLine(future.Result);
Console.WriteLine($"{nameof(ExampleC)}: Done");
Console.WriteLine();
}
}
public class ExampleD
{
private Future Delay(int ms)
{
var promise = new Promise();
var thread = new Thread(() =>
{
Thread.Sleep(ms);
promise.Complete();
});
thread.Start();
return promise.Future;
}
private Future<int> DoSomethingAsync1Async()
{
return Delay(500).ContinueWith(_ => 1);
}
private Future<int> DoSomethingAsync2Async()
{
return Delay(500).ContinueWith(_ => 10);
}
private Future<int> DoSomethingAsync3Async() =>
Delay(500).ContinueWith(_ => 100);
private Future<int> DoSomethingElseAsync(int input) =>
Delay(500).ContinueWith(_ => input * 2);
private async Future<int> CallAsync()
{
Helpers.DisplayCurrentSynchronizationContext();
var i = await DoSomethingAsync1Async().ConfigureAwait(false);
Helpers.DisplayCurrentSynchronizationContext();
var j = await DoSomethingAsync2Async();
Helpers.DisplayCurrentSynchronizationContext();
var k = await DoSomethingAsync3Async();
Helpers.DisplayCurrentSynchronizationContext();
return await DoSomethingElseAsync(i + j + k);
}
public void Run()
{
Console.WriteLine($"{nameof(ExampleD)}: Start");
// Too lazy to copy paste MS code here and filter what we can't really get access to
// => From Microsoft.VisualStudio.Threading
// => Fake UI behaviour (WPF, Winforms stuff, etc.)
var synchronizationContext = new SingleThreadedSynchronizationContext();
synchronizationContext.Send(_ =>
{
var future = CallAsync();
future.Wait();
Console.WriteLine(future.Result);
}, null);
Console.WriteLine($"{nameof(ExampleD)}: Done");
Console.WriteLine();
}
}
// ReSharper disable once UnusedTypeParameter
public interface IAwaiter<out T> : INotifyCompletion
{
// T GetResult();
// bool IsCompleted { get; }
// To Chain a continuation
// void OnCompleted(Action action);
}
// An awaiter relying on spoiler alert... : Future<T>
public readonly struct FutureAwaiter<T> : IAwaiter<T>
{
private readonly Future<T> _future;
private readonly bool _captureContext;
public FutureAwaiter(Future<T> future, bool captureContext)
{
_future = future;
_captureContext = captureContext;
}
public T GetResult() => _future.Result;
public bool IsCompleted => _future.IsCompleted;
public void OnCompleted(Action action)
{
SynchronizationContext currentSynchronizationContext = null;
if (_captureContext)
{
currentSynchronizationContext = SynchronizationContext.Current;
}
_future.ContinueWith(_ =>
{
if (currentSynchronizationContext != null)
{
// ReSharper disable once VSTHRD001
currentSynchronizationContext.Send(s => action(), null);
}
else
{
action();
}
});
}
}
public static class FutureExtensions
{
public static FutureAwaiter<T> GetAwaiter<T>(this Future<T> future) =>
new FutureAwaiter<T>(future, true);
public static ConfigureFutureAwaitable<T> ConfigureAwait<T>(this Future<T> future, bool captureContext) =>
new ConfigureFutureAwaitable<T>(future, captureContext);
}
public readonly struct ConfigureFutureAwaitable<T>
{
private readonly Future<T> _future;
private readonly bool _capturedContext;
public ConfigureFutureAwaitable(Future<T> future, bool capturedContext)
{
_future = future;
_capturedContext = capturedContext;
}
public FutureAwaiter<T> GetAwaiter() =>
new FutureAwaiter<T>(_future, _capturedContext);
}
public struct FutureBuilder<T>
{
private Future<T> _future;
// Store boxed state machine, so that it happens only once.
private IAsyncStateMachine _stateMachine;
// Lazy initialization
public Future<T> Task =>
_future ??= new Future<T>();
public static FutureBuilder<T> Create() =>
new FutureBuilder<T>();
public void Start<TStateMachine>(ref TStateMachine stateMachine)
where TStateMachine : IAsyncStateMachine =>
stateMachine.MoveNext();
// Note: we don't really have exception in our case
// => oversimplified impl.
public void SetException(Exception exception) =>
throw new NotImplementedException();
public void SetResult(T result)
{
Task.Result = result;
Task.IsCompleted = true;
}
// TAwaiter + TAsyncStateMachine => avoid additional boxings and hence heap allocations
// ref => avoid making copies of value type instances (and both structures can potentially be rather "big")
public void AwaitOnCompleted<TAwaiter, TAsyncStateMachine>(
ref TAwaiter awaiter, ref TAsyncStateMachine stateMachine)
where TAwaiter : IAwaiter<T>
where TAsyncStateMachine : IAsyncStateMachine
{
// Now it's time to force a future (ie. Task property) to show up (ie. remember that the container is a value type)
// Task;
// => Can't call the a property like that, so we can avoid GCtion with:
GC.KeepAlive(Task);
// Another alternative is to write something like:
// _future ??= new Future<T>();
if (_stateMachine == null)
{
Console.WriteLine("Boxing: should happen only once");
// Explicit cast much needed here, or _stateMachine will be "null-ed" every time.
var boxedStateMachine = (IAsyncStateMachine)stateMachine;
_stateMachine = boxedStateMachine;
boxedStateMachine.SetStateMachine(boxedStateMachine);
}
awaiter.OnCompleted(_stateMachine.MoveNext);
}
// In theory to support specifics about Execution Context
public void AwaitUnsafeOnCompleted<TAwaiter, TAsyncStateMachine>(
ref TAwaiter awaiter, ref TAsyncStateMachine stateMachine)
where TAwaiter : IAwaiter<T>
where TAsyncStateMachine : IAsyncStateMachine =>
AwaitOnCompleted(ref awaiter, ref stateMachine);
public void SetStateMachine(IAsyncStateMachine stateMachine) =>
_stateMachine = stateMachine;
}
public static class Program
{
public static void Main()
{
// .NET Terminology
// Promise => TCS
// Future => Task
// Calls a method => Create Promise => Returns associated Future
new ExampleA().Run();
new ExampleB().Run();
new ExampleC().Run();
new ExampleD().Run();
}
}
// Task Equivalent
public class Future
{
private readonly ConcurrentQueue<Future> _continuations = new ConcurrentQueue<Future>();
private readonly ManualResetEventSlim _mutex = new ManualResetEventSlim();
private bool _isCompleted;
public FutureScheduler Scheduler { get; }
internal Future(FutureScheduler scheduler = null)
{
Scheduler = scheduler;
}
public bool IsCompleted
{
get => _isCompleted;
internal set
{
_isCompleted = value;
if (value)
{
_mutex.Set();
InvokeContinuations();
}
}
}
// Convention: returned Future has completed when the callback is done
public Future ContinueWith(Action<Future> continuation, FutureScheduler scheduler = null)
{
var future = new FutureContinuation(continuation, this, scheduler);
AddContinuation(future);
return future;
}
// ReSharper disable once VSTHRD200
public Future<T> ContinueWith<T>(Func<Future, T> continuation, FutureScheduler scheduler = null)
{
var future = new FutureContinuation<T>(continuation, this, scheduler);
AddContinuation(future);
return future;
}
// Careful to call that one only when it makes sense
// Aka when there is a proper impl. available
internal virtual void Invoke() =>
throw new NotImplementedException();
internal void ScheduleAndStart() =>
Scheduler.QueueFuture(this);
private protected void AddContinuation(Future continuation)
{
// In case we're adding a continuation not already completed on a already completed future...
// (Avoid enqueueing something that is never gonna be dequeued)
if (IsCompleted)
{
continuation.ScheduleAndStart();
return;
}
_continuations.Enqueue(continuation);
}
private void InvokeContinuations()
{
if (_continuations.Count == 1)
{
_continuations.TryDequeue(out var continuation);
if (continuation != null && !continuation.Scheduler.TryExecuteFutureInline(continuation))
{
continuation.ScheduleAndStart();
}
return;
}
while (_continuations.TryDequeue(out var continuation))
{
continuation.ScheduleAndStart();
}
}
public void Wait() =>
_mutex.Wait();
}
// Task<T> Equivalent
// => Tells the compiler that whenever an asynchronous method returns a future, use the given builder.
[AsyncMethodBuilder(typeof(FutureBuilder<>))]
public class Future<T> : Future
{
public T Result { get; internal set; }
internal Future(FutureScheduler scheduler = null)
: base(scheduler)
{
}
public Future ContinueWith(Action<Future<T>> continuation, FutureScheduler scheduler = null)
{
// We cut corners here, we actually need to create 4 different types of continuation
// The impl. adds a heap allocation
var future = new FutureContinuation(f => continuation((Future<T>)f), this, scheduler);
AddContinuation(future);
return future;
}
// ReSharper disable once VSTHRD200
public Future<TResult> ContinueWith<TResult>(Func<Future<T>, TResult> continuation, FutureScheduler scheduler = null)
{
var future = new FutureContinuation<TResult>(f => continuation((Future<T>)f), this, scheduler);
AddContinuation(future);
return future;
}
}
// TaskScheduler Equivalent
public abstract class FutureScheduler
{
public static readonly FutureScheduler Default = new ThreadPoolFutureScheduler();
// Original: QueueTask Equivalent
protected internal abstract void QueueFuture(Future future);
protected void ExecuteFuture(Future future)
{
// Sanity Checks
if (future.Scheduler != this)
{
throw new InvalidOperationException();
}
future.Invoke();
}
// Original: TryExecuteTaskInline
protected internal virtual bool TryExecuteFutureInline(Future future)
{
return false;
}
}
public class ThreadPoolFutureScheduler : FutureScheduler
{
protected internal override void QueueFuture(Future future) =>
ThreadPool.QueueUserWorkItem(_ => ExecuteFuture(future));
protected internal override bool TryExecuteFutureInline(Future future)
{
Console.WriteLine("Inlining");
if (Thread.CurrentThread.IsThreadPoolThread)
{
ExecuteFuture(future);
return true;
}
return false;
}
}
// Merge the feature of Promise + Future into a single class
// Reduced number of (heap) allocations + easier impl. later on
// Can be improved (less allocations) by adding...
// ... two other missing and specific equivalentTPL methods (ie. total 4)
internal class FutureContinuation : Future
{
private readonly Future _parent;
private readonly Action<Future> _action;
public FutureContinuation(Action<Future> action, Future parent, FutureScheduler scheduler)
: base(scheduler ?? FutureScheduler.Default)
{
_action = action;
_parent = parent;
}
internal override void Invoke()
{
_action(_parent);
IsCompleted = true;
}
}
internal class FutureContinuation<T> : Future<T>
{
private readonly Future _parent;
private readonly Func<Future, T> _func;
public FutureContinuation(Func<Future, T> func, Future parent, FutureScheduler scheduler)
: base(scheduler ?? FutureScheduler.Default)
{
_func = func;
_parent = parent;
}
internal override void Invoke()
{
Result = _func(_parent);
IsCompleted = true;
}
}
// TCS Equivalent
public class Promise
{
public Future Future { get; }
public Promise() =>
Future = new Future();
public void Complete()
{
Future.IsCompleted = true;
}
}
// TCS<T> Equivalent
// Shamelessly copying-pasting Promise with a generic flavoring
public class Promise<T>
{
public Future<T> Future { get; }
public Promise() =>
Future = new Future<T>();
public void Complete(T result)
{
Future.Result = result;
Future.IsCompleted = true;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment