Created
May 10, 2024 16:36
-
-
Save to11mtm/f3ec221f5b7499000668d54c46671da1 to your computer and use it in GitHub Desktop.
Nats Toy App, Jetstream Version
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Buffers; | |
using System.Collections.Concurrent; | |
using System.Diagnostics; | |
using System.Diagnostics.CodeAnalysis; | |
using System.Net.Sockets; | |
using System.Text; | |
using System.Threading.Channels; | |
using CommunityToolkit.HighPerformance.Buffers; | |
using Cysharp.Serialization.MessagePack; | |
using LanguageExt; | |
using LinqToDB; | |
using LinqToDB.Data; | |
using LinqToDB.Mapping; | |
using MessagePack; | |
using MessagePack.Resolvers; | |
using Microsoft.Data.Sqlite; | |
using Microsoft.Extensions.Logging; | |
using NATS.Client.Core; | |
using NATS.Client.JetStream; | |
using NATS.Client.JetStream.Models; | |
using NATS.Client.KeyValueStore; | |
using ValueTaskSupplement; | |
using static LanguageExt.Prelude; | |
using CancellationTokenSource = System.Threading.CancellationTokenSource; | |
using Seq = LanguageExt.Seq; | |
using Ulid = System.Ulid; | |
namespace NatsSandBox.Client.JetStream; | |
public class Program | |
{ | |
public static async Task Main(string[] args) | |
{ | |
DataConnection.DefaultOnTraceConnection = info => | |
{ | |
if ((info.Operation == TraceOperation.ExecuteReader | |
|| info.Operation == TraceOperation.ExecuteNonQuery) | |
&& info.TraceInfoStep == TraceInfoStep.BeforeExecute) | |
{ | |
Console.WriteLine( | |
$"{info.Operation} {Environment.NewLine} {info.SqlText}"); | |
} | |
}; | |
DataConnection.TraceSwitch.Level = TraceLevel.Verbose; | |
var memReplConnectionFactory = SqliteInMemDataConnectionFactory.Create(); | |
using (var ctx = memReplConnectionFactory.GetConnection()) | |
{ | |
ctx.CreateTable<ServerReplicationRow>(); | |
ctx.CreateTable<ServerReplicationSnapshotRow>(); | |
} | |
//var cb = new ClientBootstrap(); | |
//var provider = new StandardClientProvider(cb); | |
var cb = new JetStreamClientBootstrap(); | |
var provider = new JetStreamClientProvider(cb); | |
await provider.EnsureStarted(); | |
var c = new TaskSetCoordinatorHashMap("test",provider,false); | |
var tcs = | |
new TaskCompletionSource(TaskCreationOptions | |
.RunContinuationsAsynchronously); | |
tcs.SetResult(); | |
var cts = new CancellationTokenSource(); | |
var repl = new ServerReplicatorV2(provider, memReplConnectionFactory, new ServerReplicatorOptions(true)); | |
await repl.StartReplicator(cts.Token); | |
await c.Init(tcs,cts.Token); | |
await c.CreateTask("idk", "wat", "bar", DateTimeOffset.Now.AddDays(1)); | |
var t= Task.Run(async () => | |
{ | |
while (cts.IsCancellationRequested == false) | |
{ | |
try | |
{ | |
//await c.CreateTask("idk", "wat", "bar", DateTimeOffset.Now.AddDays(1)); | |
await Task.Delay(100); | |
//Console.WriteLine(MessagePackSerializer.SerializeToJson(c.GetTaskState(), | |
// ChatTaskClientSerializer<TaskSetCoordinatorHashMap.TaskState> | |
// .DefaultSerializerOptions)); | |
await Task.Delay(5000); | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
throw; | |
} | |
} | |
}); | |
while (cts.IsCancellationRequested == false) | |
{ | |
var x = Console.ReadLine(); | |
if (x == "exit") | |
{ | |
cts.Cancel(); | |
} | |
else if (x.StartsWith("create")) | |
{ | |
var tokens = x.Split(' ', StringSplitOptions.RemoveEmptyEntries); | |
if (tokens.Length > 3) | |
{ | |
var type = tokens[1]; | |
var assignedTo = tokens[2]; | |
var text = string.Join(' ', tokens.Skip(3)); | |
var newTaskId = await c.CreateTask(type, text, assignedTo, | |
DateTimeOffset.Now.AddDays(7)); | |
Console.WriteLine($"Created Task with ID {newTaskId.ToString()}"); | |
} | |
} | |
else if (x.StartsWith("print")) | |
{ | |
var ts = c.GetTaskState(); | |
foreach (var valueTuplese in Seq.create( | |
("Assigned : ", ts.AssignedTasks), | |
("Requested : ", ts.RequestedTasks), | |
("Unassigned : ", ts.UnassignedTasks))) | |
{ | |
Console.WriteLine(valueTuplese.Item1); | |
Console.WriteLine(MessagePackSerializer.SerializeToJson(valueTuplese.Item2, | |
ChatTaskClientSerializer<HashMap<string,HashMap<string,Lst<BaseTaskEvent>>>> | |
.DefaultSerializerOptions)); | |
Console.WriteLine("================================"); | |
} | |
} | |
else if (x.StartsWith("db printrows")) | |
{ | |
using (var ctx = memReplConnectionFactory.GetConnection()) | |
{ | |
Console.WriteLine((await ctx.GetTable<ServerReplicationRow>() | |
.ToListAsync()).Count); | |
} | |
} | |
} | |
await t; | |
await Task.WhenAll(repl.ReadLoop, repl.WriteLoop); | |
cts.Dispose(); | |
Console.WriteLine("Goodbye, World!"); | |
} | |
} | |
#region wip stuff | |
public class JetStreamClientBootstrap | |
{ | |
public JetStreamClientBootstrap(NatsOpts? opts = null) | |
{ | |
_opts = opts ?? NatsOpts.Default; | |
} | |
private NatsConnection _connection; | |
private readonly NatsOpts _opts; | |
private NatsJSContext _jsCtx; | |
public async ValueTask StartClient() | |
{ | |
//TODO: At some point, do jetstream instead. | |
// And after that, do snapshots via KV! | |
_connection = new NatsConnection(_opts); | |
await _connection.ConnectAsync(); | |
var jsi = new JetstreamInitiator(); | |
_jsCtx = await jsi.EnsureJetstreams(_connection); | |
//var js = new NatsJSContext(_connection, new NatsJSOpts(NatsOpts.Default)); | |
//new NatsKVContext() | |
} | |
public async ValueTask SendMessage(ISubjectFactory command, CancellationToken token) | |
{ | |
switch (command) | |
{ | |
case BaseTaskEvent te: | |
await _jsCtx.PublishAsync(command.GetSubject(), te, | |
serializer: ChatTaskClientSerializer<BaseTaskEvent>.Default, | |
new NatsJSPubOpts(), cancellationToken: token); | |
//await _connection.PublishAsync(command.GetSubject(), te, | |
// serializer: ChatTaskClientSerializer<BaseTaskEvent>.Default); | |
break; | |
} | |
} | |
public async ValueTask<IAsyncEnumerable<NatsJSMsg<T>>> Sub<T>(string[] subject, ulong startAt = 0, | |
CancellationToken token = default) where T : BaseTaskEvent | |
{ | |
var consumer = await _jsCtx.CreateOrderedConsumerAsync(SubjectHelpers.TaskEventsJetStreamReadName, | |
new NatsJSOrderedConsumerOpts() with { OptStartSeq = startAt, FilterSubjects = subject, }, token); | |
async IAsyncEnumerable<NatsJSMsg<T>> spoolConsumerAsync() | |
{ | |
//TODO: This may be wrong lol | |
while (token.IsCancellationRequested == false) | |
{ | |
var c = consumer.ConsumeAsync(ChatTaskClientSerializer<T>.Default, new NatsJSConsumeOpts(){ MaxMsgs = 100}, token); | |
await foreach (var a in c) | |
{ | |
yield return a; | |
} | |
} | |
} | |
return spoolConsumerAsync(); | |
} | |
} | |
public class JetstreamInitiator | |
{ | |
public async Task<NatsJSContext> EnsureJetstreams(NatsConnection connection) | |
{ | |
var ctx = new NatsJSContext(connection); | |
var str = await ctx.CreateStreamAsync(new StreamConfig( | |
SubjectHelpers.TaskEventsJetStreamWriteName, | |
new[] { "sample.task.>" }) with | |
{ | |
Discard = StreamConfigDiscard.New, | |
Retention = StreamConfigRetention.Interest, | |
Storage = StreamConfigStorage.Memory, | |
//Republish = new Republish {Src = "sample.task.>", Dest = "sample.task.events.read"} | |
}); | |
var replicaStr = await ctx.CreateStreamAsync( | |
new StreamConfig(SubjectHelpers.TaskEventsJetStreamReadName, | |
//new[] { "sample.task.>" }) | |
System.Array.Empty<string>()) | |
{ | |
Sources = new List<StreamSource>(){new StreamSource(){Name = SubjectHelpers.TaskEventsJetStreamWriteName,}}, | |
Discard = StreamConfigDiscard.New, Retention = StreamConfigRetention.Limits, | |
Storage = StreamConfigStorage.Memory, | |
}); | |
return ctx; | |
} | |
} | |
#endregion | |
public class SqliteInMemDataConnectionFactory : DataConnectionFactory | |
{ | |
private static int counter = 0; | |
private static string genCs() | |
{ | |
return new SqliteConnectionStringBuilder() | |
{ DataSource = $"memdb-{Interlocked.Increment(ref counter)}", | |
Mode = SqliteOpenMode.Memory, Cache = SqliteCacheMode.Shared, Pooling = true | |
}.ToString(); | |
} | |
private SqliteInMemDataConnectionFactory() | |
{ | |
_connectionString = genCs(); | |
Options = new DataOptions( | |
new ConnectionOptions(ProviderName: ProviderName.SQLiteMS, | |
ConnectionString: _connectionString)); | |
_heldConnection = new SqliteConnection(_connectionString); | |
_heldConnection.Open(); | |
} | |
public readonly DataOptions Options; | |
private readonly string _connectionString; | |
private readonly SqliteConnection _heldConnection; | |
public override DataConnection GetConnection() | |
{ | |
return new DataConnection(Options); | |
} | |
private static readonly Atom<Lst<SqliteInMemDataConnectionFactory>> _atom = | |
Atom(Lst<SqliteInMemDataConnectionFactory>.Empty); | |
public static SqliteInMemDataConnectionFactory Create() | |
{ | |
var fact = new SqliteInMemDataConnectionFactory(); | |
_atom.Swap(fact, (a, b) => b.Add(a)); | |
return fact; | |
} | |
} | |
public class ServerReplicationRow | |
{ | |
//public string Category { get; set; } | |
public string TaskType { get; set; } | |
public Guid EventId { get; set; } | |
public Guid TaskId { get; set; } | |
public byte[] TaskEventData { get; set; } | |
public string OriginalSubject { get; set; } | |
public ulong? SourceStreamOrdering { get; set; } | |
[Column(Configuration = ProviderName.SQLite, DbType = "INTEGER", IsPrimaryKey = true,IsIdentity = true)] | |
[Column(IsPrimaryKey = true, IsIdentity = true)] | |
public long Ordering { get; set; } | |
} | |
public class ServerReplicationSnapshotRow | |
{ | |
public long LastDbOrdering { get; set; } | |
public byte[] TaskSnapshot { get; set; } | |
} | |
public class ReplicationRowHolder | |
{ | |
public readonly List<ServerReplicationRow> Rows; | |
public readonly TaskCompletionSource<long>? Tcs; | |
public ReplicationRowHolder(List<ServerReplicationRow> rows, bool b) | |
{ | |
Rows = rows; | |
Tcs = b? new TaskCompletionSource<long>(TaskCreationOptions.RunContinuationsAsynchronously) : null; | |
} | |
} | |
public class ReplicationSqlReader | |
{ | |
private readonly DataConnectionFactory _ctxFactory; | |
public ReplicationSqlReader(DataConnectionFactory ctxFactory) | |
{ | |
_ctxFactory = ctxFactory; | |
} | |
public async ValueTask<ServerReplicationSnapshotRow?> GetNewestSnapshot() | |
{ | |
using (var ctx = _ctxFactory.GetConnection()) | |
{ | |
var row = await ctx.GetTable<ServerReplicationSnapshotRow>() | |
.OrderByDescending(a => a.LastDbOrdering).FirstOrDefaultAsync(); | |
return row; | |
} | |
} | |
public async IAsyncEnumerable<ServerReplicationRow> BeginReadAtOrdering(long ordering) | |
{ | |
long? curr = ordering; | |
List<ServerReplicationRow> currSet = null; | |
while (curr != null) | |
{ | |
using (var ctx = _ctxFactory.GetConnection()) | |
{ | |
currSet = await GrabOrderingSet(ctx, curr.Value); | |
if (currSet.Count < 100) | |
{ | |
curr = null; | |
} | |
else | |
{ | |
curr = currSet.LastOrDefault()!.Ordering + 1; | |
} | |
} | |
foreach (var row in currSet) | |
{ | |
yield return row; | |
} | |
} | |
} | |
private static async Task<List<ServerReplicationRow>> GrabOrderingSet(DataConnection ctx, [DisallowNull] long curr) | |
{ | |
return await ctx.GetTable<ServerReplicationRow>().Where(a => a.Ordering >= curr) | |
.OrderBy(a=>a.Ordering) | |
.Take(100) | |
.ToListAsync(); | |
} | |
} | |
public class ReplicatorSqlWriter | |
{ | |
private readonly DataConnectionFactory _ctxFactory; | |
private static readonly BulkCopyOptions DefaultCopyOptions = | |
new BulkCopyOptions( | |
BulkCopyType: BulkCopyType.MultipleRows); | |
public ReplicatorSqlWriter(DataConnectionFactory dataConnectionFactory) | |
{ | |
_ctxFactory = dataConnectionFactory; | |
} | |
public async ValueTask<long?> WriteTasks(List<ServerReplicationRow> rows, | |
bool includeMaxOrderInReturn) | |
{ | |
try | |
{ | |
using (var ctx = _ctxFactory.GetConnection()) | |
{ | |
using (var tx = await ctx.BeginTransactionAsync()) | |
{ | |
long? ret = null; | |
await ctx.GetTable<ServerReplicationRow>() | |
.BulkCopyAsync(DefaultCopyOptions, rows); | |
if (includeMaxOrderInReturn) | |
{ | |
var lastEventId = rows.Last().EventId; | |
ret = await ctx.GetTable<ServerReplicationRow>() | |
.Where(a => a.EventId == lastEventId) | |
.Select(a => a.Ordering).OrderByDescending(a => a) | |
.FirstOrDefaultAsync(); | |
} | |
await tx.CommitAsync(); | |
return ret; | |
} | |
} | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
throw; | |
} | |
} | |
public async ValueTask WriteSnapshot(ServerReplicationSnapshotRow row) | |
{ | |
using (var ctx = _ctxFactory.GetConnection()) | |
{ | |
await ctx.GetTable<ServerReplicationSnapshotRow>() | |
.InsertOrUpdateAsync(() => new ServerReplicationSnapshotRow() | |
{ | |
LastDbOrdering = row.LastDbOrdering, | |
TaskSnapshot = row.TaskSnapshot | |
}, e => new ServerReplicationSnapshotRow() | |
{ | |
TaskSnapshot = row.TaskSnapshot | |
}, | |
() => new ServerReplicationSnapshotRow() | |
{ LastDbOrdering = row.LastDbOrdering }); | |
} | |
} | |
} | |
public abstract class ClientProvider | |
{ | |
public abstract ValueTask EnsureStarted(CancellationToken token = default); | |
public abstract ValueTask SendMessage<T>(T taskRequest, CancellationToken token = default) where T:BaseTaskEvent; | |
public abstract IAsyncEnumerable<NatsBaseEvent<T>> GetConsumer<T>(string subject,ulong startAt = 0, CancellationToken token = default) where T : BaseTaskEvent; | |
} | |
public sealed class StandardClientProvider : ClientProvider | |
{ | |
private readonly ClientBootstrap _bootstrap; | |
public StandardClientProvider(ClientBootstrap bootstrap) | |
{ | |
_bootstrap = bootstrap; | |
} | |
public override async ValueTask EnsureStarted(CancellationToken token = default) | |
{ | |
await _bootstrap.StartClient(); | |
} | |
public override async ValueTask SendMessage<T>(T taskRequest, CancellationToken token = default) | |
{ | |
await _bootstrap.SendMessage(taskRequest, token); | |
} | |
public override async IAsyncEnumerable<NatsBaseEvent<T>> GetConsumer<T>(string subject, ulong startAt = 0, CancellationToken token = default) | |
{ | |
await foreach(var item in (await _bootstrap.Sub<T>( subject,token)).Msgs.ReadAllAsync(token)) | |
{ | |
yield return new NatsEvent<T>(item); | |
} | |
} | |
} | |
public sealed class JetStreamClientProvider : ClientProvider | |
{ | |
private readonly JetStreamClientBootstrap _bootstrap; | |
public JetStreamClientProvider(JetStreamClientBootstrap bootstrap) | |
{ | |
_bootstrap = bootstrap; | |
} | |
public override async ValueTask EnsureStarted(CancellationToken token = default) | |
{ | |
await _bootstrap.StartClient(); | |
} | |
public override async ValueTask SendMessage<T>(T taskRequest, CancellationToken token = default) | |
{ | |
await _bootstrap.SendMessage(taskRequest, token); | |
} | |
public override async IAsyncEnumerable<NatsBaseEvent<T>> GetConsumer<T>(string subject, ulong startAt = 0, CancellationToken token = default) | |
{ | |
await foreach(var item in ((await _bootstrap.Sub<T>(new[] { subject }, startAt, token)).WithCancellation(token))) | |
{ | |
yield return new NatsBaseJsEvent<T>(item); | |
} | |
} | |
} | |
public record ServerReplicatorOptions(bool ForceWriteDispatchViaTaskScheduler); | |
public class ServerReplicatorV2 | |
{ | |
private readonly ClientProvider _client; | |
private readonly DataConnectionFactory _ctxFactory; | |
public ServerReplicatorV2(ClientProvider bootstrap, | |
DataConnectionFactory dataConnectionFactory, ServerReplicatorOptions options) | |
{ | |
_client = bootstrap; | |
_replicationSqlReader = new ReplicationSqlReader(dataConnectionFactory); | |
_replicatorSqlWriter = new ReplicatorSqlWriter(dataConnectionFactory); | |
_options = options; | |
} | |
private readonly Channel<ReplicationRowHolder> _serverRows = | |
Channel.CreateUnbounded<ReplicationRowHolder>(); | |
private readonly ReplicationSqlReader _replicationSqlReader; | |
public async ValueTask StartReplicator(CancellationToken token) | |
{ | |
var wStart = | |
new TaskCompletionSource(TaskCreationOptions | |
.RunContinuationsAsynchronously); | |
var rStart = | |
new TaskCompletionSource(TaskCreationOptions | |
.RunContinuationsAsynchronously); | |
WriteLoop = Task.Run(async () => | |
{ | |
try | |
{ | |
Task lastWrite = Task.CompletedTask; | |
// TODO: Pooling | |
List<ServerReplicationRow> pendingSet = | |
new List<ServerReplicationRow>(); | |
List<TaskCompletionSource<long>> pendingRange = | |
new List<TaskCompletionSource<long>>(); | |
// ReSharper disable once MethodSupportsCancellation | |
wStart.SetResult(); | |
async Task CycleFlush(List<ServerReplicationRow> serverReplicationRows, List<TaskCompletionSource<long>> taskCompletionSources) | |
{ | |
await lastWrite; | |
if (_options.ForceWriteDispatchViaTaskScheduler) | |
{ | |
lastWrite = | |
Task.Run(async () => | |
{ | |
await FlushMainWriteBufferAndClearPendings( | |
serverReplicationRows, | |
taskCompletionSources); | |
}); | |
} | |
else | |
{ | |
lastWrite = | |
FlushMainWriteBufferAndClearPendings(serverReplicationRows, | |
taskCompletionSources); | |
} | |
pendingSet = new List<ServerReplicationRow>(); | |
pendingRange = new List<TaskCompletionSource<long>>(); | |
} | |
while (await _serverRows.Reader.WaitToReadAsync()) | |
{ | |
while (_serverRows.Reader.TryRead(out var a)) | |
{ | |
pendingSet.AddRange(a.Rows); | |
if (a.Tcs != null) | |
{ | |
pendingRange.Add(a.Tcs); | |
} | |
//If close to a flush, cycle. | |
if (maxTillMainFlush - pendingSet.Count - maxTillFlush <= 0) | |
{ | |
await CycleFlush(pendingSet, pendingRange); | |
} | |
} | |
// ensure last batch is cycled, just in case it's a while | |
// until the next event. | |
if (pendingSet.Count > 0) | |
{ | |
await CycleFlush(pendingSet, pendingRange); | |
} | |
} | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
throw; | |
} | |
}); | |
ReadLoop = Task.Run(async () => | |
{ | |
try | |
{ | |
// TODO: SetResult/SetException based on sub call. | |
rStart.SetResult(); | |
var passingCh = Channel.CreateBounded<NatsBaseEvent<BaseTaskEvent>>(8); | |
var snapImpl = new ReplicatorSnapshotTracker(); | |
var lastRead = await RecoverImpl(snapImpl); | |
Task.Run(async () => | |
{ | |
var w = passingCh.Writer; | |
await foreach (var item in _client.GetConsumer<BaseTaskEvent>(SubjectHelpers.TaskReplication, lastRead.GetValueOrDefault(0), token)) | |
{ | |
await w.WriteAsync(item, token); | |
} | |
}); | |
try | |
{ | |
var r = passingCh.Reader; | |
var tillSnap = maxTillSnap; | |
var thisList = new List<ServerReplicationRow>(); | |
ReplicationRowHolder holder = null; | |
while (token.IsCancellationRequested == false) | |
{ | |
var tillFlush = maxTillFlush; | |
NatsBaseEvent<BaseTaskEvent>? msg = default; | |
while (r.TryRead(out msg)) | |
{ | |
await snapImpl.ConsumeEvent(msg.Data!); | |
thisList.Add(ToReplicationRow(msg)); | |
tillFlush--; | |
if (tillFlush == 0) | |
{ | |
tillSnap = | |
await FlushAndSnapBarrier(token, snapImpl, | |
tillSnap, thisList, msg); | |
thisList = new List<ServerReplicationRow>(); | |
} | |
} | |
if (tillFlush != maxTillFlush) | |
{ | |
tillSnap = await FlushAndSnapBarrier( | |
token, | |
snapImpl, tillSnap, thisList, msg); | |
thisList = new List<ServerReplicationRow>(); | |
} | |
if (await r.WaitToReadAsync(token) == false) | |
{ | |
break; | |
} | |
} | |
if (thisList.Count > 0) | |
{ | |
await _serverRows.Writer.WriteAsync( | |
new ReplicationRowHolder(thisList, false), token); | |
//this is shutdown, no need to realloc list here. | |
} | |
} | |
finally | |
{ | |
passingCh.Writer.TryComplete(); | |
} | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
throw; | |
} | |
}); | |
await Task.WhenAll(wStart.Task, rStart.Task); | |
} | |
private async Task FlushMainWriteBufferAndClearPendings( | |
List<ServerReplicationRow> pendingSet, | |
List<TaskCompletionSource<long>> pendingRange) | |
{ | |
var r = await _replicatorSqlWriter.WriteTasks(pendingSet, true); | |
if (r != null) | |
{ | |
foreach (var taskCompletionSource in pendingRange) | |
{ | |
taskCompletionSource.TrySetResult(r.Value); | |
} | |
} | |
} | |
const int maxTillFlush = 100; | |
const int maxTillSnap = 2000; | |
private const int maxTillMainFlush = 500; | |
private async Task<int> FlushAndSnapBarrier(CancellationToken token, | |
ReplicatorSnapshotTracker snapImpl, | |
int tillSnap, List<ServerReplicationRow> thisList, NatsBaseEvent<BaseTaskEvent>? msg) | |
{ | |
tillSnap = tillSnap - thisList.Count; | |
var shouldSnap = (tillSnap <= 0); | |
ReplicationRowHolder holder = | |
holder = new ReplicationRowHolder(thisList, shouldSnap); | |
await _serverRows.Writer.WriteAsync(holder, token); | |
if (shouldSnap) | |
{ | |
var replRow = snapImpl.GetSnapState(); | |
_ = Task.Run(async () => | |
{ | |
await _replicatorSqlWriter.WriteSnapshot( | |
new ServerReplicationSnapshotRow() | |
{ | |
LastDbOrdering = await holder.Tcs.Task, | |
TaskSnapshot = | |
ReplicationSerializer.Instance | |
.SerializeTaskSnapshot(replRow) | |
}); | |
tillSnap = maxTillSnap; | |
}, token); | |
tillSnap = maxTillSnap; | |
} | |
if (msg != null) | |
{ | |
await msg.AckAsync(token); | |
} | |
return tillSnap; | |
} | |
private static void ThrowClosedChannelException() | |
{ | |
throw new ChannelClosedException("Row Buffer Channel was closed!"); | |
} | |
private ServerReplicationRow ToReplicationRow(NatsBaseEvent<BaseTaskEvent> item) | |
{ | |
var msg = item.Data; | |
var tt = msg.TaskType; | |
var eid = msg.EventId; | |
var tid = msg.TaskId; | |
byte[] bytes = default!; | |
using (var buf = new ArrayPoolBufferWriter<byte>()) | |
{ | |
ChatTaskClientSerializer<BaseTaskEvent>.Default.Serialize(buf, msg); | |
bytes = buf.WrittenMemory.ToArray(); | |
} | |
return new ServerReplicationRow() | |
{ | |
SourceStreamOrdering = item.Sequence, | |
EventId = eid.ToGuid(), OriginalSubject = item.Subject, | |
TaskEventData = bytes, TaskId = tid.ToGuid(), TaskType = tt | |
}; | |
} | |
private readonly ReplicatorSqlWriter _replicatorSqlWriter; | |
private readonly ServerReplicatorOptions _options; | |
public Task WriteLoop { get; private set; } = Task.CompletedTask; | |
public Task ReadLoop { get; private set; } = Task.CompletedTask; | |
private async ValueTask<ulong?> RecoverImpl(ReplicatorSnapshotTracker consumer) | |
{ | |
var s = await _replicationSqlReader.GetNewestSnapshot(); | |
if (s != null) | |
{ | |
await consumer.ConsumeSnap( | |
ReplicationSerializer.Instance.DeSerializeTaskSnapshot( | |
s!.TaskSnapshot)); | |
} | |
var o = s?.LastDbOrdering ?? 0; | |
ulong? lastRead = default; | |
await foreach (var a in _replicationSqlReader.BeginReadAtOrdering(o)) | |
{ | |
await consumer.ConsumeEvent( | |
ReplicationSerializer.Instance.GetTaskEvent(a.TaskEventData)); | |
lastRead = a.SourceStreamOrdering; | |
} | |
return lastRead; | |
} | |
} | |
public abstract record NatsBaseEvent<T> | |
{ | |
public abstract T? Data { get; } | |
public abstract string Subject { get; } | |
public abstract NatsHeaders? NatsHeaders { get; } | |
public abstract ulong? Sequence { get; } | |
public abstract ValueTask AckAsync(CancellationToken token = default); | |
public abstract ValueTask NackAsync(CancellationToken token = default); | |
public abstract ValueTask AckTerminateAsync(CancellationToken token = default); | |
} | |
public record NatsEvent<T>(NatsMsg<T> Msg) : NatsBaseEvent<T> | |
{ | |
public override T? Data => Msg.Data; | |
public override string Subject => Msg.Subject; | |
public override NatsHeaders? NatsHeaders => Msg.Headers; | |
public override ulong? Sequence => null; | |
public override ValueTask AckAsync(CancellationToken token = default) | |
{ | |
return ValueTask.CompletedTask; | |
} | |
public override ValueTask NackAsync(CancellationToken token = default) | |
{ | |
return ValueTask.CompletedTask; | |
} | |
public override ValueTask AckTerminateAsync(CancellationToken token = default) | |
{ | |
return ValueTask.CompletedTask; | |
} | |
} | |
public record NatsBaseJsEvent<T>(NatsJSMsg<T> Msg) : NatsBaseEvent<T> | |
{ | |
public override T? Data => Msg.Data; | |
public override string Subject => Msg.Subject; | |
public override NatsHeaders? NatsHeaders => Msg.Headers; | |
public override ulong? Sequence => Msg.Metadata != null? Msg.Metadata.Value.Sequence.Stream:null; | |
public override async ValueTask AckAsync(CancellationToken token = default) | |
{ | |
await Msg.AckAsync(cancellationToken: token); | |
} | |
public override async ValueTask NackAsync(CancellationToken token = default) | |
{ | |
await Msg.NakAsync(cancellationToken: token); | |
} | |
public override async ValueTask AckTerminateAsync(CancellationToken token = default) | |
{ | |
await Msg.AckTerminateAsync(cancellationToken: token); | |
} | |
} | |
public class ReplicationReader | |
{ | |
} | |
public class ReplicationSerializer | |
{ | |
public static readonly ReplicationSerializer Instance = | |
new ReplicationSerializer(); | |
public BaseTaskEvent GetTaskEvent(byte[] bytes) | |
{ | |
return ChatTaskClientSerializer<BaseTaskEvent>.Default.Deserialize( | |
new ReadOnlySequence<byte>(bytes))!; | |
} | |
public byte[] SerializeTaskEvent(BaseTaskEvent taskEvt) | |
{ | |
using (var apbw = new ArrayPoolBufferWriter<byte>()) | |
{ | |
ChatTaskClientSerializer<BaseTaskEvent>.Default.Serialize(apbw, | |
taskEvt); | |
return apbw.WrittenMemory.ToArray(); | |
} | |
} | |
public TaskSerializationState DeSerializeTaskSnapshot(byte[] bytes) | |
{ | |
return ChatTaskClientSerializer<TaskSerializationState>.Default.Deserialize( | |
new ReadOnlySequence<byte>(bytes))!; | |
} | |
public byte[] SerializeTaskSnapshot(TaskSerializationState snapState) | |
{ | |
using (var apbw = new ArrayPoolBufferWriter<byte>()) | |
{ | |
ChatTaskClientSerializer<TaskSerializationState>.Default.Serialize(apbw, | |
snapState); | |
return apbw.WrittenMemory.ToArray(); | |
} | |
} | |
} | |
public record TaskSerializationState( | |
HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>> Assigned, | |
HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>> Unassigned); | |
public interface ISnapshotAndEventConsumer | |
{ | |
IAsyncEnumerable<BaseTaskEvent> BeginNewEventConsumer( | |
CancellationToken token = default); | |
ValueTask ConsumeSnap(TaskSerializationState row); | |
ValueTask ConsumeEvent(BaseTaskEvent row); | |
} | |
public abstract class DataConnectionFactory | |
{ | |
public abstract DataConnection GetConnection(); | |
} | |
public class ClientBootstrap | |
{ | |
public ClientBootstrap(NatsOpts opts = null) | |
{ | |
_opts = opts ?? NatsOpts.Default; | |
} | |
private NatsConnection _connection; | |
private readonly NatsOpts _opts; | |
public async ValueTask StartClient() | |
{ | |
//TODO: At some point, do jetstream instead. | |
// And after that, do snapshots via KV! | |
_connection = new NatsConnection(_opts); | |
await _connection.ConnectAsync(); | |
//var js = new NatsJSContext(_connection, new NatsJSOpts(NatsOpts.Default)); | |
//new NatsKVContext() | |
} | |
public async ValueTask SendMessage(ISubjectFactory command, CancellationToken token = default) | |
{ | |
switch (command) | |
{ | |
case BaseTaskEvent te: | |
await _connection.PublishAsync(command.GetSubject(), te, | |
serializer: ChatTaskClientSerializer<BaseTaskEvent>.Default, cancellationToken: token); | |
break; | |
} | |
} | |
public async ValueTask<INatsSub<T>> Sub<T>(string subject, | |
CancellationToken token) where T : BaseTaskEvent | |
{ | |
return await _connection.SubscribeCoreAsync<T>(subject, | |
null, ChatTaskClientSerializer<T>.Default, default, token); | |
} | |
} | |
public record ReplicatorStateProjectionData(HashMap<Ulid, Lst<BaseTaskEvent>> Map); | |
public record ReplicatorStateProjectionSet( | |
ReplicatorStateProjectionData Active); | |
public class ReplicatorSnapshotTracker | |
{ | |
private HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>> _tasks; | |
private HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>> _assigned; | |
private HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>> _unassigned; | |
public async ValueTask ConsumeSnap(TaskSerializationState row) | |
{ | |
_tasks = row.Assigned.Union(row.Unassigned, (a, b) => b, (a, b) => b, | |
(k, a, b) => a.Union(b)); | |
} | |
public async ValueTask ConsumeEvent(BaseTaskEvent row) | |
{ | |
if (row is TaskCompleted) | |
{ | |
RemoveFromSets(); | |
} | |
else if (row is TaskCanceled) | |
{ | |
RemoveFromSets(); | |
} | |
else | |
{ | |
_tasks = | |
_tasks.TrySetItem(row.TaskType, | |
(s) => s.TrySetItem(row.TaskId, a => a.Add(row))); | |
_assigned = _assigned.TrySetItem(row.TaskType, | |
(s) => s.TrySetItem(row.TaskId, a => a.Add(row))); | |
_unassigned = _unassigned.TrySetItem(row.TaskType, | |
(s) => s.TrySetItem(row.TaskId, a => a.Add(row))); | |
} | |
void RemoveFromSets() | |
{ | |
_tasks = | |
_tasks.TrySetItem(row.TaskType, (s) => s.Remove(row.TaskId)); | |
_assigned = _assigned.TrySetItem(row.TaskType, (s) => s.Remove(row.TaskId)); | |
_unassigned = | |
_unassigned.TrySetItem(row.TaskType, (s) => s.Remove(row.TaskId)); | |
} | |
} | |
public TaskSerializationState GetSnapState() | |
{ | |
return new TaskSerializationState(_assigned, _unassigned); | |
} | |
} | |
/// <summary> | |
/// | |
/// </summary> | |
/// <remarks> | |
/// Ok. So the way this works, is we use a series of <see cref="Atom{A}"/> | |
/// Hashmaps of Maps. Atom lets us Swap via interlocked, | |
/// such that if the values being swapped are 'comparable' and the actions are | |
/// 'safely repeatable', it can repeat the action if a concurrent update happens | |
/// until the interlocked operation succeeds. | |
/// </remarks> | |
public class TaskSetCoordinatorHashMap : ISnapshotAndEventConsumer | |
{ | |
public string UserId { get; } | |
public TaskSetCoordinatorHashMap(string userId, | |
ClientProvider clientBootstrap, bool trackChanges) | |
{ | |
UserId = userId; | |
_client = clientBootstrap; | |
_trackChanges = trackChanges; | |
_assignedTasks = Atom(HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>()); | |
_requestedTasks = Atom(HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>()); | |
_unassignedTasks = Atom(HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>()); | |
} | |
private readonly Atom<HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>> _assignedTasks; | |
private readonly Atom<HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>> _requestedTasks; | |
private readonly Atom<HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>> _unassignedTasks; | |
private readonly ClientProvider _client; | |
private Task _mainTask; | |
public async Task<Ulid> CreateTask(string taskType, string text, | |
string? assignedToUser, DateTimeOffset expectedBy, CancellationToken token = default) | |
{ | |
var taskId = Ulid.NewUlid(); | |
await _client.SendMessage(new TaskRequest(Ulid.NewUlid(), UserId, assignedToUser, | |
taskId, taskType, text, DateTimeOffset.UtcNow, | |
expectedBy), token); | |
return taskId; | |
} | |
public async ValueTask AddTaskNote(TaskRequest origReq, | |
string notes) | |
{ | |
await _client.SendMessage(new TaskUpdate(Ulid.NewUlid(),origReq.TaskId, | |
origReq.TaskType, UserId, notes)); | |
} | |
public async ValueTask<bool> AddTaskCanceled(TaskRequest origReq, string notes) | |
{ | |
if (origReq.RequestingUserId == UserId) | |
{ | |
await _client.SendMessage(new TaskCanceled(Ulid.NewUlid(),origReq.TaskId, | |
origReq.TaskType, UserId, notes, DateTimeOffset.Now)); | |
return true; | |
} | |
return false; | |
} | |
public async ValueTask<ReassignResult> ReassignTask(Ulid taskId, | |
string newUser, string notes) | |
{ | |
bool foundTask = false; | |
foreach (var requestedTask in _assignedTasks.Value) | |
{ | |
var item = | |
requestedTask.Value.Find(taskId, a => a, | |
static () => Lst<BaseTaskEvent>.Empty); | |
if (item != default) | |
{ | |
foundTask = true; | |
await _client.SendMessage(new TaskAssigned(Ulid.NewUlid(),taskId, newUser, | |
requestedTask.Key, UserId, notes, item, | |
DateTimeOffset.Now)); | |
//_assignedTasks.AddOrUpdate(requestedTask.Key, | |
// e => e.Remove(taskId), | |
// () => HashMap<Ulid, Lst<BaseTaskEvent>>.Empty); | |
return new ReassignResult(taskId, true, "Reassigned"); | |
} | |
} | |
foreach (var requestedTask in _unassignedTasks.Value) | |
{ | |
var item = | |
requestedTask.Value.Find(taskId, a => a, | |
static () => Lst<BaseTaskEvent>.Empty); | |
if (item != default) | |
{ | |
foundTask = true; | |
await _client.SendMessage(new TaskAssigned(Ulid.NewUlid(),taskId, newUser, | |
requestedTask.Key, UserId, notes, item, | |
DateTimeOffset.Now)); | |
//_assignedTasks.AddOrUpdate(requestedTask.Key, | |
// e => e.Remove(taskId), | |
// () => HashMap<Ulid, Lst<BaseTaskEvent>>.Empty); | |
return new ReassignResult(taskId, true, "Reassigned"); | |
} | |
} | |
return new ReassignResult(taskId, false, "Could not find task!"); | |
} | |
public async ValueTask Init(TaskCompletionSource initLock, | |
CancellationToken token) | |
{ | |
; | |
CancellationTokenSource cts = null; | |
_mainTask = Task.Run(async () => | |
{ | |
await initLock.Task; | |
try | |
{ | |
await foreach (var entry in | |
(_client.GetConsumer<BaseTaskEvent>( | |
SubjectHelpers.TaskUserSub(UserId), token: token)).WithCancellation(cts?.Token??default)) | |
{ | |
try | |
{ | |
await RunEventHandlers(entry); | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
throw; | |
} | |
} | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
throw; | |
} | |
finally | |
{ | |
try | |
{ | |
cts?.Cancel(false); | |
} | |
finally | |
{ | |
try | |
{ | |
cts?.Dispose(); | |
} | |
catch | |
{ | |
// ignored | |
} | |
} | |
} | |
}); | |
} | |
private async ValueTask RunEventHandlers(NatsBaseEvent<BaseTaskEvent> entry) | |
{ | |
ProcessTaskEventSwitch(entry.Data); | |
await entry.AckAsync(); | |
} | |
private void ProcessTaskEventSwitch(BaseTaskEvent? msg) | |
{ | |
switch (msg) | |
{ | |
case TaskAssigned ta: | |
{ | |
HandleTaskAssigned(ta); | |
break; | |
} | |
case TaskBegan tb: | |
{ | |
HandleTaskBegan(tb); | |
break; | |
} | |
case TaskCompleted tc: | |
{ | |
HandleTaskCompleted(tc); | |
break; | |
} | |
case TaskRequest tr: | |
{ | |
HandleTaskRequest(tr); | |
break; | |
} | |
case TaskUpdate tu: | |
{ | |
HandleTaskUpdate(tu); | |
break; | |
} | |
case TaskCanceled tcan: | |
{ | |
HandleTaskCanceled(tcan); | |
break; | |
} | |
} | |
} | |
public async ValueTask<ClearResult> ClearItem(Ulid taskId) | |
{ | |
bool? foundAndCleared = null; | |
_requestedTasks.Swap(thm => | |
{ | |
var innerThm = thm; | |
foreach (var valueTuple in thm) | |
{ | |
valueTuple.Value.Find(taskId, s => | |
{ | |
var f = s.Head(); | |
var c = s.Reverse().FirstOrDefault(); | |
if (c is TaskCanceled or TaskCompleted || | |
f is not TaskRequest) | |
{ | |
innerThm = innerThm.AddOrUpdate(valueTuple.Key, | |
thmi => thmi.Remove(taskId), | |
() => LanguageExt.HashMap<Ulid, Lst<BaseTaskEvent>>.Empty); | |
} | |
return true; | |
}, () => false); | |
} | |
return innerThm; | |
}); | |
_requestedTasks.Swap(thm => | |
{ | |
var innerThm = thm; | |
foreach (var valueTuple in thm) | |
{ | |
valueTuple.Value.Find(taskId, s => | |
{ | |
var f = s.Head(); | |
var c = s.Reverse().FirstOrDefault(); | |
if (c is TaskCanceled or TaskCompleted || | |
f is not TaskRequest) | |
{ | |
innerThm = innerThm.AddOrUpdate(valueTuple.Key, | |
thmi => thmi.Remove(taskId), | |
() => LanguageExt.HashMap<Ulid, Lst<BaseTaskEvent>>.Empty); | |
} | |
return true; | |
}, () => false); | |
} | |
return innerThm; | |
}); | |
_unassignedTasks.Swap(thm => | |
{ | |
var innerThm = thm; | |
foreach (var valueTuple in thm) | |
{ | |
valueTuple.Value.Find(taskId, s => | |
{ | |
var f = s.Head(); | |
var c = s.Reverse().FirstOrDefault(); | |
if (c is TaskCanceled or TaskCompleted || | |
f is not TaskRequest) | |
{ | |
innerThm = innerThm.AddOrUpdate(valueTuple.Key, | |
thmi => thmi.Remove(taskId), | |
() => LanguageExt.HashMap<Ulid, Lst<BaseTaskEvent>>.Empty); | |
} | |
return true; | |
}, () => false); | |
} | |
return innerThm; | |
}); | |
if (foundAndCleared.HasValue == false) | |
{ | |
return new ClearResult(true, "no instances found"); | |
} | |
else | |
{ | |
if (foundAndCleared.Value) | |
{ | |
return new ClearResult(true, "cleared"); | |
} | |
else | |
{ | |
return new ClearResult(false, "Tasks not in closed state"); | |
} | |
} | |
} | |
private void HandleTaskRequest(TaskRequest trb) | |
{ | |
void doSwap( | |
Atom<HashMap<string, | |
HashMap<Ulid, Lst<BaseTaskEvent>>>> taskSet, | |
TaskRequest taskRequest) | |
{ | |
taskSet.Swap(taskRequest, (tr,b)=>b.AddOrUpdate(tr.TaskType, | |
(hm) => hm.TryAdd(tr.TaskId, Lst<BaseTaskEvent>.Empty.Add(tr)), | |
() => LanguageExt.HashMap<Ulid, Lst<BaseTaskEvent>>.Empty.Add(tr.TaskId, | |
Lst<BaseTaskEvent>.Empty.Add(tr)))); | |
} | |
if (trb.RequestingUserId == UserId) | |
{ | |
doSwap(_requestedTasks, trb); | |
} | |
if (trb.AssignedUserId == UserId) | |
{ | |
doSwap(_assignedTasks,trb); | |
} | |
else if (trb.AssignedUserId == "unassigned") | |
{ | |
doSwap(_unassignedTasks,trb); | |
} | |
} | |
private static void TUSwap( | |
Atom<HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>> | |
atom, TaskUpdate tu) | |
{ | |
atom.Value.Find(tu.TaskType, hm => | |
{ | |
atom.Swap(b=>b.AddOrUpdate(tu.TaskType, | |
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu), | |
() => Lst<BaseTaskEvent>.Empty.Add(tu)), | |
() => LanguageExt.HashMap<Ulid, Lst<BaseTaskEvent>>.Empty)); | |
return Unit.Default; | |
}, () => Unit.Default); | |
} | |
private static void TCanSwap( | |
Atom<HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>> | |
atom, TaskCanceled tu) | |
{ | |
atom.Value.Find(tu.TaskType, hm => | |
{ | |
atom.Swap(b=>b.AddOrUpdate(tu.TaskType, | |
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu), | |
() => Lst<BaseTaskEvent>.Empty.Add(tu)), | |
() => LanguageExt.HashMap<Ulid, Lst<BaseTaskEvent>>.Empty)); | |
return Unit.Default; | |
}, () => Unit.Default); | |
} | |
private static void TComSwap( | |
Atom<HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>> | |
atom, TaskCompleted tu) | |
{ | |
atom.Value.Find(tu.TaskType, hm => | |
{ | |
atom.Swap(b=>b.AddOrUpdate(tu.TaskType, | |
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu), | |
() => Lst<BaseTaskEvent>.Empty.Add(tu)), | |
() => LanguageExt.HashMap<Ulid, Lst<BaseTaskEvent>>.Empty)); | |
return Unit.Default; | |
}, () => Unit.Default); | |
} | |
private readonly Channel<bool> _changeLocker = | |
Channel.CreateBounded<bool>(1); | |
private readonly bool _trackChanges; | |
public TaskState GetTaskState() | |
{ | |
return new( | |
_assignedTasks.Value.Select(a => (a.Key, a.Value.ToHashMap())) | |
.ToHashMap(), | |
_requestedTasks.Value.Select(a => (a.Key, a.Value.ToHashMap())) | |
.ToHashMap(), | |
_unassignedTasks.Value.Select(a => (a.Key, a.Value.ToHashMap())) | |
.ToHashMap()); | |
} | |
[MessagePackObject(keyAsPropertyName:true)] | |
public record TaskState( | |
HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>> AssignedTasks, | |
HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>> RequestedTasks, | |
HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>> UnassignedTasks); | |
private static ( | |
HashMap<TKeyOuter, Change<TrackingHashMap<TKeyInner, Lst<TLst>>>> chg, | |
TrackingHashMap<TKeyOuter, TrackingHashMap<TKeyInner, Lst<TLst>>> thm, | |
HashMap<TKeyOuter, HashMap<TKeyInner, Change<Lst<TLst>>>> innerChg) | |
MakeSnapshot<TKeyOuter,TKeyInner,TLst>( | |
TrackingHashMap<TKeyOuter, TrackingHashMap<TKeyInner, Lst<TLst>>> thm) | |
{ | |
//TODO: This is trasshhhhhhh. | |
var chg = thm.Changes; | |
var preMap = thm.Map(a => | |
{ | |
var (k, v) = a; | |
var c = v.Changes; | |
return (k, (c, v.Snapshot())); | |
}).ToHashMap(); | |
thm = thm.SetItems(preMap.Select(a => (a.Item1, a.Item2.Item2))); | |
var innerChg = preMap.Select(a => (a.Item1, a.Item2.Item1)) | |
.ToHashMap(); | |
return (chg, thm, innerChg); | |
} | |
private void HandleTaskUpdate(TaskUpdate tu) | |
{ | |
TUSwap(_assignedTasks,tu); | |
TUSwap(_requestedTasks,tu); | |
TUSwap(_unassignedTasks,tu); | |
} | |
private void HandleTaskCanceled(TaskCanceled taskCanceled) | |
{ | |
TCanSwap(_assignedTasks,taskCanceled); | |
TCanSwap(_requestedTasks,taskCanceled); | |
TCanSwap(_unassignedTasks,taskCanceled); | |
} | |
private void HandleTaskCompleted(TaskCompleted taskCompleted) | |
{ | |
TComSwap(_assignedTasks, taskCompleted); | |
TComSwap(_unassignedTasks, taskCompleted); | |
TComSwap(_requestedTasks, taskCompleted); | |
} | |
private void HandleTaskAssigned(TaskAssigned ta) | |
{ | |
if (ta.UserId == UserId) | |
{ | |
_assignedTasks.Swap(ta, (tai, thm) => | |
{ | |
return thm.AddOrUpdate(tai.TaskType, tai.TaskId, | |
e => e.Add(tai), | |
() => Lst<BaseTaskEvent>.Empty.Add(tai)); | |
}); | |
} | |
else | |
{ | |
_assignedTasks.Swap(ta, (tai, thm) => | |
{ | |
return thm.Remove(tai.TaskType, tai.TaskId); | |
}); | |
} | |
} | |
private void HandleTaskBegan(TaskBegan tb) | |
{ | |
_requestedTasks.Swap(tb, (tbi, thm) => | |
{ | |
return thm.AddOrUpdate(tbi.TaskType, tbi.TaskId, t => | |
t.Add(tbi), () => | |
Lst<BaseTaskEvent>.Empty.Add(tbi)); | |
}); | |
} | |
public async IAsyncEnumerable<BaseTaskEvent> BeginNewEventConsumer(CancellationToken token = default) | |
{ | |
{ | |
await foreach (var msg in (_client.GetConsumer<BaseTaskEvent>( | |
SubjectHelpers.TaskUserSub(UserId),token: token)) | |
.WithCancellation(token)) | |
{ | |
yield return msg.Data!; | |
await msg.AckAsync(token); | |
} | |
} | |
} | |
public async ValueTask ConsumeSnap(TaskSerializationState row) | |
{ | |
var newAMap = HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>(); | |
var newRMap = HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>(); | |
var newUaMap = HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>(); | |
foreach (var valueTuple in row.Assigned) | |
{ | |
var catAMap = HashMap<Ulid, Lst<BaseTaskEvent>>(); | |
var catRMap = HashMap<Ulid, Lst<BaseTaskEvent>>(); | |
foreach (var inner in valueTuple.Value) | |
{ | |
var state = GetTaskState(inner.Value); | |
if (state.IsAssignedToUser) | |
{ | |
catAMap = catAMap.Add(inner.Key, inner.Value); | |
} | |
if (state.IsRequestedByUser) | |
{ | |
catRMap = catRMap.Add(inner.Key, inner.Value); | |
} | |
} | |
newAMap = newAMap.Add(valueTuple.Key, catAMap); | |
newRMap = newRMap.Add(valueTuple.Key, catRMap); | |
} | |
foreach (var valueTuple in row.Unassigned) | |
{ | |
var catRMap = newRMap.Find(valueTuple.Key, a => a, () => default); | |
var catUaMap = HashMap<Ulid, Lst<BaseTaskEvent>>(); | |
foreach (var inner in valueTuple.Value) | |
{ | |
var state = GetTaskState(inner.Value); | |
if (state.IsRequestedByUser) | |
{ | |
catRMap = catRMap.Add(inner.Key, inner.Value); | |
} | |
catUaMap = catUaMap.Add(inner.Key, inner.Value); | |
} | |
newRMap = newAMap.SetItem(valueTuple.Key, catRMap); | |
newUaMap = newUaMap.SetItem(valueTuple.Key, catUaMap); | |
} | |
_assignedTasks.Swap(newAMap, (arg, ex) => arg); | |
_unassignedTasks.Swap(newUaMap, (arg, ex) => arg); | |
_requestedTasks.Swap(newRMap, (arg, ex) => arg); | |
//row.Assigned.Where(a=>a.) | |
} | |
private RebuiltTaskState GetTaskState(Lst<BaseTaskEvent> row) | |
{ | |
string assignedUserId = ""; | |
string requestedUserId = ""; | |
foreach (var baseTaskEvent in row) | |
{ | |
if (baseTaskEvent is TaskRequest taskRequest) | |
{ | |
requestedUserId = taskRequest.RequestingUserId; | |
} | |
else if (baseTaskEvent is TaskAssigned ta) | |
{ | |
assignedUserId = ta.UserId; | |
} | |
} | |
return new(assignedUserId == UserId, requestedUserId == UserId); | |
} | |
public async ValueTask ConsumeEvent(BaseTaskEvent row) | |
{ | |
this.ProcessTaskEventSwitch(row); | |
} | |
} | |
public record RebuiltTaskState(bool IsAssignedToUser, bool IsRequestedByUser); | |
[MessagePackObject(keyAsPropertyName:true)] | |
public record ReassignResult(Ulid TaskId, bool Success, string Message); | |
[MessagePackObject(keyAsPropertyName:true)] | |
public record ClearResult(bool Success, string? Reason); | |
public class ChatTaskClientSerializer<T> : INatsSerializer<T> | |
{ | |
public static readonly ChatTaskClientSerializer<T> Default = | |
new ChatTaskClientSerializer<T>(); | |
private static readonly IFormatterResolver DefaultFormatterResolver = | |
CompositeResolver.Create(new[] | |
{ | |
UlidMessagePackResolver.Instance, | |
MessagePackSerializerOptions.Standard.Resolver | |
}); | |
public static readonly MessagePackSerializerOptions DefaultSerializerOptions = | |
MessagePackSerializerOptions.Standard.WithResolver( | |
DefaultFormatterResolver); | |
public void Serialize(IBufferWriter<byte> bufferWriter, T value) | |
{ | |
MessagePackSerializer.Serialize(bufferWriter, value, | |
DefaultSerializerOptions); | |
} | |
public T? Deserialize(in ReadOnlySequence<byte> buffer) | |
{ | |
return MessagePackSerializer.Deserialize<T>(buffer, | |
DefaultSerializerOptions); | |
} | |
} | |
public interface ISubjectFactory | |
{ | |
string GetSubject(); | |
} | |
[Union(2, typeof(TaskRequest))] | |
[Union(3, typeof(TaskCompleted))] | |
[Union(4, typeof(TaskBegan))] | |
[Union(5, typeof(TaskUpdate))] | |
[Union(6, typeof(TaskAssigned))] | |
[Union(7, typeof(TaskCanceled))] | |
[MessagePackObject(keyAsPropertyName:true)] | |
public abstract record BaseTaskEvent(Ulid EventId, Ulid TaskId, string TaskType) | |
: ISubjectFactory, IEquatable<BaseTaskEvent> | |
{ | |
bool IEquatable<BaseTaskEvent>.Equals(BaseTaskEvent? other) | |
{ | |
return other != null && other.EventId == EventId; | |
} | |
public abstract string GetSubject(); | |
} | |
public static class SubjectHelpers | |
{ | |
public static string | |
Tasks(string scope, string type, Ulid taskId, string requested, | |
string assigned) => | |
$"sample.task.{scope}.{type},{taskId}.{requested}.{assigned}"; | |
public static string Tasks(string scope, string type, Ulid taskId, | |
string doneByUser) | |
{ | |
return $"sample.task.{scope}.{type},{taskId}.{doneByUser}"; | |
} | |
public static string TaskUserSub(string userId) | |
{ | |
return $"sample.task.>"; | |
} | |
public static string TaskReplication => $"sample.task.>"; | |
public const string TaskEventsJetStreamWriteName = "sample-js-task-events-write"; | |
public const string TaskEventsJetStreamReadName = "sample-js-task-events-read"; | |
} | |
[MessagePackObject(true)] | |
public record TaskRequest( | |
Ulid EventId, | |
string RequestingUserId, | |
string? AssignedUserId, | |
Ulid TaskId, | |
string TaskType, | |
string Text, | |
DateTimeOffset RequestedAt, | |
DateTimeOffset ExpectedBy) : BaseTaskEvent(EventId,TaskId,TaskType) | |
{ | |
public override string GetSubject() | |
{ | |
return SubjectHelpers.Tasks(TaskScopeSubjects.Request, TaskType, TaskId, | |
RequestingUserId, | |
AssignedUserId != null ? AssignedUserId : "unassigned"); | |
} | |
} | |
[MessagePackObject(true)] | |
public record TaskCanceled( | |
Ulid EventId, | |
Ulid TaskId, | |
string TaskType, | |
string CanceledByUser, | |
string CancelReason, | |
DateTimeOffset CancelTime) : BaseTaskEvent(EventId,TaskId,TaskType) | |
{ | |
public override string GetSubject() | |
{ | |
return SubjectHelpers.Tasks(TaskScopeSubjects.Canceled, TaskType, TaskId, | |
CanceledByUser); | |
} | |
} | |
[MessagePackObject(true)] | |
public record TaskBegan( | |
Ulid EventId, | |
Ulid TaskId, | |
string TaskType, | |
string UserId, | |
string Text, | |
DateTimeOffset BeginTime) : BaseTaskEvent(EventId, TaskId, TaskType) | |
{ | |
public override string GetSubject() | |
{ | |
return SubjectHelpers.Tasks(TaskScopeSubjects.Begun, TaskType, TaskId, UserId); | |
} | |
} | |
[MessagePackObject(true)] | |
public record TaskUpdate( | |
Ulid EventId, | |
Ulid TaskId, | |
string TaskType, | |
string UserId, | |
string Notes) : BaseTaskEvent(EventId,TaskId,TaskType) | |
{ | |
public override string GetSubject() | |
{ | |
return SubjectHelpers.Tasks(TaskScopeSubjects.Update, TaskType, TaskId, UserId); | |
} | |
} | |
[MessagePackObject(true)] | |
public record TaskCompleted( | |
Ulid EventId, | |
Ulid TaskId, | |
string TaskType, | |
string UserId, | |
string Notes, | |
DateTimeOffset CompletedTime) : BaseTaskEvent(EventId,TaskId,TaskType) | |
{ | |
public override string GetSubject() | |
{ | |
return SubjectHelpers.Tasks(TaskScopeSubjects.Completed, TaskType, TaskId, UserId); | |
} | |
} | |
public static class TaskScopeSubjects | |
{ | |
public static readonly string Completed = "completed"; | |
public static readonly string Assigned = "assigned"; | |
public static readonly string Begun = "begun"; | |
public static readonly string Update = "update"; | |
public static readonly string Canceled = "canceled"; | |
public static readonly string Request = "request"; | |
} | |
[MessagePackObject(true)] | |
public record TaskAssigned( | |
Ulid EventId, | |
Ulid TaskId, | |
string TaskType, | |
string UserId, | |
string AssignedByUserId, | |
string Notes, | |
Lst<BaseTaskEvent> History, | |
DateTimeOffset AssignedTime | |
) : BaseTaskEvent(EventId, TaskId, TaskType) | |
{ | |
public override string GetSubject() | |
{ | |
return SubjectHelpers.Tasks(TaskScopeSubjects.Assigned, TaskType, TaskId, UserId, | |
AssignedByUserId); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment