Skip to content

Instantly share code, notes, and snippets.

@to11mtm
Created January 11, 2024 01:37
Show Gist options
  • Save to11mtm/cce86ef7023c29f507fcc6a45f0bc8af to your computer and use it in GitHub Desktop.
Save to11mtm/cce86ef7023c29f507fcc6a45f0bc8af to your computer and use it in GitHub Desktop.
Proposed slim semi-unsafe valuetask continuation pooling pattern for unfold stages in Akka Streams
using System;
using System.Threading;
using Akka.Streams.Stage;
using Akka.Annotations;
namespace Akka.Streams.Implementation
{
/// <summary>
/// Semi-unsafe Helper intermediate for <see cref="OutGraphStageLogic"/>
/// that allows for a ValueTask Wait to be pooled.
/// <para/>
/// Inheritors are expected to utilize the <see cref="SetPooledCompletionCallback"/>
/// and call `base.PreStart()` in their `PreStart` conditions.
/// <para/>
/// Additionally, if inheritors have their own 'restart' logic,
/// They should utilize the `ResetHolder()` method,
/// to avoid callback clashes.
///
/// </summary>
/// <typeparam name="T"></typeparam>
internal abstract class
PooledAwaitOutGraphStageLogic<T> : OutGraphStageLogic
{
private SlimHolder<T> _slimHolder;
protected Action<SlimResult<T>> _completedCallback;
protected PooledAwaitOutGraphStageLogic(Shape shape) : base(shape)
{
_completedCallback = GetAsyncCallback<SlimResult<T>>(t =>
{
FailStage(new Exception("Callback was not set!"));
});
}
protected void SetPooledCompletionCallback(
Action<SlimResult<T>> completedCallback)
{
if (_completedCallback == null)
{
throw new ArgumentNullException(
nameof(completedCallback));
}
_completedCallback = GetAsyncCallback(completedCallback);
}
public override void PreStart()
{
ResetHolder();
}
/// <summary>
/// Sets a ValueTask to wire up the callback,
/// set via <see cref="SetPooledCompletionCallback"/>.
/// If <see cref="SetPooledCompletionCallback"/> has not been called,
/// The continuation will fail the stage!
/// </summary>
/// <param name="valueTask"></param>
protected void SetContinuation(ValueTask<T> valueTask)
{
_slimHolder.SetContinuation(valueTask);
}
/// <summary>
/// Use at own risk!
/// </summary>
protected void SetHolder(SlimHolder<T> holder)
{
Interlocked.Exchange(ref _slimHolder, holder);
}
public void ResetHolder()
{
Interlocked.Exchange(ref _slimHolder, new SlimHolder<T>(this));
}
internal void RunIfSame(SlimHolder<T> slimHolder, ValueTask<T> vt)
{
//We are explicitly using referenceEquals here,
//since we are explicitly resetting things.
if (object.ReferenceEquals(_slimHolder, slimHolder))
{
_completedCallback(vt.IsCompletedSuccessfully
? new SlimResult<T>(default, vt.Result)
: SlimResult<T>.FromTask(vt.AsTask()));
}
}
}
internal class SlimHolder<T>
{
private readonly PooledAwaitOutGraphStageLogic<T> _parent;
private ValueTask<T> _vt;
private readonly Action _continuation;
public SlimHolder(PooledAwaitOutGraphStageLogic<T> logic)
{
_parent = logic;
_continuation = ContinuationAction;
}
public void SetContinuation(ValueTask<T> vt)
{
_vt = vt;
_vt.GetAwaiter().OnCompleted(_continuation);
}
private void ContinuationAction()
{
var vt = _vt;
_vt = default;
_parent.RunIfSame(this, vt);
}
}
/// <summary>
/// INTERNAL API
/// </summary>
/// <typeparam name="TState">TBD</typeparam>
/// <typeparam name="TElement">TBD</typeparam>
[InternalApi]
public class
UnfoldValueTaskAsync<TState, TElement> : GraphStage<
SourceShape<TElement>>
{
#region stage logic
private sealed class Logic : PooledAwaitOutGraphStageLogic<
Option<(TState, TElement)>>
{
private readonly UnfoldValueTaskAsync<TState, TElement> _stage;
private TState _state;
//private Action _completionAction;
public Logic(UnfoldValueTaskAsync<TState, TElement> stage) :
base(stage.Shape)
{
_stage = stage;
_state = _stage.State;
SetPooledCompletionCallback(SyncResult);
SetHandler(_stage.Out, this);
}
public override void OnPull()
{
var vt = _stage.UnfoldFunc(_state);
if (vt.IsCompletedSuccessfully)
{
SyncResult(
new SlimResult<Option<(TState, TElement)>>(default,
vt.Result));
}
else
{
SetContinuation(vt);
}
}
private void SyncResult(
SlimResult<Option<(TState, TElement)>> result)
{
if (!result.IsSuccess())
Fail(_stage.Out, result.Error);
else
{
var option = result.Result;
if (!option.HasValue)
Complete(_stage.Out);
else
{
Push(_stage.Out, option.Value.Item2);
_state = option.Value.Item1;
}
}
}
}
#endregion
/// <summary>
/// TBD
/// </summary>
public readonly TState State;
/// <summary>
/// TBD
/// </summary>
public readonly Func<TState, ValueTask<Option<(TState, TElement)>>>
UnfoldFunc;
/// <summary>
/// TBD
/// </summary>
public readonly Outlet<TElement> Out =
new("UnfoldValueTaskAsync.out");
/// <summary>
/// TBD
/// </summary>
/// <param name="state">TBD</param>
/// <param name="unfoldFunc">TBD</param>
public UnfoldValueTaskAsync(TState state,
Func<TState, ValueTask<Option<(TState, TElement)>>> unfoldFunc)
{
State = state;
UnfoldFunc = unfoldFunc;
Shape = new SourceShape<TElement>(Out);
}
/// <summary>
/// TBD
/// </summary>
public override SourceShape<TElement> Shape { get; }
/// <summary>
/// TBD
/// </summary>
/// <param name="inheritedAttributes">TBD</param>
/// <returns>TBD</returns>
protected override GraphStageLogic CreateLogic(
Attributes inheritedAttributes) => new Logic(this);
}
public sealed class NotYetThereSentinel : Exception
{
public static readonly NotYetThereSentinel Instance = new();
}
public readonly struct SlimResult<T>
{
public readonly Exception Error;
public readonly T Result;
public static readonly SlimResult<T> NotYetReady =
new SlimResult<T>(NotYetThereSentinel.Instance, default);
public static SlimResult<T> FromTask(Task<T> task)
{
return task.IsCanceled || task.IsFaulted
? new SlimResult<T>(task.Exception, default)
: new SlimResult<T>(default, task.Result);
}
public SlimResult(Exception errorOrSentinel, T result)
{
if (result == null)
{
Error = errorOrSentinel ?? ReactiveStreamsCompliance
.ElementMustNotBeNullException;
}
else
{
Result = result;
}
}
public bool IsSuccess()
{
return Error == null;
}
public bool IsDone()
{
return Error != NotYetThereSentinel.Instance;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment