Last active
April 17, 2024 03:41
-
-
Save to11mtm/5660a2f3469a637f9ec5988bae5b14ab to your computer and use it in GitHub Desktop.
Nats Toy Task App, Round 4(+3), semi working with(out) replication/etc and a simple console interface!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Buffers; | |
using System.Collections.Concurrent; | |
using System.Diagnostics; | |
using System.Diagnostics.CodeAnalysis; | |
using System.Net.Sockets; | |
using System.Text; | |
using System.Threading.Channels; | |
using CommunityToolkit.HighPerformance.Buffers; | |
using Cysharp.Serialization.MessagePack; | |
using LanguageExt; | |
using LinqToDB; | |
using LinqToDB.Data; | |
using LinqToDB.Mapping; | |
using MessagePack; | |
using MessagePack.Resolvers; | |
using Microsoft.Data.Sqlite; | |
using Microsoft.Extensions.Logging; | |
using NATS.Client.Core; | |
using NATS.Client.JetStream; | |
using NATS.Client.KeyValueStore; | |
using ValueTaskSupplement; | |
using static LanguageExt.Prelude; | |
using CancellationTokenSource = System.Threading.CancellationTokenSource; | |
using Seq = LanguageExt.Seq; | |
using Ulid = System.Ulid; | |
namespace NatsSandBox.Client; | |
internal class Program | |
{ | |
public static async Task Main(string[] args) | |
{ | |
DataConnection.DefaultOnTraceConnection = info => | |
{ | |
if ((info.Operation == TraceOperation.ExecuteReader | |
|| info.Operation == TraceOperation.ExecuteNonQuery) | |
&& info.TraceInfoStep == TraceInfoStep.BeforeExecute) | |
{ | |
Console.WriteLine( | |
$"{info.Operation} {Environment.NewLine} {info.SqlText}"); | |
} | |
}; | |
DataConnection.TraceSwitch.Level = TraceLevel.Verbose; | |
var memReplConnectionFactory = SqliteInMemDataConnectionFactory.Create(); | |
using (var ctx = memReplConnectionFactory.GetConnection()) | |
{ | |
ctx.CreateTable<ServerReplicationRow>(); | |
ctx.CreateTable<ServerReplicationSnapshotRow>(); | |
} | |
var cb = new ClientBootstrap(); | |
await cb.Startclient(); | |
var c = new TaskSetCoordinatorHashMap("test",cb,false); | |
var tcs = | |
new TaskCompletionSource(TaskCreationOptions | |
.RunContinuationsAsynchronously); | |
tcs.SetResult(); | |
var cts = new CancellationTokenSource(); | |
var repl = new ServerReplicator(cb, memReplConnectionFactory, new ServerReplicatorOptions(true)); | |
await repl.StartReplicator(cts.Token); | |
await c.Init(tcs,cts.Token); | |
await c.CreateTask("idk", "wat", "bar", DateTimeOffset.Now.AddDays(1)); | |
var t= Task.Run(async () => | |
{ | |
while (cts.IsCancellationRequested == false) | |
{ | |
try | |
{ | |
//await c.CreateTask("idk", "wat", "bar", DateTimeOffset.Now.AddDays(1)); | |
await Task.Delay(100); | |
//Console.WriteLine(MessagePackSerializer.SerializeToJson(c.GetTaskState(), | |
// ChatTaskClientSerializer<TaskSetCoordinatorHashMap.TaskState> | |
// .DefaultSerializerOptions)); | |
await Task.Delay(5000); | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
throw; | |
} | |
} | |
}); | |
while (cts.IsCancellationRequested == false) | |
{ | |
var x = Console.ReadLine(); | |
if (x == "exit") | |
{ | |
cts.Cancel(); | |
} | |
else if (x.StartsWith("create")) | |
{ | |
var tokens = x.Split(' ', StringSplitOptions.RemoveEmptyEntries); | |
if (tokens.Length > 3) | |
{ | |
var type = tokens[1]; | |
var assignedTo = tokens[2]; | |
var text = string.Join(' ', tokens.Skip(3)); | |
var newTaskId = await c.CreateTask(type, text, assignedTo, | |
DateTimeOffset.Now.AddDays(7)); | |
Console.WriteLine($"Created Task with ID {newTaskId.ToString()}"); | |
} | |
} | |
else if (x.StartsWith("print")) | |
{ | |
var ts = c.GetTaskState(); | |
foreach (var valueTuplese in Seq.create( | |
("Assigned : ", ts.AssignedTasks), | |
("Requested : ", ts.RequestedTasks), | |
("Unassigned : ", ts.UnassignedTasks))) | |
{ | |
Console.WriteLine(valueTuplese.Item1); | |
Console.WriteLine(MessagePackSerializer.SerializeToJson(valueTuplese.Item2, | |
ChatTaskClientSerializer<HashMap<string,HashMap<string,Lst<BaseTaskEvent>>>> | |
.DefaultSerializerOptions)); | |
Console.WriteLine("================================"); | |
} | |
} | |
else if (x.StartsWith("db printrows")) | |
{ | |
using (var ctx = memReplConnectionFactory.GetConnection()) | |
{ | |
Console.WriteLine((await ctx.GetTable<ServerReplicationRow>() | |
.ToListAsync()).Count); | |
} | |
} | |
} | |
await t; | |
await Task.WhenAll(repl.ReadLoop, repl.WriteLoop); | |
cts.Dispose(); | |
Console.WriteLine("Goodbye, World!"); | |
} | |
} | |
#region wip stuff | |
public class SqliteInMemDataConnectionFactory : DataConnectionFactory | |
{ | |
private static int counter = 0; | |
private static string genCs() | |
{ | |
return new SqliteConnectionStringBuilder() | |
{ DataSource = $"memdb-{Interlocked.Increment(ref counter)}", | |
Mode = SqliteOpenMode.Memory, Cache = SqliteCacheMode.Shared, Pooling = true | |
}.ToString(); | |
} | |
private SqliteInMemDataConnectionFactory() | |
{ | |
_connectionString = genCs(); | |
Options = new DataOptions( | |
new ConnectionOptions(ProviderName: ProviderName.SQLiteMS, | |
ConnectionString: _connectionString)); | |
_heldConnection = new SqliteConnection(_connectionString); | |
_heldConnection.Open(); | |
} | |
public readonly DataOptions Options; | |
private readonly string _connectionString; | |
private readonly SqliteConnection _heldConnection; | |
public override DataConnection GetConnection() | |
{ | |
return new DataConnection(Options); | |
} | |
private static readonly Atom<Lst<SqliteInMemDataConnectionFactory>> _atom = | |
Atom(Lst<SqliteInMemDataConnectionFactory>.Empty); | |
public static SqliteInMemDataConnectionFactory Create() | |
{ | |
var fact = new SqliteInMemDataConnectionFactory(); | |
_atom.Swap(fact, (a, b) => b.Add(a)); | |
return fact; | |
} | |
} | |
public class ServerReplicationRow | |
{ | |
//public string Category { get; set; } | |
public string TaskType { get; set; } | |
public Guid EventId { get; set; } | |
public Guid TaskId { get; set; } | |
public byte[] TaskEventData { get; set; } | |
public string OriginalSubject { get; set; } | |
[Column(Configuration = ProviderName.SQLite, DbType = "INTEGER", IsPrimaryKey = true,IsIdentity = true)] | |
[Column(IsPrimaryKey = true, IsIdentity = true)] | |
public long Ordering { get; set; } | |
} | |
public class ServerReplicationSnapshotRow | |
{ | |
public long LastOrdering { get; set; } | |
public byte[] TaskSnapshot { get; set; } | |
} | |
public class ReplicationRowHolder | |
{ | |
public readonly List<ServerReplicationRow> Rows; | |
public readonly TaskCompletionSource<long>? Tcs; | |
public ReplicationRowHolder(List<ServerReplicationRow> rows, bool b) | |
{ | |
Rows = rows; | |
Tcs = b? new TaskCompletionSource<long>(TaskCreationOptions.RunContinuationsAsynchronously) : null; | |
} | |
} | |
public class ReplicationSqlReader | |
{ | |
private readonly DataConnectionFactory _ctxFactory; | |
public ReplicationSqlReader(DataConnectionFactory ctxFactory) | |
{ | |
_ctxFactory = ctxFactory; | |
} | |
public async ValueTask<ServerReplicationSnapshotRow?> GetNewestSnapshot() | |
{ | |
using (var ctx = _ctxFactory.GetConnection()) | |
{ | |
var row = await ctx.GetTable<ServerReplicationSnapshotRow>() | |
.OrderByDescending(a => a.LastOrdering).FirstOrDefaultAsync(); | |
return row; | |
} | |
} | |
public async IAsyncEnumerable<ServerReplicationRow> BeginReadAtOrdering(long ordering) | |
{ | |
long? curr = ordering; | |
List<ServerReplicationRow> currSet = null; | |
while (curr != null) | |
{ | |
using (var ctx = _ctxFactory.GetConnection()) | |
{ | |
currSet = await GrabOrderingSet(ctx, curr.Value); | |
if (currSet.Count < 100) | |
{ | |
curr = null; | |
} | |
else | |
{ | |
curr = currSet.LastOrDefault()!.Ordering + 1; | |
} | |
} | |
foreach (var row in currSet) | |
{ | |
yield return row; | |
} | |
} | |
} | |
private static async Task<List<ServerReplicationRow>> GrabOrderingSet(DataConnection ctx, [DisallowNull] long curr) | |
{ | |
return await ctx.GetTable<ServerReplicationRow>().Where(a => a.Ordering >= curr) | |
.OrderBy(a=>a.Ordering) | |
.Take(100) | |
.ToListAsync(); | |
} | |
} | |
public class ReplicatorSqlWriter | |
{ | |
private readonly DataConnectionFactory _ctxFactory; | |
private static readonly BulkCopyOptions DefaultCopyOptions = | |
new BulkCopyOptions( | |
BulkCopyType: BulkCopyType.MultipleRows); | |
public ReplicatorSqlWriter(DataConnectionFactory dataConnectionFactory) | |
{ | |
_ctxFactory = dataConnectionFactory; | |
} | |
public async ValueTask<long?> WriteTasks(List<ServerReplicationRow> rows, | |
bool includeMaxOrderInReturn) | |
{ | |
try | |
{ | |
using (var ctx = _ctxFactory.GetConnection()) | |
{ | |
using (var tx = await ctx.BeginTransactionAsync()) | |
{ | |
long? ret = null; | |
await ctx.GetTable<ServerReplicationRow>() | |
.BulkCopyAsync(DefaultCopyOptions, rows); | |
if (includeMaxOrderInReturn) | |
{ | |
var lastEventId = rows.Last().EventId; | |
ret = await ctx.GetTable<ServerReplicationRow>() | |
.Where(a => a.EventId == lastEventId) | |
.Select(a => a.Ordering).OrderByDescending(a => a) | |
.FirstOrDefaultAsync(); | |
} | |
await tx.CommitAsync(); | |
return ret; | |
} | |
} | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
throw; | |
} | |
} | |
public async ValueTask WriteSnapshot(ServerReplicationSnapshotRow row) | |
{ | |
using (var ctx = _ctxFactory.GetConnection()) | |
{ | |
await ctx.GetTable<ServerReplicationSnapshotRow>() | |
.InsertOrUpdateAsync(() => new ServerReplicationSnapshotRow() | |
{ | |
LastOrdering = row.LastOrdering, | |
TaskSnapshot = row.TaskSnapshot | |
}, e => new ServerReplicationSnapshotRow() | |
{ | |
TaskSnapshot = row.TaskSnapshot | |
}, | |
() => new ServerReplicationSnapshotRow() | |
{ LastOrdering = row.LastOrdering }); | |
} | |
} | |
} | |
public record ServerReplicatorOptions(bool ForceWriteDispatchViaTaskScheduler); | |
public class ServerReplicator | |
{ | |
private readonly ClientBootstrap _client; | |
private readonly DataConnectionFactory _ctxFactory; | |
public ServerReplicator(ClientBootstrap bootstrap, | |
DataConnectionFactory dataConnectionFactory, ServerReplicatorOptions options) | |
{ | |
_client = bootstrap; | |
_replicationSqlReader = new ReplicationSqlReader(dataConnectionFactory); | |
_replicatorSqlWriter = new ReplicatorSqlWriter(dataConnectionFactory); | |
_options = options; | |
} | |
private readonly Channel<ReplicationRowHolder> _serverRows = | |
Channel.CreateUnbounded<ReplicationRowHolder>(); | |
private readonly ReplicationSqlReader _replicationSqlReader; | |
public async ValueTask StartReplicator(CancellationToken token) | |
{ | |
var wStart = | |
new TaskCompletionSource(TaskCreationOptions | |
.RunContinuationsAsynchronously); | |
var rStart = | |
new TaskCompletionSource(TaskCreationOptions | |
.RunContinuationsAsynchronously); | |
WriteLoop = Task.Run(async () => | |
{ | |
try | |
{ | |
Task lastWrite = Task.CompletedTask; | |
// TODO: Pooling | |
List<ServerReplicationRow> pendingSet = | |
new List<ServerReplicationRow>(); | |
List<TaskCompletionSource<long>> pendingRange = | |
new List<TaskCompletionSource<long>>(); | |
// ReSharper disable once MethodSupportsCancellation | |
wStart.SetResult(); | |
async Task CycleFlush(List<ServerReplicationRow> serverReplicationRows, List<TaskCompletionSource<long>> taskCompletionSources) | |
{ | |
await lastWrite; | |
if (_options.ForceWriteDispatchViaTaskScheduler) | |
{ | |
lastWrite = | |
Task.Run(async () => | |
{ | |
await FlushMainWriteBufferAndClearPendings( | |
serverReplicationRows, | |
taskCompletionSources); | |
}); | |
} | |
else | |
{ | |
lastWrite = | |
FlushMainWriteBufferAndClearPendings(serverReplicationRows, | |
taskCompletionSources); | |
} | |
pendingSet = new List<ServerReplicationRow>(); | |
pendingRange = new List<TaskCompletionSource<long>>(); | |
} | |
while (await _serverRows.Reader.WaitToReadAsync()) | |
{ | |
while (_serverRows.Reader.TryRead(out var a)) | |
{ | |
pendingSet.AddRange(a.Rows); | |
if (a.Tcs != null) | |
{ | |
pendingRange.Add(a.Tcs); | |
} | |
//If close to a flush, cycle. | |
if (maxTillSnap - pendingSet.Count - maxTillFlush <= 0) | |
{ | |
await CycleFlush(pendingSet, pendingRange); | |
} | |
} | |
// ensure last batch is cycled, just in case it's a while | |
// until the next event. | |
if (pendingSet.Count > 0) | |
{ | |
await CycleFlush(pendingSet, pendingRange); | |
} | |
} | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
throw; | |
} | |
}); | |
ReadLoop = Task.Run(async () => | |
{ | |
try | |
{ | |
// TODO: SetResult/SetException based on sub call. | |
rStart.SetResult(); | |
await using (var sub = | |
await _client.Sub<BaseTaskEvent>( | |
SubjectHelpers.TaskReplication, | |
token)) | |
{ | |
var snapImpl = new ReplicatorSnapshotTracker(); | |
await RecoverImpl(snapImpl); | |
var tillSnap = maxTillSnap; | |
var thisList = new List<ServerReplicationRow>(); | |
ReplicationRowHolder holder = null; | |
while (token.IsCancellationRequested == false) | |
{ | |
var tillFlush = maxTillFlush; | |
while (sub.Msgs.TryRead(out var msg)) | |
{ | |
await snapImpl.ConsumeEvent(msg.Data!); | |
thisList.Add(ToReplicationRow(msg)); | |
tillFlush--; | |
if (tillFlush == 0) | |
{ | |
tillSnap = | |
await FlushAndSnapBarrier(token, snapImpl, | |
tillSnap, thisList); | |
thisList = new List<ServerReplicationRow>(); | |
} | |
} | |
if (tillFlush != maxTillFlush) | |
{ | |
tillSnap = await FlushAndSnapBarrier( | |
token, | |
snapImpl, tillSnap, thisList); | |
thisList = new List<ServerReplicationRow>(); | |
} | |
if (await sub.Msgs.WaitToReadAsync(token) == false) | |
{ | |
break; | |
} | |
} | |
if (thisList.Count > 0) | |
{ | |
await _serverRows.Writer.WriteAsync( | |
new ReplicationRowHolder(thisList, false), token); | |
//this is shutdown, no need to realloc list here. | |
} | |
} | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
throw; | |
} | |
}); | |
await Task.WhenAll(wStart.Task, rStart.Task); | |
} | |
private async Task FlushMainWriteBufferAndClearPendings( | |
List<ServerReplicationRow> pendingSet, | |
List<TaskCompletionSource<long>> pendingRange) | |
{ | |
var r = await _replicatorSqlWriter.WriteTasks(pendingSet, true); | |
if (r != null) | |
{ | |
foreach (var taskCompletionSource in pendingRange) | |
{ | |
taskCompletionSource.TrySetResult(r.Value); | |
} | |
} | |
} | |
const int maxTillFlush = 100; | |
const int maxTillSnap = 2000; | |
private const int maxTillMainFlush = 500; | |
private async Task<int> | |
FlushAndSnapBarrier(CancellationToken token, | |
ReplicatorSnapshotTracker snapImpl, | |
int tillSnap, List<ServerReplicationRow> thisList) | |
{ | |
tillSnap = tillSnap - thisList.Count; | |
var shouldSnap = (tillSnap <= 0); | |
ReplicationRowHolder holder = | |
holder = new ReplicationRowHolder(thisList, shouldSnap); | |
await _serverRows.Writer.WriteAsync(holder, token); | |
if (shouldSnap) | |
{ | |
var replRow = snapImpl.GetSnapState(); | |
_ = Task.Run(async () => | |
{ | |
await _replicatorSqlWriter.WriteSnapshot( | |
new ServerReplicationSnapshotRow() | |
{ | |
LastOrdering = await holder.Tcs.Task, | |
TaskSnapshot = | |
ReplicationSerializer.Instance | |
.SerializeTaskSnapshot(replRow) | |
}); | |
tillSnap = maxTillSnap; | |
}, token); | |
tillSnap = maxTillSnap; | |
} | |
return tillSnap; | |
} | |
private static void ThrowClosedChannelException() | |
{ | |
throw new ChannelClosedException("Row Buffer Channel was closed!"); | |
} | |
private ServerReplicationRow ToReplicationRow(NatsMsg<BaseTaskEvent> item) | |
{ | |
var msg = item.Data; | |
var tt = msg.TaskType; | |
var eid = msg.EventId; | |
var tid = msg.TaskId; | |
byte[] bytes = default; | |
using (var buf = new ArrayPoolBufferWriter<byte>()) | |
{ | |
ChatTaskClientSerializer<BaseTaskEvent>.Default.Serialize(buf, msg); | |
bytes = buf.WrittenMemory.ToArray(); | |
} | |
return new ServerReplicationRow() | |
{ | |
//Ordering gets filled in by SQL, for now... | |
EventId = eid.ToGuid(), OriginalSubject = item.Subject, | |
TaskEventData = bytes, TaskId = tid.ToGuid(), TaskType = tt | |
}; | |
} | |
private readonly ReplicatorSqlWriter _replicatorSqlWriter; | |
private readonly ServerReplicatorOptions _options; | |
public Task WriteLoop { get; private set; } = Task.CompletedTask; | |
public Task ReadLoop { get; private set; } = Task.CompletedTask; | |
private async ValueTask RecoverImpl(ReplicatorSnapshotTracker consumer) | |
{ | |
var s = await _replicationSqlReader.GetNewestSnapshot(); | |
if (s != null) | |
{ | |
await consumer.ConsumeSnap( | |
ReplicationSerializer.Instance.DeSerializeTaskSnapshot( | |
s!.TaskSnapshot)); | |
} | |
var o = s?.LastOrdering ?? 0; | |
await foreach (var a in _replicationSqlReader.BeginReadAtOrdering(o)) | |
{ | |
await consumer.ConsumeEvent( | |
ReplicationSerializer.Instance.GetTaskEvent(a.TaskEventData)); | |
} | |
} | |
} | |
public class ReplicationReader | |
{ | |
} | |
public class ReplicationSerializer | |
{ | |
public static readonly ReplicationSerializer Instance = | |
new ReplicationSerializer(); | |
public BaseTaskEvent GetTaskEvent(byte[] bytes) | |
{ | |
return ChatTaskClientSerializer<BaseTaskEvent>.Default.Deserialize( | |
new ReadOnlySequence<byte>(bytes))!; | |
} | |
public byte[] SerializeTaskEvent(BaseTaskEvent taskEvt) | |
{ | |
using (var apbw = new ArrayPoolBufferWriter<byte>()) | |
{ | |
ChatTaskClientSerializer<BaseTaskEvent>.Default.Serialize(apbw, | |
taskEvt); | |
return apbw.WrittenMemory.ToArray(); | |
} | |
} | |
public TaskSerializationState DeSerializeTaskSnapshot(byte[] bytes) | |
{ | |
return ChatTaskClientSerializer<TaskSerializationState>.Default.Deserialize( | |
new ReadOnlySequence<byte>(bytes))!; | |
} | |
public byte[] SerializeTaskSnapshot(TaskSerializationState snapState) | |
{ | |
using (var apbw = new ArrayPoolBufferWriter<byte>()) | |
{ | |
ChatTaskClientSerializer<TaskSerializationState>.Default.Serialize(apbw, | |
snapState); | |
return apbw.WrittenMemory.ToArray(); | |
} | |
} | |
} | |
public record TaskSerializationState( | |
HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>> Assigned, | |
HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>> Unassigned); | |
public interface ISnapshotAndEventConsumer | |
{ | |
IAsyncEnumerable<BaseTaskEvent> BeginNewEventConsumer( | |
CancellationToken token = default); | |
ValueTask ConsumeSnap(TaskSerializationState row); | |
ValueTask ConsumeEvent(BaseTaskEvent row); | |
} | |
public abstract class DataConnectionFactory | |
{ | |
public abstract DataConnection GetConnection(); | |
} | |
#endregion | |
public class ClientBootstrap | |
{ | |
public ClientBootstrap(NatsOpts opts = null) | |
{ | |
_opts = opts ?? NatsOpts.Default; | |
} | |
private NatsConnection _connection; | |
private readonly NatsOpts _opts; | |
public async ValueTask Startclient() | |
{ | |
//TODO: At some point, do jetstream instead. | |
// And after that, do snapshots via KV! | |
_connection = new NatsConnection(_opts); | |
await _connection.ConnectAsync(); | |
//var js = new NatsJSContext(_connection, new NatsJSOpts(NatsOpts.Default)); | |
//new NatsKVContext() | |
} | |
public async ValueTask SendMessage(ISubjectFactory command) | |
{ | |
switch (command) | |
{ | |
case IChatTaskClient ctc: | |
await _connection.PublishAsync(command.GetSubject(), ctc, | |
serializer: ChatTaskClientSerializer<IChatTaskClient> | |
.Default); | |
break; | |
case BaseTaskEvent te: | |
await _connection.PublishAsync(command.GetSubject(), te, | |
serializer: ChatTaskClientSerializer<BaseTaskEvent>.Default); | |
break; | |
} | |
} | |
public async ValueTask<INatsSub<T>> SubChat<T>(string subject, | |
CancellationToken token) where T : IChatTaskClient | |
{ | |
return await _connection.SubscribeCoreAsync<T>(subject, | |
null, ChatTaskClientSerializer<T>.Default, default, token); | |
} | |
public async ValueTask<INatsSub<T>> Sub<T>(string subject, | |
CancellationToken token) where T : BaseTaskEvent | |
{ | |
return await _connection.SubscribeCoreAsync<T>(subject, | |
null, ChatTaskClientSerializer<T>.Default, default, token); | |
} | |
} | |
public record ChatHistory(Lst<BroadcastMessage> Messages) | |
{ | |
public ChatHistory AddMsg(BroadcastMessage message) | |
{ | |
if (Messages.Contains(message)) | |
{ | |
return this; | |
} | |
return this with { Messages = Messages.Add(message) }; | |
} | |
} | |
public record ReplicatorStateProjectionData(HashMap<Ulid, Lst<BaseTaskEvent>> Map); | |
public record ReplicatorStateProjectionSet( | |
ReplicatorStateProjectionData Active); | |
public class ReplicatorSnapshotTracker | |
{ | |
private HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>> _tasks; | |
private HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>> _assigned; | |
private HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>> _unassigned; | |
public async ValueTask ConsumeSnap(TaskSerializationState row) | |
{ | |
_tasks = row.Assigned.Union(row.Unassigned, (a, b) => b, (a, b) => b, | |
(k, a, b) => a.Union(b)); | |
} | |
public async ValueTask ConsumeEvent(BaseTaskEvent row) | |
{ | |
if (row is TaskCompleted) | |
{ | |
RemoveFromSets(); | |
} | |
else if (row is TaskCanceled) | |
{ | |
RemoveFromSets(); | |
} | |
else | |
{ | |
_tasks = | |
_tasks.TrySetItem(row.TaskType, | |
(s) => s.TrySetItem(row.TaskId, a => a.Add(row))); | |
_assigned = _assigned.TrySetItem(row.TaskType, | |
(s) => s.TrySetItem(row.TaskId, a => a.Add(row))); | |
_unassigned = _unassigned.TrySetItem(row.TaskType, | |
(s) => s.TrySetItem(row.TaskId, a => a.Add(row))); | |
} | |
void RemoveFromSets() | |
{ | |
_tasks = | |
_tasks.TrySetItem(row.TaskType, (s) => s.Remove(row.TaskId)); | |
_assigned = _assigned.TrySetItem(row.TaskType, (s) => s.Remove(row.TaskId)); | |
_unassigned = | |
_unassigned.TrySetItem(row.TaskType, (s) => s.Remove(row.TaskId)); | |
} | |
} | |
public TaskSerializationState GetSnapState() | |
{ | |
return new TaskSerializationState(_assigned, _unassigned); | |
} | |
} | |
/// <summary> | |
/// | |
/// </summary> | |
/// <remarks> | |
/// Ok. So the way this works, is we use a series of <see cref="Atom{A}"/> | |
/// Hashmaps of Maps. Atom lets us Swap via interlocked, | |
/// such that if the values being swapped are 'comparable' and the actions are | |
/// 'safely repeatable', it can repeat the action if a concurrent update happens | |
/// until the interlocked operation succeeds. | |
/// </remarks> | |
public class TaskSetCoordinatorHashMap : ISnapshotAndEventConsumer | |
{ | |
public string UserId { get; } | |
public TaskSetCoordinatorHashMap(string userId, | |
ClientBootstrap clientBootstrap, bool trackChanges) | |
{ | |
UserId = userId; | |
_client = clientBootstrap; | |
_trackChanges = trackChanges; | |
_assignedTasks = Atom(HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>()); | |
_requestedTasks = Atom(HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>()); | |
_unassignedTasks = Atom(HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>()); | |
} | |
private readonly Atom<HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>> _assignedTasks; | |
private readonly Atom<HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>> _requestedTasks; | |
private readonly Atom<HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>> _unassignedTasks; | |
private readonly ClientBootstrap _client; | |
private Task _mainTask; | |
public async Task<Ulid> CreateTask(string taskType, string text, | |
string? assignedToUser, DateTimeOffset expectedBy) | |
{ | |
var taskId = Ulid.NewUlid(); | |
await _client.SendMessage(new TaskRequest(Ulid.NewUlid(), UserId, assignedToUser, | |
taskId, taskType, text, DateTimeOffset.UtcNow, | |
expectedBy)); | |
return taskId; | |
} | |
public async ValueTask AddTaskNote(TaskRequest origReq, | |
string notes) | |
{ | |
await _client.SendMessage(new TaskUpdate(Ulid.NewUlid(),origReq.TaskId, | |
origReq.TaskType, UserId, notes)); | |
} | |
public async ValueTask<bool> AddTaskCanceled(TaskRequest origReq, string notes) | |
{ | |
if (origReq.RequestingUserId == UserId) | |
{ | |
await _client.SendMessage(new TaskCanceled(Ulid.NewUlid(),origReq.TaskId, | |
origReq.TaskType, UserId, notes, DateTimeOffset.Now)); | |
return true; | |
} | |
return false; | |
} | |
public async ValueTask<ReassignResult> ReassignTask(Ulid taskId, | |
string newUser, string notes) | |
{ | |
bool foundTask = false; | |
foreach (var requestedTask in _assignedTasks.Value) | |
{ | |
var item = | |
requestedTask.Value.Find(taskId, a => a, | |
static () => Lst<BaseTaskEvent>.Empty); | |
if (item != default) | |
{ | |
foundTask = true; | |
await _client.SendMessage(new TaskAssigned(Ulid.NewUlid(),taskId, newUser, | |
requestedTask.Key, UserId, notes, item, | |
DateTimeOffset.Now)); | |
//_assignedTasks.AddOrUpdate(requestedTask.Key, | |
// e => e.Remove(taskId), | |
// () => HashMap<Ulid, Lst<BaseTaskEvent>>.Empty); | |
return new ReassignResult(taskId, true, "Reassigned"); | |
} | |
} | |
foreach (var requestedTask in _unassignedTasks.Value) | |
{ | |
var item = | |
requestedTask.Value.Find(taskId, a => a, | |
static () => Lst<BaseTaskEvent>.Empty); | |
if (item != default) | |
{ | |
foundTask = true; | |
await _client.SendMessage(new TaskAssigned(Ulid.NewUlid(),taskId, newUser, | |
requestedTask.Key, UserId, notes, item, | |
DateTimeOffset.Now)); | |
//_assignedTasks.AddOrUpdate(requestedTask.Key, | |
// e => e.Remove(taskId), | |
// () => HashMap<Ulid, Lst<BaseTaskEvent>>.Empty); | |
return new ReassignResult(taskId, true, "Reassigned"); | |
} | |
} | |
return new ReassignResult(taskId, false, "Could not find task!"); | |
} | |
public async ValueTask Init(TaskCompletionSource initLock, | |
CancellationToken token) | |
{ | |
var userEvents = | |
await _client.Sub<BaseTaskEvent>( | |
SubjectHelpers.TaskUserSub(UserId), token); | |
CancellationTokenSource cts = null; | |
_mainTask = Task.Run(async () => | |
{ | |
await initLock.Task; | |
try | |
{ | |
await foreach (var entry in userEvents.Msgs.ReadAllAsync( | |
token)) | |
{ | |
try | |
{ | |
RunEventHandlers(entry); | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
throw; | |
} | |
} | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
throw; | |
} | |
finally | |
{ | |
try | |
{ | |
cts?.Cancel(false); | |
} | |
finally | |
{ | |
try | |
{ | |
cts?.Dispose(); | |
} | |
catch | |
{ | |
// ignored | |
} | |
} | |
} | |
}); | |
} | |
private void RunEventHandlers(NatsMsg<BaseTaskEvent> entry) | |
{ | |
ProcessTaskEventSwitch(entry.Data); | |
} | |
private void ProcessTaskEventSwitch(BaseTaskEvent? msg) | |
{ | |
switch (msg) | |
{ | |
case TaskAssigned ta: | |
{ | |
HandleTaskAssigned(ta); | |
break; | |
} | |
case TaskBegan tb: | |
{ | |
HandleTaskBegan(tb); | |
break; | |
} | |
case TaskCompleted tc: | |
{ | |
HandleTaskCompleted(tc); | |
break; | |
} | |
case TaskRequest tr: | |
{ | |
HandleTaskRequest(tr); | |
break; | |
} | |
case TaskUpdate tu: | |
{ | |
HandleTaskUpdate(tu); | |
break; | |
} | |
case TaskCanceled tcan: | |
{ | |
HandleTaskCanceled(tcan); | |
break; | |
} | |
} | |
} | |
public async ValueTask<ClearResult> ClearItem(Ulid taskId) | |
{ | |
bool? foundAndCleared = null; | |
_requestedTasks.Swap(thm => | |
{ | |
var innerThm = thm; | |
foreach (var valueTuple in thm) | |
{ | |
valueTuple.Value.Find(taskId, s => | |
{ | |
var f = s.Head(); | |
var c = s.Reverse().FirstOrDefault(); | |
if (c is TaskCanceled or TaskCompleted || | |
f is not TaskRequest) | |
{ | |
innerThm = innerThm.AddOrUpdate(valueTuple.Key, | |
thmi => thmi.Remove(taskId), | |
() => LanguageExt.HashMap<Ulid, Lst<BaseTaskEvent>>.Empty); | |
} | |
return true; | |
}, () => false); | |
} | |
return innerThm; | |
}); | |
_requestedTasks.Swap(thm => | |
{ | |
var innerThm = thm; | |
foreach (var valueTuple in thm) | |
{ | |
valueTuple.Value.Find(taskId, s => | |
{ | |
var f = s.Head(); | |
var c = s.Reverse().FirstOrDefault(); | |
if (c is TaskCanceled or TaskCompleted || | |
f is not TaskRequest) | |
{ | |
innerThm = innerThm.AddOrUpdate(valueTuple.Key, | |
thmi => thmi.Remove(taskId), | |
() => LanguageExt.HashMap<Ulid, Lst<BaseTaskEvent>>.Empty); | |
} | |
return true; | |
}, () => false); | |
} | |
return innerThm; | |
}); | |
_unassignedTasks.Swap(thm => | |
{ | |
var innerThm = thm; | |
foreach (var valueTuple in thm) | |
{ | |
valueTuple.Value.Find(taskId, s => | |
{ | |
var f = s.Head(); | |
var c = s.Reverse().FirstOrDefault(); | |
if (c is TaskCanceled or TaskCompleted || | |
f is not TaskRequest) | |
{ | |
innerThm = innerThm.AddOrUpdate(valueTuple.Key, | |
thmi => thmi.Remove(taskId), | |
() => LanguageExt.HashMap<Ulid, Lst<BaseTaskEvent>>.Empty); | |
} | |
return true; | |
}, () => false); | |
} | |
return innerThm; | |
}); | |
if (foundAndCleared.HasValue == false) | |
{ | |
return new ClearResult(true, "no instances found"); | |
} | |
else | |
{ | |
if (foundAndCleared.Value) | |
{ | |
return new ClearResult(true, "cleared"); | |
} | |
else | |
{ | |
return new ClearResult(false, "Tasks not in closed state"); | |
} | |
} | |
} | |
private void HandleTaskRequest(TaskRequest trb) | |
{ | |
void doSwap( | |
Atom<HashMap<string, | |
HashMap<Ulid, Lst<BaseTaskEvent>>>> taskSet, | |
TaskRequest taskRequest) | |
{ | |
taskSet.Swap(taskRequest, (tr,b)=>b.AddOrUpdate(tr.TaskType, | |
(hm) => hm.TryAdd(tr.TaskId, Lst<BaseTaskEvent>.Empty.Add(tr)), | |
() => LanguageExt.HashMap<Ulid, Lst<BaseTaskEvent>>.Empty.Add(tr.TaskId, | |
Lst<BaseTaskEvent>.Empty.Add(tr)))); | |
} | |
if (trb.RequestingUserId == UserId) | |
{ | |
doSwap(_requestedTasks, trb); | |
} | |
if (trb.AssignedUserId == UserId) | |
{ | |
doSwap(_assignedTasks,trb); | |
} | |
else if (trb.AssignedUserId == "unassigned") | |
{ | |
doSwap(_unassignedTasks,trb); | |
} | |
} | |
private static void TUSwap( | |
Atom<HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>> | |
atom, TaskUpdate tu) | |
{ | |
atom.Value.Find(tu.TaskType, hm => | |
{ | |
atom.Swap(b=>b.AddOrUpdate(tu.TaskType, | |
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu), | |
() => Lst<BaseTaskEvent>.Empty.Add(tu)), | |
() => LanguageExt.HashMap<Ulid, Lst<BaseTaskEvent>>.Empty)); | |
return Unit.Default; | |
}, () => Unit.Default); | |
} | |
private static void TCanSwap( | |
Atom<HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>> | |
atom, TaskCanceled tu) | |
{ | |
atom.Value.Find(tu.TaskType, hm => | |
{ | |
atom.Swap(b=>b.AddOrUpdate(tu.TaskType, | |
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu), | |
() => Lst<BaseTaskEvent>.Empty.Add(tu)), | |
() => LanguageExt.HashMap<Ulid, Lst<BaseTaskEvent>>.Empty)); | |
return Unit.Default; | |
}, () => Unit.Default); | |
} | |
private static void TComSwap( | |
Atom<HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>> | |
atom, TaskCompleted tu) | |
{ | |
atom.Value.Find(tu.TaskType, hm => | |
{ | |
atom.Swap(b=>b.AddOrUpdate(tu.TaskType, | |
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu), | |
() => Lst<BaseTaskEvent>.Empty.Add(tu)), | |
() => LanguageExt.HashMap<Ulid, Lst<BaseTaskEvent>>.Empty)); | |
return Unit.Default; | |
}, () => Unit.Default); | |
} | |
private readonly Channel<bool> _changeLocker = | |
Channel.CreateBounded<bool>(1); | |
private readonly bool _trackChanges; | |
public TaskState GetTaskState() | |
{ | |
return new( | |
_assignedTasks.Value.Select(a => (a.Key, a.Value.ToHashMap())) | |
.ToHashMap(), | |
_requestedTasks.Value.Select(a => (a.Key, a.Value.ToHashMap())) | |
.ToHashMap(), | |
_unassignedTasks.Value.Select(a => (a.Key, a.Value.ToHashMap())) | |
.ToHashMap()); | |
} | |
[MessagePackObject(keyAsPropertyName:true)] | |
public record TaskState( | |
HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>> AssignedTasks, | |
HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>> RequestedTasks, | |
HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>> UnassignedTasks); | |
private static ( | |
HashMap<TKeyOuter, Change<TrackingHashMap<TKeyInner, Lst<TLst>>>> chg, | |
TrackingHashMap<TKeyOuter, TrackingHashMap<TKeyInner, Lst<TLst>>> thm, | |
HashMap<TKeyOuter, HashMap<TKeyInner, Change<Lst<TLst>>>> innerChg) | |
MakeSnapshot<TKeyOuter,TKeyInner,TLst>( | |
TrackingHashMap<TKeyOuter, TrackingHashMap<TKeyInner, Lst<TLst>>> thm) | |
{ | |
//TODO: This is trasshhhhhhh. | |
var chg = thm.Changes; | |
var preMap = thm.Map(a => | |
{ | |
var (k, v) = a; | |
var c = v.Changes; | |
return (k, (c, v.Snapshot())); | |
}).ToHashMap(); | |
thm = thm.SetItems(preMap.Select(a => (a.Item1, a.Item2.Item2))); | |
var innerChg = preMap.Select(a => (a.Item1, a.Item2.Item1)) | |
.ToHashMap(); | |
return (chg, thm, innerChg); | |
} | |
private void HandleTaskUpdate(TaskUpdate tu) | |
{ | |
TUSwap(_assignedTasks,tu); | |
TUSwap(_requestedTasks,tu); | |
TUSwap(_unassignedTasks,tu); | |
} | |
private void HandleTaskCanceled(TaskCanceled taskCanceled) | |
{ | |
TCanSwap(_assignedTasks,taskCanceled); | |
TCanSwap(_requestedTasks,taskCanceled); | |
TCanSwap(_unassignedTasks,taskCanceled); | |
} | |
private void HandleTaskCompleted(TaskCompleted taskCompleted) | |
{ | |
TComSwap(_assignedTasks, taskCompleted); | |
TComSwap(_unassignedTasks, taskCompleted); | |
TComSwap(_requestedTasks, taskCompleted); | |
} | |
private void HandleTaskAssigned(TaskAssigned ta) | |
{ | |
if (ta.UserId == UserId) | |
{ | |
_assignedTasks.Swap(ta, (tai, thm) => | |
{ | |
return thm.AddOrUpdate(tai.TaskType, tai.TaskId, | |
e => e.Add(tai), | |
() => Lst<BaseTaskEvent>.Empty.Add(tai)); | |
}); | |
} | |
else | |
{ | |
_assignedTasks.Swap(ta, (tai, thm) => | |
{ | |
return thm.Remove(tai.TaskType, tai.TaskId); | |
}); | |
} | |
} | |
private void HandleTaskBegan(TaskBegan tb) | |
{ | |
_requestedTasks.Swap(tb, (tbi, thm) => | |
{ | |
return thm.AddOrUpdate(tbi.TaskType, tbi.TaskId, t => | |
t.Add(tbi), () => | |
Lst<BaseTaskEvent>.Empty.Add(tbi)); | |
}); | |
} | |
public async IAsyncEnumerable<BaseTaskEvent> BeginNewEventConsumer(CancellationToken token = default) | |
{ | |
await using (var userEvents = | |
await _client.Sub<BaseTaskEvent>( | |
SubjectHelpers.TaskUserSub(UserId), token)) | |
{ | |
await foreach (var msg in userEvents.Msgs.ReadAllAsync(token) | |
.WithCancellation(token)) | |
{ | |
yield return msg.Data!; | |
} | |
} | |
} | |
public async ValueTask ConsumeSnap(TaskSerializationState row) | |
{ | |
var newAMap = HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>(); | |
var newRMap = HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>(); | |
var newUaMap = HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>(); | |
foreach (var valueTuple in row.Assigned) | |
{ | |
var catAMap = HashMap<Ulid, Lst<BaseTaskEvent>>(); | |
var catRMap = HashMap<Ulid, Lst<BaseTaskEvent>>(); | |
foreach (var inner in valueTuple.Value) | |
{ | |
var state = GetTaskState(inner.Value); | |
if (state.IsAssignedToUser) | |
{ | |
catAMap = catAMap.Add(inner.Key, inner.Value); | |
} | |
if (state.IsRequestedByUser) | |
{ | |
catRMap = catRMap.Add(inner.Key, inner.Value); | |
} | |
} | |
newAMap = newAMap.Add(valueTuple.Key, catAMap); | |
newRMap = newRMap.Add(valueTuple.Key, catRMap); | |
} | |
foreach (var valueTuple in row.Unassigned) | |
{ | |
var catRMap = newRMap.Find(valueTuple.Key, a => a, () => default); | |
var catUaMap = HashMap<Ulid, Lst<BaseTaskEvent>>(); | |
foreach (var inner in valueTuple.Value) | |
{ | |
var state = GetTaskState(inner.Value); | |
if (state.IsRequestedByUser) | |
{ | |
catRMap = catRMap.Add(inner.Key, inner.Value); | |
} | |
catUaMap = catUaMap.Add(inner.Key, inner.Value); | |
} | |
newRMap = newAMap.SetItem(valueTuple.Key, catRMap); | |
newUaMap = newUaMap.SetItem(valueTuple.Key, catUaMap); | |
} | |
_assignedTasks.Swap(newAMap, (arg, ex) => arg); | |
_unassignedTasks.Swap(newUaMap, (arg, ex) => arg); | |
_requestedTasks.Swap(newRMap, (arg, ex) => arg); | |
//row.Assigned.Where(a=>a.) | |
} | |
private RebuiltTaskState GetTaskState(Lst<BaseTaskEvent> row) | |
{ | |
string assignedUserId = ""; | |
string requestedUserId = ""; | |
foreach (var baseTaskEvent in row) | |
{ | |
if (baseTaskEvent is TaskRequest taskRequest) | |
{ | |
requestedUserId = taskRequest.RequestingUserId; | |
} | |
else if (baseTaskEvent is TaskAssigned ta) | |
{ | |
assignedUserId = ta.UserId; | |
} | |
} | |
return new(assignedUserId == UserId, requestedUserId == UserId); | |
} | |
public async ValueTask ConsumeEvent(BaseTaskEvent row) | |
{ | |
this.ProcessTaskEventSwitch(row); | |
} | |
} | |
public record RebuiltTaskState(bool IsAssignedToUser, bool IsRequestedByUser); | |
[MessagePackObject(keyAsPropertyName:true)] | |
public record ReassignResult(Ulid TaskId, bool Success, string Message); | |
[MessagePackObject(keyAsPropertyName:true)] | |
public record ClearResult(bool Success, string? Reason); | |
public class ChatTaskClientSerializer<T> : INatsSerializer<T> | |
{ | |
public static readonly ChatTaskClientSerializer<T> Default = | |
new ChatTaskClientSerializer<T>(); | |
private static readonly IFormatterResolver DefaultFormatterResolver = | |
CompositeResolver.Create(new[] | |
{ | |
UlidMessagePackResolver.Instance, | |
MessagePackSerializerOptions.Standard.Resolver | |
}); | |
public static readonly MessagePackSerializerOptions DefaultSerializerOptions = | |
MessagePackSerializerOptions.Standard.WithResolver( | |
DefaultFormatterResolver); | |
public void Serialize(IBufferWriter<byte> bufferWriter, T value) | |
{ | |
MessagePackSerializer.Serialize(bufferWriter, value, | |
DefaultSerializerOptions); | |
} | |
public T? Deserialize(in ReadOnlySequence<byte> buffer) | |
{ | |
return MessagePackSerializer.Deserialize<T>(buffer, | |
DefaultSerializerOptions); | |
} | |
} | |
public interface ISubjectFactory | |
{ | |
string GetSubject(); | |
} | |
[Union(2, typeof(TaskRequest))] | |
[Union(3, typeof(TaskCompleted))] | |
[Union(4, typeof(TaskBegan))] | |
[Union(5, typeof(TaskUpdate))] | |
[Union(6, typeof(TaskAssigned))] | |
[Union(7, typeof(TaskCanceled))] | |
[MessagePackObject(keyAsPropertyName:true)] | |
public abstract record BaseTaskEvent(Ulid EventId, Ulid TaskId, string TaskType) | |
: ISubjectFactory, IEquatable<BaseTaskEvent> | |
{ | |
bool IEquatable<BaseTaskEvent>.Equals(BaseTaskEvent? other) | |
{ | |
return other != null && other.EventId == EventId; | |
} | |
public abstract string GetSubject(); | |
} | |
public static class SubjectHelpers | |
{ | |
public static string BroadcastSubject => | |
$"sample.chat.message.broadcast"; | |
public static string DirectMessage(string from, string to) => | |
$"sample.chat.message.direct.{from}.{to}"; | |
public static string | |
Tasks(string scope, string type, Ulid taskId, string requested, | |
string assigned) => | |
$"sample.task.{scope}.{type},{taskId}.{requested}.{assigned}"; | |
public static string Tasks(string scope, string type, Ulid taskId, | |
string doneByUser) | |
{ | |
return $"sample.task.{scope}.{type},{taskId}.{doneByUser}"; | |
} | |
public static string TaskUserSub(string userId) | |
{ | |
return $"sample.task.>"; | |
} | |
public static string TaskReplication => $"sample.task.>"; | |
} | |
[MessagePackObject(true)] | |
public record TaskRequest( | |
Ulid EventId, | |
string RequestingUserId, | |
string? AssignedUserId, | |
Ulid TaskId, | |
string TaskType, | |
string Text, | |
DateTimeOffset RequestedAt, | |
DateTimeOffset ExpectedBy) : BaseTaskEvent(EventId,TaskId,TaskType) | |
{ | |
public override string GetSubject() | |
{ | |
return SubjectHelpers.Tasks(TaskScopeSubjects.Request, TaskType, TaskId, | |
RequestingUserId, | |
AssignedUserId != null ? AssignedUserId : "unassigned"); | |
} | |
} | |
[MessagePackObject(true)] | |
public record TaskCanceled( | |
Ulid EventId, | |
Ulid TaskId, | |
string TaskType, | |
string CanceledByUser, | |
string CancelReason, | |
DateTimeOffset CancelTime) : BaseTaskEvent(EventId,TaskId,TaskType) | |
{ | |
public override string GetSubject() | |
{ | |
return SubjectHelpers.Tasks(TaskScopeSubjects.Canceled, TaskType, TaskId, | |
CanceledByUser); | |
} | |
} | |
[MessagePackObject(true)] | |
public record TaskBegan( | |
Ulid EventId, | |
Ulid TaskId, | |
string TaskType, | |
string UserId, | |
string Text, | |
DateTimeOffset BeginTime) : BaseTaskEvent(EventId, TaskId, TaskType) | |
{ | |
public override string GetSubject() | |
{ | |
return SubjectHelpers.Tasks(TaskScopeSubjects.Begun, TaskType, TaskId, UserId); | |
} | |
} | |
[MessagePackObject(true)] | |
public record TaskUpdate( | |
Ulid EventId, | |
Ulid TaskId, | |
string TaskType, | |
string UserId, | |
string Notes) : BaseTaskEvent(EventId,TaskId,TaskType) | |
{ | |
public override string GetSubject() | |
{ | |
return SubjectHelpers.Tasks(TaskScopeSubjects.Update, TaskType, TaskId, UserId); | |
} | |
} | |
[MessagePackObject(true)] | |
public record TaskCompleted( | |
Ulid EventId, | |
Ulid TaskId, | |
string TaskType, | |
string UserId, | |
string Notes, | |
DateTimeOffset CompletedTime) : BaseTaskEvent(EventId,TaskId,TaskType) | |
{ | |
public override string GetSubject() | |
{ | |
return SubjectHelpers.Tasks(TaskScopeSubjects.Completed, TaskType, TaskId, UserId); | |
} | |
} | |
public static class TaskScopeSubjects | |
{ | |
public static readonly string Completed = "completed"; | |
public static readonly string Assigned = "assigned"; | |
public static readonly string Begun = "begun"; | |
public static readonly string Update = "update"; | |
public static readonly string Canceled = "canceled"; | |
public static readonly string Request = "request"; | |
} | |
[MessagePackObject(true)] | |
public record TaskAssigned( | |
Ulid EventId, | |
Ulid TaskId, | |
string TaskType, | |
string UserId, | |
string AssignedByUserId, | |
string Notes, | |
Lst<BaseTaskEvent> History, | |
DateTimeOffset AssignedTime | |
) : BaseTaskEvent(EventId, TaskId, TaskType) | |
{ | |
public override string GetSubject() | |
{ | |
return SubjectHelpers.Tasks(TaskScopeSubjects.Assigned, TaskType, TaskId, UserId, | |
AssignedByUserId); | |
} | |
} | |
public class ClientCoordinator | |
{ | |
public string UserId { get; } | |
private readonly Atom<ChatHistory> _broadcastHistory; | |
private readonly Atom<DmConversationSet> _directMessageHistory; | |
private readonly ClientBootstrap _client; | |
public ClientCoordinator(string userId, ClientBootstrap bootstrap) | |
{ | |
UserId = userId; | |
_client = bootstrap; | |
// OK so the way Atoms work, is they use a 'swap' function, | |
// and then check for equality/etc. | |
// If your type does equality (i.e. records) | |
// and your underlying types are immutable, | |
// and your actions are able to be applied... | |
// magic! in case of a conflict, the operation is re-attempted, | |
// all in a thread safe fashion (as long as your swap is thread safe) | |
_broadcastHistory = | |
Atom(new ChatHistory(default)); | |
_directMessageHistory = | |
Atom(new DmConversationSet(LanguageExt | |
.HashMap<string, Lst<DirectMessage>>.Empty)); | |
} | |
public async ValueTask Begin(CancellationToken token) | |
{ | |
var (broadcastSub, dmSub, outboundDmSub) = await ValueTaskEx.WhenAll( | |
_client.SubChat<BroadcastMessage>( | |
SubjectHelpers.BroadcastSubject, | |
token), | |
_client.SubChat<DirectMessage>( | |
SubjectHelpers.DirectMessage("*",UserId), | |
token), | |
_client.SubChat<DirectMessage>( | |
SubjectHelpers.DirectMessage(UserId,"*"), token) | |
); | |
Task.Run(async () => | |
{ | |
try | |
{ | |
await foreach (var reader in broadcastSub.Msgs.ReadAllAsync( | |
token)) | |
{ | |
_broadcastHistory.Swap(reader.Data!, | |
(m, h) => h.AddMsg(m)); | |
} | |
} | |
catch | |
{ | |
} | |
}); | |
Task.Run(async () => | |
{ | |
try | |
{ | |
await foreach | |
(var reader in outboundDmSub.Msgs.ReadAllAsync( | |
token)) | |
{ | |
_directMessageHistory.Swap(reader.Data!, | |
(m, h) => | |
{ | |
return h.AddOrMarkMessageReceived(m.ToUserId, | |
m); | |
}); | |
} | |
} | |
catch | |
{ | |
} | |
}); | |
Task.Run(async () => | |
{ | |
try | |
{ | |
await foreach (var reader in dmSub.Msgs.ReadAllAsync( | |
token)) | |
{ | |
_directMessageHistory.Swap(reader.Data!, UserId, | |
(m, h, v) => | |
{ | |
if (m.FromUserId != h) | |
{ | |
return v.AddMsg(m.FromUserId, m); | |
} | |
else | |
{ | |
return v.AddMsg(m.ToUserId, m); | |
} | |
}); | |
} | |
} | |
catch | |
{ | |
} | |
}); | |
} | |
} | |
[Union(0, typeof(BroadcastMessage))] | |
[Union(1, typeof(DirectMessage))] | |
public interface IChatTaskClient : ISubjectFactory | |
{ | |
} | |
[MessagePackObject(true)] | |
public record BroadcastMessage( | |
string FromUserId, | |
string Text) : IChatTaskClient | |
{ | |
public DateTimeOffset TimeSent { get; set; } | |
public string GetSubject() | |
{ | |
return SubjectHelpers.BroadcastSubject; | |
} | |
} | |
public record DmConversationSet(HashMap<string, Lst<DirectMessage>> User) | |
{ | |
public DmConversationSet AddMsg(string arg1FromUserId, | |
DirectMessage directMessage) | |
{ | |
return this with | |
{ | |
User = User.AddOrUpdate(arg1FromUserId, | |
(a) => a.Contains(directMessage) ? a : a.Add(directMessage), | |
() => new Lst<DirectMessage>().Add(directMessage)) | |
}; | |
} | |
public DmConversationSet AddOrMarkMessageReceived(string arg1ToUserId, | |
DirectMessage directMessage) | |
{ | |
return AddMsg(arg1ToUserId, directMessage); | |
} | |
} | |
[MessagePackObject(true)] | |
public record DirectMessage( | |
string FromUserId, | |
string ToUserId, | |
string Text) : IChatTaskClient | |
{ | |
public string GetSubject() | |
{ | |
return SubjectHelpers.DirectMessage(FromUserId, ToUserId); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Buffers; | |
using System.Collections.Concurrent; | |
using System.Diagnostics.CodeAnalysis; | |
using System.Net.Sockets; | |
using System.Text; | |
using System.Threading.Channels; | |
using CommunityToolkit.HighPerformance.Buffers; | |
using Cysharp.Serialization.MessagePack; | |
using LanguageExt; | |
using LinqToDB; | |
using LinqToDB.Data; | |
using LinqToDB.Mapping; | |
using MessagePack; | |
using MessagePack.Resolvers; | |
using Microsoft.Extensions.Logging; | |
using NATS.Client.Core; | |
using NATS.Client.JetStream; | |
using NATS.Client.KeyValueStore; | |
using ValueTaskSupplement; | |
using static LanguageExt.Prelude; | |
using CancellationTokenSource = System.Threading.CancellationTokenSource; | |
using Seq = LanguageExt.Seq; | |
using Ulid = System.Ulid; | |
namespace NatsSandBox.Client; | |
internal class Program | |
{ | |
public static async Task Main(string[] args) | |
{ | |
var cb = new ClientBootstrap(); | |
await cb.Startclient(); | |
var c = new TaskSetCoordinatorHashMap("test",cb,false); | |
var tcs = new TaskCompletionSource(); | |
tcs.SetResult(); | |
var cts = new CancellationTokenSource(); | |
await c.Init(tcs,cts.Token); | |
await c.CreateTask("idk", "wat", "bar", DateTimeOffset.Now.AddDays(1)); | |
var t= Task.Run(async () => | |
{ | |
while (cts.IsCancellationRequested == false) | |
{ | |
try | |
{ | |
//await c.CreateTask("idk", "wat", "bar", DateTimeOffset.Now.AddDays(1)); | |
await Task.Delay(100); | |
//Console.WriteLine(MessagePackSerializer.SerializeToJson(c.GetTaskState(), | |
// ChatTaskClientSerializer<TaskSetCoordinatorHashMap.TaskState> | |
// .DefaultSerializerOptions)); | |
await Task.Delay(5000); | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
throw; | |
} | |
} | |
}); | |
while (cts.IsCancellationRequested == false) | |
{ | |
var x = Console.ReadLine(); | |
if (x == "exit") | |
{ | |
cts.Cancel(); | |
} | |
else if (x.StartsWith("create")) | |
{ | |
var tokens = x.Split(' ', StringSplitOptions.RemoveEmptyEntries); | |
if (tokens.Length > 3) | |
{ | |
var type = tokens[1]; | |
var assignedTo = tokens[2]; | |
var text = string.Join(' ', tokens.Skip(3)); | |
var newTaskId = await c.CreateTask(type, text, assignedTo, | |
DateTimeOffset.Now.AddDays(7)); | |
Console.WriteLine($"Created Task with ID {newTaskId.ToString()}"); | |
} | |
} | |
else if (x.StartsWith("print")) | |
{ | |
var ts = c.GetTaskState(); | |
foreach (var valueTuplese in Seq.create( | |
("Assigned : ", ts.AssignedTasks), | |
("Requested : ", ts.RequestedTasks), | |
("Unassigned : ", ts.UnassignedTasks))) | |
{ | |
Console.WriteLine(valueTuplese.Item1); | |
Console.WriteLine(MessagePackSerializer.SerializeToJson(valueTuplese.Item2, | |
ChatTaskClientSerializer<HashMap<string,HashMap<string,Lst<BaseTaskEvent>>>> | |
.DefaultSerializerOptions)); | |
Console.WriteLine("================================"); | |
} | |
} | |
} | |
await t; | |
cts.Dispose(); | |
Console.WriteLine("Goodbye, World!"); | |
} | |
} | |
#region wip stuff | |
public class ServerReplicationRow | |
{ | |
//public string Category { get; set; } | |
public string TaskType { get; set; } | |
public Guid EventId { get; set; } | |
public Guid TaskId { get; set; } | |
public byte[] TaskEventData { get; set; } | |
public string OriginalSubject { get; set; } | |
[Column(IsPrimaryKey = true, IsIdentity = true)] | |
public long Ordering { get; set; } | |
} | |
public class ServerReplicationSnapshotRow | |
{ | |
public long LastOrdering { get; set; } | |
public byte[] TaskSnapshot { get; set; } | |
} | |
public class ReplicationRowHolder | |
{ | |
public readonly List<ServerReplicationRow> Rows; | |
public readonly TaskCompletionSource<long> Tcs; | |
public ReplicationRowHolder(List<ServerReplicationRow> rows) | |
{ | |
Rows = rows; | |
Tcs = new TaskCompletionSource<long>(TaskCreationOptions.RunContinuationsAsynchronously); | |
} | |
} | |
public class ServerReplicator | |
{ | |
private readonly ClientBootstrap _client; | |
private readonly DataConnectionFactory _ctxFactory; | |
private readonly SemaphoreSuperSlim _snapshotGrabbingLocker = | |
new SemaphoreSuperSlim(); | |
public ServerReplicator(ClientBootstrap bootstrap, DataConnectionFactory dataConnectionFactory) | |
{ | |
_client = bootstrap; | |
_ctxFactory = dataConnectionFactory; | |
} | |
private async ValueTask<long?> WriteTasks(List<ServerReplicationRow> rows, bool includeMaxOrderInReturn) | |
{ | |
using (var ctx = _ctxFactory.GetConnection()) | |
{ | |
using (var tx = await ctx.BeginTransactionAsync()) | |
{ | |
long? ret = null; | |
await ctx.GetTable<ServerReplicationRow>() | |
.BulkCopyAsync(rows); | |
if (includeMaxOrderInReturn) | |
{ | |
var lastEventId = rows.Last().EventId; | |
ret = await ctx.GetTable<ServerReplicationRow>() | |
.Where(a => a.EventId == lastEventId) | |
.Select(a => a.Ordering).OrderDescending() | |
.FirstOrDefaultAsync(); | |
} | |
await tx.CommitAsync(); | |
return ret; | |
} | |
} | |
} | |
private readonly Channel<ReplicationRowHolder> _serverRows = Channel.CreateUnbounded<ReplicationRowHolder>(); | |
public async ValueTask StartReplicator(TaskSetCoordinator coordinator, CancellationToken token) | |
{ | |
Task _writeLoop = Task.Run(async () => | |
{ | |
var pending = 500; | |
List<ServerReplicationRow> pendingSet = | |
new List<ServerReplicationRow>(512); | |
List<TaskCompletionSource<long>> pendingRange = | |
new List<TaskCompletionSource<long>>(); | |
while (await _serverRows.Reader.WaitToReadAsync()) | |
{ | |
while (_serverRows.Reader.TryRead(out var a)) | |
{ | |
pendingSet.AddRange(a.Rows); | |
pendingRange.Add(a.Tcs); | |
pending = pending - a.Rows.Count; | |
if (pending - 100 < 0) | |
{ | |
var r= await WriteTasks(pendingSet, true); | |
if (r != null) | |
{ | |
foreach (var taskCompletionSource in pendingRange) | |
{ | |
taskCompletionSource.TrySetResult(r.Value); | |
} | |
} | |
pendingRange.Clear(); | |
pendingSet.Clear(); | |
} | |
} | |
} | |
}); | |
Task _readLoop = Task.Run(async () => | |
{ | |
await using (var sub = | |
await _client.Sub<BaseTaskEvent>( | |
SubjectHelpers.TaskReplication, | |
token)) | |
{ | |
var snapImpl = new ReplicatorSnapshotTracker(); | |
await RecoverImpl(snapImpl); | |
var tillSnap = maxTillSnap; | |
var thisList = new List<ServerReplicationRow>(); | |
ReplicationRowHolder holder = null; | |
while (token.IsCancellationRequested == false) | |
{ | |
var tillFlush = maxTillFlush; | |
while (sub.Msgs.TryRead(out var msg)) | |
{ | |
await snapImpl.ConsumeEvent(msg.Data!); | |
thisList.Add(ToReplicationRow(msg)); | |
tillFlush--; | |
if (tillFlush == 0) | |
{ | |
(tillSnap, thisList) = await FlushAndSnapBarrier(token, snapImpl, tillSnap, thisList); | |
} | |
} | |
if (tillFlush != maxTillFlush) | |
{ | |
(tillSnap, thisList) = await FlushAndSnapBarrier(token, snapImpl, tillSnap, thisList); | |
} | |
if (await sub.Msgs.WaitToReadAsync(token) == false) | |
{ | |
return; | |
} | |
} | |
if (thisList.Count > 0) | |
{ | |
_serverRows.Writer.TryWrite(new ReplicationRowHolder(thisList)); | |
} | |
} | |
}); | |
} | |
const int maxTillFlush = 100; | |
const int maxTillSnap = 2000; | |
private async Task<(int tillSnap, List<ServerReplicationRow> thisList)> FlushAndSnapBarrier(CancellationToken token, | |
ReplicatorSnapshotTracker snapImpl, | |
int tillSnap, List<ServerReplicationRow> thisList) | |
{ | |
ReplicationRowHolder? holder; | |
tillSnap = tillSnap - maxTillFlush; | |
holder = new ReplicationRowHolder(thisList); | |
if (_serverRows.Writer.TryWrite(holder) == false) | |
{ | |
ThrowClosedChannelException(); | |
} | |
thisList = new List<ServerReplicationRow>(); | |
if (tillSnap <= 0 && holder != null) | |
{ | |
var replRow = snapImpl.GetSnapState(); | |
Task.Run(async () => | |
{ | |
await this.WriteSnapshot( | |
new ServerReplicationSnapshotRow() | |
{ | |
LastOrdering = await holder.Tcs.Task, | |
TaskSnapshot = | |
ReplicationSerializer.Instance | |
.SerializeTaskSnapshot(replRow) | |
}); | |
tillSnap = maxTillSnap; | |
}); | |
} | |
return (tillSnap, thisList); | |
} | |
private static void ThrowClosedChannelException() | |
{ | |
throw new ChannelClosedException("Row Buffer Channel was closed!"); | |
} | |
private ServerReplicationRow ToReplicationRow(NatsMsg<BaseTaskEvent> item) | |
{ | |
var msg = item.Data; | |
var tt = msg.TaskType; | |
var eid = msg.EventId; | |
var tid = msg.TaskId; | |
byte[] bytes = default; | |
using (var buf = new ArrayPoolBufferWriter<byte>()) | |
{ | |
ChatTaskClientSerializer<BaseTaskEvent>.Default.Serialize(buf, msg); | |
bytes = buf.WrittenMemory.ToArray(); | |
} | |
return new ServerReplicationRow() | |
{ | |
//Category = ???, | |
EventId = eid.ToGuid(), OriginalSubject = item.Subject, | |
TaskEventData = bytes, TaskId = tid.ToGuid(), TaskType = tt | |
}; | |
} | |
public async ValueTask WriteSnapshot(ServerReplicationSnapshotRow row) | |
{ | |
using (var ctx = _ctxFactory.GetConnection()) | |
{ | |
await ctx.GetTable<ServerReplicationSnapshotRow>() | |
.InsertOrUpdateAsync(() => new ServerReplicationSnapshotRow() | |
{ | |
LastOrdering = row.LastOrdering, | |
TaskSnapshot = row.TaskSnapshot | |
}, e => new ServerReplicationSnapshotRow() | |
{ | |
TaskSnapshot = row.TaskSnapshot | |
}, | |
() => new ServerReplicationSnapshotRow() | |
{ LastOrdering = row.LastOrdering }); | |
} | |
} | |
public async ValueTask<ServerReplicationSnapshotRow?> GetNewestSnapshot() | |
{ | |
using (var ctx = _ctxFactory.GetConnection()) | |
{ | |
var row = await ctx.GetTable<ServerReplicationSnapshotRow>() | |
.OrderByDescending(a => a.LastOrdering).FirstOrDefaultAsync(); | |
return row; | |
} | |
} | |
public async IAsyncEnumerable<ServerReplicationRow> BeginReadAtOrdering(long ordering) | |
{ | |
long? curr = ordering; | |
List<ServerReplicationRow> currSet = null; | |
while (curr != null) | |
{ | |
using (var ctx = _ctxFactory.GetConnection()) | |
{ | |
currSet = await GrabOrderingSet(ctx, curr); | |
if (currSet.Count < 100) | |
{ | |
curr = null; | |
} | |
else | |
{ | |
curr = currSet.LastOrDefault()?.Ordering; | |
} | |
} | |
foreach (var row in currSet) | |
{ | |
yield return row; | |
} | |
} | |
} | |
private static async Task<List<ServerReplicationRow>> GrabOrderingSet(DataConnection ctx, [DisallowNull] long? curr) | |
{ | |
return await ctx.GetTable<ServerReplicationRow>().Where(a => a.Ordering >= curr) | |
.OrderBy(a=>a.Ordering) | |
.Take(100) | |
.ToListAsync(); | |
} | |
public async ValueTask StartAndApplyFromSnapshot(ISnapshotAndEventConsumer consumer) | |
{ | |
var c = consumer.BeginNewEventConsumer(); | |
await RecoverImpl(consumer); | |
await foreach (var a in c) | |
{ | |
try | |
{ | |
using (var ctx = | |
await _snapshotGrabbingLocker.WaitAndAcquireSafe( | |
(CancellationToken)default)) | |
{ | |
await consumer.ConsumeEvent(a); | |
} | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
throw; | |
} | |
} | |
} | |
private async ValueTask RecoverImpl(ISnapshotAndEventConsumer consumer) | |
{ | |
var s = await GetNewestSnapshot(); | |
if (s != null) | |
{ | |
await consumer.ConsumeSnap(ReplicationSerializer.Instance.DeSerializeTaskSnapshot(s!.TaskSnapshot)); | |
} | |
var o = s?.LastOrdering??0; | |
await foreach (var a in BeginReadAtOrdering(o)) | |
{ | |
await consumer.ConsumeEvent(ReplicationSerializer.Instance.GetTaskEvent(a.TaskEventData)); | |
} | |
} | |
private async ValueTask RecoverImpl(ReplicatorSnapshotTracker consumer) | |
{ | |
var s = await GetNewestSnapshot(); | |
if (s != null) | |
{ | |
await consumer.ConsumeSnap(ReplicationSerializer.Instance.DeSerializeTaskSnapshot(s!.TaskSnapshot)); | |
} | |
var o = s?.LastOrdering??0; | |
await foreach (var a in BeginReadAtOrdering(o)) | |
{ | |
await consumer.ConsumeEvent(ReplicationSerializer.Instance.GetTaskEvent(a.TaskEventData)); | |
} | |
} | |
} | |
public readonly struct SemaphoreSuperSlimContext : IDisposable | |
{ | |
private readonly SemaphoreSuperSlim _semaphoreSuperSlim; | |
public SemaphoreSuperSlimContext(SemaphoreSuperSlim parent) | |
{ | |
_semaphoreSuperSlim = parent; | |
} | |
public void Dispose() | |
{ | |
_semaphoreSuperSlim.TryRelease(); | |
} | |
} | |
/// <summary> | |
/// We use this because, unlike <see cref="SemaphoreSlim"/>, | |
/// A Bounded <see cref="Channel{T}"/> will avoid allocations | |
/// when there is only one waiter, and that is our common case. | |
/// </summary> | |
public class SemaphoreSuperSlim | |
{ | |
private readonly Channel<int> lockChannel = Channel.CreateBounded<int>(1); | |
/// <summary> | |
/// If -true-, you can continue. | |
/// <para/> | |
/// If -false-, you are done. | |
/// <para/> | |
/// If Throws, a hard-fault was signalled. | |
/// <para/> | |
/// If you are using a retry loop on this, try to re-use a single waiter | |
/// Instead of calling it repeatedly. :) | |
/// </summary> | |
public async ValueTask<bool> WaitForNext(CancellationToken token = default) | |
{ | |
return await lockChannel.Reader.WaitToReadAsync(token); | |
} | |
public bool IsAcquired() | |
{ | |
return lockChannel.Reader.TryPeek(out _); | |
} | |
/// <summary> | |
/// Waits for and acquired a | |
/// </summary> | |
/// <param name="token"></param> | |
/// <returns></returns> | |
public async ValueTask<bool> WaitAndAcquire(CancellationToken token = default) | |
{ | |
await lockChannel.Writer.WriteAsync(0, token); | |
return true; | |
} | |
public async ValueTask<SemaphoreSuperSlimContext> WaitAndAcquireSafe(CancellationToken token = default) | |
{ | |
await lockChannel.Writer.WriteAsync(0, token); | |
return new SemaphoreSuperSlimContext(this); | |
} | |
/// <summary> | |
/// Tries to set the waiter as no longer waiting. | |
/// </summary> | |
/// <returns>If -false-, the waiter was already set to not-waiting</returns> | |
public bool TryAcquire() | |
{ | |
return lockChannel.Writer.TryWrite(0); | |
} | |
/// <summary> | |
/// Tries to set the waiter as unused. | |
/// </summary> | |
/// <returns>If -false-, the waiter was already set as waiting</returns> | |
public bool TryRelease() | |
{ | |
return lockChannel.Reader.TryRead(out _); | |
} | |
/// <summary> | |
/// Marks the waiter as -completed-, i.e. no further work should be done. | |
/// </summary> | |
/// <returns></returns> | |
public bool MarkCompleted() | |
{ | |
var didComplete = lockChannel.Writer.TryComplete(); | |
lockChannel.Reader.TryRead(out _); | |
return didComplete; | |
} | |
/// <summary> | |
/// Marks the waiter in a hard failure state. | |
/// </summary> | |
public bool MarkHardFailure(Exception ex) | |
{ | |
var didComplete = lockChannel.Writer.TryComplete(ex); | |
lockChannel.Reader.TryRead(out _); | |
return didComplete; | |
} | |
} | |
/// <summary> | |
/// We use this because, unlike <see cref="SemaphoreSlim"/>, | |
/// A Bounded <see cref="Channel{T}"/> will avoid allocations | |
/// when there is only one waiter, and that is our common case. | |
/// </summary> | |
public class WaitOrCompletionLock | |
{ | |
private readonly Channel<int> lockChannel = Channel.CreateBounded<int>(1); | |
/// <summary> | |
/// If -true-, you can continue. | |
/// <para/> | |
/// If -false-, you are done. | |
/// <para/> | |
/// If Throws, a hard-fault was signalled. | |
/// <para/> | |
/// If you are using a retry loop on this, try to re-use a single waiter | |
/// Instead of calling it repeatedly. :) | |
/// </summary> | |
public async Task<bool> WaitForNext(CancellationToken token = default) | |
{ | |
return await lockChannel.Reader.WaitToReadAsync(token); | |
} | |
/// <summary> | |
/// Tries to set the waiter as no longer waiting. | |
/// </summary> | |
/// <returns>If -false-, the waiter was already set to not-waiting</returns> | |
public bool TrySetNotWaiting() | |
{ | |
return lockChannel.Writer.TryWrite(0); | |
} | |
/// <summary> | |
/// Tries to set the waiter as waiting. | |
/// </summary> | |
/// <returns>If -false-, the waiter was already set as waiting</returns> | |
public bool TrySetWaiting() | |
{ | |
return lockChannel.Reader.TryRead(out _); | |
} | |
/// <summary> | |
/// Marks the waiter as -completed-, i.e. no further work should be done. | |
/// </summary> | |
/// <returns></returns> | |
public bool MarkCompleted() | |
{ | |
var didComplete = lockChannel.Writer.TryComplete(); | |
lockChannel.Reader.TryRead(out _); | |
return didComplete; | |
} | |
/// <summary> | |
/// Marks the waiter in a hard failure state. | |
/// </summary> | |
public bool MarkHardFailure(Exception ex) | |
{ | |
var didComplete = lockChannel.Writer.TryComplete(ex); | |
lockChannel.Reader.TryRead(out _); | |
return didComplete; | |
} | |
} | |
public class ReplicationSerializer | |
{ | |
public static readonly ReplicationSerializer Instance = | |
new ReplicationSerializer(); | |
public BaseTaskEvent GetTaskEvent(byte[] bytes) | |
{ | |
return ChatTaskClientSerializer<BaseTaskEvent>.Default.Deserialize( | |
new ReadOnlySequence<byte>(bytes))!; | |
} | |
public byte[] SerializeTaskEvent(BaseTaskEvent taskEvt) | |
{ | |
using (var apbw = new ArrayPoolBufferWriter<byte>()) | |
{ | |
ChatTaskClientSerializer<BaseTaskEvent>.Default.Serialize(apbw, | |
taskEvt); | |
return apbw.WrittenMemory.ToArray(); | |
} | |
} | |
public TaskSerializationState DeSerializeTaskSnapshot(byte[] bytes) | |
{ | |
return ChatTaskClientSerializer<TaskSerializationState>.Default.Deserialize( | |
new ReadOnlySequence<byte>(bytes))!; | |
} | |
public byte[] SerializeTaskSnapshot(TaskSerializationState snapState) | |
{ | |
using (var apbw = new ArrayPoolBufferWriter<byte>()) | |
{ | |
ChatTaskClientSerializer<TaskSerializationState>.Default.Serialize(apbw, | |
snapState); | |
return apbw.WrittenMemory.ToArray(); | |
} | |
} | |
} | |
public record TaskSerializationState( | |
HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>> Assigned, | |
HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>> Unassigned); | |
public interface ISnapshotAndEventConsumer | |
{ | |
IAsyncEnumerable<BaseTaskEvent> BeginNewEventConsumer( | |
CancellationToken token = default); | |
ValueTask ConsumeSnap(TaskSerializationState row); | |
ValueTask ConsumeEvent(BaseTaskEvent row); | |
} | |
public class DataConnectionFactory | |
{ | |
public DataConnection GetConnection() | |
{ | |
return null; | |
} | |
} | |
#endregion | |
public class ClientBootstrap | |
{ | |
public ClientBootstrap(NatsOpts opts = null) | |
{ | |
_opts = opts ?? NatsOpts.Default; | |
} | |
private NatsConnection _connection; | |
private readonly NatsOpts _opts; | |
public async ValueTask Startclient() | |
{ | |
//TODO: At some point, do jetstream instead. | |
// And after that, do snapshots via KV! | |
_connection = new NatsConnection(_opts); | |
await _connection.ConnectAsync(); | |
//var js = new NatsJSContext(_connection, new NatsJSOpts(NatsOpts.Default)); | |
//new NatsKVContext() | |
} | |
public async ValueTask SendMessage(ISubjectFactory command) | |
{ | |
switch (command) | |
{ | |
case IChatTaskClient ctc: | |
await _connection.PublishAsync(command.GetSubject(), ctc, | |
serializer: ChatTaskClientSerializer<IChatTaskClient> | |
.Default); | |
break; | |
case BaseTaskEvent te: | |
await _connection.PublishAsync(command.GetSubject(), te, | |
serializer: ChatTaskClientSerializer<BaseTaskEvent>.Default); | |
break; | |
} | |
} | |
public async ValueTask<INatsSub<T>> SubChat<T>(string subject, | |
CancellationToken token) where T : IChatTaskClient | |
{ | |
return await _connection.SubscribeCoreAsync<T>(subject, | |
null, ChatTaskClientSerializer<T>.Default, default, token); | |
} | |
public async ValueTask<INatsSub<T>> Sub<T>(string subject, | |
CancellationToken token) where T : BaseTaskEvent | |
{ | |
return await _connection.SubscribeCoreAsync<T>(subject, | |
null, ChatTaskClientSerializer<T>.Default, default, token); | |
} | |
} | |
public record ChatHistory(Lst<BroadcastMessage> Messages) | |
{ | |
public ChatHistory AddMsg(BroadcastMessage message) | |
{ | |
if (Messages.Contains(message)) | |
{ | |
return this; | |
} | |
return this with { Messages = Messages.Add(message) }; | |
} | |
} | |
public record ReplicatorStateProjectionData(HashMap<Ulid, Lst<BaseTaskEvent>> Map); | |
public record ReplicatorStateProjectionSet( | |
ReplicatorStateProjectionData Active); | |
public class ReplicatorSnapshotTracker | |
{ | |
private HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>> _tasks; | |
private HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>> _assigned; | |
private HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>> _unassigned; | |
public async ValueTask ConsumeSnap(TaskSerializationState row) | |
{ | |
_tasks = row.Assigned.Union(row.Unassigned, (a, b) => b, (a, b) => b, | |
(k, a, b) => a.Union(b)); | |
} | |
public async ValueTask ConsumeEvent(BaseTaskEvent row) | |
{ | |
if (row is TaskCompleted) | |
{ | |
RemoveFromSets(); | |
} | |
else if (row is TaskCanceled) | |
{ | |
RemoveFromSets(); | |
} | |
else | |
{ | |
_tasks = | |
_tasks.TrySetItem(row.TaskType, | |
(s) => s.TrySetItem(row.TaskId, a => a.Add(row))); | |
_assigned = _assigned.TrySetItem(row.TaskType, | |
(s) => s.TrySetItem(row.TaskId, a => a.Add(row))); | |
_unassigned = _unassigned.TrySetItem(row.TaskType, | |
(s) => s.TrySetItem(row.TaskId, a => a.Add(row))); | |
} | |
void RemoveFromSets() | |
{ | |
_tasks = | |
_tasks.TrySetItem(row.TaskType, (s) => s.Remove(row.TaskId)); | |
_assigned = _assigned.TrySetItem(row.TaskType, (s) => s.Remove(row.TaskId)); | |
_unassigned = | |
_unassigned.TrySetItem(row.TaskType, (s) => s.Remove(row.TaskId)); | |
} | |
} | |
public TaskSerializationState GetSnapState() | |
{ | |
return new TaskSerializationState(_assigned, _unassigned); | |
} | |
} | |
/// <summary> | |
/// | |
/// </summary> | |
/// <remarks> | |
/// Ok. So the way this works, is we use a series of <see cref="Atom{A}"/> | |
/// Hashmaps of Maps. Atom lets us Swap via interlocked, | |
/// such that if the values being swapped are 'comparable' and the actions are | |
/// 'safely repeatable', it can repeat the action if a concurrent update happens | |
/// until the interlocked operation succeeds. | |
/// </remarks> | |
public class TaskSetCoordinatorHashMap : ISnapshotAndEventConsumer | |
{ | |
public string UserId { get; } | |
public TaskSetCoordinatorHashMap(string userId, | |
ClientBootstrap clientBootstrap, bool trackChanges) | |
{ | |
UserId = userId; | |
_client = clientBootstrap; | |
_trackChanges = trackChanges; | |
_assignedTasks = Atom(HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>()); | |
_requestedTasks = Atom(HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>()); | |
_unassignedTasks = Atom(HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>()); | |
} | |
private readonly Atom<HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>> _assignedTasks; | |
private readonly Atom<HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>> _requestedTasks; | |
private readonly Atom<HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>> _unassignedTasks; | |
private readonly ClientBootstrap _client; | |
private Task _mainTask; | |
public async Task<Ulid> CreateTask(string taskType, string text, | |
string? assignedToUser, DateTimeOffset expectedBy) | |
{ | |
var taskId = Ulid.NewUlid(); | |
await _client.SendMessage(new TaskRequest(Ulid.NewUlid(), UserId, assignedToUser, | |
taskId, taskType, text, DateTimeOffset.UtcNow, | |
expectedBy)); | |
return taskId; | |
} | |
public async ValueTask AddTaskNote(TaskRequest origReq, | |
string notes) | |
{ | |
await _client.SendMessage(new TaskUpdate(Ulid.NewUlid(),origReq.TaskId, | |
origReq.TaskType, UserId, notes)); | |
} | |
public async ValueTask<bool> AddTaskCanceled(TaskRequest origReq, string notes) | |
{ | |
if (origReq.RequestingUserId == UserId) | |
{ | |
await _client.SendMessage(new TaskCanceled(Ulid.NewUlid(),origReq.TaskId, | |
origReq.TaskType, UserId, notes, DateTimeOffset.Now)); | |
return true; | |
} | |
return false; | |
} | |
public async ValueTask<ReassignResult> ReassignTask(Ulid taskId, | |
string newUser, string notes) | |
{ | |
bool foundTask = false; | |
foreach (var requestedTask in _assignedTasks.Value) | |
{ | |
var item = | |
requestedTask.Value.Find(taskId, a => a, | |
static () => Lst<BaseTaskEvent>.Empty); | |
if (item != default) | |
{ | |
foundTask = true; | |
await _client.SendMessage(new TaskAssigned(Ulid.NewUlid(),taskId, newUser, | |
requestedTask.Key, UserId, notes, item, | |
DateTimeOffset.Now)); | |
//_assignedTasks.AddOrUpdate(requestedTask.Key, | |
// e => e.Remove(taskId), | |
// () => HashMap<Ulid, Lst<BaseTaskEvent>>.Empty); | |
return new ReassignResult(taskId, true, "Reassigned"); | |
} | |
} | |
foreach (var requestedTask in _unassignedTasks.Value) | |
{ | |
var item = | |
requestedTask.Value.Find(taskId, a => a, | |
static () => Lst<BaseTaskEvent>.Empty); | |
if (item != default) | |
{ | |
foundTask = true; | |
await _client.SendMessage(new TaskAssigned(Ulid.NewUlid(),taskId, newUser, | |
requestedTask.Key, UserId, notes, item, | |
DateTimeOffset.Now)); | |
//_assignedTasks.AddOrUpdate(requestedTask.Key, | |
// e => e.Remove(taskId), | |
// () => HashMap<Ulid, Lst<BaseTaskEvent>>.Empty); | |
return new ReassignResult(taskId, true, "Reassigned"); | |
} | |
} | |
return new ReassignResult(taskId, false, "Could not find task!"); | |
} | |
public async ValueTask Init(TaskCompletionSource initLock, | |
CancellationToken token) | |
{ | |
var userEvents = | |
await _client.Sub<BaseTaskEvent>( | |
SubjectHelpers.TaskUserSub(UserId), token); | |
CancellationTokenSource cts = null; | |
_mainTask = Task.Run(async () => | |
{ | |
await initLock.Task; | |
try | |
{ | |
await foreach (var entry in userEvents.Msgs.ReadAllAsync( | |
token)) | |
{ | |
try | |
{ | |
RunEventHandlers(entry); | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
throw; | |
} | |
} | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
throw; | |
} | |
finally | |
{ | |
try | |
{ | |
cts?.Cancel(false); | |
} | |
finally | |
{ | |
try | |
{ | |
cts?.Dispose(); | |
} | |
catch | |
{ | |
// ignored | |
} | |
} | |
} | |
}); | |
} | |
private void RunEventHandlers(NatsMsg<BaseTaskEvent> entry) | |
{ | |
ProcessTaskEventSwitch(entry.Data); | |
} | |
private void ProcessTaskEventSwitch(BaseTaskEvent? msg) | |
{ | |
switch (msg) | |
{ | |
case TaskAssigned ta: | |
{ | |
HandleTaskAssigned(ta); | |
break; | |
} | |
case TaskBegan tb: | |
{ | |
HandleTaskBegan(tb); | |
break; | |
} | |
case TaskCompleted tc: | |
{ | |
HandleTaskCompleted(tc); | |
break; | |
} | |
case TaskRequest tr: | |
{ | |
HandleTaskRequest(tr); | |
break; | |
} | |
case TaskUpdate tu: | |
{ | |
HandleTaskUpdate(tu); | |
break; | |
} | |
case TaskCanceled tcan: | |
{ | |
HandleTaskCanceled(tcan); | |
break; | |
} | |
} | |
} | |
public async ValueTask<ClearResult> ClearItem(Ulid taskId) | |
{ | |
bool? foundAndCleared = null; | |
_requestedTasks.Swap(thm => | |
{ | |
var innerThm = thm; | |
foreach (var valueTuple in thm) | |
{ | |
valueTuple.Value.Find(taskId, s => | |
{ | |
var f = s.Head(); | |
var c = s.Reverse().FirstOrDefault(); | |
if (c is TaskCanceled or TaskCompleted || | |
f is not TaskRequest) | |
{ | |
innerThm = innerThm.AddOrUpdate(valueTuple.Key, | |
thmi => thmi.Remove(taskId), | |
() => LanguageExt.HashMap<Ulid, Lst<BaseTaskEvent>>.Empty); | |
} | |
return true; | |
}, () => false); | |
} | |
return innerThm; | |
}); | |
_requestedTasks.Swap(thm => | |
{ | |
var innerThm = thm; | |
foreach (var valueTuple in thm) | |
{ | |
valueTuple.Value.Find(taskId, s => | |
{ | |
var f = s.Head(); | |
var c = s.Reverse().FirstOrDefault(); | |
if (c is TaskCanceled or TaskCompleted || | |
f is not TaskRequest) | |
{ | |
innerThm = innerThm.AddOrUpdate(valueTuple.Key, | |
thmi => thmi.Remove(taskId), | |
() => LanguageExt.HashMap<Ulid, Lst<BaseTaskEvent>>.Empty); | |
} | |
return true; | |
}, () => false); | |
} | |
return innerThm; | |
}); | |
_unassignedTasks.Swap(thm => | |
{ | |
var innerThm = thm; | |
foreach (var valueTuple in thm) | |
{ | |
valueTuple.Value.Find(taskId, s => | |
{ | |
var f = s.Head(); | |
var c = s.Reverse().FirstOrDefault(); | |
if (c is TaskCanceled or TaskCompleted || | |
f is not TaskRequest) | |
{ | |
innerThm = innerThm.AddOrUpdate(valueTuple.Key, | |
thmi => thmi.Remove(taskId), | |
() => LanguageExt.HashMap<Ulid, Lst<BaseTaskEvent>>.Empty); | |
} | |
return true; | |
}, () => false); | |
} | |
return innerThm; | |
}); | |
if (foundAndCleared.HasValue == false) | |
{ | |
return new ClearResult(true, "no instances found"); | |
} | |
else | |
{ | |
if (foundAndCleared.Value) | |
{ | |
return new ClearResult(true, "cleared"); | |
} | |
else | |
{ | |
return new ClearResult(false, "Tasks not in closed state"); | |
} | |
} | |
} | |
private void HandleTaskRequest(TaskRequest trb) | |
{ | |
void doSwap( | |
Atom<HashMap<string, | |
HashMap<Ulid, Lst<BaseTaskEvent>>>> taskSet, | |
TaskRequest taskRequest) | |
{ | |
taskSet.Swap(taskRequest, (tr,b)=>b.AddOrUpdate(tr.TaskType, | |
(hm) => hm.TryAdd(tr.TaskId, Lst<BaseTaskEvent>.Empty.Add(tr)), | |
() => LanguageExt.HashMap<Ulid, Lst<BaseTaskEvent>>.Empty.Add(tr.TaskId, | |
Lst<BaseTaskEvent>.Empty.Add(tr)))); | |
} | |
if (trb.RequestingUserId == UserId) | |
{ | |
doSwap(_requestedTasks, trb); | |
} | |
if (trb.AssignedUserId == UserId) | |
{ | |
doSwap(_assignedTasks,trb); | |
} | |
else if (trb.AssignedUserId == "unassigned") | |
{ | |
doSwap(_unassignedTasks,trb); | |
} | |
} | |
private static void TUSwap( | |
Atom<HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>> | |
atom, TaskUpdate tu) | |
{ | |
atom.Value.Find(tu.TaskType, hm => | |
{ | |
atom.Swap(b=>b.AddOrUpdate(tu.TaskType, | |
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu), | |
() => Lst<BaseTaskEvent>.Empty.Add(tu)), | |
() => LanguageExt.HashMap<Ulid, Lst<BaseTaskEvent>>.Empty)); | |
return Unit.Default; | |
}, () => Unit.Default); | |
} | |
private static void TCanSwap( | |
Atom<HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>> | |
atom, TaskCanceled tu) | |
{ | |
atom.Value.Find(tu.TaskType, hm => | |
{ | |
atom.Swap(b=>b.AddOrUpdate(tu.TaskType, | |
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu), | |
() => Lst<BaseTaskEvent>.Empty.Add(tu)), | |
() => LanguageExt.HashMap<Ulid, Lst<BaseTaskEvent>>.Empty)); | |
return Unit.Default; | |
}, () => Unit.Default); | |
} | |
private static void TComSwap( | |
Atom<HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>>> | |
atom, TaskCompleted tu) | |
{ | |
atom.Value.Find(tu.TaskType, hm => | |
{ | |
atom.Swap(b=>b.AddOrUpdate(tu.TaskType, | |
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu), | |
() => Lst<BaseTaskEvent>.Empty.Add(tu)), | |
() => LanguageExt.HashMap<Ulid, Lst<BaseTaskEvent>>.Empty)); | |
return Unit.Default; | |
}, () => Unit.Default); | |
} | |
private readonly Channel<bool> _changeLocker = | |
Channel.CreateBounded<bool>(1); | |
private readonly bool _trackChanges; | |
public TaskState GetTaskState() | |
{ | |
return new( | |
_assignedTasks.Value.Select(a => (a.Key, a.Value.ToHashMap())) | |
.ToHashMap(), | |
_requestedTasks.Value.Select(a => (a.Key, a.Value.ToHashMap())) | |
.ToHashMap(), | |
_unassignedTasks.Value.Select(a => (a.Key, a.Value.ToHashMap())) | |
.ToHashMap()); | |
} | |
[MessagePackObject(keyAsPropertyName:true)] | |
public record TaskState( | |
HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>> AssignedTasks, | |
HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>> RequestedTasks, | |
HashMap<string, HashMap<Ulid, Lst<BaseTaskEvent>>> UnassignedTasks); | |
private void Snap() | |
{ | |
var oldState = _oldTaskState; | |
var newState = GetTaskState(); | |
} | |
private TaskState _oldTaskState; | |
private static ( | |
HashMap<TKeyOuter, Change<TrackingHashMap<TKeyInner, Lst<TLst>>>> chg, | |
TrackingHashMap<TKeyOuter, TrackingHashMap<TKeyInner, Lst<TLst>>> thm, | |
HashMap<TKeyOuter, HashMap<TKeyInner, Change<Lst<TLst>>>> innerChg) | |
MakeSnapshot<TKeyOuter,TKeyInner,TLst>( | |
TrackingHashMap<TKeyOuter, TrackingHashMap<TKeyInner, Lst<TLst>>> thm) | |
{ | |
//TODO: This is trasshhhhhhh. | |
var chg = thm.Changes; | |
var preMap = thm.Map(a => | |
{ | |
var (k, v) = a; | |
var c = v.Changes; | |
return (k, (c, v.Snapshot())); | |
}).ToHashMap(); | |
thm = thm.SetItems(preMap.Select(a => (a.Item1, a.Item2.Item2))); | |
var innerChg = preMap.Select(a => (a.Item1, a.Item2.Item1)) | |
.ToHashMap(); | |
return (chg, thm, innerChg); | |
} | |
private void HandleTaskUpdate(TaskUpdate tu) | |
{ | |
TUSwap(_assignedTasks,tu); | |
TUSwap(_requestedTasks,tu); | |
TUSwap(_unassignedTasks,tu); | |
} | |
private void HandleTaskCanceled(TaskCanceled taskCanceled) | |
{ | |
TCanSwap(_assignedTasks,taskCanceled); | |
TCanSwap(_requestedTasks,taskCanceled); | |
TCanSwap(_unassignedTasks,taskCanceled); | |
} | |
private void HandleTaskCompleted(TaskCompleted taskCompleted) | |
{ | |
TComSwap(_assignedTasks, taskCompleted); | |
TComSwap(_unassignedTasks, taskCompleted); | |
TComSwap(_requestedTasks, taskCompleted); | |
} | |
private void HandleTaskAssigned(TaskAssigned ta) | |
{ | |
if (ta.UserId == UserId) | |
{ | |
_assignedTasks.Swap(ta, (tai, thm) => | |
{ | |
return thm.AddOrUpdate(tai.TaskType, tai.TaskId, | |
e => e.Add(tai), | |
() => Lst<BaseTaskEvent>.Empty.Add(tai)); | |
}); | |
} | |
else | |
{ | |
_assignedTasks.Swap(ta, (tai, thm) => | |
{ | |
return thm.Remove(tai.TaskType, tai.TaskId); | |
}); | |
} | |
} | |
private void HandleTaskBegan(TaskBegan tb) | |
{ | |
_requestedTasks.Swap(tb, (tbi, thm) => | |
{ | |
return thm.AddOrUpdate(tbi.TaskType, tbi.TaskId, t => | |
t.Add(tbi), () => | |
Lst<BaseTaskEvent>.Empty.Add(tbi)); | |
}); | |
} | |
public async IAsyncEnumerable<BaseTaskEvent> BeginNewEventConsumer(CancellationToken token = default) | |
{ | |
await using (var userEvents = | |
await _client.Sub<BaseTaskEvent>( | |
SubjectHelpers.TaskUserSub(UserId), token)) | |
{ | |
await foreach (var msg in userEvents.Msgs.ReadAllAsync(token) | |
.WithCancellation(token)) | |
{ | |
yield return msg.Data!; | |
} | |
} | |
} | |
public async ValueTask ConsumeSnap(TaskSerializationState row) | |
{ | |
throw new NotImplementedException(); | |
} | |
public async ValueTask ConsumeEvent(BaseTaskEvent row) | |
{ | |
this.ProcessTaskEventSwitch(row); | |
} | |
} | |
[MessagePackObject(keyAsPropertyName:true)] | |
public record ReassignResult(Ulid TaskId, bool Success, string Message); | |
[MessagePackObject(keyAsPropertyName:true)] | |
public record ClearResult(bool Success, string? Reason); | |
public record DmConversationSet(HashMap<string, Lst<DirectMessage>> User) | |
{ | |
public DmConversationSet AddMsg(string arg1FromUserId, | |
DirectMessage directMessage) | |
{ | |
return this with | |
{ | |
User = User.AddOrUpdate(arg1FromUserId, | |
(a) => a.Contains(directMessage) ? a : a.Add(directMessage), | |
() => new Lst<DirectMessage>().Add(directMessage)) | |
}; | |
} | |
public DmConversationSet AddOrMarkMessageReceived(string arg1ToUserId, | |
DirectMessage directMessage) | |
{ | |
return AddMsg(arg1ToUserId, directMessage); | |
} | |
} | |
public class ChatTaskClientSerializer<T> : INatsSerializer<T> | |
{ | |
public static readonly ChatTaskClientSerializer<T> Default = | |
new ChatTaskClientSerializer<T>(); | |
private static readonly IFormatterResolver DefaultFormatterResolver = | |
CompositeResolver.Create(new[] | |
{ | |
UlidMessagePackResolver.Instance, | |
MessagePackSerializerOptions.Standard.Resolver | |
}); | |
public static readonly MessagePackSerializerOptions DefaultSerializerOptions = | |
MessagePackSerializerOptions.Standard.WithResolver( | |
DefaultFormatterResolver); | |
public void Serialize(IBufferWriter<byte> bufferWriter, T value) | |
{ | |
MessagePackSerializer.Serialize(bufferWriter, value, | |
DefaultSerializerOptions); | |
} | |
public T? Deserialize(in ReadOnlySequence<byte> buffer) | |
{ | |
return MessagePackSerializer.Deserialize<T>(buffer, | |
DefaultSerializerOptions); | |
} | |
} | |
public interface ISubjectFactory | |
{ | |
string GetSubject(); | |
} | |
[Union(2, typeof(TaskRequest))] | |
[Union(3, typeof(TaskCompleted))] | |
[Union(4, typeof(TaskBegan))] | |
[Union(5, typeof(TaskUpdate))] | |
[Union(6, typeof(TaskAssigned))] | |
[Union(7, typeof(TaskCanceled))] | |
[MessagePackObject(keyAsPropertyName:true)] | |
public abstract record BaseTaskEvent(Ulid EventId, Ulid TaskId, string TaskType) | |
: ISubjectFactory, IEquatable<BaseTaskEvent> | |
{ | |
bool IEquatable<BaseTaskEvent>.Equals(BaseTaskEvent? other) | |
{ | |
return other != null && other.EventId == EventId; | |
} | |
public abstract string GetSubject(); | |
} | |
[Union(0, typeof(BroadcastMessage))] | |
[Union(1, typeof(DirectMessage))] | |
public interface IChatTaskClient : ISubjectFactory | |
{ | |
} | |
[MessagePackObject(true)] | |
public record BroadcastMessage( | |
string FromUserId, | |
string Text) : IChatTaskClient | |
{ | |
public DateTimeOffset TimeSent { get; set; } | |
public string GetSubject() | |
{ | |
return SubjectHelpers.BroadcastSubject; | |
} | |
} | |
public static class SubjectHelpers | |
{ | |
public static string BroadcastSubject => | |
$"sample.chat.message.broadcast"; | |
public static string DirectMessage(string from, string to) => | |
$"sample.chat.message.direct.{from}.{to}"; | |
public static string | |
Tasks(string scope, string type, Ulid taskId, string requested, | |
string assigned) => | |
$"sample.task.{scope}.{type},{taskId}.{requested}.{assigned}"; | |
public static string Tasks(string scope, string type, Ulid taskId, | |
string doneByUser) | |
{ | |
return $"sample.task.{scope}.{type},{taskId}.{doneByUser}"; | |
} | |
public static string TaskUserSub(string userId) | |
{ | |
return $"sample.task.>"; | |
} | |
public static string TaskReplication => $"sample.task.>"; | |
} | |
[MessagePackObject(true)] | |
public record DirectMessage( | |
string FromUserId, | |
string ToUserId, | |
string Text) : IChatTaskClient | |
{ | |
public string GetSubject() | |
{ | |
return SubjectHelpers.DirectMessage(FromUserId, ToUserId); | |
} | |
} | |
[MessagePackObject(true)] | |
public record TaskRequest( | |
Ulid EventId, | |
string RequestingUserId, | |
string? AssignedUserId, | |
Ulid TaskId, | |
string TaskType, | |
string Text, | |
DateTimeOffset RequestedAt, | |
DateTimeOffset ExpectedBy) : BaseTaskEvent(EventId,TaskId,TaskType) | |
{ | |
public override string GetSubject() | |
{ | |
return SubjectHelpers.Tasks(TaskScopeSubjects.Request, TaskType, TaskId, | |
RequestingUserId, | |
AssignedUserId != null ? AssignedUserId : "unassigned"); | |
} | |
} | |
[MessagePackObject(true)] | |
public record TaskCanceled( | |
Ulid EventId, | |
Ulid TaskId, | |
string TaskType, | |
string CanceledByUser, | |
string CancelReason, | |
DateTimeOffset CancelTime) : BaseTaskEvent(EventId,TaskId,TaskType) | |
{ | |
public override string GetSubject() | |
{ | |
return SubjectHelpers.Tasks(TaskScopeSubjects.Canceled, TaskType, TaskId, | |
CanceledByUser); | |
} | |
} | |
[MessagePackObject(true)] | |
public record TaskBegan( | |
Ulid EventId, | |
Ulid TaskId, | |
string TaskType, | |
string UserId, | |
string Text, | |
DateTimeOffset BeginTime) : BaseTaskEvent(EventId, TaskId, TaskType) | |
{ | |
public override string GetSubject() | |
{ | |
return SubjectHelpers.Tasks(TaskScopeSubjects.Begun, TaskType, TaskId, UserId); | |
} | |
} | |
[MessagePackObject(true)] | |
public record TaskUpdate( | |
Ulid EventId, | |
Ulid TaskId, | |
string TaskType, | |
string UserId, | |
string Notes) : BaseTaskEvent(EventId,TaskId,TaskType) | |
{ | |
public override string GetSubject() | |
{ | |
return SubjectHelpers.Tasks(TaskScopeSubjects.Update, TaskType, TaskId, UserId); | |
} | |
} | |
[MessagePackObject(true)] | |
public record TaskCompleted( | |
Ulid EventId, | |
Ulid TaskId, | |
string TaskType, | |
string UserId, | |
string Notes, | |
DateTimeOffset CompletedTime) : BaseTaskEvent(EventId,TaskId,TaskType) | |
{ | |
public override string GetSubject() | |
{ | |
return SubjectHelpers.Tasks(TaskScopeSubjects.Completed, TaskType, TaskId, UserId); | |
} | |
} | |
public static class TaskScopeSubjects | |
{ | |
public static readonly string Completed = "completed"; | |
public static readonly string Assigned = "assigned"; | |
public static readonly string Begun = "begun"; | |
public static readonly string Update = "update"; | |
public static readonly string Canceled = "canceled"; | |
public static readonly string Request = "request"; | |
} | |
[MessagePackObject(true)] | |
public record TaskAssigned( | |
Ulid EventId, | |
Ulid TaskId, | |
string TaskType, | |
string UserId, | |
string AssignedByUserId, | |
string Notes, | |
Lst<BaseTaskEvent> History, | |
DateTimeOffset AssignedTime | |
) : BaseTaskEvent(EventId, TaskId, TaskType) | |
{ | |
public override string GetSubject() | |
{ | |
return SubjectHelpers.Tasks(TaskScopeSubjects.Assigned, TaskType, TaskId, UserId, | |
AssignedByUserId); | |
} | |
} | |
public class ClientCoordinator | |
{ | |
public string UserId { get; } | |
private readonly Atom<ChatHistory> _broadcastHistory; | |
private readonly Atom<DmConversationSet> _directMessageHistory; | |
private readonly ClientBootstrap _client; | |
public ClientCoordinator(string userId, ClientBootstrap bootstrap) | |
{ | |
UserId = userId; | |
_client = bootstrap; | |
// OK so the way Atoms work, is they use a 'swap' function, | |
// and then check for equality/etc. | |
// If your type does equality (i.e. records) | |
// and your underlying types are immutable, | |
// and your actions are able to be applied... | |
// magic! in case of a conflict, the operation is re-attempted, | |
// all in a thread safe fashion (as long as your swap is thread safe) | |
_broadcastHistory = | |
Atom(new ChatHistory(default)); | |
_directMessageHistory = | |
Atom(new DmConversationSet(LanguageExt | |
.HashMap<string, Lst<DirectMessage>>.Empty)); | |
} | |
public async ValueTask Begin(CancellationToken token) | |
{ | |
var (broadcastSub, dmSub, outboundDmSub) = await ValueTaskEx.WhenAll( | |
_client.SubChat<BroadcastMessage>( | |
SubjectHelpers.BroadcastSubject, | |
token), | |
_client.SubChat<DirectMessage>( | |
SubjectHelpers.DirectMessage("*",UserId), | |
token), | |
_client.SubChat<DirectMessage>( | |
SubjectHelpers.DirectMessage(UserId,"*"), token) | |
); | |
Task.Run(async () => | |
{ | |
try | |
{ | |
await foreach (var reader in broadcastSub.Msgs.ReadAllAsync( | |
token)) | |
{ | |
_broadcastHistory.Swap(reader.Data!, | |
(m, h) => h.AddMsg(m)); | |
} | |
} | |
catch | |
{ | |
} | |
}); | |
Task.Run(async () => | |
{ | |
try | |
{ | |
await foreach | |
(var reader in outboundDmSub.Msgs.ReadAllAsync( | |
token)) | |
{ | |
_directMessageHistory.Swap(reader.Data!, | |
(m, h) => | |
{ | |
return h.AddOrMarkMessageReceived(m.ToUserId, | |
m); | |
}); | |
} | |
} | |
catch | |
{ | |
} | |
}); | |
Task.Run(async () => | |
{ | |
try | |
{ | |
await foreach (var reader in dmSub.Msgs.ReadAllAsync( | |
token)) | |
{ | |
_directMessageHistory.Swap(reader.Data!, UserId, | |
(m, h, v) => | |
{ | |
if (m.FromUserId != h) | |
{ | |
return v.AddMsg(m.FromUserId, m); | |
} | |
else | |
{ | |
return v.AddMsg(m.ToUserId, m); | |
} | |
}); | |
} | |
} | |
catch | |
{ | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment