Skip to content

Instantly share code, notes, and snippets.

Last active November 24, 2020 06:04
Show Gist options
  • Save Horusiath/f988ab999d26b7f25d2d1f72834fbcb6 to your computer and use it in GitHub Desktop.
Save Horusiath/f988ab999d26b7f25d2d1f72834fbcb6 to your computer and use it in GitHub Desktop.
Custom Akka.NET Streams graph stage for prefetching elements, when the buffer comes close to the empty
using Akka.Streams;
using Akka.Streams.Stage;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Demo
sealed class PreFetch<T> : GraphStage<SourceShape<T>>
private readonly int threshold;
private readonly Func<Task<IEnumerable<T>>> fetch;
private readonly Outlet<T> outlet = new Outlet<T>("prefetch");
public PreFetch(int threshold, Func<Task<IEnumerable<T>>> fetch)
this.threshold = threshold;
this.fetch = fetch;
this.Shape = new SourceShape<T>(this.outlet);
public override SourceShape<T> Shape { get; }
protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this);
private sealed class Logic : GraphStageLogic
public Logic(PreFetch<T> stage) : base(stage.Shape)
// queue for batched elements
var queue = new Queue<T>();
// flag which indicates, that pull from downstream was made,
// but we didn't have any elements at that moment
var wasPulled = false;
// determines if fetch was already called
var fetchInProgress = false;
// in order to cooperate with async calls without data races,
// we need to register async callbacks for success and failure scenarios
var onSuccess = this.GetAsyncCallback<IEnumerable<T>>(batch =>
foreach (var item in batch) queue.Enqueue(item);
if (wasPulled)
// if pull was requested but not fulfilled, we need to push now, as we have elements
// this assumes, that `batch` had at least one element
Push(stage.outlet, queue.Dequeue());
wasPulled = false;
fetchInProgress = false;
var onFailure = this.GetAsyncCallback<Exception>(this.FailStage);
SetHandler(stage.outlet, onPull: () => {
if (queue.Count < stage.threshold && !fetchInProgress)
// if queue occupation reached bellow expected capacity
// call fetch on a side thread and handle its result asynchronously
stage.fetch().ContinueWith(task =>
// depending on if task was failed or not, we call corresponding callback
if (task.IsFaulted || task.IsCanceled)
onFailure(task.Exception as Exception ?? new TaskCanceledException(task));
else onSuccess(task.Result);
fetchInProgress = true;
// if queue is empty, we cannot push immediatelly, so we only mark
// that pull request has been made but not fulfilled
if (queue.Count == 0)
wasPulled = true;
Push(stage.outlet, queue.Dequeue());
wasPulled = false;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment