Skip to content

Instantly share code, notes, and snippets.

@to11mtm
Created January 22, 2021 04:39
Show Gist options
  • Save to11mtm/043eee408e887a64de43ff5345d1e02b to your computer and use it in GitHub Desktop.
Save to11mtm/043eee408e887a64de43ff5345d1e02b to your computer and use it in GitHub Desktop.
First take at a quick and dirty way to make an Akka stream into AsyncEnumerable.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Akka;
using Akka.Streams;
using Akka.Streams.Dsl;
namespace GlutenFree.Akka.Streams.AsyncEnumerable
{
public static class StageExtensions
{
public static IAsyncEnumerable<T> ToAsyncEnumerable<T, TMat>(
this Source<T, NotUsed> tMat, IMaterializer materializer)
{
return new StreamsAsyncEnumerable<T>(
tMat.ToMaterialized(Sink.Queue<T>(), Keep.Right), materializer);
}
}
public sealed class StreamsAsyncEnumerable<T> : IAsyncEnumerable<T>
{
private readonly Func<CancellationToken, IAsyncEnumerator<T>> _getEnumerator;
private IMaterializer _materializer;
private IRunnableGraph<ISinkQueue<T>> _graph;
public StreamsAsyncEnumerable(IRunnableGraph<ISinkQueue<T>> graph, IMaterializer materializer)
{
_graph = graph;
_materializer = materializer;
}
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
return new SinkQueueAsyncEnumerator<T>(_graph.Run(_materializer));
}
}
public class SinkQueueAsyncEnumerator<T> : IAsyncEnumerator<T>
{
private ISinkQueue<T> _sinkQueue;
public SinkQueueAsyncEnumerator(ISinkQueue<T> sinkQueue)
{
_sinkQueue = sinkQueue;
}
public async ValueTask DisposeAsync()
{
_sinkQueue = null;
}
public async ValueTask<bool> MoveNextAsync()
{
var opt = await _sinkQueue.PullAsync();
if (opt.HasValue)
{
Current = opt.Value;
return true;
}
else
{
return false;
}
}
public T Current { get; private set; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment