Skip to content

Instantly share code, notes, and snippets.

@to11mtm
Created November 30, 2021 02:26
Show Gist options
  • Save to11mtm/dc9a350080fcbcb14098c14509d70e7f to your computer and use it in GitHub Desktop.
Save to11mtm/dc9a350080fcbcb14098c14509d70e7f to your computer and use it in GitHub Desktop.
Akka Streams Linq2Db Streaming Reader source from IQueryable
using System;
using System.Collections.Generic;
using System.Linq;
using Akka.Streams.Dsl;
using LinqToDB;
using LinqToDB.Data;
namespace Akka.Persistence.Sql.Linq2Db.Streams
{
public class ReaderSource<T>
{
public DataConnection DataConnection { get; }
public IAsyncEnumerator<T> Enumerable { get; }
public ReaderSource(DataConnection dc, IAsyncEnumerator<T> en)
{
DataConnection = dc;
Enumerable = en;
}
}
public class Linq2DbSource
{
public static Source<T, NotUsed> FromReader<T>(Func<DataConnection> readerCreator, Func<DataConnection,IQueryable<T>> producer)
{
if (readerCreator is null)
ThrowArgumentNullException("reader");
return Source.UnfoldResourceAsync(async () =>
{
var readerRes = readerCreator();
var v = producer(readerRes).AsAsyncEnumerable()
.GetAsyncEnumerator();
return new ReaderSource<T>(readerRes, v);
}, async (reader) =>
{
if (await reader.Enumerable.MoveNextAsync())
{
return reader.Enumerable.Current;
}
return Util.Option<T>.None;
}, async (readerContext) =>
{
await readerContext.Enumerable.DisposeAsync();
await readerContext.DataConnection.DisposeAsync();
return Done.Instance;
});
}
private static void ThrowArgumentNullException(string arg)
{
throw new ArgumentNullException(arg);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment