Skip to content

Instantly share code, notes, and snippets.

@noseratio
Created September 16, 2020 11:29
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save noseratio/5d2d5f2a0cbb71b7880ce731c3958e62 to your computer and use it in GitHub Desktop.
Save noseratio/5d2d5f2a0cbb71b7880ce731c3958e62 to your computer and use it in GitHub Desktop.
Continue on the specified task scheduler, which becomes the current one
// by @noseratio
#nullable enable
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace Noseratio.Experimental
{
public static class TaskSchedulerExtensions
{
/// <summary>
/// Continue on the specified task scheduler, which becomes the current one
/// Inspired by <see cref="https://github.com/dotnet/runtime/issues/20025"/>this GitHub issue</see>.
/// </summary>
/// <param name="@this">A task scheduler instance, e.g., <c>TaskScheduler.Default</c></param>
/// <param name="alwaysSchedule">Always use the task scheduler to queue the continuations,
/// even if it can be executed synchronously.
/// </param>
/// <example>
/// <code>
/// await await TaskScheduler.Default.SwitchTo(alwaysSchedule: true);
/// </code>
/// </example>
/// <returns></returns>
public static TaskSchedulerAwaitable SwitchTo(this TaskScheduler @this, bool alwaysSchedule = false)
{
return new TaskSchedulerAwaitable(@this, alwaysSchedule);
}
public struct TaskSchedulerAwaiter : System.Runtime.CompilerServices.ICriticalNotifyCompletion
{
private readonly TaskScheduler _scheduler;
private bool _alwaysSchedule;
public TaskSchedulerAwaiter(TaskScheduler scheduler, bool alwaysSchedule = false)
{
_scheduler = scheduler;
_alwaysSchedule = alwaysSchedule;
}
private void Schedule(Action continuation)
{
Task.Factory.StartNew(
continuation,
CancellationToken.None,
TaskCreationOptions.None,
_scheduler);
}
public bool IsCompleted =>
// optimize if already on the default task scheduler
// and on a thread pool thread without sync context
!_alwaysSchedule &&
_scheduler == TaskScheduler.Default &&
TaskScheduler.Current == TaskScheduler.Default &&
Thread.CurrentThread.IsThreadPoolThread &&
SynchronizationContext.Current == null;
public void GetResult()
{
}
// a safe version that has to flow the execution context
public void OnCompleted(Action continuation)
{
throw new NotImplementedException(nameof(OnCompleted));
}
// an unsafe version that doesn't have to flow the execution context
public void UnsafeOnCompleted(Action continuation)
{
// use ThreadPool.UnsafeQueueUserWorkItem to optimize for TaskScheduler.Default
if (_scheduler == TaskScheduler.Default)
{
ThreadPool.UnsafeQueueUserWorkItem(
c => ((Action)c!).Invoke(),
continuation,
preferLocal: true);
return;
}
// use Task.Factory.StartNew for all non-default task schedulers
if (ExecutionContext.IsFlowSuppressed())
{
Schedule(continuation);
return;
}
// suppress execution context flow
ExecutionContext.SuppressFlow();
try
{
Schedule(continuation);
}
finally
{
ExecutionContext.RestoreFlow();
}
}
}
public struct TaskSchedulerAwaitable
{
private readonly TaskSchedulerAwaiter _awaiter;
public TaskSchedulerAwaitable(TaskScheduler scheduler, bool alwaysSchedule = false)
{
_awaiter = new TaskSchedulerAwaiter(scheduler, alwaysSchedule);
}
public TaskSchedulerAwaiter GetAwaiter()
{
return _awaiter;
}
}
}
/// <summary>
/// Testing TaskScheduler.Default.SwitchTo
/// </summary>
class Program
{
public class CustomTaskScheduler : TaskScheduler
{
protected override IEnumerable<Task>? GetScheduledTasks()
{
throw new NotImplementedException();
}
protected override void QueueTask(Task task)
{
ThreadPool.QueueUserWorkItem(t =>
base.TryExecuteTask((Task)t!), task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false;
}
}
static async Task Main()
{
await TaskScheduler.Default.SwitchTo(alwaysSchedule: true);
Trace.Assert(TaskScheduler.Current == TaskScheduler.Default);
var scheduler = new CustomTaskScheduler();
await scheduler.SwitchTo();
Trace.Assert(TaskScheduler.Current == scheduler);
var tcs = new TaskCompletionSource<bool>();
ThreadPool.QueueUserWorkItem(_ =>
{
Debug.Assert(TaskScheduler.Current == TaskScheduler.Default);
tcs.SetResult(true);
});
await tcs.Task;
Trace.Assert(TaskScheduler.Current == scheduler);
await TaskScheduler.Default.SwitchTo();
Trace.Assert(TaskScheduler.Current == TaskScheduler.Default);
}
}
}
@AArnott
Copy link

AArnott commented Sep 30, 2020

Per the docs you shouldn't call RestoreFlow.

@noseratio
Copy link
Author

Per the docs you shouldn't call RestoreFlow.

Hi @AArnott, sorry I've only just noticed your comment. I actually borrowed this idea of calling RestoreFlow directly from .NET Core sources, particularly from here. I think the idea was to avoid creating AsyncFlowControl for a short synchronous scope.

But now I believe we shouldn't even be suppressing it for .NET Core, inline with Stephen's notes?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment