Created
May 7, 2019 15:31
-
-
Save buybackoff/82351815275f6f0026f654913cd7f544 to your computer and use it in GitHub Desktop.
ManualResetValueTaskSourceCore for net461
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Licensed to the .NET Foundation under one or more agreements. | |
// The .NET Foundation licenses this file to you under the MIT license. | |
// See the LICENSE file in the project root for more information. | |
using System.Diagnostics; | |
using System.Runtime.ExceptionServices; | |
using System.Runtime.InteropServices; | |
namespace System.Threading.Tasks.Sources | |
{ | |
/// <summary>Provides the core logic for implementing a manual-reset <see cref="IValueTaskSource"/> or <see cref="IValueTaskSource{TResult}"/>.</summary> | |
/// <typeparam name="TResult"></typeparam> | |
[StructLayout(LayoutKind.Auto)] | |
public struct ManualResetValueTaskSourceCore<TResult> | |
{ | |
/// <summary> | |
/// The callback to invoke when the operation completes if <see cref="OnCompleted"/> was called before the operation completed, | |
/// or <see cref="ManualResetValueTaskSourceCoreShared.s_sentinel"/> if the operation completed before a callback was supplied, | |
/// or null if a callback hasn't yet been provided and the operation hasn't yet completed. | |
/// </summary> | |
private Action<object> _continuation; | |
/// <summary>State to pass to <see cref="_continuation"/>.</summary> | |
private object _continuationState; | |
/// <summary><see cref="ExecutionContext"/> to flow to the callback, or null if no flowing is required.</summary> | |
private ExecutionContext _executionContext; | |
/// <summary> | |
/// A "captured" <see cref="SynchronizationContext"/> or <see cref="TaskScheduler"/> with which to invoke the callback, | |
/// or null if no special context is required. | |
/// </summary> | |
private object _capturedContext; | |
/// <summary>Whether the current operation has completed.</summary> | |
private bool _completed; | |
/// <summary>The result with which the operation succeeded, or the default value if it hasn't yet completed or failed.</summary> | |
private TResult _result; | |
/// <summary>The exception with which the operation failed, or null if it hasn't yet completed or completed successfully.</summary> | |
private ExceptionDispatchInfo _error; | |
/// <summary>The current version of this value, used to help prevent misuse.</summary> | |
private short _version; | |
/// <summary>Gets or sets whether to force continuations to run asynchronously.</summary> | |
/// <remarks>Continuations may run asynchronously if this is false, but they'll never run synchronously if this is true.</remarks> | |
public bool RunContinuationsAsynchronously { get; set; } | |
/// <summary>Resets to prepare for the next operation.</summary> | |
public void Reset() | |
{ | |
// Reset/update state for the next use/await of this instance. | |
_version++; | |
_completed = false; | |
_result = default; // TODO-NULLABLE-GENERIC | |
_error = null; | |
_executionContext = null; | |
_capturedContext = null; | |
_continuation = null; | |
_continuationState = null; | |
} | |
/// <summary>Completes with a successful result.</summary> | |
/// <param name="result">The result.</param> | |
public void SetResult(TResult result) | |
{ | |
_result = result; | |
SignalCompletion(); | |
} | |
/// <summary>Complets with an error.</summary> | |
/// <param name="error"></param> | |
public void SetException(Exception error) | |
{ | |
_error = ExceptionDispatchInfo.Capture(error); | |
SignalCompletion(); | |
} | |
/// <summary>Gets the operation version.</summary> | |
public short Version => _version; | |
/// <summary>Gets the status of the operation.</summary> | |
/// <param name="token">Opaque value that was provided to the <see cref="ValueTask"/>'s constructor.</param> | |
public ValueTaskSourceStatus GetStatus(short token) | |
{ | |
ValidateToken(token); | |
return | |
_continuation == null || !_completed ? ValueTaskSourceStatus.Pending : | |
_error == null ? ValueTaskSourceStatus.Succeeded : | |
_error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled : | |
ValueTaskSourceStatus.Faulted; | |
} | |
/// <summary>Gets the result of the operation.</summary> | |
/// <param name="token">Opaque value that was provided to the <see cref="ValueTask"/>'s constructor.</param> | |
public TResult GetResult(short token) | |
{ | |
ValidateToken(token); | |
if (!_completed) | |
{ | |
throw new InvalidOperationException(); | |
} | |
_error?.Throw(); | |
return _result; | |
} | |
/// <summary>Schedules the continuation action for this operation.</summary> | |
/// <param name="continuation">The continuation to invoke when the operation has completed.</param> | |
/// <param name="state">The state object to pass to <paramref name="continuation"/> when it's invoked.</param> | |
/// <param name="token">Opaque value that was provided to the <see cref="ValueTask"/>'s constructor.</param> | |
/// <param name="flags">The flags describing the behavior of the continuation.</param> | |
public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) // TODO-NULLABLE: https://github.com/dotnet/roslyn/issues/26761 | |
{ | |
if (continuation == null) | |
{ | |
throw new ArgumentNullException(nameof(continuation)); | |
} | |
ValidateToken(token); | |
if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0) | |
{ | |
_executionContext = ExecutionContext.Capture(); | |
} | |
if ((flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0) | |
{ | |
SynchronizationContext sc = SynchronizationContext.Current; | |
if (sc != null && sc.GetType() != typeof(SynchronizationContext)) | |
{ | |
_capturedContext = sc; | |
} | |
else | |
{ | |
TaskScheduler ts = TaskScheduler.Current; | |
if (ts != TaskScheduler.Default) | |
{ | |
_capturedContext = ts; | |
} | |
} | |
} | |
// We need to set the continuation state before we swap in the delegate, so that | |
// if there's a race between this and SetResult/Exception and SetResult/Exception | |
// sees the _continuation as non-null, it'll be able to invoke it with the state | |
// stored here. However, this also means that if this is used incorrectly (e.g. | |
// awaited twice concurrently), _continuationState might get erroneously overwritten. | |
// To minimize the chances of that, we check preemptively whether _continuation | |
// is already set to something other than the completion sentinel. | |
object oldContinuation = _continuation; | |
if (oldContinuation == null) | |
{ | |
_continuationState = state; | |
oldContinuation = Interlocked.CompareExchange(ref _continuation, continuation, null); | |
} | |
if (oldContinuation != null) | |
{ | |
// Operation already completed, so we need to queue the supplied callback. | |
if (!ReferenceEquals(oldContinuation, ManualResetValueTaskSourceCoreShared.s_sentinel)) | |
{ | |
throw new InvalidOperationException(); | |
} | |
switch (_capturedContext) | |
{ | |
case null: | |
if (_executionContext != null) | |
{ | |
#if NETCOREAPP3_0 | |
ThreadPool.QueueUserWorkItem(continuation, state, preferLocal: true); | |
#else | |
ThreadPool.QueueUserWorkItem((s) => continuation(s), state); | |
#endif | |
} | |
else | |
{ | |
#if NETCOREAPP3_0 | |
ThreadPool.UnsafeQueueUserWorkItem(continuation, state, preferLocal: true); | |
#else | |
ThreadPool.UnsafeQueueUserWorkItem((s) => continuation(s), state); | |
#endif | |
} | |
break; | |
case SynchronizationContext sc: | |
sc.Post(s => | |
{ | |
var tuple = (Tuple<Action<object>, object>)s; // TODO-NULLABLE: https://github.com/dotnet/roslyn/issues/26761 | |
tuple.Item1(tuple.Item2); | |
}, Tuple.Create(continuation, state)); | |
break; | |
case TaskScheduler ts: | |
Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts); | |
break; | |
} | |
} | |
} | |
/// <summary>Ensures that the specified token matches the current version.</summary> | |
/// <param name="token">The token supplied by <see cref="ValueTask"/>.</param> | |
private void ValidateToken(short token) | |
{ | |
if (token != _version) | |
{ | |
throw new InvalidOperationException(); | |
} | |
} | |
/// <summary>Signals that the operation has completed. Invoked after the result or error has been set.</summary> | |
private void SignalCompletion() | |
{ | |
if (_completed) | |
{ | |
throw new InvalidOperationException(); | |
} | |
_completed = true; | |
if (_continuation != null || Interlocked.CompareExchange(ref _continuation, ManualResetValueTaskSourceCoreShared.s_sentinel, null) != null) | |
{ | |
if (_executionContext != null) | |
{ | |
ExecutionContext.Run( | |
_executionContext, | |
(s) => ((ManualResetValueTaskSourceCore<TResult>)s).InvokeContinuation(), | |
this); | |
} | |
else | |
{ | |
InvokeContinuation(); | |
} | |
} | |
} | |
/// <summary> | |
/// Invokes the continuation with the appropriate captured context / scheduler. | |
/// This assumes that if <see cref="_executionContext"/> is not null we're already | |
/// running within that <see cref="ExecutionContext"/>. | |
/// </summary> | |
private void InvokeContinuation() | |
{ | |
Debug.Assert(_continuation != null); | |
switch (_capturedContext) | |
{ | |
case null: | |
if (RunContinuationsAsynchronously) | |
{ | |
if (_executionContext != null) | |
{ | |
#if NETCOREAPP3_0 | |
ThreadPool.QueueUserWorkItem(_continuation, _continuationState, preferLocal: true); | |
#else | |
var c = _continuation; | |
ThreadPool.QueueUserWorkItem((s) => c(s), _continuationState); | |
#endif | |
} | |
else | |
{ | |
#if NETCOREAPP3_0 | |
ThreadPool.UnsafeQueueUserWorkItem(_continuation, _continuationState, preferLocal: true); | |
#else | |
var c = _continuation; | |
ThreadPool.UnsafeQueueUserWorkItem((s) => c(s), _continuationState); | |
#endif | |
} | |
} | |
else | |
{ | |
_continuation(_continuationState); | |
} | |
break; | |
case SynchronizationContext sc: | |
sc.Post(s => | |
{ | |
var state = (Tuple<Action<object>, object>)s; // TODO-NULLABLE: https://github.com/dotnet/roslyn/issues/26761 | |
state.Item1(state.Item2); | |
}, Tuple.Create(_continuation, _continuationState)); | |
break; | |
case TaskScheduler ts: | |
Task.Factory.StartNew(_continuation, _continuationState, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts); | |
break; | |
} | |
} | |
} | |
internal static class ManualResetValueTaskSourceCoreShared // separated out of generic to avoid unnecessary duplication | |
{ | |
internal static readonly Action<object> s_sentinel = CompletionSentinel; | |
private static void CompletionSentinel(object _) // named method to aid debugging | |
{ | |
Debug.Fail("The sentinel delegate should never be invoked."); | |
throw new InvalidOperationException(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment