Created
January 22, 2021 04:39
-
-
Save to11mtm/043eee408e887a64de43ff5345d1e02b to your computer and use it in GitHub Desktop.
First take at a quick and dirty way to make an Akka stream into AsyncEnumerable.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Akka; | |
using Akka.Streams; | |
using Akka.Streams.Dsl; | |
namespace GlutenFree.Akka.Streams.AsyncEnumerable | |
{ | |
public static class StageExtensions | |
{ | |
public static IAsyncEnumerable<T> ToAsyncEnumerable<T, TMat>( | |
this Source<T, NotUsed> tMat, IMaterializer materializer) | |
{ | |
return new StreamsAsyncEnumerable<T>( | |
tMat.ToMaterialized(Sink.Queue<T>(), Keep.Right), materializer); | |
} | |
} | |
public sealed class StreamsAsyncEnumerable<T> : IAsyncEnumerable<T> | |
{ | |
private readonly Func<CancellationToken, IAsyncEnumerator<T>> _getEnumerator; | |
private IMaterializer _materializer; | |
private IRunnableGraph<ISinkQueue<T>> _graph; | |
public StreamsAsyncEnumerable(IRunnableGraph<ISinkQueue<T>> graph, IMaterializer materializer) | |
{ | |
_graph = graph; | |
_materializer = materializer; | |
} | |
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken) | |
{ | |
cancellationToken.ThrowIfCancellationRequested(); | |
return new SinkQueueAsyncEnumerator<T>(_graph.Run(_materializer)); | |
} | |
} | |
public class SinkQueueAsyncEnumerator<T> : IAsyncEnumerator<T> | |
{ | |
private ISinkQueue<T> _sinkQueue; | |
public SinkQueueAsyncEnumerator(ISinkQueue<T> sinkQueue) | |
{ | |
_sinkQueue = sinkQueue; | |
} | |
public async ValueTask DisposeAsync() | |
{ | |
_sinkQueue = null; | |
} | |
public async ValueTask<bool> MoveNextAsync() | |
{ | |
var opt = await _sinkQueue.PullAsync(); | |
if (opt.HasValue) | |
{ | |
Current = opt.Value; | |
return true; | |
} | |
else | |
{ | |
return false; | |
} | |
} | |
public T Current { get; private set; } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment