Last active
June 25, 2021 02:32
-
-
Save to11mtm/07bd895b803d1996f94da3547c1b9e2d to your computer and use it in GitHub Desktop.
A Helper to create a Batched stream based on a producer function
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading.Tasks; | |
using Akka.Actor; | |
using Akka.Pattern; | |
using Akka.Streams.Dsl; | |
using LanguageExt; | |
namespace Akka.Persistence.Sql.Linq2Db.Journal.DAO | |
{ | |
public class BatchFlowControl | |
{ | |
public class Continue : BatchFlowControl | |
{ | |
public static Continue Instance = new Continue(); | |
} | |
public class ContinueDelayed : BatchFlowControl | |
{ | |
public static ContinueDelayed Instance = new ContinueDelayed(); | |
} | |
public class Stop : BatchFlowControl | |
{ | |
public static Stop Instance = new Stop(); | |
} | |
} | |
public static class BatchStream | |
{ | |
/// <summary> | |
/// Creates a Source that will read in batches | |
/// </summary> | |
/// <param name="startAt">The starting point</param> | |
/// <param name="endAt">The end point</param> | |
/// <param name="batchSize">number of records expected in batch</param> | |
/// <param name="batchProducer"> | |
/// A function to produce a batch of records given a start and end | |
/// This function should return records such that the last returned | |
/// Has the 'last' record based on the sequencenumber | |
/// </param> | |
/// <param name="getSequenceNumber"> | |
/// A function to retrieve the sequence number after each function, | |
/// Expected to return | |
/// </param> | |
/// <param name="refreshInterval"> | |
/// If provided, this may be used to continually 'poll' | |
/// until <see cref="endAt"/> has been reached, | |
/// Rather than completing as soon as there are no more records to return | |
/// </param> | |
/// <typeparam name="TElem"></typeparam> | |
/// <returns></returns> | |
public static Source<TElem, NotUsed> Create<TElem>( | |
long startAt, | |
long endAt, | |
int batchSize, | |
Func<(long startBatchAt, long endBatchAt, int batchSize), Task<Seq<TElem>>> batchProducer, | |
Func<TElem, long> getSequenceNumber, | |
Util.Option<(TimeSpan, IScheduler)> refreshInterval | |
) => | |
Source | |
.UnfoldAsync<(long, BatchFlowControl), | |
Seq<TElem>>( | |
(Math.Max(1, startAt), | |
BatchFlowControl.Continue.Instance), | |
async opt => | |
{ | |
async Task<Util.Option<((long, BatchFlowControl), Seq<TElem>)>> | |
RetrieveNextBatch() | |
{ | |
Seq<TElem> msg; | |
msg = await batchProducer((opt.Item1, endAt,batchSize)); | |
var hasMoreEvents = msg.Count == batchSize; | |
var lastMsg = msg.LastOrDefault(); | |
Util.Option<long> lastSeq = Util.Option<long>.None; | |
if (lastMsg != null) | |
{ | |
lastSeq = | |
getSequenceNumber(lastMsg); | |
} | |
var hasLastEvent = | |
lastSeq.HasValue && | |
lastSeq.Value >= endAt; | |
BatchFlowControl nextControl = null; | |
if (hasLastEvent || opt.Item1 > endAt) | |
{ | |
nextControl = BatchFlowControl.Stop.Instance; | |
} | |
else if (hasMoreEvents) | |
{ | |
nextControl = BatchFlowControl.Continue.Instance; | |
} | |
else if (refreshInterval.HasValue == false) | |
{ | |
nextControl = BatchFlowControl.Stop.Instance; | |
} | |
else | |
{ | |
nextControl = BatchFlowControl.ContinueDelayed | |
.Instance; | |
} | |
long nextFrom = 0; | |
if (lastSeq.HasValue) | |
{ | |
nextFrom = lastSeq.Value + 1; | |
} | |
else | |
{ | |
nextFrom = opt.Item1; | |
} | |
return new Util.Option<((long, BatchFlowControl), | |
Seq<TElem>)>(( | |
(nextFrom, nextControl), msg)); | |
} | |
switch (opt.Item2) | |
{ | |
case BatchFlowControl.Stop _: | |
return Util | |
.Option<((long, BatchFlowControl), Seq<TElem>)> | |
.None; | |
case BatchFlowControl.Continue _: | |
return await RetrieveNextBatch(); | |
case BatchFlowControl.ContinueDelayed _ | |
when refreshInterval.HasValue: | |
return await FutureTimeoutSupport.After( | |
refreshInterval.Value.Item1, | |
refreshInterval.Value.Item2, | |
RetrieveNextBatch); | |
default: | |
throw new Exception( | |
$"Got invalid BatchFlowControl from Queue! Type : {opt.Item2.GetType()}"); | |
} | |
}).SelectMany(r => r); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment