Skip to content

Instantly share code, notes, and snippets.

@to11mtm
Last active April 8, 2024 00:01
Show Gist options
  • Save to11mtm/4e73aab45a2d43c462a830de2c58da02 to your computer and use it in GitHub Desktop.
Save to11mtm/4e73aab45a2d43c462a830de2c58da02 to your computer and use it in GitHub Desktop.
Nats Sandbox Task tracking Logic, Round 2
using System.Buffers;
using System.Collections.Concurrent;
using System.Net.Sockets;
using System.Threading.Channels;
using Cysharp.Serialization.MessagePack;
using LanguageExt;
using LinqToDB;
using LinqToDB.Data;
using MessagePack;
using MessagePack.Resolvers;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.KeyValueStore;
using ValueTaskSupplement;
using static LanguageExt.Prelude;
using CancellationTokenSource = System.Threading.CancellationTokenSource;
using Seq = LanguageExt.Seq;
using Ulid = System.Ulid;
namespace NatsSandBox.Client;
public class ServerReplicationRow
{
public string Category { get; set; }
public string TaskType { get; set; }
public Ulid EventId { get; set; }
public Ulid TaskId { get; set; }
public byte[] TaskEventData { get; set; }
public string OriginalSubject { get; set; }
public long Ordering { get; set; }
}
public class ServerReplicationSnapshotRow
{
public long LastOrdering { get; set; }
public byte[] TaskSnapshot { get; set; }
}
public class ServerReplicator
{
private readonly ClientBootstrap _client;
private readonly DataConnectionFactory _ctxFactory;
public ServerReplicator(ClientBootstrap bootstrap, DataConnectionFactory dataConnectionFactory)
{
_client = bootstrap;
_ctxFactory = dataConnectionFactory;
}
private async ValueTask WriteTasks(List<ServerReplicationRow> rows)
{
using (var ctx = _ctxFactory.GetConnection())
{
using (var tx = await ctx.BeginTransactionAsync())
{
await ctx.GetTable<ServerReplicationRow>()
.BulkCopyAsync(rows);
await tx.CommitAsync();
}
}
}
private readonly Channel<List<ServerReplicationRow>> _serverRows = Channel.CreateUnbounded<List<ServerReplicationRow>>();
public async ValueTask StartReplicator(TaskSetCoordinator coordinator, CancellationToken token)
{
Task _writeLoop = Task.Run(async () =>
{
var pending = 500;
List<ServerReplicationRow> pendingSet =
new List<ServerReplicationRow>(512);
while (await _serverRows.Reader.WaitToReadAsync())
{
while (_serverRows.Reader.TryRead(out var a))
{
pendingSet.AddRange(a);
pending = pending - a.Count;
if (pending - 100 < 0)
{
await WriteTasks(pendingSet);
pendingSet.Clear();
}
}
}
});
await using (var sub =
await _client.Sub<ITaskEvent>(SubjectHelpers.TaskReplication,
token))
{
const int maxTillSnap = 2000;
var tillSnap = maxTillSnap;
var thisList = new List<ServerReplicationRow>();
while (token.IsCancellationRequested == false)
{
const int maxTillFlush = 100;
var tillFlush = maxTillFlush;
while (sub.Msgs.TryRead(out var msg))
{
thisList.Add(ToReplicationRow(msg));
tillFlush--;
if (tillFlush == 0)
{
_serverRows.Writer.TryWrite(thisList);
thisList = new List<ServerReplicationRow>();
}
}
if (tillFlush != maxTillFlush)
{
_serverRows.Writer.TryWrite(thisList);
thisList = new List<ServerReplicationRow>();
}
if (await sub.Msgs.WaitToReadAsync(token) == false)
{
return;
}
}
if (thisList.Count > 0)
{
_serverRows.Writer.TryWrite(thisList);
}
}
}
private ServerReplicationRow ToReplicationRow(NatsMsg<ITaskEvent> item)
{
return default;
}
public async ValueTask WriteSnapshot(ServerReplicationSnapshotRow row)
{
using (var ctx = _ctxFactory.GetConnection())
{
await ctx.GetTable<ServerReplicationSnapshotRow>()
.InsertOrUpdateAsync(() => new ServerReplicationSnapshotRow()
{
LastOrdering = row.LastOrdering,
TaskSnapshot = row.TaskSnapshot
}, e => new ServerReplicationSnapshotRow()
{
TaskSnapshot = row.TaskSnapshot
},
() => new ServerReplicationSnapshotRow()
{ LastOrdering = row.LastOrdering });
}
}
public async ValueTask<ServerReplicationSnapshotRow?> GetNewestSnapshot()
{
using (var ctx = _ctxFactory.GetConnection())
{
var row = await ctx.GetTable<ServerReplicationSnapshotRow>()
.OrderByDescending(a => a.LastOrdering).FirstOrDefaultAsync();
return row;
}
}
public async IAsyncEnumerable<ServerReplicationRow> BeginReadAtOrdering(long ordering)
{
long? curr = ordering;
List<ServerReplicationRow> currSet = null;
while (curr != null)
{
using (var ctx = _ctxFactory.GetConnection())
{
currSet = await ctx.GetTable<ServerReplicationRow>().Where(a => a.Ordering >= curr)
.OrderBy(a=>a.Ordering)
.Take(100)
.ToListAsync();
if (currSet.Count < 100)
{
curr = null;
}
else
{
curr = currSet.LastOrDefault()?.Ordering;
}
}
foreach (var row in currSet)
{
yield return row;
}
}
}
}
public class DataConnectionFactory
{
public DataConnection GetConnection()
{
return null;
}
}
public class ClientBootstrap
{
public ClientBootstrap(NatsOpts opts = null)
{
_opts = opts ?? NatsOpts.Default;
}
private NatsConnection _connection;
private readonly NatsOpts _opts;
public void Startclient()
{
//TODO: At some point, do jetstream instead.
// And after that, do snapshots via KV!
_connection = new NatsConnection(_opts);
//var js = new NatsJSContext(_connection, new NatsJSOpts(NatsOpts.Default));
//new NatsKVContext()
}
public async ValueTask SendMessage(IChatTaskClient command)
{
await _connection.PublishAsync(command.GetSubject(), command,
serializer: ChatTaskClientSerializer<IChatTaskClient>.Default);
}
public async ValueTask<INatsSub<T>> Sub<T>(string subject,
CancellationToken token) where T : IChatTaskClient
{
return await _connection.SubscribeCoreAsync<T>(subject,
null, ChatTaskClientSerializer<T>.Default, default, token);
}
}
public record ChatHistory(Lst<BroadcastMessage> Messages)
{
public ChatHistory AddMsg(BroadcastMessage message)
{
if (Messages.Contains(message))
{
return this;
}
return this with { Messages = Messages.Add(message) };
}
}
public enum TaskCategory
{
Assigned,
Requested,
Unassigned
}
public record TaskChangeEvent(
TaskCategory Category,
Ulid TaskId,
ITaskEvent Event);
public class TaskSetCoordinatorHashMap
{
public string UserId { get; }
public TaskSetCoordinatorHashMap(string userId,
ClientBootstrap clientBootstrap, bool trackChanges)
{
UserId = userId;
_client = clientBootstrap;
_trackChanges = trackChanges;
_assignedTasks = Atom(HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>>());
_requestedTasks = Atom(HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>>());
_unassignedTasks = Atom(HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>>());
}
private readonly Atom<HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>>> _assignedTasks;
private readonly Atom<HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>>> _requestedTasks;
private Atom<HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>>> _unassignedTasks;
private readonly ClientBootstrap _client;
private Task _mainTask;
public async ValueTask CreateTask(string taskType, string text,
string? assignedToUser, DateTimeOffset expectedBy)
{
await _client.SendMessage(new TaskRequest(Ulid.NewUlid(), UserId, assignedToUser,
Ulid.NewUlid(), taskType, text, DateTimeOffset.UtcNow,
expectedBy));
}
public async ValueTask AddTaskNote(TaskRequest origReq,
string notes)
{
await _client.SendMessage(new TaskUpdate(Ulid.NewUlid(),origReq.TaskId,
origReq.TaskType, UserId, notes));
}
public async ValueTask<bool> AddTaskCanceled(TaskRequest origReq, string notes)
{
if (origReq.RequestingUserId == UserId)
{
await _client.SendMessage(new TaskCanceled(Ulid.NewUlid(),origReq.TaskId,
origReq.TaskType, UserId, notes, DateTimeOffset.Now));
return true;
}
return false;
}
public async ValueTask<ReassignResult> ReassignTask(Ulid taskId,
string newUser, string notes)
{
bool foundTask = false;
foreach (var requestedTask in _assignedTasks.Value)
{
var item =
requestedTask.Value.Find(taskId, a => a,
static () => Lst<ITaskEvent>.Empty);
if (item != default)
{
foundTask = true;
await _client.SendMessage(new TaskAssigned(Ulid.NewUlid(),taskId, newUser,
requestedTask.Key, UserId, notes, item,
DateTimeOffset.Now));
//_assignedTasks.AddOrUpdate(requestedTask.Key,
// e => e.Remove(taskId),
// () => HashMap<Ulid, Lst<ITaskEvent>>.Empty);
return new ReassignResult(taskId, true, "Reassigned");
}
}
foreach (var requestedTask in _unassignedTasks.Value)
{
var item =
requestedTask.Value.Find(taskId, a => a,
static () => Lst<ITaskEvent>.Empty);
if (item != default)
{
foundTask = true;
await _client.SendMessage(new TaskAssigned(Ulid.NewUlid(),taskId, newUser,
requestedTask.Key, UserId, notes, item,
DateTimeOffset.Now));
//_assignedTasks.AddOrUpdate(requestedTask.Key,
// e => e.Remove(taskId),
// () => HashMap<Ulid, Lst<ITaskEvent>>.Empty);
return new ReassignResult(taskId, true, "Reassigned");
}
}
return new ReassignResult(taskId, false, "Could not find task!");
}
public async ValueTask Init(CancellationToken token)
{
var userEvents =
await _client.Sub<ITaskEvent>(
SubjectHelpers.TaskUserSub(UserId), token);
CancellationTokenSource cts = null;
_mainTask = Task.Run(async () =>
{
try
{
await foreach (var entry in userEvents.Msgs.ReadAllAsync(
token))
{
try
{
RunEventHandlers(entry);
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
finally
{
try
{
cts?.Cancel(false);
}
finally
{
try
{
cts?.Dispose();
}
catch
{
// ignored
}
}
}
});
}
private void RunEventHandlers(NatsMsg<ITaskEvent> entry)
{
var msg = entry.Data;
switch (msg)
{
case TaskAssigned ta:
{
HandleTaskAssigned(ta);
break;
}
case TaskBegan tb:
{
HandleTaskBegan(tb);
break;
}
case TaskCompleted tc:
{
HandleTaskCompleted(tc);
break;
}
case TaskRequest tr:
{
HandleTaskRequest(tr);
break;
}
case TaskUpdate tu:
{
HandleTaskUpdate(tu);
break;
}
case TaskCanceled tcan:
{
HandleTaskCanceled(tcan);
break;
}
}
}
public async ValueTask<ClearResult> ClearItem(Ulid taskId)
{
bool? foundAndCleared = null;
_requestedTasks.Swap(thm =>
{
var innerThm = thm;
foreach (var valueTuple in thm)
{
valueTuple.Value.Find(taskId, s =>
{
var f = s.Head();
var c = s.Reverse().FirstOrDefault();
if (c is TaskCanceled or TaskCompleted ||
f is not TaskRequest)
{
innerThm = innerThm.AddOrUpdate(valueTuple.Key,
thmi => thmi.Remove(taskId),
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty);
}
return true;
}, () => false);
}
return innerThm;
});
_requestedTasks.Swap(thm =>
{
var innerThm = thm;
foreach (var valueTuple in thm)
{
valueTuple.Value.Find(taskId, s =>
{
var f = s.Head();
var c = s.Reverse().FirstOrDefault();
if (c is TaskCanceled or TaskCompleted ||
f is not TaskRequest)
{
innerThm = innerThm.AddOrUpdate(valueTuple.Key,
thmi => thmi.Remove(taskId),
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty);
}
return true;
}, () => false);
}
return innerThm;
});
_unassignedTasks.Swap(thm =>
{
var innerThm = thm;
foreach (var valueTuple in thm)
{
valueTuple.Value.Find(taskId, s =>
{
var f = s.Head();
var c = s.Reverse().FirstOrDefault();
if (c is TaskCanceled or TaskCompleted ||
f is not TaskRequest)
{
innerThm = innerThm.AddOrUpdate(valueTuple.Key,
thmi => thmi.Remove(taskId),
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty);
}
return true;
}, () => false);
}
return innerThm;
});
if (foundAndCleared.HasValue == false)
{
return new ClearResult(true, "no instances found");
}
else
{
if (foundAndCleared.Value)
{
return new ClearResult(true, "cleared");
}
else
{
return new ClearResult(false, "Tasks not in closed state");
}
}
}
private void HandleTaskRequest(TaskRequest trb)
{
void doSwap(
Atom<HashMap<string,
HashMap<Ulid, Lst<ITaskEvent>>>> taskSet,
TaskRequest taskRequest)
{
taskSet.Swap(taskRequest, (tr,b)=>b.AddOrUpdate(tr.TaskType,
(hm) => hm.TryAdd(tr.TaskId, Lst<ITaskEvent>.Empty.Add(tr)),
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty.Add(tr.TaskId,
Lst<ITaskEvent>.Empty.Add(tr))));
}
if (trb.RequestingUserId == UserId)
{
doSwap(_requestedTasks, trb);
}
if (trb.AssignedUserId == UserId)
{
doSwap(_assignedTasks,trb);
}
else if (trb.AssignedUserId == "unassigned")
{
doSwap(_unassignedTasks,trb);
}
}
private static void TUSwap(
Atom<HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>>>
atom, TaskUpdate tu)
{
atom.Value.Find(tu.TaskType, hm =>
{
atom.Swap(b=>b.AddOrUpdate(tu.TaskType,
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu),
() => Lst<ITaskEvent>.Empty.Add(tu)),
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty));
return Unit.Default;
}, () => Unit.Default);
}
private static void TCanSwap(
Atom<HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>>>
atom, TaskCanceled tu)
{
atom.Value.Find(tu.TaskType, hm =>
{
atom.Swap(b=>b.AddOrUpdate(tu.TaskType,
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu),
() => Lst<ITaskEvent>.Empty.Add(tu)),
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty));
return Unit.Default;
}, () => Unit.Default);
}
private static void TComSwap(
Atom<HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>>>
atom, TaskCompleted tu)
{
atom.Value.Find(tu.TaskType, hm =>
{
atom.Swap(b=>b.AddOrUpdate(tu.TaskType,
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu),
() => Lst<ITaskEvent>.Empty.Add(tu)),
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty));
return Unit.Default;
}, () => Unit.Default);
}
private readonly Channel<bool> _changeLocker =
Channel.CreateBounded<bool>(1);
private readonly bool _trackChanges;
public TaskState GetTaskState()
{
return new(
_assignedTasks.Value.Select(a => (a.Key, a.Value.ToHashMap()))
.ToHashMap(),
_requestedTasks.Value.Select(a => (a.Key, a.Value.ToHashMap()))
.ToHashMap(),
_unassignedTasks.Value.Select(a => (a.Key, a.Value.ToHashMap()))
.ToHashMap());
}
public record TaskState(
HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>> AssignedTasks,
HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>> RequestedTasks,
HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>> UnassignedTasks);
private static (
HashMap<TKeyOuter, Change<TrackingHashMap<TKeyInner, Lst<TLst>>>> chg,
TrackingHashMap<TKeyOuter, TrackingHashMap<TKeyInner, Lst<TLst>>> thm,
HashMap<TKeyOuter, HashMap<TKeyInner, Change<Lst<TLst>>>> innerChg)
MakeSnapshot<TKeyOuter,TKeyInner,TLst>(
TrackingHashMap<TKeyOuter, TrackingHashMap<TKeyInner, Lst<TLst>>> thm)
{
//TODO: This is trasshhhhhhh.
var chg = thm.Changes;
var preMap = thm.Map(a =>
{
var (k, v) = a;
var c = v.Changes;
return (k, (c, v.Snapshot()));
}).ToHashMap();
thm = thm.SetItems(preMap.Select(a => (a.Item1, a.Item2.Item2)));
var innerChg = preMap.Select(a => (a.Item1, a.Item2.Item1))
.ToHashMap();
return (chg, thm, innerChg);
}
private void HandleTaskUpdate(TaskUpdate tu)
{
TUSwap(_assignedTasks,tu);
TUSwap(_requestedTasks,tu);
TUSwap(_unassignedTasks,tu);
}
private void HandleTaskCanceled(TaskCanceled taskCanceled)
{
TCanSwap(_assignedTasks,taskCanceled);
TCanSwap(_requestedTasks,taskCanceled);
TCanSwap(_unassignedTasks,taskCanceled);
}
private void HandleTaskCompleted(TaskCompleted taskCompleted)
{
TComSwap(_assignedTasks, taskCompleted);
TComSwap(_unassignedTasks, taskCompleted);
TComSwap(_requestedTasks, taskCompleted);
}
private void HandleTaskAssigned(TaskAssigned ta)
{
if (ta.UserId == UserId)
{
_assignedTasks.Swap(ta, (tai, thm) =>
{
return thm.AddOrUpdate(tai.TaskType, tai.TaskId,
e => e.Add(tai),
() => Lst<ITaskEvent>.Empty.Add(tai));
});
}
else
{
_assignedTasks.Swap(ta, (tai, thm) =>
{
return thm.Remove(tai.TaskType, tai.TaskId);
});
}
}
private void HandleTaskBegan(TaskBegan tb)
{
_requestedTasks.Swap(tb, (tbi, thm) =>
{
return thm.AddOrUpdate(tbi.TaskType, tbi.TaskId, t =>
t.Add(tbi), () =>
Lst<ITaskEvent>.Empty.Add(tbi));
});
}
}
public class TaskSetCoordinator
{
public string UserId { get; }
public TaskSetCoordinator(string userId,
ClientBootstrap clientBootstrap, bool trackChanges)
{
UserId = userId;
_client = clientBootstrap;
_trackChanges = trackChanges;
_assignedTasks = Atom(TrackingHashMap<string, TrackingHashMap<Ulid, Lst<ITaskEvent>>>());
_requestedTasks = Atom(TrackingHashMap<string, TrackingHashMap<Ulid, Lst<ITaskEvent>>>());
_unassignedTasks = Atom(TrackingHashMap<string, TrackingHashMap<Ulid, Lst<ITaskEvent>>>());
}
private readonly Atom<TrackingHashMap<string, TrackingHashMap<Ulid, Lst<ITaskEvent>>>> _assignedTasks;
private readonly Atom<TrackingHashMap<string, TrackingHashMap<Ulid, Lst<ITaskEvent>>>> _requestedTasks;
private Atom<TrackingHashMap<string, TrackingHashMap<Ulid, Lst<ITaskEvent>>>> _unassignedTasks;
private readonly ClientBootstrap _client;
private Task _mainTask;
public async ValueTask CreateTask(string taskType, string text,
string? assignedToUser, DateTimeOffset expectedBy)
{
await _client.SendMessage(new TaskRequest(Ulid.NewUlid(), UserId, assignedToUser,
Ulid.NewUlid(), taskType, text, DateTimeOffset.UtcNow,
expectedBy));
}
public async ValueTask AddTaskNote(TaskRequest origReq,
string notes)
{
await _client.SendMessage(new TaskUpdate(Ulid.NewUlid(),origReq.TaskId,
origReq.TaskType, UserId, notes));
}
public async ValueTask<bool> AddTaskCanceled(TaskRequest origReq, string notes)
{
if (origReq.RequestingUserId == UserId)
{
await _client.SendMessage(new TaskCanceled(Ulid.NewUlid(),origReq.TaskId,
origReq.TaskType, UserId, notes, DateTimeOffset.Now));
return true;
}
return false;
}
public async ValueTask<ReassignResult> ReassignTask(Ulid taskId,
string newUser, string notes)
{
bool foundTask = false;
foreach (var requestedTask in _assignedTasks.Value)
{
var item =
requestedTask.Value.Find(taskId, a => a,
static () => Lst<ITaskEvent>.Empty);
if (item != default)
{
foundTask = true;
await _client.SendMessage(new TaskAssigned(Ulid.NewUlid(),taskId, newUser,
requestedTask.Key, UserId, notes, item,
DateTimeOffset.Now));
//_assignedTasks.AddOrUpdate(requestedTask.Key,
// e => e.Remove(taskId),
// () => HashMap<Ulid, Lst<ITaskEvent>>.Empty);
return new ReassignResult(taskId, true, "Reassigned");
}
}
foreach (var requestedTask in _unassignedTasks.Value)
{
var item =
requestedTask.Value.Find(taskId, a => a,
static () => Lst<ITaskEvent>.Empty);
if (item != default)
{
foundTask = true;
await _client.SendMessage(new TaskAssigned(Ulid.NewUlid(),taskId, newUser,
requestedTask.Key, UserId, notes, item,
DateTimeOffset.Now));
//_assignedTasks.AddOrUpdate(requestedTask.Key,
// e => e.Remove(taskId),
// () => HashMap<Ulid, Lst<ITaskEvent>>.Empty);
return new ReassignResult(taskId, true, "Reassigned");
}
}
return new ReassignResult(taskId, false, "Could not find task!");
}
public async ValueTask Init(CancellationToken token)
{
var userEvents =
await _client.Sub<ITaskEvent>(
SubjectHelpers.TaskUserSub(UserId), token);
CancellationTokenSource cts = null;
if (_trackChanges == false)
{
cts = new CancellationTokenSource();
_ = Task.Factory.StartNew(async () =>
{
await foreach (var cl in ChangeLoop().WithCancellation(token))
{
if (cts.IsCancellationRequested)
{
break;
}
await Task.Delay(5000);
}
});
}
_mainTask = Task.Run(async () =>
{
try
{
await foreach (var entry in userEvents.Msgs.ReadAllAsync(
token))
{
try
{
RunEventHandlers(entry);
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
finally
{
try
{
cts?.Cancel(false);
}
finally
{
try
{
cts?.Dispose();
}
catch
{
// ignored
}
}
}
});
}
private void RunEventHandlers(NatsMsg<ITaskEvent> entry)
{
var msg = entry.Data;
switch (msg)
{
case TaskAssigned ta:
{
HandleTaskAssigned(ta);
break;
}
case TaskBegan tb:
{
HandleTaskBegan(tb);
break;
}
case TaskCompleted tc:
{
HandleTaskCompleted(tc);
break;
}
case TaskRequest tr:
{
HandleTaskRequest(tr);
break;
}
case TaskUpdate tu:
{
HandleTaskUpdate(tu);
break;
}
case TaskCanceled tcan:
{
HandleTaskCanceled(tcan);
break;
}
}
}
public async ValueTask<ClearResult> ClearItem(Ulid taskId)
{
bool? foundAndCleared = null;
_requestedTasks.Swap(thm =>
{
var innerThm = thm;
foreach (var valueTuple in thm)
{
valueTuple.Value.Find(taskId, s =>
{
var f = s.Head();
var c = s.Reverse().FirstOrDefault();
if (c is TaskCanceled or TaskCompleted ||
f is not TaskRequest)
{
innerThm = innerThm.AddOrUpdate(valueTuple.Key,
thmi => thmi.Remove(taskId),
() => LanguageExt.TrackingHashMap<Ulid, Lst<ITaskEvent>>.Empty);
}
return true;
}, () => false);
}
return innerThm;
});
_requestedTasks.Swap(thm =>
{
var innerThm = thm;
foreach (var valueTuple in thm)
{
valueTuple.Value.Find(taskId, s =>
{
var f = s.Head();
var c = s.Reverse().FirstOrDefault();
if (c is TaskCanceled or TaskCompleted ||
f is not TaskRequest)
{
innerThm = innerThm.AddOrUpdate(valueTuple.Key,
thmi => thmi.Remove(taskId),
() => LanguageExt.TrackingHashMap<Ulid, Lst<ITaskEvent>>.Empty);
}
return true;
}, () => false);
}
return innerThm;
});
_unassignedTasks.Swap(thm =>
{
var innerThm = thm;
foreach (var valueTuple in thm)
{
valueTuple.Value.Find(taskId, s =>
{
var f = s.Head();
var c = s.Reverse().FirstOrDefault();
if (c is TaskCanceled or TaskCompleted ||
f is not TaskRequest)
{
innerThm = innerThm.AddOrUpdate(valueTuple.Key,
thmi => thmi.Remove(taskId),
() => LanguageExt.TrackingHashMap<Ulid, Lst<ITaskEvent>>.Empty);
}
return true;
}, () => false);
}
return innerThm;
});
if (foundAndCleared.HasValue == false)
{
return new ClearResult(true, "no instances found");
}
else
{
if (foundAndCleared.Value)
{
return new ClearResult(true, "cleared");
}
else
{
return new ClearResult(false, "Tasks not in closed state");
}
}
}
private void HandleTaskRequest(TaskRequest trb)
{
void doSwap(
Atom<TrackingHashMap<string,
TrackingHashMap<Ulid, Lst<ITaskEvent>>>> taskSet,
TaskRequest taskRequest)
{
taskSet.Swap(taskRequest, (tr,b)=>b.AddOrUpdate(tr.TaskType,
(hm) => hm.TryAdd(tr.TaskId, Lst<ITaskEvent>.Empty.Add(tr)),
() => LanguageExt.TrackingHashMap<Ulid, Lst<ITaskEvent>>.Empty.Add(tr.TaskId,
Lst<ITaskEvent>.Empty.Add(tr))));
}
if (trb.RequestingUserId == UserId)
{
doSwap(_requestedTasks, trb);
}
if (trb.AssignedUserId == UserId)
{
doSwap(_assignedTasks,trb);
}
else if (trb.AssignedUserId == "unassigned")
{
doSwap(_unassignedTasks,trb);
}
}
private static void TUSwap(
Atom<TrackingHashMap<string, TrackingHashMap<Ulid, Lst<ITaskEvent>>>>
atom, TaskUpdate tu)
{
atom.Value.Find(tu.TaskType, hm =>
{
atom.Swap(b=>b.AddOrUpdate(tu.TaskType,
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu),
() => Lst<ITaskEvent>.Empty.Add(tu)),
() => LanguageExt.TrackingHashMap<Ulid, Lst<ITaskEvent>>.Empty));
return Unit.Default;
}, () => Unit.Default);
}
private static void TCanSwap(
Atom<TrackingHashMap<string, TrackingHashMap<Ulid, Lst<ITaskEvent>>>>
atom, TaskCanceled tu)
{
atom.Value.Find(tu.TaskType, hm =>
{
atom.Swap(b=>b.AddOrUpdate(tu.TaskType,
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu),
() => Lst<ITaskEvent>.Empty.Add(tu)),
() => LanguageExt.TrackingHashMap<Ulid, Lst<ITaskEvent>>.Empty));
return Unit.Default;
}, () => Unit.Default);
}
private static void TComSwap(
Atom<TrackingHashMap<string, TrackingHashMap<Ulid, Lst<ITaskEvent>>>>
atom, TaskCompleted tu)
{
atom.Value.Find(tu.TaskType, hm =>
{
atom.Swap(b=>b.AddOrUpdate(tu.TaskType,
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu),
() => Lst<ITaskEvent>.Empty.Add(tu)),
() => LanguageExt.TrackingHashMap<Ulid, Lst<ITaskEvent>>.Empty));
return Unit.Default;
}, () => Unit.Default);
}
private readonly Channel<bool> _changeLocker =
Channel.CreateBounded<bool>(1);
private readonly bool _trackChanges;
public TaskState GetTaskState()
{
return new(
_assignedTasks.Value.Select(a => (a.Key, a.Value.ToHashMap()))
.ToHashMap(),
_requestedTasks.Value.Select(a => (a.Key, a.Value.ToHashMap()))
.ToHashMap(),
_unassignedTasks.Value.Select(a => (a.Key, a.Value.ToHashMap()))
.ToHashMap());
}
public record TaskState(
HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>> AssignedTasks,
HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>> RequestedTasks,
HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>> UnassignedTasks);
public async IAsyncEnumerable<TaskChangeSnapshot> ChangeLoop()
{
while (await _changeLocker.Reader.ReadAsync())
{
var snapshotLoop = new SnapshotBuild();
_assignedTasks.Swap(snapshotLoop,(sl,thm) =>
{
(sl.AssignedTasks, thm, sl.AssignedInner) = MakeSnapshot(thm);
return thm.Snapshot();
});
_unassignedTasks.Swap(snapshotLoop,(sl,thm) =>
{
(sl.RequestedTasks, thm, sl.RequestedInner) = MakeSnapshot(thm);
return thm.Snapshot();
});
_requestedTasks.Swap(snapshotLoop,(sl,thm) =>
{
(sl.UnassignedTasks, thm, sl.UnassignedInner) = MakeSnapshot(thm);
return thm.Snapshot();
});
_changeLocker.Writer.TryWrite(true);
yield return snapshotLoop.ToRecord();
}
}
private static (
HashMap<TKeyOuter, Change<TrackingHashMap<TKeyInner, Lst<TLst>>>> chg,
TrackingHashMap<TKeyOuter, TrackingHashMap<TKeyInner, Lst<TLst>>> thm,
HashMap<TKeyOuter, HashMap<TKeyInner, Change<Lst<TLst>>>> innerChg)
MakeSnapshot<TKeyOuter,TKeyInner,TLst>(
TrackingHashMap<TKeyOuter, TrackingHashMap<TKeyInner, Lst<TLst>>> thm)
{
//TODO: This is trasshhhhhhh.
var chg = thm.Changes;
var preMap = thm.Map(a =>
{
var (k, v) = a;
var c = v.Changes;
return (k, (c, v.Snapshot()));
}).ToHashMap();
thm = thm.SetItems(preMap.Select(a => (a.Item1, a.Item2.Item2)));
var innerChg = preMap.Select(a => (a.Item1, a.Item2.Item1))
.ToHashMap();
return (chg, thm, innerChg);
}
private void HandleTaskUpdate(TaskUpdate tu)
{
TUSwap(_assignedTasks,tu);
TUSwap(_requestedTasks,tu);
TUSwap(_unassignedTasks,tu);
}
private void HandleTaskCanceled(TaskCanceled taskCanceled)
{
TCanSwap(_assignedTasks,taskCanceled);
TCanSwap(_requestedTasks,taskCanceled);
TCanSwap(_unassignedTasks,taskCanceled);
}
private void HandleTaskCompleted(TaskCompleted taskCompleted)
{
TComSwap(_assignedTasks, taskCompleted);
TComSwap(_unassignedTasks, taskCompleted);
TComSwap(_requestedTasks, taskCompleted);
}
private void HandleTaskAssigned(TaskAssigned ta)
{
if (ta.UserId == UserId)
{
_assignedTasks.Swap(ta, (tai, thm) =>
{
return thm.AddOrUpdate(tai.TaskType, tai.TaskId,
e => e.Add(tai),
() => Lst<ITaskEvent>.Empty.Add(tai));
});
}
else
{
_assignedTasks.Swap(ta, (tai, thm) =>
{
return thm.Remove(tai.TaskType, tai.TaskId);
});
}
}
private void HandleTaskBegan(TaskBegan tb)
{
_requestedTasks.Swap(tb, (tbi, thm) =>
{
return thm.AddOrUpdate(tbi.TaskType, tbi.TaskId, t =>
t.Add(tbi), () =>
Lst<ITaskEvent>.Empty.Add(tbi));
});
}
}
public record ReassignResult(Ulid TaskId, bool Success, string Message);
public record ClearResult(bool Success, string? Reason);
public record DmConversationSet(HashMap<string, Lst<DirectMessage>> User)
{
public DmConversationSet AddMsg(string arg1FromUserId,
DirectMessage directMessage)
{
return this with
{
User = User.AddOrUpdate(arg1FromUserId,
(a) => a.Contains(directMessage) ? a : a.Add(directMessage),
() => new Lst<DirectMessage>().Add(directMessage))
};
}
public DmConversationSet AddOrMarkMessageReceived(string arg1ToUserId,
DirectMessage directMessage)
{
return AddMsg(arg1ToUserId, directMessage);
}
}
public class ChatTaskClientSerializer<T> : INatsSerializer<T>
{
public static readonly ChatTaskClientSerializer<T> Default =
new ChatTaskClientSerializer<T>();
private static readonly IFormatterResolver DefaultFormatterResolver =
CompositeResolver.Create(new[]
{
UlidMessagePackResolver.Instance,
MessagePackSerializerOptions.Standard.Resolver
});
private static readonly MessagePackSerializerOptions DefaultSerializerOptions =
MessagePackSerializerOptions.Standard.WithResolver(
DefaultFormatterResolver);
public void Serialize(IBufferWriter<byte> bufferWriter, T value)
{
MessagePackSerializer.Serialize(bufferWriter, value,
DefaultSerializerOptions);
}
public T? Deserialize(in ReadOnlySequence<byte> buffer)
{
return MessagePackSerializer.Deserialize<T>(buffer,
DefaultSerializerOptions);
}
}
public interface ISubjectFactory
{
string GetSubject();
}
public interface ITaskEvent : IChatTaskClient, IEquatable<ITaskEvent>
{
Ulid EventId { get; init; }
Ulid TaskId { get; init; }
string TaskType { get; init; }
bool IEquatable<ITaskEvent>.Equals(ITaskEvent? other)
{
return other != null && other.EventId == EventId;
}
}
[Union(0, typeof(BroadcastMessage))]
[Union(1, typeof(DirectMessage))]
[Union(2, typeof(TaskRequest))]
[Union(3, typeof(TaskCompleted))]
[Union(4, typeof(TaskBegan))]
[Union(5, typeof(TaskUpdate))]
[Union(6, typeof(TaskAssigned))]
[Union(7, typeof(TaskCanceled))]
public interface IChatTaskClient : ISubjectFactory
{
}
[MessagePackObject(true)]
public record BroadcastMessage(
string FromUserId,
string Text) : IChatTaskClient
{
public DateTimeOffset TimeSent { get; set; }
public string GetSubject()
{
return SubjectHelpers.BroadcastSubject;
}
}
public static class SubjectHelpers
{
public static string BroadcastSubject =>
$"sample.chat.message.broadcast";
public static string DirectMessage(string from, string to) =>
$"sample.chat.message.direct.{from}.{to}";
public static string
Tasks(string scope, string type, Ulid taskId, string requested,
string assigned) =>
$"sample.task.{scope}.{type},{taskId}.{requested}.{assigned}";
public static string Tasks(string scope, string type, Ulid taskId,
string doneByUser)
{
return $"sample.task.{scope}.{type},{taskId}.{doneByUser}";
}
public static string TaskUserSub(string userId)
{
return $"sample.task.>";
}
public static string TaskReplication => $"sample.task.>";
}
[MessagePackObject(true)]
public record DirectMessage(
string FromUserId,
string ToUserId,
string Text) : IChatTaskClient
{
public string GetSubject()
{
return SubjectHelpers.DirectMessage(FromUserId, ToUserId);
}
}
[MessagePackObject(true)]
public record TaskRequest(
Ulid EventId,
string RequestingUserId,
string? AssignedUserId,
Ulid TaskId,
string TaskType,
string Text,
DateTimeOffset RequestedAt,
DateTimeOffset ExpectedBy) : IChatTaskClient, ITaskEvent
{
public string GetSubject()
{
return SubjectHelpers.Tasks(TaskScopeSubjects.Request, TaskType, TaskId,
RequestingUserId,
AssignedUserId != null ? AssignedUserId : "unassigned");
}
}
[MessagePackObject(true)]
public record TaskCanceled(
Ulid EventId,
Ulid TaskId,
string TaskType,
string CanceledByUser,
string CancelReason,
DateTimeOffset CancelTime) : IChatTaskClient, ITaskEvent
{
public string GetSubject()
{
return SubjectHelpers.Tasks(TaskScopeSubjects.Canceled, TaskType, TaskId,
CanceledByUser);
}
}
[MessagePackObject(true)]
public record TaskBegan(
Ulid EventId,
Ulid TaskId,
string TaskType,
string UserId,
string Text,
DateTimeOffset BeginTime) : IChatTaskClient, ITaskEvent
{
public string GetSubject()
{
return SubjectHelpers.Tasks(TaskScopeSubjects.Begun, TaskType, TaskId, UserId);
}
}
[MessagePackObject(true)]
public record TaskUpdate(
Ulid EventId,
Ulid TaskId,
string TaskType,
string UserId,
string Notes) : IChatTaskClient, ITaskEvent
{
public string GetSubject()
{
return SubjectHelpers.Tasks(TaskScopeSubjects.Update, TaskType, TaskId, UserId);
}
}
[MessagePackObject(true)]
public record TaskCompleted(
Ulid EventId,
Ulid TaskId,
string TaskType,
string UserId,
string Notes,
DateTimeOffset CompletedTime) : IChatTaskClient, ITaskEvent
{
public string GetSubject()
{
return SubjectHelpers.Tasks(TaskScopeSubjects.Completed, TaskType, TaskId, UserId);
}
}
public static class TaskScopeSubjects
{
public static readonly string Completed = "completed";
public static readonly string Assigned = "assigned";
public static readonly string Begun = "begun";
public static readonly string Update = "update";
public static readonly string Canceled = "canceled";
public static readonly string Request = "request";
}
[MessagePackObject(true)]
public record TaskAssigned(
Ulid EventId,
Ulid TaskId,
string TaskType,
string UserId,
string AssignedByUserId,
string Notes,
Lst<ITaskEvent> History,
DateTimeOffset AssignedTime
) : IChatTaskClient, ITaskEvent
{
public string GetSubject()
{
return SubjectHelpers.Tasks(TaskScopeSubjects.Assigned, TaskType, TaskId, UserId,
AssignedByUserId);
}
}
public class SnapshotBuild
{
internal HashMap<string, Change<TrackingHashMap<Ulid, Lst<ITaskEvent>>>> AssignedTasks;
internal HashMap<string, Change<TrackingHashMap<Ulid, Lst<ITaskEvent>>>> RequestedTasks;
internal HashMap<string, Change<TrackingHashMap<Ulid, Lst<ITaskEvent>>>> UnassignedTasks;
internal HashMap<string, HashMap<Ulid, Change<Lst<ITaskEvent>>>> AssignedInner;
internal HashMap<string, HashMap<Ulid, Change<Lst<ITaskEvent>>>> RequestedInner;
internal HashMap<string, HashMap<Ulid, Change<Lst<ITaskEvent>>>> UnassignedInner;
public TaskChangeSnapshot ToRecord() =>
new TaskChangeSnapshot(AssignedTasks, RequestedTasks, UnassignedTasks, AssignedInner, RequestedInner, UnassignedInner);
}
public class TaskChangeSnapshot(
HashMap<string, Change<TrackingHashMap<Ulid, Lst<ITaskEvent>>>>
AssignedTasks,
HashMap<string, Change<TrackingHashMap<Ulid, Lst<ITaskEvent>>>>
RequestedTasks,
HashMap<string, Change<TrackingHashMap<Ulid, Lst<ITaskEvent>>>>
UnassignedTasks,
HashMap<string, HashMap<Ulid, Change<Lst<ITaskEvent>>>> AssignedInner,
HashMap<string, HashMap<Ulid, Change<Lst<ITaskEvent>>>> RequestedInner,
HashMap<string, HashMap<Ulid, Change<Lst<ITaskEvent>>>> UnassignedInner);
public class ClientCoordinator
{
public string UserId { get; }
private readonly Atom<ChatHistory> _broadcastHistory;
private readonly Atom<DmConversationSet> _directMessageHistory;
private readonly ClientBootstrap _client;
public ClientCoordinator(string userId, ClientBootstrap bootstrap)
{
UserId = userId;
_client = bootstrap;
// OK so the way Atoms work, is they use a 'swap' function,
// and then check for equality/etc.
// If your type does equality (i.e. records)
// and your underlying types are immutable,
// and your actions are able to be applied...
// magic! in case of a conflict, the operation is re-attempted,
// all in a thread safe fashion (as long as your swap is thread safe)
_broadcastHistory =
Atom(new ChatHistory(default));
_directMessageHistory =
Atom(new DmConversationSet(LanguageExt
.HashMap<string, Lst<DirectMessage>>.Empty));
}
public async ValueTask Begin(CancellationToken token)
{
var (broadcastSub, dmSub, outboundDmSub) = await ValueTaskEx.WhenAll(
_client.Sub<BroadcastMessage>(
SubjectHelpers.BroadcastSubject,
token),
_client.Sub<DirectMessage>(
SubjectHelpers.DirectMessage("*",UserId),
token),
_client.Sub<DirectMessage>(
SubjectHelpers.DirectMessage(UserId,"*"), token)
);
Task.Run(async () =>
{
try
{
await foreach (var reader in broadcastSub.Msgs.ReadAllAsync(
token))
{
_broadcastHistory.Swap(reader.Data!,
(m, h) => h.AddMsg(m));
}
}
catch
{
}
});
Task.Run(async () =>
{
try
{
await foreach
(var reader in outboundDmSub.Msgs.ReadAllAsync(
token))
{
_directMessageHistory.Swap(reader.Data!,
(m, h) =>
{
return h.AddOrMarkMessageReceived(m.ToUserId,
m);
});
}
}
catch
{
}
});
Task.Run(async () =>
{
try
{
await foreach (var reader in dmSub.Msgs.ReadAllAsync(
token))
{
_directMessageHistory.Swap(reader.Data!, UserId,
(m, h, v) =>
{
if (m.FromUserId != h)
{
return v.AddMsg(m.FromUserId, m);
}
else
{
return v.AddMsg(m.ToUserId, m);
}
});
}
}
catch
{
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment