Skip to content

Instantly share code, notes, and snippets.

@noseratio
Last active February 2, 2022 03:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save noseratio/22a291e1d69f6d1cea547623ad9c9147 to your computer and use it in GitHub Desktop.
Save noseratio/22a291e1d69f6d1cea547623ad9c9147 to your computer and use it in GitHub Desktop.
Single thread event loop scheduler, based on Rx EventLoopScheduler but with added SynchronizationContext and TPL TaskScheduler API
namespace Noseratio;
/// <summary>
/// Single thread event loop scheduler, based on Rx <seealso cref="EventLoopScheduler"/>
/// but with added SynchronizationContext and TPL TaskScheduler API
/// https://gist.github.com/noseratio/22a291e1d69f6d1cea547623ad9c9147
/// </summary>
public sealed partial class EventLoopThreadScheduler :
TaskScheduler,
IScheduler,
IAsyncDisposable
{
private readonly EventLoopScheduler _rxScheduler;
private readonly Task _completionTask;
private readonly Thread _thread;
private readonly TaskCompletionSource<DBNull> _threadEndTcs;
public EventLoopThreadScheduler()
{
var syncContextSchedulerTcs = new TaskCompletionSource<IScheduler>(TaskCreationOptions.RunContinuationsAsynchronously);
var eventLoopRoutineTcs = new TaskCompletionSource<ThreadStart>(TaskCreationOptions.RunContinuationsAsynchronously);
_threadEndTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
// create a thread but don't start it
_thread = new(ThreadStart)
{
IsBackground = true,
Name = nameof(EventLoopThreadScheduler)
};
// Rx EventLoopScheduler will start our thread on-demand
_rxScheduler = new EventLoopScheduler(
eventLoopRoutine =>
{
if (!this.IsThreadUnstarted())
{
throw new InvalidOperationException($"The {_thread.Name} thread has already started.");
}
eventLoopRoutineTcs.SetResult(eventLoopRoutine);
return _thread;
});
// provide IScheduler to the thread routine,
// to wrap it as the thread's synchronization context
// (see SyncContext below)
syncContextSchedulerTcs.SetResult(this);
// wrap the thread completion moment as a Task,
// to reliably await it in DisposeAsync
// and catch any leaked thread routine exceptions
_completionTask = _threadEndTcs.Task.ContinueWith(
continuationFunction: anteTask =>
{
if (_thread.IsAlive)
{
_thread.Join();
}
return anteTask;
},
cancellationToken: CancellationToken.None,
TaskContinuationOptions.RunContinuationsAsynchronously,
scheduler: TaskScheduler.Default)
.Unwrap();
// the actual thread routine, where we just delegate
// to Rx EventLoopScheduler
void ThreadStart()
{
var scheduler = syncContextSchedulerTcs.Task.GetAwaiter().GetResult();
SynchronizationContext.SetSynchronizationContext(new SyncContext(scheduler));
try
{
// run the event loop routine provided by Rx
var eventLoopRoutine = eventLoopRoutineTcs.Task.GetAwaiter().GetResult();
eventLoopRoutine.Invoke();
_threadEndTcs.SetResult(DBNull.Value);
}
catch (Exception ex)
{
_threadEndTcs.SetException(ex);
}
finally
{
SynchronizationContext.SetSynchronizationContext(null);
}
}
}
private bool IsThreadUnstarted() =>
(_thread.ThreadState & System.Threading.ThreadState.Unstarted) != 0;
#region IScheduler members
public IDisposable Schedule<TState>(
TState state,
Func<IScheduler, TState, IDisposable> action)
=> _rxScheduler.Schedule(state, action);
public IDisposable Schedule<TState>(
TState state,
TimeSpan dueTime,
Func<IScheduler, TState, IDisposable> action)
=> _rxScheduler.Schedule(state, dueTime, action);
public IDisposable Schedule<TState>(
TState state,
DateTimeOffset dueTime,
Func<IScheduler, TState, IDisposable> action)
=> _rxScheduler.Schedule(state, dueTime, action);
public DateTimeOffset Now => _rxScheduler.Now;
#endregion
#region TaskScheduler members
public override int MaximumConcurrencyLevel => 1;
protected override IEnumerable<Task>? GetScheduledTasks() =>
throw new NotSupportedException();
protected override void QueueTask(Task task) =>
_rxScheduler.Schedule(() => base.TryExecuteTask(task));
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) =>
Thread.CurrentThread == _thread && base.TryExecuteTask(task);
#endregion
#region IAsyncDisposable members
public ValueTask DisposeAsync()
{
_rxScheduler.Dispose();
if (this.IsThreadUnstarted())
{
_threadEndTcs.TrySetResult(DBNull.Value);
}
return new ValueTask(_completionTask);
}
#endregion
}
partial class EventLoopThreadScheduler
{
#region SyncContext
// wrap IScheduler as SynchronizationContext
private class SyncContext : SynchronizationContext
{
private readonly IScheduler _scheduler;
public SyncContext(IScheduler scheduler) =>
_scheduler = scheduler;
public override void Send(SendOrPostCallback callback, object? state) =>
throw new NotImplementedException(nameof(Send));
public override void Post(SendOrPostCallback callback, object? state)
{
try
{
_scheduler.Schedule(() => callback(state));
}
catch (Exception ex)
{
// probably an ObjectDisposedException exception
// rethrow anyway on thread pool, to avoid neglecting it
ThreadPool.QueueUserWorkItem(
static (edi) => edi.Throw(),
ExceptionDispatchInfo.Capture(ex),
preferLocal: true);
}
}
public override SynchronizationContext CreateCopy() => this;
}
#endregion
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment