Created
November 30, 2021 02:26
-
-
Save to11mtm/dc9a350080fcbcb14098c14509d70e7f to your computer and use it in GitHub Desktop.
Akka Streams Linq2Db Streaming Reader source from IQueryable
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using Akka.Streams.Dsl; | |
using LinqToDB; | |
using LinqToDB.Data; | |
namespace Akka.Persistence.Sql.Linq2Db.Streams | |
{ | |
public class ReaderSource<T> | |
{ | |
public DataConnection DataConnection { get; } | |
public IAsyncEnumerator<T> Enumerable { get; } | |
public ReaderSource(DataConnection dc, IAsyncEnumerator<T> en) | |
{ | |
DataConnection = dc; | |
Enumerable = en; | |
} | |
} | |
public class Linq2DbSource | |
{ | |
public static Source<T, NotUsed> FromReader<T>(Func<DataConnection> readerCreator, Func<DataConnection,IQueryable<T>> producer) | |
{ | |
if (readerCreator is null) | |
ThrowArgumentNullException("reader"); | |
return Source.UnfoldResourceAsync(async () => | |
{ | |
var readerRes = readerCreator(); | |
var v = producer(readerRes).AsAsyncEnumerable() | |
.GetAsyncEnumerator(); | |
return new ReaderSource<T>(readerRes, v); | |
}, async (reader) => | |
{ | |
if (await reader.Enumerable.MoveNextAsync()) | |
{ | |
return reader.Enumerable.Current; | |
} | |
return Util.Option<T>.None; | |
}, async (readerContext) => | |
{ | |
await readerContext.Enumerable.DisposeAsync(); | |
await readerContext.DataConnection.DisposeAsync(); | |
return Done.Instance; | |
}); | |
} | |
private static void ThrowArgumentNullException(string arg) | |
{ | |
throw new ArgumentNullException(arg); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment