Skip to content

Instantly share code, notes, and snippets.

Last active December 7, 2023 22:23
Show Gist options
  • Save to11mtm/27f88ee13ea2349a64a8653bfd9e6c97 to your computer and use it in GitHub Desktop.
Save to11mtm/27f88ee13ea2349a64a8653bfd9e6c97 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Akka;
using Akka.Actor;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.Streams.Stage;
using Akka.Streams.Supervision;
using LinqToDB;
using LinqToDB.Data;
using LinqToDB.Mapping;
using LinqToDB.Tools;
using Directive = Akka.Streams.Supervision.Directive;
// ReSharper disable PrivateFieldCanBeConvertedToLocalVariable
// ReSharper disable RedundantArgumentDefaultValue
// ReSharper disable RedundantDefaultMemberInitializer
//So, there's this other project I am working on called 'Venti'.
//It's an event related thing... That's all I will say for now.
//However, I wanted to keep parts of it in the 'GlutenFree' spirit;
// 1. Keep things minimal and hackable
// 2. Prefer Composition of minimal components to a larger whole
// To that end, It's best to start such a project with the storage layer...
// So, this is a 'Small Part' of Venti...
// Thus, the name ShotGlass.
namespace GlutenFree.ShotGlass.Stiff
public abstract class TypedKeyedSequencedRow
[Column(CanBeNull = false, Length = 128)]
public string Type { get; set; } = null!;
[Column(CanBeNull = false, Length = 128)]
public string Key { get; set; } = null!;
// ReSharper disable once UnusedAutoPropertyAccessor.Global
public long Sequence { get; set; }
[Column(SkipOnInsert = true)]
public long Ordering { get; set; }
/// <summary>
/// Meant to be a Unix UTC Timestamp in millis. Sane DBs can do this no problem.
/// </summary>
[Column(SkipOnInsert= true)]
public virtual long InsertTimestamp {get; set; }
public class BatchFlowControl
public class Continue : BatchFlowControl
public static Continue Instance = new Continue();
public class ContinueDelayed : BatchFlowControl
public static ContinueDelayed Instance = new ContinueDelayed();
public class Stop : BatchFlowControl
public static Stop Instance = new Stop();
public class WriteQueueEntry<T> where T: TypedKeyedSequencedRow
public List<T> Rows { get; }
public TaskCompletionSource<Done> Completion { get; }
public CancellationToken Token { get; }
public WriteQueueEntry(List<T> rows,
TaskCompletionSource<Done> completion,
CancellationToken token)
Rows = rows;
Completion = completion;
Token = token;
public static class WriteQueueSet
/// <remarks>
/// It is the consumer's responsibility to NOT return this to the pool,
/// <para/>
/// Or, ensure the pool's return logic (and/or other logic)
/// will discard it.
/// <para/>
/// In our case, The discard logic is tied to our buffer size,
/// And in the seed function, we just see whether the entry is bigger
/// or not. If the entry is bigger, The akka streams Batch stage
/// already is treating it as a snowflake,
/// And the discard will see it is too big.
/// </remarks>
internal static WriteQueueSet<T> UnPooled<T>(
this WriteQueueEntry<T> entry) where T: TypedKeyedSequencedRow
return new WriteQueueSet<T>(entry);
public class Bartender<T> : IAsyncDisposable
where T : TypedKeyedSequencedRow
private readonly ShotGlassRack _rack;
public readonly ReadReader<T> Reader;
public readonly BatchQueue<T> Writer;
public readonly SequenceQueue<T> SequenceGetter;
public Bartender(IDataConnectionFactory connectionFactory,
IMaterializer? materializer = null, string? tableName = null)
if (materializer == null)
_rack = ShotGlassRack.ViaStatic(connectionFactory);
_rack = new ShotGlassRack(connectionFactory, materializer);
//TODO: Confs
Reader = _rack.ReadReader<T>(tableName);
Writer = _rack.WriteQueue<T>(tableName);
SequenceGetter = _rack.SequenceQueue<T>(tableName);
public async ValueTask DisposeAsync()
await Writer.DisposeAsync();
await SequenceGetter.DisposeAsync();
public class ShotGlassRack
public ShotGlassRack(IDataConnectionFactory factory,
IMaterializer materializer)
_factory = factory;
_materializer = materializer;
public SequenceQueue<T> SequenceQueue<T>(string? tableName = null,
int maxBatch = 20, int? maxQueue = null, int? maxDop = null) where T : TypedKeyedSequencedRow
return new SequenceQueue<T>(_factory, _materializer, tableName,
maxBatch, maxQueue, maxDop);
public ReadReader<T> ReadReader<T>(string? tableName = null) where T: TypedKeyedSequencedRow
return new ReadReader<T>(_factory, _materializer, tableName);
public BatchQueue<T> WriteQueue<T>(string? tableName = null, int bufferSize = 8192, int batchSize = 512, int? maxDop = null)
where T : TypedKeyedSequencedRow
return new BatchQueue<T>(_factory, bufferSize, batchSize,
public static ShotGlassRack ViaStatic(IDataConnectionFactory factory)
if (_staticMaterializerFactory == null)
lock (StaticFactoryLockObj)
if (_staticMaterializerFactory == null)
_staticMaterializerFactory = new RackStaticFactory();
return new ShotGlassRack(factory, _staticMaterializerFactory.Materializer);
private static RackStaticFactory? _staticMaterializerFactory;
private static readonly object StaticFactoryLockObj = new();
private readonly IDataConnectionFactory _factory;
private readonly IMaterializer _materializer;
public class RackStaticFactory
private readonly ActorSystem _system;
private readonly ActorMaterializer _materializer;
public RackStaticFactory()
_system = ActorSystem.Create("shotglass-rack-static");
_materializer = _system.Materializer(namePrefix: "shotglass");
public IMaterializer Materializer => _materializer;
public class WriteQueueSet<T> where T: TypedKeyedSequencedRow
public readonly List<WriteQueueEntry<T>> Entries;
public int Count { get; private set; }
internal WriteQueueSet(WriteQueueEntry<T> seedEntry)
Entries = new List<WriteQueueEntry<T>>();
Count = seedEntry.Rows.Count;
public WriteQueueSet<T> Add(WriteQueueEntry<T> newEntry)
Count = newEntry.Rows.Count;
return this;
public interface IDataConnectionFactory
DataConnection GetConnection();
public static class BatchQueue
public static BatchQueue<T>
AsBatchQueue<T>(this T item,
IDataConnectionFactory factory, int bufferSize,
int batchSize, int maxDop, IMaterializer materializer)
where T : TypedKeyedSequencedRow
return new BatchQueue<T>(factory, bufferSize,
batchSize, maxDop, materializer);
public class KeyAndSequenceFor
public KeyAndSequenceFor(string key, long seq)
Key = key;
Seq = seq;
public string Key { get; }
public long Seq { get; }
public static class L2DbExts
public static ITable<T> SafeTableName<T>(this ITable<T> table,
string? tableName) where T : notnull
if (string.IsNullOrWhiteSpace(tableName))
return table;
return table.TableName(tableName);
public class SequenceTrackingChannelReader<TRecord,TOut> : ChannelReader<TOut>
where TRecord : TypedKeyedSequencedRow
private readonly ChannelReader<Carrier<TRecord>> _reader;
// This is useful later...
// ReSharper disable once NotAccessedField.Local
private long _lastSeq;
private readonly Func<TRecord, TOut> _map;
public SequenceTrackingChannelReader(ChannelReader<Carrier<TRecord>> reader, Func<TRecord,TOut> map)
_reader = reader;
_map = map;
public override bool TryRead(out TOut item)
while (_reader.TryRead(out var cItem) && cItem.CarrierType != CarrierType.Ignore)
var tItem = cItem.GetValueOrThrow();
_lastSeq = tItem.Sequence;
item = _map(tItem);
return true;
item = default!;
return false;
public override bool TryPeek(out TOut item)
if (_reader.TryPeek(out var citem) &&
citem.CarrierType == CarrierType.Event)
item = _map(citem.Value);
return true;
item = default;
return false;
public override async ValueTask<bool> WaitToReadAsync(
CancellationToken cancellationToken = new CancellationToken())
return await _reader.WaitToReadAsync(cancellationToken);
public class SequenceQueue<T> : IAsyncDisposable
where T : TypedKeyedSequencedRow
public readonly string? TableName;
private readonly Task<Done> _completion;
private int _shutdownState = 0;
private readonly ChannelWriter<SequenceRequest> _requestQueue;
private static readonly Expression<Func<T, string>> KeySelector = t=>t.Key;
private static readonly Expression<Func<IGrouping<string, T>, KeyAndSequenceFor>> SequenceGroupByExpression = t=>new KeyAndSequenceFor(t.Key,t.Max(i=>i.Sequence));
/// <summary>
/// Returns a completion that will be closed once the Queue is shut down.
/// </summary>
public async Task ClosedCompletion()
await _completion;
/// <summary>
/// Requests completion of the Queue (i.e. for shutdown).
/// Note that until the result of <see cref="ClosedCompletion"/> completes,
/// The queue may still be running entries.
/// </summary>
/// <param name="error">
/// Optional: If provided, closes with an error instead of gracefully.
/// </param>
/// <returns>
/// True if this request caused Completion.
/// <para/>
/// False if a different request caused completion.
/// </returns>
public bool TryFlagCompletion(Exception? error = null)
if (Interlocked.CompareExchange(ref _shutdownState, 1, 0) == 0)
return true;
return false;
public SequenceQueue(IDataConnectionFactory dcf, IMaterializer mat,
string? tableName = null, int maxBatch = 20, int? maxQueue = null,
int? maxDop = null)
TableName = tableName;
var calcedDop =
maxDop.GetValueOrDefault(Environment.ProcessorCount * 2);
(_requestQueue, _completion) = Source
maxQueue ?? calcedDop * maxBatch, false)
.GroupBy(16384, sr=>sr.Type)
sr => new SequenceRequestGroup(sr.Type,sr.Key,
(sr, srg) =>
if (srg != null && srg.Has(sr.Key))
return 0;
return 1;
}, (srg, sr) => srg.Add(sr.Key, sr.Response))
.SelectAsync(calcedDop, async srg =>
using (var ctx = dcf.GetConnection())
var results = await ctx.GetTable<T>()
.Where(t=>t.Type==srg.Type && t.Key.In(srg.Keys))
if (results.Count == srg.Count)
//Fast path; we can set every result easily
//And don't need to filter dictionary entries out.
foreach (var item in results)
srg.SetResultsForKey(item.Key, item.Seq);
catch (Exception e)
foreach (var r in srg.Sets)
foreach (var taskCompletionSource in r.Value)
return NotUsed.Instance;
}).MergeSubstreamsAsSource().ToMaterialized(Sink.Ignore<NotUsed>(), Keep.Both).Run(mat);
public async ValueTask<long> GetSequenceFor(string type, string key,
CancellationToken token = default)
var req = new SequenceRequest(type,key);
await _requestQueue.WriteAsync(req, token);
return await req.Response.Task;
public async ValueTask DisposeAsync()
await _completion;
public static class SubFlowExtensions
public static Source<TOut, TMat> MergeSubstreamsAsSource<TOut, TMat,
TClosed>(this SubFlow<TOut, TMat, TClosed> subFlow)
return (Source<TOut,TMat>)(subFlow.MergeSubstreams());
public readonly struct Carrier<T> where T:class
public Carrier(T value)
CarrierType = CarrierType.Event;
_carrierVal = value;
public Carrier(Exception error)
CarrierType = CarrierType.Error;
_carrierVal = error;
public static readonly Carrier<T> Ignore =
new Carrier<T>(CarrierType.Ignore);
private readonly object _carrierVal;
public readonly CarrierType CarrierType;
private Carrier(CarrierType use)
_carrierVal = default;
CarrierType = use;
public T Value => _carrierVal as T;
public Exception Error => _carrierVal as Exception;
public bool IsIgnore => CarrierType == CarrierType.Ignore;
public T GetValueOrThrow()
if (CarrierType.HasFlag(CarrierType.Event))
return Value;
else if (CarrierType.HasFlag(Stiff.CarrierType.Error))
throw Error;
throw new InvalidOperationException("Invalid state!");
public enum CarrierType
Invalid = 0,
Ignore = 1,
Event = 2,
Error = 4
internal static class CarrierTypeInternals
internal static readonly Exception Event = new Exception("event");
internal static readonly Exception Ignore = new Exception("ignore");
public class ReadReader<T>
where T : TypedKeyedSequencedRow
private readonly IDataConnectionFactory _dataConnectionFactory;
private readonly string? _tableName;
public ReadReader(IDataConnectionFactory dataConnectionFactory,
IMaterializer materializer,
string? tableName)
_dataConnectionFactory = dataConnectionFactory;
_tableName = tableName;
_mat = materializer;
private IMaterializer _mat;
public async ValueTask<List<T>> RunQuery(string type, string key, long startAt,
long? argEndAt, int maxBatch)
using (var ctx = _dataConnectionFactory.GetConnection())
var query = ctx.GetTable<T>().SafeTableName(_tableName)
.Where(t =>
t.Type == type && t.Key == key &&
t.Sequence >= startAt);
if (argEndAt != null)
query = query.Where(t => t.Sequence <= argEndAt);
return await query.Take(maxBatch).ToListAsync();
public ChannelReader<T> GetSpooler(string type, string key, long startAt,
long? endAt, int maxBatch,
Func<T,bool>? filter = null,
CancellationToken token = default)
return GetMappedSpooler(type, key, startAt, endAt, maxBatch, t => t,
public ChannelReader<U> GetMappedSpooler<U>(string type,string key, long startAt,
long? endAt, int maxBatch, Func<T, U> map, Func<T,bool>? baseFilter = null,
CancellationToken token = default)
if (baseFilter == null)
baseFilter = static t => true;
return new SequenceTrackingChannelReader<T,U>(Source.UnfoldAsync<spoolCapt,Carrier<List<T>>>(
new spoolCapt(type,key, startAt, endAt, maxBatch),
async (c) =>
if (c.startAt <= c.endAt)
var results = await RunQuery(c.type,c.key, c.startAt,
if (results.Count > 0)
return Akka.Util.Option<(spoolCapt,Carrier<List<T>>)>.Create((new spoolCapt(c.type,c.key,
c.maxBatch), new Carrier<List<T>>(results)));
catch (Exception e)
return Akka.Util
.Option<(spoolCapt, Carrier<List<T>>)>.Create((
new spoolCapt(c.type, c.key, long.MaxValue,
long.MinValue, 0),
new Carrier<List<T>>(e)));
return Akka.Util.Option<(spoolCapt, Carrier<List<T>>)>.None;
}).SelectMany(a =>
if ((a.CarrierType & CarrierType.Event) != 0)
return a.Value.Select(a=>new Carrier<T>(a));
return new List<Carrier<T>>()
{ new Carrier<T>(a.Error) };
}).Where(a=> (a.CarrierType.HasFlag(CarrierType.Event) && baseFilter(a.Value)) || a.CarrierType.HasFlag(CarrierType.Error))
ChannelSink.AsReader<Carrier<T>>(32, false,
private readonly struct spoolCapt
public readonly string type;
public readonly string key;
public readonly long startAt;
public readonly long? endAt;
public readonly int maxBatch;
public spoolCapt(string type, string key, long startAt, long? endAt, int maxBatch)
this.type = type;
this.key = key;
this.startAt = startAt;
this.endAt = endAt;
this.maxBatch = maxBatch;
public class SequenceRequestGroup
private readonly Dictionary<string, List<TaskCompletionSource<long>>>
public readonly string Type;
public SequenceRequestGroup(string type, string key, TaskCompletionSource<long> req)
Type = type;
_reqSet = new Dictionary<string, List<TaskCompletionSource<long>>>();
_reqSet.Add(key, new List<TaskCompletionSource<long>>() { req });
public bool Has(string key)
return _reqSet.ContainsKey(key);
public IEnumerable<string> Keys => _reqSet.Keys;
public IEnumerable<KeyValuePair<string, List<TaskCompletionSource<long>>>>
Sets => _reqSet;
public int Count => _reqSet.Count;
internal void SetResultsForKey(string key, long item)
if (_reqSet.TryGetValue(key, out var _set))
foreach (var taskCompletionSource in _set)
internal void SetResultsSlow(List<KeyAndSequenceFor> set)
foreach (var item in set)
if (_reqSet.TryGetValue(item.Key, out var entry))
foreach (var tcs in entry)
foreach (var keyValuePair in _reqSet)
foreach (var tcs in keyValuePair.Value)
public SequenceRequestGroup Add(string key,
TaskCompletionSource<long> req)
if (_reqSet.TryGetValue(key, out var set))
_reqSet.Add(key, new List<TaskCompletionSource<long>>() { req });
return this;
public static class StreamDsl
public static Source<TOut2, TMat>
BatchWeightedWithContext<TOut, TOut2, TMat>(
this Source<TOut, TMat> flow, long max, Func<TOut, TOut2> seed,
Func<TOut, TOut2?, long> costFunction,
Func<TOut2, TOut, TOut2> aggregate)
return (Source<TOut2, TMat>)flow.Via(
new BatchWeightWithContext<TOut, TOut2>(max, costFunction,
seed, aggregate));
public static SubFlow<TOut2, TMat, TClosed> BatchWeightedWithContext<TOut, TOut2, TMat,TClosed>(
this SubFlow<TOut, TMat,TClosed> flow, long max, Func<TOut, TOut2> seed,
Func<TOut, TOut2?, long> costFunction,
Func<TOut2, TOut, TOut2> aggregate)
return (SubFlow<TOut2, TMat,TClosed>)flow.Via(
new BatchWeightWithContext<TOut, TOut2>(max, costFunction,
seed, aggregate));
public sealed class
BatchWeightWithContext<TIn, TOut> : GraphStage<FlowShape<TIn, TOut>>
#region internal classes
private sealed class Logic : InAndOutGraphStageLogic
private readonly FlowShape<TIn, TOut> _shape;
private readonly BatchWeightWithContext<TIn, TOut> _stage;
private readonly Akka.Streams.Supervision.Decider _decider;
private Akka.Util.Option<TOut> _aggregate;
private long _left;
private Akka.Util.Option<TIn> _pending;
public Logic(Attributes inheritedAttributes,
BatchWeightWithContext<TIn, TOut> stage) : base(stage.Shape)
_shape = stage.Shape;
_stage = stage;
var attr = inheritedAttributes
_decider = attr != null
? attr.Decider
: Deciders.StoppingDecider;
_left = stage._max;
SetHandlers(_shape.Inlet, _shape.Outlet, this);
public override void OnPush()
var element = Grab(_shape.Inlet);
var cost =
_stage._costFunc(element, _aggregate.GetOrElse(default));
if (!_aggregate.HasValue)
_aggregate = _stage._seed(element);
_left -= cost;
catch (Exception ex)
switch (_decider(ex))
case Directive.Stop:
case Directive.Restart:
case Directive.Resume:
else if (_left < cost)
_pending = element;
_aggregate =
_stage._aggregate(_aggregate.Value, element);
_left -= cost;
catch (Exception ex)
switch (_decider(ex))
case Directive.Stop:
case Directive.Restart:
case Directive.Resume:
if (IsAvailable(_shape.Outlet))
if (!_pending.HasValue)
public override void OnUpstreamFinish()
if (!_aggregate.HasValue)
public override void OnPull()
if (!_aggregate.HasValue)
if (IsClosed(_shape.Inlet))
else if (!HasBeenPulled(_shape.Inlet))
else if (IsClosed(_shape.Inlet))
Push(_shape.Outlet, _aggregate.Value);
if (!_pending.HasValue)
_aggregate = _stage._seed(_pending.Value);
catch (Exception ex)
switch (_decider(ex))
case Directive.Stop:
case Directive.Restart:
if (!HasBeenPulled(_shape.Inlet))
case Directive.Resume:
_pending = Akka.Util.Option<TIn>.None;
if (!HasBeenPulled(_shape.Inlet))
private void Flush()
if (_aggregate.HasValue)
Push(_shape.Outlet, _aggregate.Value);
_left = _stage._max;
if (_pending.HasValue)
_aggregate = _stage._seed(_pending.Value);
_left -= _stage._costFunc(_pending.Value, default);
_pending = Akka.Util.Option<TIn>.None;
catch (Exception ex)
switch (_decider(ex))
case Directive.Stop:
case Directive.Restart:
case Directive.Resume:
_pending = Akka.Util.Option<TIn>.None;
_aggregate = Akka.Util.Option<TOut>.None;
public override void PreStart() => Pull(_shape.Inlet);
private void RestartState()
_aggregate = Akka.Util.Option<TOut>.None;
_left = _stage._max;
_pending = Akka.Util.Option<TIn>.None;
private readonly long _max;
private readonly Func<TIn, TOut?, long> _costFunc;
private readonly Func<TIn, TOut> _seed;
private readonly Func<TOut, TIn, TOut> _aggregate;
/// <summary>
/// TBD
/// </summary>
/// <param name="max">TBD</param>
/// <param name="costFunc">TBD</param>
/// <param name="seed">TBD</param>
/// <param name="aggregate">TBD</param>
public BatchWeightWithContext(long max, Func<TIn, TOut?, long> costFunc,
Func<TIn, TOut> seed, Func<TOut, TIn, TOut> aggregate)
_max = max;
_costFunc = costFunc;
_seed = seed;
_aggregate = aggregate;
var inlet = new Inlet<TIn>("");
var outlet = new Outlet<TOut>("Batch.out");
Shape = new FlowShape<TIn, TOut>(inlet, outlet);
/// <summary>
/// TBD
/// </summary>
public override FlowShape<TIn, TOut> Shape { get; }
/// <summary>
/// TBD
/// </summary>
/// <param name="inheritedAttributes">TBD</param>
/// <returns>TBD</returns>
protected override GraphStageLogic CreateLogic(
Attributes inheritedAttributes)
=> new Logic(inheritedAttributes, this);
public sealed class SequenceRequest
public SequenceRequest(string type, string key)
Type = type;
Key = key;
Response = new TaskCompletionSource<long>(TaskCreationOptions
public TaskCompletionSource<long> Response { get; }
public string Type { get; }
public string Key { get; }
public class BatchQueue<T> : IAsyncDisposable
where T : TypedKeyedSequencedRow
private readonly ChannelWriter<WriteQueueEntry<T>> WriteQueue;
private readonly IDataConnectionFactory _factory;
private readonly string TableName;
private readonly IMaterializer Materializer;
private Task<Done> Completion;
public async ValueTask WriteJournalRowsAsync(List<T> rowset,
CancellationToken token = default)
using (var ctx = _factory.GetConnection())
if (rowset.Count == 1)
await ctx.InsertAsync(rowset[0], TableName, token: token);
await InsertMulti(rowset, token, ctx);
private async Task InsertMulti(List<T> rowset, CancellationToken token,
DataConnection ctx)
using (var tx = await ctx.BeginTransactionAsync(token))
await ctx.GetTable<T>()
new BulkCopyOptions(
TableName: this.TableName,
BulkCopyType: BulkCopyType
.MultipleRows), rowset, token);
await tx.CommitAsync(token);
catch (Exception e)
// We should always try to rollback even if cancelled.
// ReSharper disable once MethodSupportsCancellation
await tx.RollbackAsync();
catch (Exception exception)
throw new AggregateException(e, exception);
public BatchQueue(IDataConnectionFactory factory, int bufferSize,
int batchSize, int maxDop, IMaterializer materializer)
Materializer = materializer;
_factory = factory;
(WriteQueue,Completion) = Source
.Channel<WriteQueueEntry<T>>(bufferSize, false,
cf => cf.Rows.Count,
r => r.UnPooled(),
(oldRows, newRows) =>
async promisesAndRows =>
//Hack: We use a Yield here to guarantee parallelism.
//Grabbing a connection may block but not yield
//(In some cases)
await Task.Yield();
List<T> writeSet = new List<T>();
var entrySet = promisesAndRows.Entries;
//Go -backwards- here,
//Otherwise Remove has pathological copy behavior.
for (var i = entrySet.Count-1;
i > 0;
var t = entrySet[i];
if (t.Token.IsCancellationRequested)
await WriteJournalRowsAsync(writeSet);
foreach (var e in entrySet)
catch (Exception e)
foreach (var errSet in promisesAndRows.Entries)
return NotUsed.Instance;
public ValueTask WriteAsync(T item,
CancellationToken token = default)
return WriteAsync(new List<T>(1) { item }, token);
/// <remarks>
/// We use ValueTask here despite a TCS,
/// with the intent of allowing pooling in future.
/// </remarks>
public async ValueTask WriteAsync(List<T> items,
CancellationToken token = default)
var entry = new WriteQueueEntry<T>(items,
new TaskCompletionSource<Done>(TaskCreationOptions
if (!WriteQueue.TryWrite(entry))
await WriteQueue.WriteAsync(entry, token);
await entry.Completion.Task;
public async Task<Akka.Util.Try<Done>> Closed()
await Completion.ConfigureAwait(false);
return new Akka.Util.Try<Done>(Done.Instance);
catch (Exception e)
return new Akka.Util.Try<Done>(e);
public async ValueTask DisposeAsync()
await Completion;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment