Last active
February 2, 2022 03:15
-
-
Save noseratio/22a291e1d69f6d1cea547623ad9c9147 to your computer and use it in GitHub Desktop.
Single thread event loop scheduler, based on Rx EventLoopScheduler but with added SynchronizationContext and TPL TaskScheduler API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Noseratio; | |
/// <summary> | |
/// Single thread event loop scheduler, based on Rx <seealso cref="EventLoopScheduler"/> | |
/// but with added SynchronizationContext and TPL TaskScheduler API | |
/// https://gist.github.com/noseratio/22a291e1d69f6d1cea547623ad9c9147 | |
/// </summary> | |
public sealed partial class EventLoopThreadScheduler : | |
TaskScheduler, | |
IScheduler, | |
IAsyncDisposable | |
{ | |
private readonly EventLoopScheduler _rxScheduler; | |
private readonly Task _completionTask; | |
private readonly Thread _thread; | |
private readonly TaskCompletionSource<DBNull> _threadEndTcs; | |
public EventLoopThreadScheduler() | |
{ | |
var syncContextSchedulerTcs = new TaskCompletionSource<IScheduler>(TaskCreationOptions.RunContinuationsAsynchronously); | |
var eventLoopRoutineTcs = new TaskCompletionSource<ThreadStart>(TaskCreationOptions.RunContinuationsAsynchronously); | |
_threadEndTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); | |
// create a thread but don't start it | |
_thread = new(ThreadStart) | |
{ | |
IsBackground = true, | |
Name = nameof(EventLoopThreadScheduler) | |
}; | |
// Rx EventLoopScheduler will start our thread on-demand | |
_rxScheduler = new EventLoopScheduler( | |
eventLoopRoutine => | |
{ | |
if (!this.IsThreadUnstarted()) | |
{ | |
throw new InvalidOperationException($"The {_thread.Name} thread has already started."); | |
} | |
eventLoopRoutineTcs.SetResult(eventLoopRoutine); | |
return _thread; | |
}); | |
// provide IScheduler to the thread routine, | |
// to wrap it as the thread's synchronization context | |
// (see SyncContext below) | |
syncContextSchedulerTcs.SetResult(this); | |
// wrap the thread completion moment as a Task, | |
// to reliably await it in DisposeAsync | |
// and catch any leaked thread routine exceptions | |
_completionTask = _threadEndTcs.Task.ContinueWith( | |
continuationFunction: anteTask => | |
{ | |
if (_thread.IsAlive) | |
{ | |
_thread.Join(); | |
} | |
return anteTask; | |
}, | |
cancellationToken: CancellationToken.None, | |
TaskContinuationOptions.RunContinuationsAsynchronously, | |
scheduler: TaskScheduler.Default) | |
.Unwrap(); | |
// the actual thread routine, where we just delegate | |
// to Rx EventLoopScheduler | |
void ThreadStart() | |
{ | |
var scheduler = syncContextSchedulerTcs.Task.GetAwaiter().GetResult(); | |
SynchronizationContext.SetSynchronizationContext(new SyncContext(scheduler)); | |
try | |
{ | |
// run the event loop routine provided by Rx | |
var eventLoopRoutine = eventLoopRoutineTcs.Task.GetAwaiter().GetResult(); | |
eventLoopRoutine.Invoke(); | |
_threadEndTcs.SetResult(DBNull.Value); | |
} | |
catch (Exception ex) | |
{ | |
_threadEndTcs.SetException(ex); | |
} | |
finally | |
{ | |
SynchronizationContext.SetSynchronizationContext(null); | |
} | |
} | |
} | |
private bool IsThreadUnstarted() => | |
(_thread.ThreadState & System.Threading.ThreadState.Unstarted) != 0; | |
#region IScheduler members | |
public IDisposable Schedule<TState>( | |
TState state, | |
Func<IScheduler, TState, IDisposable> action) | |
=> _rxScheduler.Schedule(state, action); | |
public IDisposable Schedule<TState>( | |
TState state, | |
TimeSpan dueTime, | |
Func<IScheduler, TState, IDisposable> action) | |
=> _rxScheduler.Schedule(state, dueTime, action); | |
public IDisposable Schedule<TState>( | |
TState state, | |
DateTimeOffset dueTime, | |
Func<IScheduler, TState, IDisposable> action) | |
=> _rxScheduler.Schedule(state, dueTime, action); | |
public DateTimeOffset Now => _rxScheduler.Now; | |
#endregion | |
#region TaskScheduler members | |
public override int MaximumConcurrencyLevel => 1; | |
protected override IEnumerable<Task>? GetScheduledTasks() => | |
throw new NotSupportedException(); | |
protected override void QueueTask(Task task) => | |
_rxScheduler.Schedule(() => base.TryExecuteTask(task)); | |
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => | |
Thread.CurrentThread == _thread && base.TryExecuteTask(task); | |
#endregion | |
#region IAsyncDisposable members | |
public ValueTask DisposeAsync() | |
{ | |
_rxScheduler.Dispose(); | |
if (this.IsThreadUnstarted()) | |
{ | |
_threadEndTcs.TrySetResult(DBNull.Value); | |
} | |
return new ValueTask(_completionTask); | |
} | |
#endregion | |
} | |
partial class EventLoopThreadScheduler | |
{ | |
#region SyncContext | |
// wrap IScheduler as SynchronizationContext | |
private class SyncContext : SynchronizationContext | |
{ | |
private readonly IScheduler _scheduler; | |
public SyncContext(IScheduler scheduler) => | |
_scheduler = scheduler; | |
public override void Send(SendOrPostCallback callback, object? state) => | |
throw new NotImplementedException(nameof(Send)); | |
public override void Post(SendOrPostCallback callback, object? state) | |
{ | |
try | |
{ | |
_scheduler.Schedule(() => callback(state)); | |
} | |
catch (Exception ex) | |
{ | |
// probably an ObjectDisposedException exception | |
// rethrow anyway on thread pool, to avoid neglecting it | |
ThreadPool.QueueUserWorkItem( | |
static (edi) => edi.Throw(), | |
ExceptionDispatchInfo.Capture(ex), | |
preferLocal: true); | |
} | |
} | |
public override SynchronizationContext CreateCopy() => this; | |
} | |
#endregion | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment