Skip to content

Instantly share code, notes, and snippets.

@Horusiath
Last active November 24, 2020 06:04
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Horusiath/f988ab999d26b7f25d2d1f72834fbcb6 to your computer and use it in GitHub Desktop.
Save Horusiath/f988ab999d26b7f25d2d1f72834fbcb6 to your computer and use it in GitHub Desktop.
Custom Akka.NET Streams graph stage for prefetching elements, when the buffer comes close to the empty
using Akka.Streams;
using Akka.Streams.Stage;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Demo
{
sealed class PreFetch<T> : GraphStage<SourceShape<T>>
{
private readonly int threshold;
private readonly Func<Task<IEnumerable<T>>> fetch;
private readonly Outlet<T> outlet = new Outlet<T>("prefetch");
public PreFetch(int threshold, Func<Task<IEnumerable<T>>> fetch)
{
this.threshold = threshold;
this.fetch = fetch;
this.Shape = new SourceShape<T>(this.outlet);
}
public override SourceShape<T> Shape { get; }
protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this);
private sealed class Logic : GraphStageLogic
{
public Logic(PreFetch<T> stage) : base(stage.Shape)
{
// queue for batched elements
var queue = new Queue<T>();
// flag which indicates, that pull from downstream was made,
// but we didn't have any elements at that moment
var wasPulled = false;
// determines if fetch was already called
var fetchInProgress = false;
// in order to cooperate with async calls without data races,
// we need to register async callbacks for success and failure scenarios
var onSuccess = this.GetAsyncCallback<IEnumerable<T>>(batch =>
{
foreach (var item in batch) queue.Enqueue(item);
if (wasPulled)
{
// if pull was requested but not fulfilled, we need to push now, as we have elements
// this assumes, that `batch` had at least one element
Push(stage.outlet, queue.Dequeue());
wasPulled = false;
}
fetchInProgress = false;
});
var onFailure = this.GetAsyncCallback<Exception>(this.FailStage);
SetHandler(stage.outlet, onPull: () => {
if (queue.Count < stage.threshold && !fetchInProgress)
{
// if queue occupation reached bellow expected capacity
// call fetch on a side thread and handle its result asynchronously
stage.fetch().ContinueWith(task =>
{
// depending on if task was failed or not, we call corresponding callback
if (task.IsFaulted || task.IsCanceled)
onFailure(task.Exception as Exception ?? new TaskCanceledException(task));
else onSuccess(task.Result);
});
fetchInProgress = true;
}
// if queue is empty, we cannot push immediatelly, so we only mark
// that pull request has been made but not fulfilled
if (queue.Count == 0)
wasPulled = true;
else
{
Push(stage.outlet, queue.Dequeue());
wasPulled = false;
}
});
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment