Last active
December 7, 2023 22:23
-
-
Save to11mtm/27f88ee13ea2349a64a8653bfd9e6c97 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Linq.Expressions; | |
using System.Threading; | |
using System.Threading.Channels; | |
using System.Threading.Tasks; | |
using Akka; | |
using Akka.Actor; | |
using Akka.Streams; | |
using Akka.Streams.Dsl; | |
using Akka.Streams.Stage; | |
using Akka.Streams.Supervision; | |
using LinqToDB; | |
using LinqToDB.Data; | |
using LinqToDB.Mapping; | |
using LinqToDB.Tools; | |
using Directive = Akka.Streams.Supervision.Directive; | |
// ReSharper disable PrivateFieldCanBeConvertedToLocalVariable | |
// ReSharper disable RedundantArgumentDefaultValue | |
// ReSharper disable RedundantDefaultMemberInitializer | |
//So, there's this other project I am working on called 'Venti'. | |
//It's an event related thing... That's all I will say for now. | |
//However, I wanted to keep parts of it in the 'GlutenFree' spirit; | |
// 1. Keep things minimal and hackable | |
// 2. Prefer Composition of minimal components to a larger whole | |
// To that end, It's best to start such a project with the storage layer... | |
// So, this is a 'Small Part' of Venti... | |
// Thus, the name ShotGlass. | |
namespace GlutenFree.ShotGlass.Stiff | |
{ | |
public abstract class TypedKeyedSequencedRow | |
{ | |
[Column(CanBeNull = false, Length = 128)] | |
public string Type { get; set; } = null!; | |
[Column(CanBeNull = false, Length = 128)] | |
public string Key { get; set; } = null!; | |
// ReSharper disable once UnusedAutoPropertyAccessor.Global | |
public long Sequence { get; set; } | |
[Column(SkipOnInsert = true)] | |
public long Ordering { get; set; } | |
/// <summary> | |
/// Meant to be a Unix UTC Timestamp in millis. Sane DBs can do this no problem. | |
/// </summary> | |
[Column(SkipOnInsert= true)] | |
public virtual long InsertTimestamp {get; set; } | |
} | |
public class BatchFlowControl | |
{ | |
public class Continue : BatchFlowControl | |
{ | |
public static Continue Instance = new Continue(); | |
} | |
public class ContinueDelayed : BatchFlowControl | |
{ | |
public static ContinueDelayed Instance = new ContinueDelayed(); | |
} | |
public class Stop : BatchFlowControl | |
{ | |
public static Stop Instance = new Stop(); | |
} | |
} | |
public class WriteQueueEntry<T> where T: TypedKeyedSequencedRow | |
{ | |
public List<T> Rows { get; } | |
public TaskCompletionSource<Done> Completion { get; } | |
public CancellationToken Token { get; } | |
public WriteQueueEntry(List<T> rows, | |
TaskCompletionSource<Done> completion, | |
CancellationToken token) | |
{ | |
Rows = rows; | |
Completion = completion; | |
Token = token; | |
} | |
} | |
public static class WriteQueueSet | |
{ | |
/// <remarks> | |
/// It is the consumer's responsibility to NOT return this to the pool, | |
/// <para/> | |
/// Or, ensure the pool's return logic (and/or other logic) | |
/// will discard it. | |
/// <para/> | |
/// In our case, The discard logic is tied to our buffer size, | |
/// And in the seed function, we just see whether the entry is bigger | |
/// or not. If the entry is bigger, The akka streams Batch stage | |
/// already is treating it as a snowflake, | |
/// And the discard will see it is too big. | |
/// </remarks> | |
internal static WriteQueueSet<T> UnPooled<T>( | |
this WriteQueueEntry<T> entry) where T: TypedKeyedSequencedRow | |
{ | |
return new WriteQueueSet<T>(entry); | |
} | |
} | |
public class Bartender<T> : IAsyncDisposable | |
where T : TypedKeyedSequencedRow | |
{ | |
private readonly ShotGlassRack _rack; | |
public readonly ReadReader<T> Reader; | |
public readonly BatchQueue<T> Writer; | |
public readonly SequenceQueue<T> SequenceGetter; | |
public Bartender(IDataConnectionFactory connectionFactory, | |
IMaterializer? materializer = null, string? tableName = null) | |
{ | |
if (materializer == null) | |
{ | |
_rack = ShotGlassRack.ViaStatic(connectionFactory); | |
} | |
else | |
{ | |
_rack = new ShotGlassRack(connectionFactory, materializer); | |
} | |
//TODO: Confs | |
Reader = _rack.ReadReader<T>(tableName); | |
Writer = _rack.WriteQueue<T>(tableName); | |
SequenceGetter = _rack.SequenceQueue<T>(tableName); | |
} | |
public async ValueTask DisposeAsync() | |
{ | |
await Writer.DisposeAsync(); | |
await SequenceGetter.DisposeAsync(); | |
} | |
} | |
public class ShotGlassRack | |
{ | |
public ShotGlassRack(IDataConnectionFactory factory, | |
IMaterializer materializer) | |
{ | |
_factory = factory; | |
_materializer = materializer; | |
} | |
public SequenceQueue<T> SequenceQueue<T>(string? tableName = null, | |
int maxBatch = 20, int? maxQueue = null, int? maxDop = null) where T : TypedKeyedSequencedRow | |
{ | |
return new SequenceQueue<T>(_factory, _materializer, tableName, | |
maxBatch, maxQueue, maxDop); | |
} | |
public ReadReader<T> ReadReader<T>(string? tableName = null) where T: TypedKeyedSequencedRow | |
{ | |
return new ReadReader<T>(_factory, _materializer, tableName); | |
} | |
public BatchQueue<T> WriteQueue<T>(string? tableName = null, int bufferSize = 8192, int batchSize = 512, int? maxDop = null) | |
where T : TypedKeyedSequencedRow | |
{ | |
return new BatchQueue<T>(_factory, bufferSize, batchSize, | |
maxDop.GetValueOrDefault(Environment.ProcessorCount), | |
_materializer); | |
} | |
public static ShotGlassRack ViaStatic(IDataConnectionFactory factory) | |
{ | |
if (_staticMaterializerFactory == null) | |
{ | |
lock (StaticFactoryLockObj) | |
{ | |
if (_staticMaterializerFactory == null) | |
{ | |
_staticMaterializerFactory = new RackStaticFactory(); | |
} | |
} | |
} | |
return new ShotGlassRack(factory, _staticMaterializerFactory.Materializer); | |
} | |
private static RackStaticFactory? _staticMaterializerFactory; | |
private static readonly object StaticFactoryLockObj = new(); | |
private readonly IDataConnectionFactory _factory; | |
private readonly IMaterializer _materializer; | |
} | |
public class RackStaticFactory | |
{ | |
private readonly ActorSystem _system; | |
private readonly ActorMaterializer _materializer; | |
public RackStaticFactory() | |
{ | |
_system = ActorSystem.Create("shotglass-rack-static"); | |
_materializer = _system.Materializer(namePrefix: "shotglass"); | |
} | |
public IMaterializer Materializer => _materializer; | |
} | |
public class WriteQueueSet<T> where T: TypedKeyedSequencedRow | |
{ | |
public readonly List<WriteQueueEntry<T>> Entries; | |
public int Count { get; private set; } | |
internal WriteQueueSet(WriteQueueEntry<T> seedEntry) | |
{ | |
Entries = new List<WriteQueueEntry<T>>(); | |
Entries.Add(seedEntry); | |
Count = seedEntry.Rows.Count; | |
} | |
public WriteQueueSet<T> Add(WriteQueueEntry<T> newEntry) | |
{ | |
Entries.Add(newEntry); | |
Count = newEntry.Rows.Count; | |
return this; | |
} | |
} | |
public interface IDataConnectionFactory | |
{ | |
DataConnection GetConnection(); | |
} | |
public static class BatchQueue | |
{ | |
public static BatchQueue<T> | |
AsBatchQueue<T>(this T item, | |
IDataConnectionFactory factory, int bufferSize, | |
int batchSize, int maxDop, IMaterializer materializer) | |
where T : TypedKeyedSequencedRow | |
{ | |
return new BatchQueue<T>(factory, bufferSize, | |
batchSize, maxDop, materializer); | |
} | |
} | |
public class KeyAndSequenceFor | |
{ | |
public KeyAndSequenceFor(string key, long seq) | |
{ | |
Key = key; | |
Seq = seq; | |
} | |
public string Key { get; } | |
public long Seq { get; } | |
} | |
public static class L2DbExts | |
{ | |
public static ITable<T> SafeTableName<T>(this ITable<T> table, | |
string? tableName) where T : notnull | |
{ | |
if (string.IsNullOrWhiteSpace(tableName)) | |
{ | |
return table; | |
} | |
return table.TableName(tableName); | |
} | |
} | |
public class SequenceTrackingChannelReader<TRecord,TOut> : ChannelReader<TOut> | |
where TRecord : TypedKeyedSequencedRow | |
{ | |
private readonly ChannelReader<Carrier<TRecord>> _reader; | |
// This is useful later... | |
// ReSharper disable once NotAccessedField.Local | |
private long _lastSeq; | |
private readonly Func<TRecord, TOut> _map; | |
public SequenceTrackingChannelReader(ChannelReader<Carrier<TRecord>> reader, Func<TRecord,TOut> map) | |
{ | |
_reader = reader; | |
_map = map; | |
} | |
public override bool TryRead(out TOut item) | |
{ | |
while (_reader.TryRead(out var cItem) && cItem.CarrierType != CarrierType.Ignore) | |
{ | |
var tItem = cItem.GetValueOrThrow(); | |
_lastSeq = tItem.Sequence; | |
item = _map(tItem); | |
return true; | |
} | |
item = default!; | |
return false; | |
} | |
#if NETCOREAPP3_1 || NETSTANDARD2_0 | |
#else | |
public override bool TryPeek(out TOut item) | |
{ | |
if (_reader.TryPeek(out var citem) && | |
citem.CarrierType == CarrierType.Event) | |
{ | |
item = _map(citem.Value); | |
return true; | |
} | |
item = default; | |
return false; | |
} | |
#endif | |
public override async ValueTask<bool> WaitToReadAsync( | |
CancellationToken cancellationToken = new CancellationToken()) | |
{ | |
return await _reader.WaitToReadAsync(cancellationToken); | |
} | |
} | |
public class SequenceQueue<T> : IAsyncDisposable | |
where T : TypedKeyedSequencedRow | |
{ | |
public readonly string? TableName; | |
private readonly Task<Done> _completion; | |
private int _shutdownState = 0; | |
private readonly ChannelWriter<SequenceRequest> _requestQueue; | |
private static readonly Expression<Func<T, string>> KeySelector = t=>t.Key; | |
private static readonly Expression<Func<IGrouping<string, T>, KeyAndSequenceFor>> SequenceGroupByExpression = t=>new KeyAndSequenceFor(t.Key,t.Max(i=>i.Sequence)); | |
/// <summary> | |
/// Returns a completion that will be closed once the Queue is shut down. | |
/// </summary> | |
public async Task ClosedCompletion() | |
{ | |
await _completion; | |
} | |
/// <summary> | |
/// Requests completion of the Queue (i.e. for shutdown). | |
/// Note that until the result of <see cref="ClosedCompletion"/> completes, | |
/// The queue may still be running entries. | |
/// </summary> | |
/// <param name="error"> | |
/// Optional: If provided, closes with an error instead of gracefully. | |
/// </param> | |
/// <returns> | |
/// True if this request caused Completion. | |
/// <para/> | |
/// False if a different request caused completion. | |
/// </returns> | |
public bool TryFlagCompletion(Exception? error = null) | |
{ | |
if (Interlocked.CompareExchange(ref _shutdownState, 1, 0) == 0) | |
{ | |
_requestQueue.TryComplete(error); | |
return true; | |
} | |
return false; | |
} | |
public SequenceQueue(IDataConnectionFactory dcf, IMaterializer mat, | |
string? tableName = null, int maxBatch = 20, int? maxQueue = null, | |
int? maxDop = null) | |
{ | |
TableName = tableName; | |
var calcedDop = | |
maxDop.GetValueOrDefault(Environment.ProcessorCount * 2); | |
(_requestQueue, _completion) = Source | |
.Channel<SequenceRequest>( | |
maxQueue ?? calcedDop * maxBatch, false) | |
.GroupBy(16384, sr=>sr.Type) | |
.BatchWeightedWithContext(maxBatch, | |
sr => new SequenceRequestGroup(sr.Type,sr.Key, | |
sr.Response), | |
(sr, srg) => | |
{ | |
if (srg != null && srg.Has(sr.Key)) | |
{ | |
return 0; | |
} | |
return 1; | |
}, (srg, sr) => srg.Add(sr.Key, sr.Response)) | |
.SelectAsync(calcedDop, async srg => | |
{ | |
try | |
{ | |
using (var ctx = dcf.GetConnection()) | |
{ | |
var results = await ctx.GetTable<T>() | |
.SafeTableName(TableName) | |
.Where(t=>t.Type==srg.Type && t.Key.In(srg.Keys)) | |
.GroupBy(KeySelector) | |
.Select(SequenceGroupByExpression) | |
.ToListAsync(); | |
if (results.Count == srg.Count) | |
{ | |
//Fast path; we can set every result easily | |
//And don't need to filter dictionary entries out. | |
foreach (var item in results) | |
{ | |
srg.SetResultsForKey(item.Key, item.Seq); | |
} | |
} | |
else | |
{ | |
srg.SetResultsSlow(results); | |
} | |
} | |
} | |
catch (Exception e) | |
{ | |
foreach (var r in srg.Sets) | |
{ | |
foreach (var taskCompletionSource in r.Value) | |
{ | |
taskCompletionSource.TrySetException(e); | |
} | |
} | |
} | |
return NotUsed.Instance; | |
}).MergeSubstreamsAsSource().ToMaterialized(Sink.Ignore<NotUsed>(), Keep.Both).Run(mat); | |
} | |
public async ValueTask<long> GetSequenceFor(string type, string key, | |
CancellationToken token = default) | |
{ | |
var req = new SequenceRequest(type,key); | |
await _requestQueue.WriteAsync(req, token); | |
return await req.Response.Task; | |
} | |
public async ValueTask DisposeAsync() | |
{ | |
TryFlagCompletion(); | |
try | |
{ | |
await _completion; | |
} | |
catch | |
{ | |
//Intentional | |
} | |
} | |
} | |
public static class SubFlowExtensions | |
{ | |
public static Source<TOut, TMat> MergeSubstreamsAsSource<TOut, TMat, | |
TClosed>(this SubFlow<TOut, TMat, TClosed> subFlow) | |
{ | |
return (Source<TOut,TMat>)(subFlow.MergeSubstreams()); | |
} | |
} | |
public readonly struct Carrier<T> where T:class | |
{ | |
public Carrier(T value) | |
{ | |
CarrierType = CarrierType.Event; | |
_carrierVal = value; | |
} | |
public Carrier(Exception error) | |
{ | |
CarrierType = CarrierType.Error; | |
_carrierVal = error; | |
} | |
public static readonly Carrier<T> Ignore = | |
new Carrier<T>(CarrierType.Ignore); | |
private readonly object _carrierVal; | |
public readonly CarrierType CarrierType; | |
private Carrier(CarrierType use) | |
{ | |
_carrierVal = default; | |
CarrierType = use; | |
} | |
public T Value => _carrierVal as T; | |
public Exception Error => _carrierVal as Exception; | |
public bool IsIgnore => CarrierType == CarrierType.Ignore; | |
public T GetValueOrThrow() | |
{ | |
if (CarrierType.HasFlag(CarrierType.Event)) | |
{ | |
return Value; | |
} | |
else if (CarrierType.HasFlag(Stiff.CarrierType.Error)) | |
{ | |
throw Error; | |
} | |
else | |
{ | |
throw new InvalidOperationException("Invalid state!"); | |
} | |
} | |
} | |
[Flags] | |
public enum CarrierType | |
{ | |
Invalid = 0, | |
Ignore = 1, | |
Event = 2, | |
Error = 4 | |
} | |
internal static class CarrierTypeInternals | |
{ | |
internal static readonly Exception Event = new Exception("event"); | |
internal static readonly Exception Ignore = new Exception("ignore"); | |
} | |
public class ReadReader<T> | |
where T : TypedKeyedSequencedRow | |
{ | |
private readonly IDataConnectionFactory _dataConnectionFactory; | |
private readonly string? _tableName; | |
public ReadReader(IDataConnectionFactory dataConnectionFactory, | |
IMaterializer materializer, | |
string? tableName) | |
{ | |
_dataConnectionFactory = dataConnectionFactory; | |
_tableName = tableName; | |
_mat = materializer; | |
} | |
private IMaterializer _mat; | |
public async ValueTask<List<T>> RunQuery(string type, string key, long startAt, | |
long? argEndAt, int maxBatch) | |
{ | |
using (var ctx = _dataConnectionFactory.GetConnection()) | |
{ | |
var query = ctx.GetTable<T>().SafeTableName(_tableName) | |
.Where(t => | |
t.Type == type && t.Key == key && | |
t.Sequence >= startAt); | |
if (argEndAt != null) | |
{ | |
query = query.Where(t => t.Sequence <= argEndAt); | |
} | |
return await query.Take(maxBatch).ToListAsync(); | |
} | |
} | |
public ChannelReader<T> GetSpooler(string type, string key, long startAt, | |
long? endAt, int maxBatch, | |
Func<T,bool>? filter = null, | |
CancellationToken token = default) | |
{ | |
return GetMappedSpooler(type, key, startAt, endAt, maxBatch, t => t, | |
filter, | |
token); | |
} | |
public ChannelReader<U> GetMappedSpooler<U>(string type,string key, long startAt, | |
long? endAt, int maxBatch, Func<T, U> map, Func<T,bool>? baseFilter = null, | |
CancellationToken token = default) | |
{ | |
if (baseFilter == null) | |
{ | |
baseFilter = static t => true; | |
} | |
return new SequenceTrackingChannelReader<T,U>(Source.UnfoldAsync<spoolCapt,Carrier<List<T>>>( | |
new spoolCapt(type,key, startAt, endAt, maxBatch), | |
async (c) => | |
{ | |
try | |
{ | |
token.ThrowIfCancellationRequested(); | |
if (c.startAt <= c.endAt) | |
{ | |
var results = await RunQuery(c.type,c.key, c.startAt, | |
c.endAt, | |
c.maxBatch); | |
if (results.Count > 0) | |
{ | |
return Akka.Util.Option<(spoolCapt,Carrier<List<T>>)>.Create((new spoolCapt(c.type,c.key, | |
results[results.Count-1].Sequence+1, | |
c.endAt, | |
c.maxBatch), new Carrier<List<T>>(results))); | |
} | |
} | |
} | |
catch (Exception e) | |
{ | |
return Akka.Util | |
.Option<(spoolCapt, Carrier<List<T>>)>.Create(( | |
new spoolCapt(c.type, c.key, long.MaxValue, | |
long.MinValue, 0), | |
new Carrier<List<T>>(e))); | |
} | |
return Akka.Util.Option<(spoolCapt, Carrier<List<T>>)>.None; | |
}).SelectMany(a => | |
{ | |
if ((a.CarrierType & CarrierType.Event) != 0) | |
{ | |
return a.Value.Select(a=>new Carrier<T>(a)); | |
} | |
else | |
{ | |
return new List<Carrier<T>>() | |
{ new Carrier<T>(a.Error) }; | |
} | |
}).Where(a=> (a.CarrierType.HasFlag(CarrierType.Event) && baseFilter(a.Value)) || a.CarrierType.HasFlag(CarrierType.Error)) | |
.RunWith( | |
ChannelSink.AsReader<Carrier<T>>(32, false, | |
BoundedChannelFullMode.Wait), | |
_mat),map); | |
} | |
private readonly struct spoolCapt | |
{ | |
public readonly string type; | |
public readonly string key; | |
public readonly long startAt; | |
public readonly long? endAt; | |
public readonly int maxBatch; | |
public spoolCapt(string type, string key, long startAt, long? endAt, int maxBatch) | |
{ | |
this.type = type; | |
this.key = key; | |
this.startAt = startAt; | |
this.endAt = endAt; | |
this.maxBatch = maxBatch; | |
} | |
} | |
} | |
public class SequenceRequestGroup | |
{ | |
private readonly Dictionary<string, List<TaskCompletionSource<long>>> | |
_reqSet; | |
public readonly string Type; | |
public SequenceRequestGroup(string type, string key, TaskCompletionSource<long> req) | |
{ | |
Type = type; | |
_reqSet = new Dictionary<string, List<TaskCompletionSource<long>>>(); | |
_reqSet.Add(key, new List<TaskCompletionSource<long>>() { req }); | |
} | |
public bool Has(string key) | |
{ | |
return _reqSet.ContainsKey(key); | |
} | |
public IEnumerable<string> Keys => _reqSet.Keys; | |
public IEnumerable<KeyValuePair<string, List<TaskCompletionSource<long>>>> | |
Sets => _reqSet; | |
public int Count => _reqSet.Count; | |
internal void SetResultsForKey(string key, long item) | |
{ | |
if (_reqSet.TryGetValue(key, out var _set)) | |
{ | |
foreach (var taskCompletionSource in _set) | |
{ | |
taskCompletionSource.TrySetResult(item); | |
} | |
} | |
} | |
internal void SetResultsSlow(List<KeyAndSequenceFor> set) | |
{ | |
foreach (var item in set) | |
{ | |
if (_reqSet.TryGetValue(item.Key, out var entry)) | |
{ | |
foreach (var tcs in entry) | |
{ | |
tcs.TrySetResult(item.Seq); | |
} | |
_reqSet.Remove(item.Key); | |
} | |
} | |
foreach (var keyValuePair in _reqSet) | |
{ | |
foreach (var tcs in keyValuePair.Value) | |
{ | |
tcs.TrySetResult(0); | |
} | |
} | |
} | |
public SequenceRequestGroup Add(string key, | |
TaskCompletionSource<long> req) | |
{ | |
if (_reqSet.TryGetValue(key, out var set)) | |
{ | |
set.Add(req); | |
} | |
else | |
{ | |
_reqSet.Add(key, new List<TaskCompletionSource<long>>() { req }); | |
} | |
return this; | |
} | |
} | |
public static class StreamDsl | |
{ | |
public static Source<TOut2, TMat> | |
BatchWeightedWithContext<TOut, TOut2, TMat>( | |
this Source<TOut, TMat> flow, long max, Func<TOut, TOut2> seed, | |
Func<TOut, TOut2?, long> costFunction, | |
Func<TOut2, TOut, TOut2> aggregate) | |
{ | |
return (Source<TOut2, TMat>)flow.Via( | |
new BatchWeightWithContext<TOut, TOut2>(max, costFunction, | |
seed, aggregate)); | |
} | |
public static SubFlow<TOut2, TMat, TClosed> BatchWeightedWithContext<TOut, TOut2, TMat,TClosed>( | |
this SubFlow<TOut, TMat,TClosed> flow, long max, Func<TOut, TOut2> seed, | |
Func<TOut, TOut2?, long> costFunction, | |
Func<TOut2, TOut, TOut2> aggregate) | |
{ | |
return (SubFlow<TOut2, TMat,TClosed>)flow.Via( | |
new BatchWeightWithContext<TOut, TOut2>(max, costFunction, | |
seed, aggregate)); | |
} | |
} | |
public sealed class | |
BatchWeightWithContext<TIn, TOut> : GraphStage<FlowShape<TIn, TOut>> | |
{ | |
#region internal classes | |
private sealed class Logic : InAndOutGraphStageLogic | |
{ | |
private readonly FlowShape<TIn, TOut> _shape; | |
private readonly BatchWeightWithContext<TIn, TOut> _stage; | |
private readonly Akka.Streams.Supervision.Decider _decider; | |
private Akka.Util.Option<TOut> _aggregate; | |
private long _left; | |
private Akka.Util.Option<TIn> _pending; | |
public Logic(Attributes inheritedAttributes, | |
BatchWeightWithContext<TIn, TOut> stage) : base(stage.Shape) | |
{ | |
_shape = stage.Shape; | |
_stage = stage; | |
var attr = inheritedAttributes | |
.GetAttribute<ActorAttributes.SupervisionStrategy>(null); | |
_decider = attr != null | |
? attr.Decider | |
: Deciders.StoppingDecider; | |
_left = stage._max; | |
SetHandlers(_shape.Inlet, _shape.Outlet, this); | |
} | |
public override void OnPush() | |
{ | |
var element = Grab(_shape.Inlet); | |
var cost = | |
_stage._costFunc(element, _aggregate.GetOrElse(default)); | |
if (!_aggregate.HasValue) | |
{ | |
try | |
{ | |
_aggregate = _stage._seed(element); | |
_left -= cost; | |
} | |
catch (Exception ex) | |
{ | |
switch (_decider(ex)) | |
{ | |
case Directive.Stop: | |
FailStage(ex); | |
break; | |
case Directive.Restart: | |
RestartState(); | |
break; | |
case Directive.Resume: | |
break; | |
} | |
} | |
} | |
else if (_left < cost) | |
_pending = element; | |
else | |
{ | |
try | |
{ | |
_aggregate = | |
_stage._aggregate(_aggregate.Value, element); | |
_left -= cost; | |
} | |
catch (Exception ex) | |
{ | |
switch (_decider(ex)) | |
{ | |
case Directive.Stop: | |
FailStage(ex); | |
break; | |
case Directive.Restart: | |
RestartState(); | |
break; | |
case Directive.Resume: | |
break; | |
} | |
} | |
} | |
if (IsAvailable(_shape.Outlet)) | |
Flush(); | |
if (!_pending.HasValue) | |
Pull(_shape.Inlet); | |
} | |
public override void OnUpstreamFinish() | |
{ | |
if (!_aggregate.HasValue) | |
CompleteStage(); | |
} | |
public override void OnPull() | |
{ | |
if (!_aggregate.HasValue) | |
{ | |
if (IsClosed(_shape.Inlet)) | |
CompleteStage(); | |
else if (!HasBeenPulled(_shape.Inlet)) | |
Pull(_shape.Inlet); | |
} | |
else if (IsClosed(_shape.Inlet)) | |
{ | |
Push(_shape.Outlet, _aggregate.Value); | |
if (!_pending.HasValue) | |
CompleteStage(); | |
else | |
{ | |
try | |
{ | |
_aggregate = _stage._seed(_pending.Value); | |
} | |
catch (Exception ex) | |
{ | |
switch (_decider(ex)) | |
{ | |
case Directive.Stop: | |
FailStage(ex); | |
break; | |
case Directive.Restart: | |
RestartState(); | |
if (!HasBeenPulled(_shape.Inlet)) | |
Pull(_shape.Inlet); | |
break; | |
case Directive.Resume: | |
break; | |
} | |
} | |
_pending = Akka.Util.Option<TIn>.None; | |
} | |
} | |
else | |
{ | |
Flush(); | |
if (!HasBeenPulled(_shape.Inlet)) | |
Pull(_shape.Inlet); | |
} | |
} | |
private void Flush() | |
{ | |
if (_aggregate.HasValue) | |
{ | |
Push(_shape.Outlet, _aggregate.Value); | |
_left = _stage._max; | |
} | |
if (_pending.HasValue) | |
{ | |
try | |
{ | |
_aggregate = _stage._seed(_pending.Value); | |
_left -= _stage._costFunc(_pending.Value, default); | |
_pending = Akka.Util.Option<TIn>.None; | |
} | |
catch (Exception ex) | |
{ | |
switch (_decider(ex)) | |
{ | |
case Directive.Stop: | |
FailStage(ex); | |
break; | |
case Directive.Restart: | |
RestartState(); | |
break; | |
case Directive.Resume: | |
_pending = Akka.Util.Option<TIn>.None; | |
break; | |
} | |
} | |
} | |
else | |
_aggregate = Akka.Util.Option<TOut>.None; | |
} | |
public override void PreStart() => Pull(_shape.Inlet); | |
private void RestartState() | |
{ | |
_aggregate = Akka.Util.Option<TOut>.None; | |
_left = _stage._max; | |
_pending = Akka.Util.Option<TIn>.None; | |
} | |
} | |
#endregion | |
private readonly long _max; | |
private readonly Func<TIn, TOut?, long> _costFunc; | |
private readonly Func<TIn, TOut> _seed; | |
private readonly Func<TOut, TIn, TOut> _aggregate; | |
/// <summary> | |
/// TBD | |
/// </summary> | |
/// <param name="max">TBD</param> | |
/// <param name="costFunc">TBD</param> | |
/// <param name="seed">TBD</param> | |
/// <param name="aggregate">TBD</param> | |
public BatchWeightWithContext(long max, Func<TIn, TOut?, long> costFunc, | |
Func<TIn, TOut> seed, Func<TOut, TIn, TOut> aggregate) | |
{ | |
_max = max; | |
_costFunc = costFunc; | |
_seed = seed; | |
_aggregate = aggregate; | |
var inlet = new Inlet<TIn>("Batch.in"); | |
var outlet = new Outlet<TOut>("Batch.out"); | |
Shape = new FlowShape<TIn, TOut>(inlet, outlet); | |
} | |
/// <summary> | |
/// TBD | |
/// </summary> | |
public override FlowShape<TIn, TOut> Shape { get; } | |
/// <summary> | |
/// TBD | |
/// </summary> | |
/// <param name="inheritedAttributes">TBD</param> | |
/// <returns>TBD</returns> | |
protected override GraphStageLogic CreateLogic( | |
Attributes inheritedAttributes) | |
=> new Logic(inheritedAttributes, this); | |
} | |
public sealed class SequenceRequest | |
{ | |
public SequenceRequest(string type, string key) | |
{ | |
Type = type; | |
Key = key; | |
Response = new TaskCompletionSource<long>(TaskCreationOptions | |
.RunContinuationsAsynchronously); | |
} | |
public TaskCompletionSource<long> Response { get; } | |
public string Type { get; } | |
public string Key { get; } | |
} | |
public class BatchQueue<T> : IAsyncDisposable | |
where T : TypedKeyedSequencedRow | |
{ | |
private readonly ChannelWriter<WriteQueueEntry<T>> WriteQueue; | |
private readonly IDataConnectionFactory _factory; | |
private readonly string TableName; | |
private readonly IMaterializer Materializer; | |
private Task<Done> Completion; | |
public async ValueTask WriteJournalRowsAsync(List<T> rowset, | |
CancellationToken token = default) | |
{ | |
using (var ctx = _factory.GetConnection()) | |
{ | |
if (rowset.Count == 1) | |
{ | |
await ctx.InsertAsync(rowset[0], TableName, token: token); | |
} | |
else | |
{ | |
await InsertMulti(rowset, token, ctx); | |
} | |
} | |
} | |
private async Task InsertMulti(List<T> rowset, CancellationToken token, | |
DataConnection ctx) | |
{ | |
using (var tx = await ctx.BeginTransactionAsync(token)) | |
{ | |
try | |
{ | |
await ctx.GetTable<T>() | |
.BulkCopyAsync( | |
new BulkCopyOptions( | |
TableName: this.TableName, | |
BulkCopyType: BulkCopyType | |
.MultipleRows), rowset, token); | |
await tx.CommitAsync(token); | |
} | |
catch (Exception e) | |
{ | |
try | |
{ | |
// We should always try to rollback even if cancelled. | |
// ReSharper disable once MethodSupportsCancellation | |
await tx.RollbackAsync(); | |
} | |
catch (Exception exception) | |
{ | |
throw new AggregateException(e, exception); | |
} | |
} | |
} | |
} | |
public BatchQueue(IDataConnectionFactory factory, int bufferSize, | |
int batchSize, int maxDop, IMaterializer materializer) | |
{ | |
Materializer = materializer; | |
_factory = factory; | |
(WriteQueue,Completion) = Source | |
.Channel<WriteQueueEntry<T>>(bufferSize, false, | |
BoundedChannelFullMode.Wait) | |
.BatchWeighted( | |
batchSize, | |
cf => cf.Rows.Count, | |
r => r.UnPooled(), | |
(oldRows, newRows) => | |
oldRows.Add(newRows)) | |
.SelectAsync( | |
maxDop, | |
async promisesAndRows => | |
{ | |
//Hack: We use a Yield here to guarantee parallelism. | |
//Otherwise, | |
//Grabbing a connection may block but not yield | |
//(In some cases) | |
await Task.Yield(); | |
try | |
{ | |
List<T> writeSet = new List<T>(); | |
var entrySet = promisesAndRows.Entries; | |
//Go -backwards- here, | |
//Otherwise Remove has pathological copy behavior. | |
for (var i = entrySet.Count-1; | |
i > 0; | |
i--) | |
{ | |
var t = entrySet[i]; | |
if (t.Token.IsCancellationRequested) | |
{ | |
t.Completion.TrySetCanceled(t.Token); | |
entrySet.RemoveAt(i); | |
} | |
else | |
{ | |
writeSet.AddRange(t.Rows); | |
} | |
} | |
await WriteJournalRowsAsync(writeSet); | |
foreach (var e in entrySet) | |
e.Completion.TrySetResult(Done.Instance); | |
} | |
catch (Exception e) | |
{ | |
foreach (var errSet in promisesAndRows.Entries) | |
errSet.Completion.TrySetException(e); | |
} | |
finally | |
{ | |
} | |
return NotUsed.Instance; | |
}) | |
.ToMaterialized( | |
Sink.Ignore<NotUsed>(), | |
Keep.Both).Run(Materializer); | |
} | |
public ValueTask WriteAsync(T item, | |
CancellationToken token = default) | |
{ | |
return WriteAsync(new List<T>(1) { item }, token); | |
} | |
/// <remarks> | |
/// We use ValueTask here despite a TCS, | |
/// with the intent of allowing pooling in future. | |
/// </remarks> | |
public async ValueTask WriteAsync(List<T> items, | |
CancellationToken token = default) | |
{ | |
var entry = new WriteQueueEntry<T>(items, | |
new TaskCompletionSource<Done>(TaskCreationOptions | |
.RunContinuationsAsynchronously),token); | |
if (!WriteQueue.TryWrite(entry)) | |
{ | |
await WriteQueue.WriteAsync(entry, token); | |
} | |
await entry.Completion.Task; | |
} | |
public async Task<Akka.Util.Try<Done>> Closed() | |
{ | |
try | |
{ | |
await Completion.ConfigureAwait(false); | |
return new Akka.Util.Try<Done>(Done.Instance); | |
} | |
catch (Exception e) | |
{ | |
return new Akka.Util.Try<Done>(e); | |
} | |
} | |
public async ValueTask DisposeAsync() | |
{ | |
try | |
{ | |
WriteQueue.TryComplete(); | |
} | |
catch | |
{ | |
//Intentional | |
} | |
try | |
{ | |
await Completion; | |
} | |
catch | |
{ | |
//Intentional | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment