Single thread event loop scheduler, based on Rx EventLoopScheduler but with added SynchronizationContext and TPL TaskScheduler API
namespace Noseratio;
/// <summary>
/// Single thread event loop scheduler, based on Rx <seealso cref="EventLoopScheduler"/>
/// but with added SynchronizationContext and TPL TaskScheduler API
/// </summary>
public sealed partial class EventLoopThreadScheduler :
private readonly EventLoopScheduler _rxScheduler;
private readonly Task _completionTask;
private readonly Thread _thread;
private readonly TaskCompletionSource<DBNull> _threadEndTcs;
public EventLoopThreadScheduler()
var syncContextSchedulerTcs = new TaskCompletionSource<IScheduler>(TaskCreationOptions.RunContinuationsAsynchronously);
var eventLoopRoutineTcs = new TaskCompletionSource<ThreadStart>(TaskCreationOptions.RunContinuationsAsynchronously);
_threadEndTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
_thread = new(ThreadStart)
IsBackground = true,
Name = nameof(EventLoopThreadScheduler)
_rxScheduler = new EventLoopScheduler(
eventLoopRoutine =>
if (!this.IsThreadUnstarted())
throw new InvalidOperationException($"The {_thread.Name} thread has already started.");
return _thread;
_completionTask = _threadEndTcs.Task.ContinueWith(
continuationFunction: anteTask =>
if (_thread.IsAlive)
return anteTask;
cancellationToken: CancellationToken.None,
scheduler: TaskScheduler.Default)
void ThreadStart()
var scheduler = syncContextSchedulerTcs.Task.GetAwaiter().GetResult();
SynchronizationContext.SetSynchronizationContext(new SyncContext(scheduler));
var eventLoopRoutine = eventLoopRoutineTcs.Task.GetAwaiter().GetResult();
catch (Exception ex)
private bool IsThreadUnstarted() =>
(_thread.ThreadState & System.Threading.ThreadState.Unstarted) != 0;
#region IScheduler members
public IDisposable Schedule<TState>(
TState state,
Func<IScheduler, TState, IDisposable> action)
=> _rxScheduler.Schedule(state, action);
public IDisposable Schedule<TState>(
TState state,
TimeSpan dueTime,
Func<IScheduler, TState, IDisposable> action)
=> _rxScheduler.Schedule(state, dueTime, action);
public IDisposable Schedule<TState>(
TState state,
DateTimeOffset dueTime,
Func<IScheduler, TState, IDisposable> action)
=> _rxScheduler.Schedule(state, dueTime, action);
public DateTimeOffset Now => _rxScheduler.Now;
#region TaskScheduler members
public override int MaximumConcurrencyLevel => 1;
protected override IEnumerable<Task>? GetScheduledTasks() =>
throw new NotSupportedException();
protected override void QueueTask(Task task) =>
_rxScheduler.Schedule(() => base.TryExecuteTask(task));
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) =>
Thread.CurrentThread == _thread && base.TryExecuteTask(task);
#region IAsyncDisposable members
public ValueTask DisposeAsync()
if (this.IsThreadUnstarted())
return new ValueTask(_completionTask);
partial class EventLoopThreadScheduler
#region SyncContext
private class SyncContext : SynchronizationContext
private readonly IScheduler _scheduler;
public SyncContext(IScheduler scheduler) =>
_scheduler = scheduler;
public override void Send(SendOrPostCallback callback, object? state) =>
throw new NotImplementedException(nameof(Send));
public override void Post(SendOrPostCallback callback, object? state)
_scheduler.Schedule(() => callback(state));
catch (Exception ex)
// probably an ObjectDisposedException exception
// rethrow anyway on thread pool, to avoid neglecting it
static (edi) => edi.Throw(),
preferLocal: true);
public override SynchronizationContext CreateCopy() => this;
