Last active
April 8, 2024 00:01
-
-
Save to11mtm/4e73aab45a2d43c462a830de2c58da02 to your computer and use it in GitHub Desktop.
Nats Sandbox Task tracking Logic, Round 2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Buffers; | |
using System.Collections.Concurrent; | |
using System.Net.Sockets; | |
using System.Threading.Channels; | |
using Cysharp.Serialization.MessagePack; | |
using LanguageExt; | |
using LinqToDB; | |
using LinqToDB.Data; | |
using MessagePack; | |
using MessagePack.Resolvers; | |
using Microsoft.Extensions.Logging; | |
using NATS.Client.Core; | |
using NATS.Client.JetStream; | |
using NATS.Client.KeyValueStore; | |
using ValueTaskSupplement; | |
using static LanguageExt.Prelude; | |
using CancellationTokenSource = System.Threading.CancellationTokenSource; | |
using Seq = LanguageExt.Seq; | |
using Ulid = System.Ulid; | |
namespace NatsSandBox.Client; | |
public class ServerReplicationRow | |
{ | |
public string Category { get; set; } | |
public string TaskType { get; set; } | |
public Ulid EventId { get; set; } | |
public Ulid TaskId { get; set; } | |
public byte[] TaskEventData { get; set; } | |
public string OriginalSubject { get; set; } | |
public long Ordering { get; set; } | |
} | |
public class ServerReplicationSnapshotRow | |
{ | |
public long LastOrdering { get; set; } | |
public byte[] TaskSnapshot { get; set; } | |
} | |
public class ServerReplicator | |
{ | |
private readonly ClientBootstrap _client; | |
private readonly DataConnectionFactory _ctxFactory; | |
public ServerReplicator(ClientBootstrap bootstrap, DataConnectionFactory dataConnectionFactory) | |
{ | |
_client = bootstrap; | |
_ctxFactory = dataConnectionFactory; | |
} | |
private async ValueTask WriteTasks(List<ServerReplicationRow> rows) | |
{ | |
using (var ctx = _ctxFactory.GetConnection()) | |
{ | |
using (var tx = await ctx.BeginTransactionAsync()) | |
{ | |
await ctx.GetTable<ServerReplicationRow>() | |
.BulkCopyAsync(rows); | |
await tx.CommitAsync(); | |
} | |
} | |
} | |
private readonly Channel<List<ServerReplicationRow>> _serverRows = Channel.CreateUnbounded<List<ServerReplicationRow>>(); | |
public async ValueTask StartReplicator(TaskSetCoordinator coordinator, CancellationToken token) | |
{ | |
Task _writeLoop = Task.Run(async () => | |
{ | |
var pending = 500; | |
List<ServerReplicationRow> pendingSet = | |
new List<ServerReplicationRow>(512); | |
while (await _serverRows.Reader.WaitToReadAsync()) | |
{ | |
while (_serverRows.Reader.TryRead(out var a)) | |
{ | |
pendingSet.AddRange(a); | |
pending = pending - a.Count; | |
if (pending - 100 < 0) | |
{ | |
await WriteTasks(pendingSet); | |
pendingSet.Clear(); | |
} | |
} | |
} | |
}); | |
await using (var sub = | |
await _client.Sub<ITaskEvent>(SubjectHelpers.TaskReplication, | |
token)) | |
{ | |
const int maxTillSnap = 2000; | |
var tillSnap = maxTillSnap; | |
var thisList = new List<ServerReplicationRow>(); | |
while (token.IsCancellationRequested == false) | |
{ | |
const int maxTillFlush = 100; | |
var tillFlush = maxTillFlush; | |
while (sub.Msgs.TryRead(out var msg)) | |
{ | |
thisList.Add(ToReplicationRow(msg)); | |
tillFlush--; | |
if (tillFlush == 0) | |
{ | |
_serverRows.Writer.TryWrite(thisList); | |
thisList = new List<ServerReplicationRow>(); | |
} | |
} | |
if (tillFlush != maxTillFlush) | |
{ | |
_serverRows.Writer.TryWrite(thisList); | |
thisList = new List<ServerReplicationRow>(); | |
} | |
if (await sub.Msgs.WaitToReadAsync(token) == false) | |
{ | |
return; | |
} | |
} | |
if (thisList.Count > 0) | |
{ | |
_serverRows.Writer.TryWrite(thisList); | |
} | |
} | |
} | |
private ServerReplicationRow ToReplicationRow(NatsMsg<ITaskEvent> item) | |
{ | |
return default; | |
} | |
public async ValueTask WriteSnapshot(ServerReplicationSnapshotRow row) | |
{ | |
using (var ctx = _ctxFactory.GetConnection()) | |
{ | |
await ctx.GetTable<ServerReplicationSnapshotRow>() | |
.InsertOrUpdateAsync(() => new ServerReplicationSnapshotRow() | |
{ | |
LastOrdering = row.LastOrdering, | |
TaskSnapshot = row.TaskSnapshot | |
}, e => new ServerReplicationSnapshotRow() | |
{ | |
TaskSnapshot = row.TaskSnapshot | |
}, | |
() => new ServerReplicationSnapshotRow() | |
{ LastOrdering = row.LastOrdering }); | |
} | |
} | |
public async ValueTask<ServerReplicationSnapshotRow?> GetNewestSnapshot() | |
{ | |
using (var ctx = _ctxFactory.GetConnection()) | |
{ | |
var row = await ctx.GetTable<ServerReplicationSnapshotRow>() | |
.OrderByDescending(a => a.LastOrdering).FirstOrDefaultAsync(); | |
return row; | |
} | |
} | |
public async IAsyncEnumerable<ServerReplicationRow> BeginReadAtOrdering(long ordering) | |
{ | |
long? curr = ordering; | |
List<ServerReplicationRow> currSet = null; | |
while (curr != null) | |
{ | |
using (var ctx = _ctxFactory.GetConnection()) | |
{ | |
currSet = await ctx.GetTable<ServerReplicationRow>().Where(a => a.Ordering >= curr) | |
.OrderBy(a=>a.Ordering) | |
.Take(100) | |
.ToListAsync(); | |
if (currSet.Count < 100) | |
{ | |
curr = null; | |
} | |
else | |
{ | |
curr = currSet.LastOrDefault()?.Ordering; | |
} | |
} | |
foreach (var row in currSet) | |
{ | |
yield return row; | |
} | |
} | |
} | |
} | |
public class DataConnectionFactory | |
{ | |
public DataConnection GetConnection() | |
{ | |
return null; | |
} | |
} | |
public class ClientBootstrap | |
{ | |
public ClientBootstrap(NatsOpts opts = null) | |
{ | |
_opts = opts ?? NatsOpts.Default; | |
} | |
private NatsConnection _connection; | |
private readonly NatsOpts _opts; | |
public void Startclient() | |
{ | |
//TODO: At some point, do jetstream instead. | |
// And after that, do snapshots via KV! | |
_connection = new NatsConnection(_opts); | |
//var js = new NatsJSContext(_connection, new NatsJSOpts(NatsOpts.Default)); | |
//new NatsKVContext() | |
} | |
public async ValueTask SendMessage(IChatTaskClient command) | |
{ | |
await _connection.PublishAsync(command.GetSubject(), command, | |
serializer: ChatTaskClientSerializer<IChatTaskClient>.Default); | |
} | |
public async ValueTask<INatsSub<T>> Sub<T>(string subject, | |
CancellationToken token) where T : IChatTaskClient | |
{ | |
return await _connection.SubscribeCoreAsync<T>(subject, | |
null, ChatTaskClientSerializer<T>.Default, default, token); | |
} | |
} | |
public record ChatHistory(Lst<BroadcastMessage> Messages) | |
{ | |
public ChatHistory AddMsg(BroadcastMessage message) | |
{ | |
if (Messages.Contains(message)) | |
{ | |
return this; | |
} | |
return this with { Messages = Messages.Add(message) }; | |
} | |
} | |
public enum TaskCategory | |
{ | |
Assigned, | |
Requested, | |
Unassigned | |
} | |
public record TaskChangeEvent( | |
TaskCategory Category, | |
Ulid TaskId, | |
ITaskEvent Event); | |
public class TaskSetCoordinatorHashMap | |
{ | |
public string UserId { get; } | |
public TaskSetCoordinatorHashMap(string userId, | |
ClientBootstrap clientBootstrap, bool trackChanges) | |
{ | |
UserId = userId; | |
_client = clientBootstrap; | |
_trackChanges = trackChanges; | |
_assignedTasks = Atom(HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>>()); | |
_requestedTasks = Atom(HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>>()); | |
_unassignedTasks = Atom(HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>>()); | |
} | |
private readonly Atom<HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>>> _assignedTasks; | |
private readonly Atom<HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>>> _requestedTasks; | |
private Atom<HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>>> _unassignedTasks; | |
private readonly ClientBootstrap _client; | |
private Task _mainTask; | |
public async ValueTask CreateTask(string taskType, string text, | |
string? assignedToUser, DateTimeOffset expectedBy) | |
{ | |
await _client.SendMessage(new TaskRequest(Ulid.NewUlid(), UserId, assignedToUser, | |
Ulid.NewUlid(), taskType, text, DateTimeOffset.UtcNow, | |
expectedBy)); | |
} | |
public async ValueTask AddTaskNote(TaskRequest origReq, | |
string notes) | |
{ | |
await _client.SendMessage(new TaskUpdate(Ulid.NewUlid(),origReq.TaskId, | |
origReq.TaskType, UserId, notes)); | |
} | |
public async ValueTask<bool> AddTaskCanceled(TaskRequest origReq, string notes) | |
{ | |
if (origReq.RequestingUserId == UserId) | |
{ | |
await _client.SendMessage(new TaskCanceled(Ulid.NewUlid(),origReq.TaskId, | |
origReq.TaskType, UserId, notes, DateTimeOffset.Now)); | |
return true; | |
} | |
return false; | |
} | |
public async ValueTask<ReassignResult> ReassignTask(Ulid taskId, | |
string newUser, string notes) | |
{ | |
bool foundTask = false; | |
foreach (var requestedTask in _assignedTasks.Value) | |
{ | |
var item = | |
requestedTask.Value.Find(taskId, a => a, | |
static () => Lst<ITaskEvent>.Empty); | |
if (item != default) | |
{ | |
foundTask = true; | |
await _client.SendMessage(new TaskAssigned(Ulid.NewUlid(),taskId, newUser, | |
requestedTask.Key, UserId, notes, item, | |
DateTimeOffset.Now)); | |
//_assignedTasks.AddOrUpdate(requestedTask.Key, | |
// e => e.Remove(taskId), | |
// () => HashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
return new ReassignResult(taskId, true, "Reassigned"); | |
} | |
} | |
foreach (var requestedTask in _unassignedTasks.Value) | |
{ | |
var item = | |
requestedTask.Value.Find(taskId, a => a, | |
static () => Lst<ITaskEvent>.Empty); | |
if (item != default) | |
{ | |
foundTask = true; | |
await _client.SendMessage(new TaskAssigned(Ulid.NewUlid(),taskId, newUser, | |
requestedTask.Key, UserId, notes, item, | |
DateTimeOffset.Now)); | |
//_assignedTasks.AddOrUpdate(requestedTask.Key, | |
// e => e.Remove(taskId), | |
// () => HashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
return new ReassignResult(taskId, true, "Reassigned"); | |
} | |
} | |
return new ReassignResult(taskId, false, "Could not find task!"); | |
} | |
public async ValueTask Init(CancellationToken token) | |
{ | |
var userEvents = | |
await _client.Sub<ITaskEvent>( | |
SubjectHelpers.TaskUserSub(UserId), token); | |
CancellationTokenSource cts = null; | |
_mainTask = Task.Run(async () => | |
{ | |
try | |
{ | |
await foreach (var entry in userEvents.Msgs.ReadAllAsync( | |
token)) | |
{ | |
try | |
{ | |
RunEventHandlers(entry); | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
throw; | |
} | |
} | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
throw; | |
} | |
finally | |
{ | |
try | |
{ | |
cts?.Cancel(false); | |
} | |
finally | |
{ | |
try | |
{ | |
cts?.Dispose(); | |
} | |
catch | |
{ | |
// ignored | |
} | |
} | |
} | |
}); | |
} | |
private void RunEventHandlers(NatsMsg<ITaskEvent> entry) | |
{ | |
var msg = entry.Data; | |
switch (msg) | |
{ | |
case TaskAssigned ta: | |
{ | |
HandleTaskAssigned(ta); | |
break; | |
} | |
case TaskBegan tb: | |
{ | |
HandleTaskBegan(tb); | |
break; | |
} | |
case TaskCompleted tc: | |
{ | |
HandleTaskCompleted(tc); | |
break; | |
} | |
case TaskRequest tr: | |
{ | |
HandleTaskRequest(tr); | |
break; | |
} | |
case TaskUpdate tu: | |
{ | |
HandleTaskUpdate(tu); | |
break; | |
} | |
case TaskCanceled tcan: | |
{ | |
HandleTaskCanceled(tcan); | |
break; | |
} | |
} | |
} | |
public async ValueTask<ClearResult> ClearItem(Ulid taskId) | |
{ | |
bool? foundAndCleared = null; | |
_requestedTasks.Swap(thm => | |
{ | |
var innerThm = thm; | |
foreach (var valueTuple in thm) | |
{ | |
valueTuple.Value.Find(taskId, s => | |
{ | |
var f = s.Head(); | |
var c = s.Reverse().FirstOrDefault(); | |
if (c is TaskCanceled or TaskCompleted || | |
f is not TaskRequest) | |
{ | |
innerThm = innerThm.AddOrUpdate(valueTuple.Key, | |
thmi => thmi.Remove(taskId), | |
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
} | |
return true; | |
}, () => false); | |
} | |
return innerThm; | |
}); | |
_requestedTasks.Swap(thm => | |
{ | |
var innerThm = thm; | |
foreach (var valueTuple in thm) | |
{ | |
valueTuple.Value.Find(taskId, s => | |
{ | |
var f = s.Head(); | |
var c = s.Reverse().FirstOrDefault(); | |
if (c is TaskCanceled or TaskCompleted || | |
f is not TaskRequest) | |
{ | |
innerThm = innerThm.AddOrUpdate(valueTuple.Key, | |
thmi => thmi.Remove(taskId), | |
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
} | |
return true; | |
}, () => false); | |
} | |
return innerThm; | |
}); | |
_unassignedTasks.Swap(thm => | |
{ | |
var innerThm = thm; | |
foreach (var valueTuple in thm) | |
{ | |
valueTuple.Value.Find(taskId, s => | |
{ | |
var f = s.Head(); | |
var c = s.Reverse().FirstOrDefault(); | |
if (c is TaskCanceled or TaskCompleted || | |
f is not TaskRequest) | |
{ | |
innerThm = innerThm.AddOrUpdate(valueTuple.Key, | |
thmi => thmi.Remove(taskId), | |
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
} | |
return true; | |
}, () => false); | |
} | |
return innerThm; | |
}); | |
if (foundAndCleared.HasValue == false) | |
{ | |
return new ClearResult(true, "no instances found"); | |
} | |
else | |
{ | |
if (foundAndCleared.Value) | |
{ | |
return new ClearResult(true, "cleared"); | |
} | |
else | |
{ | |
return new ClearResult(false, "Tasks not in closed state"); | |
} | |
} | |
} | |
private void HandleTaskRequest(TaskRequest trb) | |
{ | |
void doSwap( | |
Atom<HashMap<string, | |
HashMap<Ulid, Lst<ITaskEvent>>>> taskSet, | |
TaskRequest taskRequest) | |
{ | |
taskSet.Swap(taskRequest, (tr,b)=>b.AddOrUpdate(tr.TaskType, | |
(hm) => hm.TryAdd(tr.TaskId, Lst<ITaskEvent>.Empty.Add(tr)), | |
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty.Add(tr.TaskId, | |
Lst<ITaskEvent>.Empty.Add(tr)))); | |
} | |
if (trb.RequestingUserId == UserId) | |
{ | |
doSwap(_requestedTasks, trb); | |
} | |
if (trb.AssignedUserId == UserId) | |
{ | |
doSwap(_assignedTasks,trb); | |
} | |
else if (trb.AssignedUserId == "unassigned") | |
{ | |
doSwap(_unassignedTasks,trb); | |
} | |
} | |
private static void TUSwap( | |
Atom<HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>>> | |
atom, TaskUpdate tu) | |
{ | |
atom.Value.Find(tu.TaskType, hm => | |
{ | |
atom.Swap(b=>b.AddOrUpdate(tu.TaskType, | |
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu), | |
() => Lst<ITaskEvent>.Empty.Add(tu)), | |
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty)); | |
return Unit.Default; | |
}, () => Unit.Default); | |
} | |
private static void TCanSwap( | |
Atom<HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>>> | |
atom, TaskCanceled tu) | |
{ | |
atom.Value.Find(tu.TaskType, hm => | |
{ | |
atom.Swap(b=>b.AddOrUpdate(tu.TaskType, | |
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu), | |
() => Lst<ITaskEvent>.Empty.Add(tu)), | |
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty)); | |
return Unit.Default; | |
}, () => Unit.Default); | |
} | |
private static void TComSwap( | |
Atom<HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>>> | |
atom, TaskCompleted tu) | |
{ | |
atom.Value.Find(tu.TaskType, hm => | |
{ | |
atom.Swap(b=>b.AddOrUpdate(tu.TaskType, | |
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu), | |
() => Lst<ITaskEvent>.Empty.Add(tu)), | |
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty)); | |
return Unit.Default; | |
}, () => Unit.Default); | |
} | |
private readonly Channel<bool> _changeLocker = | |
Channel.CreateBounded<bool>(1); | |
private readonly bool _trackChanges; | |
public TaskState GetTaskState() | |
{ | |
return new( | |
_assignedTasks.Value.Select(a => (a.Key, a.Value.ToHashMap())) | |
.ToHashMap(), | |
_requestedTasks.Value.Select(a => (a.Key, a.Value.ToHashMap())) | |
.ToHashMap(), | |
_unassignedTasks.Value.Select(a => (a.Key, a.Value.ToHashMap())) | |
.ToHashMap()); | |
} | |
public record TaskState( | |
HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>> AssignedTasks, | |
HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>> RequestedTasks, | |
HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>> UnassignedTasks); | |
private static ( | |
HashMap<TKeyOuter, Change<TrackingHashMap<TKeyInner, Lst<TLst>>>> chg, | |
TrackingHashMap<TKeyOuter, TrackingHashMap<TKeyInner, Lst<TLst>>> thm, | |
HashMap<TKeyOuter, HashMap<TKeyInner, Change<Lst<TLst>>>> innerChg) | |
MakeSnapshot<TKeyOuter,TKeyInner,TLst>( | |
TrackingHashMap<TKeyOuter, TrackingHashMap<TKeyInner, Lst<TLst>>> thm) | |
{ | |
//TODO: This is trasshhhhhhh. | |
var chg = thm.Changes; | |
var preMap = thm.Map(a => | |
{ | |
var (k, v) = a; | |
var c = v.Changes; | |
return (k, (c, v.Snapshot())); | |
}).ToHashMap(); | |
thm = thm.SetItems(preMap.Select(a => (a.Item1, a.Item2.Item2))); | |
var innerChg = preMap.Select(a => (a.Item1, a.Item2.Item1)) | |
.ToHashMap(); | |
return (chg, thm, innerChg); | |
} | |
private void HandleTaskUpdate(TaskUpdate tu) | |
{ | |
TUSwap(_assignedTasks,tu); | |
TUSwap(_requestedTasks,tu); | |
TUSwap(_unassignedTasks,tu); | |
} | |
private void HandleTaskCanceled(TaskCanceled taskCanceled) | |
{ | |
TCanSwap(_assignedTasks,taskCanceled); | |
TCanSwap(_requestedTasks,taskCanceled); | |
TCanSwap(_unassignedTasks,taskCanceled); | |
} | |
private void HandleTaskCompleted(TaskCompleted taskCompleted) | |
{ | |
TComSwap(_assignedTasks, taskCompleted); | |
TComSwap(_unassignedTasks, taskCompleted); | |
TComSwap(_requestedTasks, taskCompleted); | |
} | |
private void HandleTaskAssigned(TaskAssigned ta) | |
{ | |
if (ta.UserId == UserId) | |
{ | |
_assignedTasks.Swap(ta, (tai, thm) => | |
{ | |
return thm.AddOrUpdate(tai.TaskType, tai.TaskId, | |
e => e.Add(tai), | |
() => Lst<ITaskEvent>.Empty.Add(tai)); | |
}); | |
} | |
else | |
{ | |
_assignedTasks.Swap(ta, (tai, thm) => | |
{ | |
return thm.Remove(tai.TaskType, tai.TaskId); | |
}); | |
} | |
} | |
private void HandleTaskBegan(TaskBegan tb) | |
{ | |
_requestedTasks.Swap(tb, (tbi, thm) => | |
{ | |
return thm.AddOrUpdate(tbi.TaskType, tbi.TaskId, t => | |
t.Add(tbi), () => | |
Lst<ITaskEvent>.Empty.Add(tbi)); | |
}); | |
} | |
} | |
public class TaskSetCoordinator | |
{ | |
public string UserId { get; } | |
public TaskSetCoordinator(string userId, | |
ClientBootstrap clientBootstrap, bool trackChanges) | |
{ | |
UserId = userId; | |
_client = clientBootstrap; | |
_trackChanges = trackChanges; | |
_assignedTasks = Atom(TrackingHashMap<string, TrackingHashMap<Ulid, Lst<ITaskEvent>>>()); | |
_requestedTasks = Atom(TrackingHashMap<string, TrackingHashMap<Ulid, Lst<ITaskEvent>>>()); | |
_unassignedTasks = Atom(TrackingHashMap<string, TrackingHashMap<Ulid, Lst<ITaskEvent>>>()); | |
} | |
private readonly Atom<TrackingHashMap<string, TrackingHashMap<Ulid, Lst<ITaskEvent>>>> _assignedTasks; | |
private readonly Atom<TrackingHashMap<string, TrackingHashMap<Ulid, Lst<ITaskEvent>>>> _requestedTasks; | |
private Atom<TrackingHashMap<string, TrackingHashMap<Ulid, Lst<ITaskEvent>>>> _unassignedTasks; | |
private readonly ClientBootstrap _client; | |
private Task _mainTask; | |
public async ValueTask CreateTask(string taskType, string text, | |
string? assignedToUser, DateTimeOffset expectedBy) | |
{ | |
await _client.SendMessage(new TaskRequest(Ulid.NewUlid(), UserId, assignedToUser, | |
Ulid.NewUlid(), taskType, text, DateTimeOffset.UtcNow, | |
expectedBy)); | |
} | |
public async ValueTask AddTaskNote(TaskRequest origReq, | |
string notes) | |
{ | |
await _client.SendMessage(new TaskUpdate(Ulid.NewUlid(),origReq.TaskId, | |
origReq.TaskType, UserId, notes)); | |
} | |
public async ValueTask<bool> AddTaskCanceled(TaskRequest origReq, string notes) | |
{ | |
if (origReq.RequestingUserId == UserId) | |
{ | |
await _client.SendMessage(new TaskCanceled(Ulid.NewUlid(),origReq.TaskId, | |
origReq.TaskType, UserId, notes, DateTimeOffset.Now)); | |
return true; | |
} | |
return false; | |
} | |
public async ValueTask<ReassignResult> ReassignTask(Ulid taskId, | |
string newUser, string notes) | |
{ | |
bool foundTask = false; | |
foreach (var requestedTask in _assignedTasks.Value) | |
{ | |
var item = | |
requestedTask.Value.Find(taskId, a => a, | |
static () => Lst<ITaskEvent>.Empty); | |
if (item != default) | |
{ | |
foundTask = true; | |
await _client.SendMessage(new TaskAssigned(Ulid.NewUlid(),taskId, newUser, | |
requestedTask.Key, UserId, notes, item, | |
DateTimeOffset.Now)); | |
//_assignedTasks.AddOrUpdate(requestedTask.Key, | |
// e => e.Remove(taskId), | |
// () => HashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
return new ReassignResult(taskId, true, "Reassigned"); | |
} | |
} | |
foreach (var requestedTask in _unassignedTasks.Value) | |
{ | |
var item = | |
requestedTask.Value.Find(taskId, a => a, | |
static () => Lst<ITaskEvent>.Empty); | |
if (item != default) | |
{ | |
foundTask = true; | |
await _client.SendMessage(new TaskAssigned(Ulid.NewUlid(),taskId, newUser, | |
requestedTask.Key, UserId, notes, item, | |
DateTimeOffset.Now)); | |
//_assignedTasks.AddOrUpdate(requestedTask.Key, | |
// e => e.Remove(taskId), | |
// () => HashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
return new ReassignResult(taskId, true, "Reassigned"); | |
} | |
} | |
return new ReassignResult(taskId, false, "Could not find task!"); | |
} | |
public async ValueTask Init(CancellationToken token) | |
{ | |
var userEvents = | |
await _client.Sub<ITaskEvent>( | |
SubjectHelpers.TaskUserSub(UserId), token); | |
CancellationTokenSource cts = null; | |
if (_trackChanges == false) | |
{ | |
cts = new CancellationTokenSource(); | |
_ = Task.Factory.StartNew(async () => | |
{ | |
await foreach (var cl in ChangeLoop().WithCancellation(token)) | |
{ | |
if (cts.IsCancellationRequested) | |
{ | |
break; | |
} | |
await Task.Delay(5000); | |
} | |
}); | |
} | |
_mainTask = Task.Run(async () => | |
{ | |
try | |
{ | |
await foreach (var entry in userEvents.Msgs.ReadAllAsync( | |
token)) | |
{ | |
try | |
{ | |
RunEventHandlers(entry); | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
throw; | |
} | |
} | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
throw; | |
} | |
finally | |
{ | |
try | |
{ | |
cts?.Cancel(false); | |
} | |
finally | |
{ | |
try | |
{ | |
cts?.Dispose(); | |
} | |
catch | |
{ | |
// ignored | |
} | |
} | |
} | |
}); | |
} | |
private void RunEventHandlers(NatsMsg<ITaskEvent> entry) | |
{ | |
var msg = entry.Data; | |
switch (msg) | |
{ | |
case TaskAssigned ta: | |
{ | |
HandleTaskAssigned(ta); | |
break; | |
} | |
case TaskBegan tb: | |
{ | |
HandleTaskBegan(tb); | |
break; | |
} | |
case TaskCompleted tc: | |
{ | |
HandleTaskCompleted(tc); | |
break; | |
} | |
case TaskRequest tr: | |
{ | |
HandleTaskRequest(tr); | |
break; | |
} | |
case TaskUpdate tu: | |
{ | |
HandleTaskUpdate(tu); | |
break; | |
} | |
case TaskCanceled tcan: | |
{ | |
HandleTaskCanceled(tcan); | |
break; | |
} | |
} | |
} | |
public async ValueTask<ClearResult> ClearItem(Ulid taskId) | |
{ | |
bool? foundAndCleared = null; | |
_requestedTasks.Swap(thm => | |
{ | |
var innerThm = thm; | |
foreach (var valueTuple in thm) | |
{ | |
valueTuple.Value.Find(taskId, s => | |
{ | |
var f = s.Head(); | |
var c = s.Reverse().FirstOrDefault(); | |
if (c is TaskCanceled or TaskCompleted || | |
f is not TaskRequest) | |
{ | |
innerThm = innerThm.AddOrUpdate(valueTuple.Key, | |
thmi => thmi.Remove(taskId), | |
() => LanguageExt.TrackingHashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
} | |
return true; | |
}, () => false); | |
} | |
return innerThm; | |
}); | |
_requestedTasks.Swap(thm => | |
{ | |
var innerThm = thm; | |
foreach (var valueTuple in thm) | |
{ | |
valueTuple.Value.Find(taskId, s => | |
{ | |
var f = s.Head(); | |
var c = s.Reverse().FirstOrDefault(); | |
if (c is TaskCanceled or TaskCompleted || | |
f is not TaskRequest) | |
{ | |
innerThm = innerThm.AddOrUpdate(valueTuple.Key, | |
thmi => thmi.Remove(taskId), | |
() => LanguageExt.TrackingHashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
} | |
return true; | |
}, () => false); | |
} | |
return innerThm; | |
}); | |
_unassignedTasks.Swap(thm => | |
{ | |
var innerThm = thm; | |
foreach (var valueTuple in thm) | |
{ | |
valueTuple.Value.Find(taskId, s => | |
{ | |
var f = s.Head(); | |
var c = s.Reverse().FirstOrDefault(); | |
if (c is TaskCanceled or TaskCompleted || | |
f is not TaskRequest) | |
{ | |
innerThm = innerThm.AddOrUpdate(valueTuple.Key, | |
thmi => thmi.Remove(taskId), | |
() => LanguageExt.TrackingHashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
} | |
return true; | |
}, () => false); | |
} | |
return innerThm; | |
}); | |
if (foundAndCleared.HasValue == false) | |
{ | |
return new ClearResult(true, "no instances found"); | |
} | |
else | |
{ | |
if (foundAndCleared.Value) | |
{ | |
return new ClearResult(true, "cleared"); | |
} | |
else | |
{ | |
return new ClearResult(false, "Tasks not in closed state"); | |
} | |
} | |
} | |
private void HandleTaskRequest(TaskRequest trb) | |
{ | |
void doSwap( | |
Atom<TrackingHashMap<string, | |
TrackingHashMap<Ulid, Lst<ITaskEvent>>>> taskSet, | |
TaskRequest taskRequest) | |
{ | |
taskSet.Swap(taskRequest, (tr,b)=>b.AddOrUpdate(tr.TaskType, | |
(hm) => hm.TryAdd(tr.TaskId, Lst<ITaskEvent>.Empty.Add(tr)), | |
() => LanguageExt.TrackingHashMap<Ulid, Lst<ITaskEvent>>.Empty.Add(tr.TaskId, | |
Lst<ITaskEvent>.Empty.Add(tr)))); | |
} | |
if (trb.RequestingUserId == UserId) | |
{ | |
doSwap(_requestedTasks, trb); | |
} | |
if (trb.AssignedUserId == UserId) | |
{ | |
doSwap(_assignedTasks,trb); | |
} | |
else if (trb.AssignedUserId == "unassigned") | |
{ | |
doSwap(_unassignedTasks,trb); | |
} | |
} | |
private static void TUSwap( | |
Atom<TrackingHashMap<string, TrackingHashMap<Ulid, Lst<ITaskEvent>>>> | |
atom, TaskUpdate tu) | |
{ | |
atom.Value.Find(tu.TaskType, hm => | |
{ | |
atom.Swap(b=>b.AddOrUpdate(tu.TaskType, | |
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu), | |
() => Lst<ITaskEvent>.Empty.Add(tu)), | |
() => LanguageExt.TrackingHashMap<Ulid, Lst<ITaskEvent>>.Empty)); | |
return Unit.Default; | |
}, () => Unit.Default); | |
} | |
private static void TCanSwap( | |
Atom<TrackingHashMap<string, TrackingHashMap<Ulid, Lst<ITaskEvent>>>> | |
atom, TaskCanceled tu) | |
{ | |
atom.Value.Find(tu.TaskType, hm => | |
{ | |
atom.Swap(b=>b.AddOrUpdate(tu.TaskType, | |
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu), | |
() => Lst<ITaskEvent>.Empty.Add(tu)), | |
() => LanguageExt.TrackingHashMap<Ulid, Lst<ITaskEvent>>.Empty)); | |
return Unit.Default; | |
}, () => Unit.Default); | |
} | |
private static void TComSwap( | |
Atom<TrackingHashMap<string, TrackingHashMap<Ulid, Lst<ITaskEvent>>>> | |
atom, TaskCompleted tu) | |
{ | |
atom.Value.Find(tu.TaskType, hm => | |
{ | |
atom.Swap(b=>b.AddOrUpdate(tu.TaskType, | |
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu), | |
() => Lst<ITaskEvent>.Empty.Add(tu)), | |
() => LanguageExt.TrackingHashMap<Ulid, Lst<ITaskEvent>>.Empty)); | |
return Unit.Default; | |
}, () => Unit.Default); | |
} | |
private readonly Channel<bool> _changeLocker = | |
Channel.CreateBounded<bool>(1); | |
private readonly bool _trackChanges; | |
public TaskState GetTaskState() | |
{ | |
return new( | |
_assignedTasks.Value.Select(a => (a.Key, a.Value.ToHashMap())) | |
.ToHashMap(), | |
_requestedTasks.Value.Select(a => (a.Key, a.Value.ToHashMap())) | |
.ToHashMap(), | |
_unassignedTasks.Value.Select(a => (a.Key, a.Value.ToHashMap())) | |
.ToHashMap()); | |
} | |
public record TaskState( | |
HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>> AssignedTasks, | |
HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>> RequestedTasks, | |
HashMap<string, HashMap<Ulid, Lst<ITaskEvent>>> UnassignedTasks); | |
public async IAsyncEnumerable<TaskChangeSnapshot> ChangeLoop() | |
{ | |
while (await _changeLocker.Reader.ReadAsync()) | |
{ | |
var snapshotLoop = new SnapshotBuild(); | |
_assignedTasks.Swap(snapshotLoop,(sl,thm) => | |
{ | |
(sl.AssignedTasks, thm, sl.AssignedInner) = MakeSnapshot(thm); | |
return thm.Snapshot(); | |
}); | |
_unassignedTasks.Swap(snapshotLoop,(sl,thm) => | |
{ | |
(sl.RequestedTasks, thm, sl.RequestedInner) = MakeSnapshot(thm); | |
return thm.Snapshot(); | |
}); | |
_requestedTasks.Swap(snapshotLoop,(sl,thm) => | |
{ | |
(sl.UnassignedTasks, thm, sl.UnassignedInner) = MakeSnapshot(thm); | |
return thm.Snapshot(); | |
}); | |
_changeLocker.Writer.TryWrite(true); | |
yield return snapshotLoop.ToRecord(); | |
} | |
} | |
private static ( | |
HashMap<TKeyOuter, Change<TrackingHashMap<TKeyInner, Lst<TLst>>>> chg, | |
TrackingHashMap<TKeyOuter, TrackingHashMap<TKeyInner, Lst<TLst>>> thm, | |
HashMap<TKeyOuter, HashMap<TKeyInner, Change<Lst<TLst>>>> innerChg) | |
MakeSnapshot<TKeyOuter,TKeyInner,TLst>( | |
TrackingHashMap<TKeyOuter, TrackingHashMap<TKeyInner, Lst<TLst>>> thm) | |
{ | |
//TODO: This is trasshhhhhhh. | |
var chg = thm.Changes; | |
var preMap = thm.Map(a => | |
{ | |
var (k, v) = a; | |
var c = v.Changes; | |
return (k, (c, v.Snapshot())); | |
}).ToHashMap(); | |
thm = thm.SetItems(preMap.Select(a => (a.Item1, a.Item2.Item2))); | |
var innerChg = preMap.Select(a => (a.Item1, a.Item2.Item1)) | |
.ToHashMap(); | |
return (chg, thm, innerChg); | |
} | |
private void HandleTaskUpdate(TaskUpdate tu) | |
{ | |
TUSwap(_assignedTasks,tu); | |
TUSwap(_requestedTasks,tu); | |
TUSwap(_unassignedTasks,tu); | |
} | |
private void HandleTaskCanceled(TaskCanceled taskCanceled) | |
{ | |
TCanSwap(_assignedTasks,taskCanceled); | |
TCanSwap(_requestedTasks,taskCanceled); | |
TCanSwap(_unassignedTasks,taskCanceled); | |
} | |
private void HandleTaskCompleted(TaskCompleted taskCompleted) | |
{ | |
TComSwap(_assignedTasks, taskCompleted); | |
TComSwap(_unassignedTasks, taskCompleted); | |
TComSwap(_requestedTasks, taskCompleted); | |
} | |
private void HandleTaskAssigned(TaskAssigned ta) | |
{ | |
if (ta.UserId == UserId) | |
{ | |
_assignedTasks.Swap(ta, (tai, thm) => | |
{ | |
return thm.AddOrUpdate(tai.TaskType, tai.TaskId, | |
e => e.Add(tai), | |
() => Lst<ITaskEvent>.Empty.Add(tai)); | |
}); | |
} | |
else | |
{ | |
_assignedTasks.Swap(ta, (tai, thm) => | |
{ | |
return thm.Remove(tai.TaskType, tai.TaskId); | |
}); | |
} | |
} | |
private void HandleTaskBegan(TaskBegan tb) | |
{ | |
_requestedTasks.Swap(tb, (tbi, thm) => | |
{ | |
return thm.AddOrUpdate(tbi.TaskType, tbi.TaskId, t => | |
t.Add(tbi), () => | |
Lst<ITaskEvent>.Empty.Add(tbi)); | |
}); | |
} | |
} | |
public record ReassignResult(Ulid TaskId, bool Success, string Message); | |
public record ClearResult(bool Success, string? Reason); | |
public record DmConversationSet(HashMap<string, Lst<DirectMessage>> User) | |
{ | |
public DmConversationSet AddMsg(string arg1FromUserId, | |
DirectMessage directMessage) | |
{ | |
return this with | |
{ | |
User = User.AddOrUpdate(arg1FromUserId, | |
(a) => a.Contains(directMessage) ? a : a.Add(directMessage), | |
() => new Lst<DirectMessage>().Add(directMessage)) | |
}; | |
} | |
public DmConversationSet AddOrMarkMessageReceived(string arg1ToUserId, | |
DirectMessage directMessage) | |
{ | |
return AddMsg(arg1ToUserId, directMessage); | |
} | |
} | |
public class ChatTaskClientSerializer<T> : INatsSerializer<T> | |
{ | |
public static readonly ChatTaskClientSerializer<T> Default = | |
new ChatTaskClientSerializer<T>(); | |
private static readonly IFormatterResolver DefaultFormatterResolver = | |
CompositeResolver.Create(new[] | |
{ | |
UlidMessagePackResolver.Instance, | |
MessagePackSerializerOptions.Standard.Resolver | |
}); | |
private static readonly MessagePackSerializerOptions DefaultSerializerOptions = | |
MessagePackSerializerOptions.Standard.WithResolver( | |
DefaultFormatterResolver); | |
public void Serialize(IBufferWriter<byte> bufferWriter, T value) | |
{ | |
MessagePackSerializer.Serialize(bufferWriter, value, | |
DefaultSerializerOptions); | |
} | |
public T? Deserialize(in ReadOnlySequence<byte> buffer) | |
{ | |
return MessagePackSerializer.Deserialize<T>(buffer, | |
DefaultSerializerOptions); | |
} | |
} | |
public interface ISubjectFactory | |
{ | |
string GetSubject(); | |
} | |
public interface ITaskEvent : IChatTaskClient, IEquatable<ITaskEvent> | |
{ | |
Ulid EventId { get; init; } | |
Ulid TaskId { get; init; } | |
string TaskType { get; init; } | |
bool IEquatable<ITaskEvent>.Equals(ITaskEvent? other) | |
{ | |
return other != null && other.EventId == EventId; | |
} | |
} | |
[Union(0, typeof(BroadcastMessage))] | |
[Union(1, typeof(DirectMessage))] | |
[Union(2, typeof(TaskRequest))] | |
[Union(3, typeof(TaskCompleted))] | |
[Union(4, typeof(TaskBegan))] | |
[Union(5, typeof(TaskUpdate))] | |
[Union(6, typeof(TaskAssigned))] | |
[Union(7, typeof(TaskCanceled))] | |
public interface IChatTaskClient : ISubjectFactory | |
{ | |
} | |
[MessagePackObject(true)] | |
public record BroadcastMessage( | |
string FromUserId, | |
string Text) : IChatTaskClient | |
{ | |
public DateTimeOffset TimeSent { get; set; } | |
public string GetSubject() | |
{ | |
return SubjectHelpers.BroadcastSubject; | |
} | |
} | |
public static class SubjectHelpers | |
{ | |
public static string BroadcastSubject => | |
$"sample.chat.message.broadcast"; | |
public static string DirectMessage(string from, string to) => | |
$"sample.chat.message.direct.{from}.{to}"; | |
public static string | |
Tasks(string scope, string type, Ulid taskId, string requested, | |
string assigned) => | |
$"sample.task.{scope}.{type},{taskId}.{requested}.{assigned}"; | |
public static string Tasks(string scope, string type, Ulid taskId, | |
string doneByUser) | |
{ | |
return $"sample.task.{scope}.{type},{taskId}.{doneByUser}"; | |
} | |
public static string TaskUserSub(string userId) | |
{ | |
return $"sample.task.>"; | |
} | |
public static string TaskReplication => $"sample.task.>"; | |
} | |
[MessagePackObject(true)] | |
public record DirectMessage( | |
string FromUserId, | |
string ToUserId, | |
string Text) : IChatTaskClient | |
{ | |
public string GetSubject() | |
{ | |
return SubjectHelpers.DirectMessage(FromUserId, ToUserId); | |
} | |
} | |
[MessagePackObject(true)] | |
public record TaskRequest( | |
Ulid EventId, | |
string RequestingUserId, | |
string? AssignedUserId, | |
Ulid TaskId, | |
string TaskType, | |
string Text, | |
DateTimeOffset RequestedAt, | |
DateTimeOffset ExpectedBy) : IChatTaskClient, ITaskEvent | |
{ | |
public string GetSubject() | |
{ | |
return SubjectHelpers.Tasks(TaskScopeSubjects.Request, TaskType, TaskId, | |
RequestingUserId, | |
AssignedUserId != null ? AssignedUserId : "unassigned"); | |
} | |
} | |
[MessagePackObject(true)] | |
public record TaskCanceled( | |
Ulid EventId, | |
Ulid TaskId, | |
string TaskType, | |
string CanceledByUser, | |
string CancelReason, | |
DateTimeOffset CancelTime) : IChatTaskClient, ITaskEvent | |
{ | |
public string GetSubject() | |
{ | |
return SubjectHelpers.Tasks(TaskScopeSubjects.Canceled, TaskType, TaskId, | |
CanceledByUser); | |
} | |
} | |
[MessagePackObject(true)] | |
public record TaskBegan( | |
Ulid EventId, | |
Ulid TaskId, | |
string TaskType, | |
string UserId, | |
string Text, | |
DateTimeOffset BeginTime) : IChatTaskClient, ITaskEvent | |
{ | |
public string GetSubject() | |
{ | |
return SubjectHelpers.Tasks(TaskScopeSubjects.Begun, TaskType, TaskId, UserId); | |
} | |
} | |
[MessagePackObject(true)] | |
public record TaskUpdate( | |
Ulid EventId, | |
Ulid TaskId, | |
string TaskType, | |
string UserId, | |
string Notes) : IChatTaskClient, ITaskEvent | |
{ | |
public string GetSubject() | |
{ | |
return SubjectHelpers.Tasks(TaskScopeSubjects.Update, TaskType, TaskId, UserId); | |
} | |
} | |
[MessagePackObject(true)] | |
public record TaskCompleted( | |
Ulid EventId, | |
Ulid TaskId, | |
string TaskType, | |
string UserId, | |
string Notes, | |
DateTimeOffset CompletedTime) : IChatTaskClient, ITaskEvent | |
{ | |
public string GetSubject() | |
{ | |
return SubjectHelpers.Tasks(TaskScopeSubjects.Completed, TaskType, TaskId, UserId); | |
} | |
} | |
public static class TaskScopeSubjects | |
{ | |
public static readonly string Completed = "completed"; | |
public static readonly string Assigned = "assigned"; | |
public static readonly string Begun = "begun"; | |
public static readonly string Update = "update"; | |
public static readonly string Canceled = "canceled"; | |
public static readonly string Request = "request"; | |
} | |
[MessagePackObject(true)] | |
public record TaskAssigned( | |
Ulid EventId, | |
Ulid TaskId, | |
string TaskType, | |
string UserId, | |
string AssignedByUserId, | |
string Notes, | |
Lst<ITaskEvent> History, | |
DateTimeOffset AssignedTime | |
) : IChatTaskClient, ITaskEvent | |
{ | |
public string GetSubject() | |
{ | |
return SubjectHelpers.Tasks(TaskScopeSubjects.Assigned, TaskType, TaskId, UserId, | |
AssignedByUserId); | |
} | |
} | |
public class SnapshotBuild | |
{ | |
internal HashMap<string, Change<TrackingHashMap<Ulid, Lst<ITaskEvent>>>> AssignedTasks; | |
internal HashMap<string, Change<TrackingHashMap<Ulid, Lst<ITaskEvent>>>> RequestedTasks; | |
internal HashMap<string, Change<TrackingHashMap<Ulid, Lst<ITaskEvent>>>> UnassignedTasks; | |
internal HashMap<string, HashMap<Ulid, Change<Lst<ITaskEvent>>>> AssignedInner; | |
internal HashMap<string, HashMap<Ulid, Change<Lst<ITaskEvent>>>> RequestedInner; | |
internal HashMap<string, HashMap<Ulid, Change<Lst<ITaskEvent>>>> UnassignedInner; | |
public TaskChangeSnapshot ToRecord() => | |
new TaskChangeSnapshot(AssignedTasks, RequestedTasks, UnassignedTasks, AssignedInner, RequestedInner, UnassignedInner); | |
} | |
public class TaskChangeSnapshot( | |
HashMap<string, Change<TrackingHashMap<Ulid, Lst<ITaskEvent>>>> | |
AssignedTasks, | |
HashMap<string, Change<TrackingHashMap<Ulid, Lst<ITaskEvent>>>> | |
RequestedTasks, | |
HashMap<string, Change<TrackingHashMap<Ulid, Lst<ITaskEvent>>>> | |
UnassignedTasks, | |
HashMap<string, HashMap<Ulid, Change<Lst<ITaskEvent>>>> AssignedInner, | |
HashMap<string, HashMap<Ulid, Change<Lst<ITaskEvent>>>> RequestedInner, | |
HashMap<string, HashMap<Ulid, Change<Lst<ITaskEvent>>>> UnassignedInner); | |
public class ClientCoordinator | |
{ | |
public string UserId { get; } | |
private readonly Atom<ChatHistory> _broadcastHistory; | |
private readonly Atom<DmConversationSet> _directMessageHistory; | |
private readonly ClientBootstrap _client; | |
public ClientCoordinator(string userId, ClientBootstrap bootstrap) | |
{ | |
UserId = userId; | |
_client = bootstrap; | |
// OK so the way Atoms work, is they use a 'swap' function, | |
// and then check for equality/etc. | |
// If your type does equality (i.e. records) | |
// and your underlying types are immutable, | |
// and your actions are able to be applied... | |
// magic! in case of a conflict, the operation is re-attempted, | |
// all in a thread safe fashion (as long as your swap is thread safe) | |
_broadcastHistory = | |
Atom(new ChatHistory(default)); | |
_directMessageHistory = | |
Atom(new DmConversationSet(LanguageExt | |
.HashMap<string, Lst<DirectMessage>>.Empty)); | |
} | |
public async ValueTask Begin(CancellationToken token) | |
{ | |
var (broadcastSub, dmSub, outboundDmSub) = await ValueTaskEx.WhenAll( | |
_client.Sub<BroadcastMessage>( | |
SubjectHelpers.BroadcastSubject, | |
token), | |
_client.Sub<DirectMessage>( | |
SubjectHelpers.DirectMessage("*",UserId), | |
token), | |
_client.Sub<DirectMessage>( | |
SubjectHelpers.DirectMessage(UserId,"*"), token) | |
); | |
Task.Run(async () => | |
{ | |
try | |
{ | |
await foreach (var reader in broadcastSub.Msgs.ReadAllAsync( | |
token)) | |
{ | |
_broadcastHistory.Swap(reader.Data!, | |
(m, h) => h.AddMsg(m)); | |
} | |
} | |
catch | |
{ | |
} | |
}); | |
Task.Run(async () => | |
{ | |
try | |
{ | |
await foreach | |
(var reader in outboundDmSub.Msgs.ReadAllAsync( | |
token)) | |
{ | |
_directMessageHistory.Swap(reader.Data!, | |
(m, h) => | |
{ | |
return h.AddOrMarkMessageReceived(m.ToUserId, | |
m); | |
}); | |
} | |
} | |
catch | |
{ | |
} | |
}); | |
Task.Run(async () => | |
{ | |
try | |
{ | |
await foreach (var reader in dmSub.Msgs.ReadAllAsync( | |
token)) | |
{ | |
_directMessageHistory.Swap(reader.Data!, UserId, | |
(m, h, v) => | |
{ | |
if (m.FromUserId != h) | |
{ | |
return v.AddMsg(m.FromUserId, m); | |
} | |
else | |
{ | |
return v.AddMsg(m.ToUserId, m); | |
} | |
}); | |
} | |
} | |
catch | |
{ | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment