Created
January 11, 2024 01:37
-
-
Save to11mtm/cce86ef7023c29f507fcc6a45f0bc8af to your computer and use it in GitHub Desktop.
Proposed slim semi-unsafe valuetask continuation pooling pattern for unfold stages in Akka Streams
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading; | |
using Akka.Streams.Stage; | |
using Akka.Annotations; | |
namespace Akka.Streams.Implementation | |
{ | |
/// <summary> | |
/// Semi-unsafe Helper intermediate for <see cref="OutGraphStageLogic"/> | |
/// that allows for a ValueTask Wait to be pooled. | |
/// <para/> | |
/// Inheritors are expected to utilize the <see cref="SetPooledCompletionCallback"/> | |
/// and call `base.PreStart()` in their `PreStart` conditions. | |
/// <para/> | |
/// Additionally, if inheritors have their own 'restart' logic, | |
/// They should utilize the `ResetHolder()` method, | |
/// to avoid callback clashes. | |
/// | |
/// </summary> | |
/// <typeparam name="T"></typeparam> | |
internal abstract class | |
PooledAwaitOutGraphStageLogic<T> : OutGraphStageLogic | |
{ | |
private SlimHolder<T> _slimHolder; | |
protected Action<SlimResult<T>> _completedCallback; | |
protected PooledAwaitOutGraphStageLogic(Shape shape) : base(shape) | |
{ | |
_completedCallback = GetAsyncCallback<SlimResult<T>>(t => | |
{ | |
FailStage(new Exception("Callback was not set!")); | |
}); | |
} | |
protected void SetPooledCompletionCallback( | |
Action<SlimResult<T>> completedCallback) | |
{ | |
if (_completedCallback == null) | |
{ | |
throw new ArgumentNullException( | |
nameof(completedCallback)); | |
} | |
_completedCallback = GetAsyncCallback(completedCallback); | |
} | |
public override void PreStart() | |
{ | |
ResetHolder(); | |
} | |
/// <summary> | |
/// Sets a ValueTask to wire up the callback, | |
/// set via <see cref="SetPooledCompletionCallback"/>. | |
/// If <see cref="SetPooledCompletionCallback"/> has not been called, | |
/// The continuation will fail the stage! | |
/// </summary> | |
/// <param name="valueTask"></param> | |
protected void SetContinuation(ValueTask<T> valueTask) | |
{ | |
_slimHolder.SetContinuation(valueTask); | |
} | |
/// <summary> | |
/// Use at own risk! | |
/// </summary> | |
protected void SetHolder(SlimHolder<T> holder) | |
{ | |
Interlocked.Exchange(ref _slimHolder, holder); | |
} | |
public void ResetHolder() | |
{ | |
Interlocked.Exchange(ref _slimHolder, new SlimHolder<T>(this)); | |
} | |
internal void RunIfSame(SlimHolder<T> slimHolder, ValueTask<T> vt) | |
{ | |
//We are explicitly using referenceEquals here, | |
//since we are explicitly resetting things. | |
if (object.ReferenceEquals(_slimHolder, slimHolder)) | |
{ | |
_completedCallback(vt.IsCompletedSuccessfully | |
? new SlimResult<T>(default, vt.Result) | |
: SlimResult<T>.FromTask(vt.AsTask())); | |
} | |
} | |
} | |
internal class SlimHolder<T> | |
{ | |
private readonly PooledAwaitOutGraphStageLogic<T> _parent; | |
private ValueTask<T> _vt; | |
private readonly Action _continuation; | |
public SlimHolder(PooledAwaitOutGraphStageLogic<T> logic) | |
{ | |
_parent = logic; | |
_continuation = ContinuationAction; | |
} | |
public void SetContinuation(ValueTask<T> vt) | |
{ | |
_vt = vt; | |
_vt.GetAwaiter().OnCompleted(_continuation); | |
} | |
private void ContinuationAction() | |
{ | |
var vt = _vt; | |
_vt = default; | |
_parent.RunIfSame(this, vt); | |
} | |
} | |
/// <summary> | |
/// INTERNAL API | |
/// </summary> | |
/// <typeparam name="TState">TBD</typeparam> | |
/// <typeparam name="TElement">TBD</typeparam> | |
[InternalApi] | |
public class | |
UnfoldValueTaskAsync<TState, TElement> : GraphStage< | |
SourceShape<TElement>> | |
{ | |
#region stage logic | |
private sealed class Logic : PooledAwaitOutGraphStageLogic< | |
Option<(TState, TElement)>> | |
{ | |
private readonly UnfoldValueTaskAsync<TState, TElement> _stage; | |
private TState _state; | |
//private Action _completionAction; | |
public Logic(UnfoldValueTaskAsync<TState, TElement> stage) : | |
base(stage.Shape) | |
{ | |
_stage = stage; | |
_state = _stage.State; | |
SetPooledCompletionCallback(SyncResult); | |
SetHandler(_stage.Out, this); | |
} | |
public override void OnPull() | |
{ | |
var vt = _stage.UnfoldFunc(_state); | |
if (vt.IsCompletedSuccessfully) | |
{ | |
SyncResult( | |
new SlimResult<Option<(TState, TElement)>>(default, | |
vt.Result)); | |
} | |
else | |
{ | |
SetContinuation(vt); | |
} | |
} | |
private void SyncResult( | |
SlimResult<Option<(TState, TElement)>> result) | |
{ | |
if (!result.IsSuccess()) | |
Fail(_stage.Out, result.Error); | |
else | |
{ | |
var option = result.Result; | |
if (!option.HasValue) | |
Complete(_stage.Out); | |
else | |
{ | |
Push(_stage.Out, option.Value.Item2); | |
_state = option.Value.Item1; | |
} | |
} | |
} | |
} | |
#endregion | |
/// <summary> | |
/// TBD | |
/// </summary> | |
public readonly TState State; | |
/// <summary> | |
/// TBD | |
/// </summary> | |
public readonly Func<TState, ValueTask<Option<(TState, TElement)>>> | |
UnfoldFunc; | |
/// <summary> | |
/// TBD | |
/// </summary> | |
public readonly Outlet<TElement> Out = | |
new("UnfoldValueTaskAsync.out"); | |
/// <summary> | |
/// TBD | |
/// </summary> | |
/// <param name="state">TBD</param> | |
/// <param name="unfoldFunc">TBD</param> | |
public UnfoldValueTaskAsync(TState state, | |
Func<TState, ValueTask<Option<(TState, TElement)>>> unfoldFunc) | |
{ | |
State = state; | |
UnfoldFunc = unfoldFunc; | |
Shape = new SourceShape<TElement>(Out); | |
} | |
/// <summary> | |
/// TBD | |
/// </summary> | |
public override SourceShape<TElement> Shape { get; } | |
/// <summary> | |
/// TBD | |
/// </summary> | |
/// <param name="inheritedAttributes">TBD</param> | |
/// <returns>TBD</returns> | |
protected override GraphStageLogic CreateLogic( | |
Attributes inheritedAttributes) => new Logic(this); | |
} | |
public sealed class NotYetThereSentinel : Exception | |
{ | |
public static readonly NotYetThereSentinel Instance = new(); | |
} | |
public readonly struct SlimResult<T> | |
{ | |
public readonly Exception Error; | |
public readonly T Result; | |
public static readonly SlimResult<T> NotYetReady = | |
new SlimResult<T>(NotYetThereSentinel.Instance, default); | |
public static SlimResult<T> FromTask(Task<T> task) | |
{ | |
return task.IsCanceled || task.IsFaulted | |
? new SlimResult<T>(task.Exception, default) | |
: new SlimResult<T>(default, task.Result); | |
} | |
public SlimResult(Exception errorOrSentinel, T result) | |
{ | |
if (result == null) | |
{ | |
Error = errorOrSentinel ?? ReactiveStreamsCompliance | |
.ElementMustNotBeNullException; | |
} | |
else | |
{ | |
Result = result; | |
} | |
} | |
public bool IsSuccess() | |
{ | |
return Error == null; | |
} | |
public bool IsDone() | |
{ | |
return Error != NotYetThereSentinel.Instance; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment