Created
March 30, 2024 23:08
-
-
Save to11mtm/043edae54477f505e0335771c06b1826 to your computer and use it in GitHub Desktop.
Just some playing around with making a 'toy' NATS app for Chat and Tasks
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Buffers; | |
using LanguageExt; | |
using MessagePack; | |
using NATS.Client.Core; | |
using NATS.Client.JetStream; | |
using NATS.Client.KeyValueStore; | |
using static LanguageExt.Prelude; | |
namespace NatsSandBox.Client; | |
public class ClientBootstrap | |
{ | |
private NatsConnection _connection; | |
public void Startclient() | |
{ | |
_connection = new NatsConnection(NatsOpts.Default with | |
{ | |
}); | |
//var js = new NatsJSContext(_connection, new NatsJSOpts(NatsOpts.Default)); | |
//new NatsKVContext() | |
} | |
public async ValueTask SendMessage(IChatTaskClient command) | |
{ | |
await _connection.PublishAsync(command.GetSubject(), command, | |
serializer: ChatTaskClientSerializer<IChatTaskClient>.Default); | |
} | |
public async ValueTask<INatsSub<T>> Sub<T>(string subject, | |
CancellationToken token) where T : IChatTaskClient | |
{ | |
return await _connection.SubscribeCoreAsync<T>(subject, | |
null, ChatTaskClientSerializer<T>.Default, default, token); | |
} | |
} | |
public record ChatHistory(Lst<BroadcastMessage> Messages) | |
{ | |
public ChatHistory AddMsg(BroadcastMessage message) | |
{ | |
if (Messages.Contains(message)) | |
{ | |
return this; | |
} | |
return this with { Messages = Messages.Add(message) }; | |
} | |
} | |
public class TaskSetCoordinator | |
{ | |
public string UserId { get; } | |
public TaskSetCoordinator(string userId, | |
ClientBootstrap clientBootstrap, | |
AtomHashMap<string, HashMap<Ulid, Lst<ITaskEvent>>> assignedTasks, | |
AtomHashMap<string, HashMap<Ulid, Lst<ITaskEvent>>> requestedTasks, | |
AtomHashMap<string, HashMap<Ulid, Lst<ITaskEvent>>> unassignedTasks) | |
{ | |
UserId = userId; | |
_client = clientBootstrap; | |
_assignedTasks = assignedTasks; | |
_requestedTasks = requestedTasks; | |
_unassignedTasks = unassignedTasks; | |
} | |
private AtomHashMap<string, HashMap<Ulid, Lst<ITaskEvent>>> | |
_assignedTasks; | |
private AtomHashMap<string, HashMap<Ulid, Lst<ITaskEvent>>> | |
_requestedTasks; | |
private AtomHashMap<string, HashMap<Ulid, Lst<ITaskEvent>>> | |
_unassignedTasks; | |
private readonly ClientBootstrap _client; | |
private Task _mainTask; | |
public async ValueTask CreateTask(string taskType, string text, | |
string? assignedToUser, DateTimeOffset expectedBy) | |
{ | |
await _client.SendMessage(new TaskRequest(UserId, assignedToUser, | |
Ulid.NewUlid(), taskType, text, DateTimeOffset.UtcNow, | |
expectedBy)); | |
} | |
public async ValueTask AddTaskNote(TaskRequest origReq, | |
string notes) | |
{ | |
await _client.SendMessage(new TaskUpdate(origReq.TaskId, | |
origReq.TaskType, UserId, notes)); | |
} | |
public async ValueTask<bool> AddTaskCanceled(TaskRequest origReq, string notes) | |
{ | |
if (origReq.RequestingUserId == UserId) | |
{ | |
await _client.SendMessage(new TaskCanceled(origReq.TaskId, | |
origReq.TaskType, UserId, notes, DateTimeOffset.Now)); | |
return true; | |
} | |
return false; | |
} | |
public async ValueTask<ReassignResult> ReassignTask(Ulid taskId, | |
string newUser, string notes) | |
{ | |
bool foundTask = false; | |
foreach (var requestedTask in _assignedTasks) | |
{ | |
var item = | |
requestedTask.Value.Find(taskId, a => a, | |
static () => Lst<ITaskEvent>.Empty); | |
if (item != default) | |
{ | |
foundTask = true; | |
await _client.SendMessage(new TaskAssigned(taskId, newUser, | |
requestedTask.Key, UserId, notes, item, | |
DateTimeOffset.Now)); | |
//_assignedTasks.AddOrUpdate(requestedTask.Key, | |
// e => e.Remove(taskId), | |
// () => HashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
return new ReassignResult(taskId, true, "Reassigned"); | |
} | |
} | |
foreach (var requestedTask in _unassignedTasks) | |
{ | |
var item = | |
requestedTask.Value.Find(taskId, a => a, | |
static () => Lst<ITaskEvent>.Empty); | |
if (item != default) | |
{ | |
foundTask = true; | |
await _client.SendMessage(new TaskAssigned(taskId, newUser, | |
requestedTask.Key, UserId, notes, item, | |
DateTimeOffset.Now)); | |
//_assignedTasks.AddOrUpdate(requestedTask.Key, | |
// e => e.Remove(taskId), | |
// () => HashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
return new ReassignResult(taskId, true, "Reassigned"); | |
} | |
} | |
return new ReassignResult(taskId, false, "Could not find task!"); | |
} | |
public async ValueTask Init(CancellationToken token) | |
{ | |
var userEvents = | |
await _client.Sub<ITaskEvent>( | |
SubjectHelpers.TaskUserSub(UserId), token); | |
_mainTask = Task.Run(async () => | |
{ | |
try | |
{ | |
await foreach (var entry in userEvents.Msgs.ReadAllAsync( | |
token)) | |
{ | |
try | |
{ | |
RunEventHandlers(entry); | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
throw; | |
} | |
} | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
throw; | |
} | |
}); | |
} | |
private void RunEventHandlers(NatsMsg<ITaskEvent> entry) | |
{ | |
var msg = entry.Data; | |
switch (msg) | |
{ | |
case TaskAssigned ta: | |
{ | |
HandleTaskAssigned(ta); | |
break; | |
} | |
case TaskBegan tb: | |
{ | |
HandleTaskBegan(tb); | |
break; | |
} | |
case TaskCompleted tc: | |
{ | |
HandleTaskCompleted(tc); | |
break; | |
} | |
case TaskRequest tr: | |
{ | |
HandleTaskRequest(tr); | |
break; | |
} | |
case TaskUpdate tu: | |
{ | |
HandleTaskUpdate(tu); | |
break; | |
} | |
case TaskCanceled tcan: | |
{ | |
HandleTaskCanceled(tcan); | |
break; | |
} | |
} | |
} | |
public async ValueTask<ClearResult> ClearItem(Ulid taskId) | |
{ | |
bool? foundAndCleared = null; | |
_requestedTasks.Swap(thm => | |
{ | |
var innerThm = thm; | |
foreach (var valueTuple in thm) | |
{ | |
valueTuple.Value.Find(taskId, s => | |
{ | |
var f = s.Head(); | |
var c = s.Reverse().FirstOrDefault(); | |
if (c is TaskCanceled or TaskCompleted || | |
f is not TaskRequest) | |
{ | |
innerThm = innerThm.AddOrUpdate(valueTuple.Key, | |
thm => thm.Remove(taskId), | |
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
} | |
return false; | |
}, () => (bool?)null); | |
} | |
return innerThm; | |
}); | |
_requestedTasks.Swap(thm => | |
{ | |
var innerThm = thm; | |
foreach (var valueTuple in thm) | |
{ | |
valueTuple.Value.Find(taskId, s => | |
{ | |
var f = s.Head(); | |
var c = s.Reverse().FirstOrDefault(); | |
if (c is TaskCanceled or TaskCompleted || | |
f is not TaskRequest) | |
{ | |
innerThm = innerThm.AddOrUpdate(valueTuple.Key, | |
thm => thm.Remove(taskId), | |
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
} | |
return false; | |
}, () => (bool?)null); | |
} | |
return innerThm; | |
}); | |
_unassignedTasks.Swap(thm => | |
{ | |
var innerThm = thm; | |
foreach (var valueTuple in thm) | |
{ | |
valueTuple.Value.Find(taskId, s => | |
{ | |
var f = s.Head(); | |
var c = s.Reverse().FirstOrDefault(); | |
if (c is TaskCanceled or TaskCompleted || | |
f is not TaskRequest) | |
{ | |
innerThm = innerThm.AddOrUpdate(valueTuple.Key, | |
thm => thm.Remove(taskId), | |
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
} | |
return false; | |
}, () => (bool?)null); | |
} | |
return innerThm; | |
}); | |
if (foundAndCleared.HasValue == false) | |
{ | |
return new ClearResult(true, "no instances found"); | |
} | |
else | |
{ | |
if (foundAndCleared.Value) | |
{ | |
return new ClearResult(true, "cleared"); | |
} | |
else | |
{ | |
return new ClearResult(false, "Tasks not in closed state"); | |
} | |
} | |
} | |
private void HandleTaskRequest(TaskRequest tr) | |
{ | |
if (tr.RequestingUserId == UserId) | |
{ | |
_requestedTasks.AddOrUpdate(tr.TaskType, | |
(hm) => hm.TryAdd(tr.TaskId, Lst<ITaskEvent>.Empty.Add(tr)), | |
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty.Add(tr.TaskId, | |
Lst<ITaskEvent>.Empty.Add(tr))); | |
} | |
if (tr.AssignedUserId == UserId) | |
{ | |
_assignedTasks.AddOrUpdate(tr.TaskType, | |
(hm) => hm.TryAdd(tr.TaskId, Lst<ITaskEvent>.Empty.Add(tr)), | |
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty.Add(tr.TaskId, | |
Lst<ITaskEvent>.Empty.Add(tr))); | |
} | |
else if (tr.AssignedUserId == "unassigned") | |
{ | |
_unassignedTasks.AddOrUpdate(tr.TaskType, | |
(hm) => hm.TryAdd(tr.TaskId, Lst<ITaskEvent>.Empty.Add(tr)), | |
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty.Add(tr.TaskId, | |
Lst<ITaskEvent>.Empty.Add(tr))); | |
} | |
} | |
private void HandleTaskUpdate(TaskUpdate tu) | |
{ | |
_assignedTasks.Find(tu.TaskType, hm => | |
{ | |
_assignedTasks.AddOrUpdate(tu.TaskType, | |
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu), | |
() => Lst<ITaskEvent>.Empty.Add(tu)), | |
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
return Unit.Default; | |
}, () => Unit.Default); | |
_requestedTasks.Find(tu.TaskType, hm => | |
{ | |
_requestedTasks.AddOrUpdate(tu.TaskType, | |
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu), | |
() => Lst<ITaskEvent>.Empty.Add(tu)), | |
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
return Unit.Default; | |
}, () => Unit.Default); | |
_unassignedTasks.Find(tu.TaskType, hm => | |
{ | |
_unassignedTasks.AddOrUpdate(tu.TaskType, | |
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu), | |
() => Lst<ITaskEvent>.Empty.Add(tu)), | |
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
return Unit.Default; | |
}, () => Unit.Default); | |
} | |
private void HandleTaskCanceled(TaskCanceled taskCanceled) | |
{ | |
_assignedTasks.Find(taskCanceled.TaskType, hm => | |
{ | |
_assignedTasks.AddOrUpdate(taskCanceled.TaskType, | |
(e) => e.AddOrUpdate(taskCanceled.TaskId, | |
(s) => s.Add(taskCanceled), | |
() => Lst<ITaskEvent>.Empty.Add(taskCanceled)), | |
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
return Unit.Default; | |
}, () => Unit.Default); | |
_requestedTasks.Find(taskCanceled.TaskType, hm => | |
{ | |
_requestedTasks.AddOrUpdate(taskCanceled.TaskType, | |
(e) => e.AddOrUpdate(taskCanceled.TaskId, | |
(s) => s.Add(taskCanceled), | |
() => Lst<ITaskEvent>.Empty.Add(taskCanceled)), | |
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
return Unit.Default; | |
}, () => Unit.Default); | |
_unassignedTasks.Find(taskCanceled.TaskType, hm => | |
{ | |
_unassignedTasks.AddOrUpdate(taskCanceled.TaskType, | |
(e) => e.AddOrUpdate(taskCanceled.TaskId, | |
(s) => s.Add(taskCanceled), | |
() => Lst<ITaskEvent>.Empty.Add(taskCanceled)), | |
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
return Unit.Default; | |
}, () => Unit.Default); | |
} | |
private void HandleTaskCompleted(TaskCompleted taskCompleted) | |
{ | |
_assignedTasks.Find(taskCompleted.TaskType, hm => | |
{ | |
_assignedTasks.AddOrUpdate(taskCompleted.TaskType, | |
(e) => e.AddOrUpdate(taskCompleted.TaskId, | |
(s) => s.Add(taskCompleted), | |
() => Lst<ITaskEvent>.Empty.Add(taskCompleted)), | |
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
return Unit.Default; | |
}, () => Unit.Default); | |
_requestedTasks.Find(taskCompleted.TaskType, hm => | |
{ | |
_requestedTasks.AddOrUpdate(taskCompleted.TaskType, | |
(e) => e.AddOrUpdate(taskCompleted.TaskId, | |
(s) => s.Add(taskCompleted), | |
() => Lst<ITaskEvent>.Empty.Add(taskCompleted)), | |
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
return Unit.Default; | |
}, () => Unit.Default); | |
_unassignedTasks.Find(taskCompleted.TaskType, hm => | |
{ | |
_unassignedTasks.AddOrUpdate(taskCompleted.TaskType, | |
(e) => e.AddOrUpdate(taskCompleted.TaskId, | |
(s) => s.Add(taskCompleted), | |
() => Lst<ITaskEvent>.Empty.Add(taskCompleted)), | |
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
return Unit.Default; | |
}, () => Unit.Default); | |
} | |
private void HandleTaskAssigned(TaskAssigned ta) | |
{ | |
if (ta.UserId == UserId) | |
{ | |
_assignedTasks.AddOrUpdate(ta.TaskType, (c) => | |
{ | |
return c.AddOrUpdate(ta.TaskId, | |
(l) => l.AddRange(ta.History), | |
Lst<ITaskEvent>.Empty.Add(ta)); | |
}, | |
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty | |
.Add( | |
ta.TaskId, ta.History)); | |
} | |
else | |
{ | |
//TODO: get rid of task types. | |
_assignedTasks.AddOrUpdate(ta.TaskType, | |
(c) => { return c.Remove(ta.TaskId); }, LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty); | |
} | |
} | |
private void HandleTaskBegan(TaskBegan tb) | |
{ | |
_requestedTasks.Find(tb.TaskType, | |
_ => | |
{ | |
_requestedTasks.AddOrUpdate( | |
tb.TaskType, | |
(t => t.AddOrUpdate(tb.TaskId, | |
(e) => e.Add(tb), | |
Lst<ITaskEvent>.Empty.Add(tb))), | |
() => { return default; }); | |
return Unit.Default; | |
}, () => Unit.Default); | |
return; | |
} | |
} | |
public record ReassignResult(Ulid TaskId, bool Success, string Message); | |
public record ClearResult(bool Success, string? Reason); | |
public record DmConversationSet(HashMap<string, Lst<DirectMessage>> User) | |
{ | |
public DmConversationSet AddMsg(string arg1FromUserId, | |
DirectMessage directMessage) | |
{ | |
return this with | |
{ | |
User = User.AddOrUpdate(arg1FromUserId, | |
(a) => a.Contains(directMessage) ? a : a.Add(directMessage), | |
() => new Lst<DirectMessage>().Add(directMessage)) | |
}; | |
} | |
public DmConversationSet AddOrMarkMessageReceived(string arg1ToUserId, | |
DirectMessage directMessage) | |
{ | |
return AddMsg(arg1ToUserId, directMessage); | |
} | |
} | |
public class ChatTaskClientSerializer<T> : INatsSerializer<T> | |
{ | |
public static ChatTaskClientSerializer<T> Default = | |
new ChatTaskClientSerializer<T>(); | |
public void Serialize(IBufferWriter<byte> bufferWriter, T value) | |
{ | |
MessagePackSerializer.Serialize(bufferWriter, value, | |
MessagePackSerializerOptions.Standard); | |
} | |
public T? Deserialize(in ReadOnlySequence<byte> buffer) | |
{ | |
return MessagePackSerializer.Deserialize<T>(buffer, | |
MessagePackSerializerOptions.Standard); | |
} | |
} | |
public interface ISubjectFactory | |
{ | |
string GetSubject(); | |
} | |
public interface ITaskEvent : IChatTaskClient | |
{ | |
Ulid TaskId { get; init; } | |
string TaskType { get; init; } | |
} | |
[Union(0, typeof(BroadcastMessage))] | |
[Union(1, typeof(TaskRequest))] | |
[Union(2, typeof(TaskCompleted))] | |
[Union(3, typeof(TaskBegan))] | |
[Union(4, typeof(TaskUpdate))] | |
[Union(5, typeof(TaskAssigned))] | |
public interface IChatTaskClient : ISubjectFactory | |
{ | |
} | |
[MessagePackObject(true)] | |
public record BroadcastMessage( | |
string FromUserId, | |
string Text) : IChatTaskClient | |
{ | |
public DateTimeOffset TimeSent { get; set; } | |
public string GetSubject() | |
{ | |
return SubjectHelpers.BroadcastSubject; | |
} | |
} | |
public static class SubjectHelpers | |
{ | |
public static string BroadcastSubject => | |
$"sample.chat.message.broadcast"; | |
public static string DirectMessage(string from, string to) => | |
$"sample.chat.message.direct.{from}.{to}"; | |
public static string | |
Tasks(string scope, string type, Ulid taskId, string requested, | |
string assigned) => | |
$"sample.task.{scope}.{type},{taskId}.{requested}.{assigned}"; | |
public static string Tasks(string scope, string type, Ulid taskId, | |
string doneByUser) | |
{ | |
return $"sample.task.{scope}.{type},{taskId}.{doneByUser}"; | |
} | |
public static string TaskUserSub(string userId) | |
{ | |
return $"sample.task.>"; | |
} | |
} | |
[MessagePackObject(true)] | |
public record DirectMessage( | |
string FromUserId, | |
string ToUserId, | |
string Text) : IChatTaskClient | |
{ | |
public string GetSubject() | |
{ | |
return SubjectHelpers.DirectMessage(FromUserId, ToUserId); | |
} | |
} | |
[MessagePackObject(true)] | |
public record TaskRequest( | |
string RequestingUserId, | |
string? AssignedUserId, | |
Ulid TaskId, | |
string TaskType, | |
string Text, | |
DateTimeOffset RequestedAt, | |
DateTimeOffset ExpectedBy) : IChatTaskClient, ITaskEvent | |
{ | |
public string GetSubject() | |
{ | |
return SubjectHelpers.Tasks("request", TaskType, TaskId, | |
RequestingUserId, | |
AssignedUserId != null ? AssignedUserId : "unassigned"); | |
} | |
} | |
[MessagePackObject(true)] | |
public record TaskCanceled( | |
Ulid TaskId, | |
string TaskType, | |
string CanceledByUser, | |
string CancelReason, | |
DateTimeOffset CancelTime) : IChatTaskClient, ITaskEvent | |
{ | |
public string GetSubject() | |
{ | |
return SubjectHelpers.Tasks("canceled", TaskType, TaskId, | |
CanceledByUser); | |
} | |
} | |
[MessagePackObject(true)] | |
public record TaskBegan( | |
Ulid TaskId, | |
string TaskType, | |
string UserId, | |
string Text, | |
DateTimeOffset BeginTime) : IChatTaskClient, ITaskEvent | |
{ | |
public string GetSubject() | |
{ | |
return SubjectHelpers.Tasks("begun", TaskType, TaskId, UserId); | |
} | |
} | |
[MessagePackObject(true)] | |
public record TaskUpdate( | |
Ulid TaskId, | |
string TaskType, | |
string UserId, | |
string Notes) : IChatTaskClient, ITaskEvent | |
{ | |
public string GetSubject() | |
{ | |
return SubjectHelpers.Tasks("update", TaskType, TaskId, UserId); | |
} | |
} | |
[MessagePackObject(true)] | |
public record TaskCompleted( | |
Ulid TaskId, | |
string TaskType, | |
string UserId, | |
string Notes, | |
DateTimeOffset CompletedTime) : IChatTaskClient, ITaskEvent | |
{ | |
public string GetSubject() | |
{ | |
return SubjectHelpers.Tasks("completed", TaskType, TaskId, UserId); | |
} | |
} | |
[MessagePackObject(true)] | |
public record TaskAssigned( | |
Ulid TaskId, | |
string TaskType, | |
string UserId, | |
string AssignedByUserId, | |
string Notes, | |
Lst<ITaskEvent> History, | |
DateTimeOffset AssignedTime | |
) : IChatTaskClient, ITaskEvent | |
{ | |
public string GetSubject() | |
{ | |
return SubjectHelpers.Tasks("assigned", TaskType, TaskId, UserId, | |
AssignedByUserId); | |
} | |
} | |
public class ClientCoordinator | |
{ | |
public string UserId { get; } | |
private readonly Atom<ChatHistory> _broadcastHistory; | |
private readonly Atom<DmConversationSet> _directMessageHistory; | |
private readonly ClientBootstrap _client; | |
public ClientCoordinator(string userId, ClientBootstrap bootstrap) | |
{ | |
UserId = userId; | |
_client = bootstrap; | |
_broadcastHistory = | |
Atom(new ChatHistory(default)); | |
_directMessageHistory = | |
Atom(new DmConversationSet(LanguageExt | |
.HashMap<string, Lst<DirectMessage>>.Empty)); | |
} | |
public async ValueTask Begin(CancellationToken token) | |
{ | |
var broadcastSub = | |
await _client.Sub<BroadcastMessage>( | |
"sample.chat.message.broadcast", | |
token); | |
var dmSub = | |
await _client.Sub<DirectMessage>( | |
$"sample.chat.message.direct.*.{UserId}", | |
token); | |
var outboundDmSub | |
= await _client.Sub<DirectMessage>( | |
$"sample.chat.message.direct.{UserId}.*", token); | |
Task.Run(async () => | |
{ | |
try | |
{ | |
await foreach (var reader in broadcastSub.Msgs.ReadAllAsync( | |
token)) | |
{ | |
_broadcastHistory.Swap(reader.Data!, | |
(m, h) => h.AddMsg(m)); | |
} | |
} | |
catch | |
{ | |
} | |
}); | |
Task.Run(async () => | |
{ | |
try | |
{ | |
await foreach | |
(var reader in outboundDmSub.Msgs.ReadAllAsync( | |
token)) | |
{ | |
_directMessageHistory.Swap(reader.Data!, | |
(m, h) => | |
{ | |
return h.AddOrMarkMessageReceived(m.ToUserId, | |
m); | |
}); | |
} | |
} | |
catch | |
{ | |
} | |
}); | |
Task.Run(async () => | |
{ | |
try | |
{ | |
await foreach (var reader in dmSub.Msgs.ReadAllAsync( | |
token)) | |
{ | |
_directMessageHistory.Swap(reader.Data!, UserId, | |
(m, h, v) => | |
{ | |
if (m.FromUserId != h) | |
{ | |
return v.AddMsg(m.FromUserId, m); | |
} | |
else | |
{ | |
return v.AddMsg(m.ToUserId, m); | |
} | |
}); | |
} | |
} | |
catch | |
{ | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment