Skip to content

Instantly share code, notes, and snippets.

@to11mtm
Created March 30, 2024 23:08
Show Gist options
  • Save to11mtm/043edae54477f505e0335771c06b1826 to your computer and use it in GitHub Desktop.
Save to11mtm/043edae54477f505e0335771c06b1826 to your computer and use it in GitHub Desktop.
Just some playing around with making a 'toy' NATS app for Chat and Tasks
using System.Buffers;
using LanguageExt;
using MessagePack;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.KeyValueStore;
using static LanguageExt.Prelude;
namespace NatsSandBox.Client;
public class ClientBootstrap
{
private NatsConnection _connection;
public void Startclient()
{
_connection = new NatsConnection(NatsOpts.Default with
{
});
//var js = new NatsJSContext(_connection, new NatsJSOpts(NatsOpts.Default));
//new NatsKVContext()
}
public async ValueTask SendMessage(IChatTaskClient command)
{
await _connection.PublishAsync(command.GetSubject(), command,
serializer: ChatTaskClientSerializer<IChatTaskClient>.Default);
}
public async ValueTask<INatsSub<T>> Sub<T>(string subject,
CancellationToken token) where T : IChatTaskClient
{
return await _connection.SubscribeCoreAsync<T>(subject,
null, ChatTaskClientSerializer<T>.Default, default, token);
}
}
public record ChatHistory(Lst<BroadcastMessage> Messages)
{
public ChatHistory AddMsg(BroadcastMessage message)
{
if (Messages.Contains(message))
{
return this;
}
return this with { Messages = Messages.Add(message) };
}
}
public class TaskSetCoordinator
{
public string UserId { get; }
public TaskSetCoordinator(string userId,
ClientBootstrap clientBootstrap,
AtomHashMap<string, HashMap<Ulid, Lst<ITaskEvent>>> assignedTasks,
AtomHashMap<string, HashMap<Ulid, Lst<ITaskEvent>>> requestedTasks,
AtomHashMap<string, HashMap<Ulid, Lst<ITaskEvent>>> unassignedTasks)
{
UserId = userId;
_client = clientBootstrap;
_assignedTasks = assignedTasks;
_requestedTasks = requestedTasks;
_unassignedTasks = unassignedTasks;
}
private AtomHashMap<string, HashMap<Ulid, Lst<ITaskEvent>>>
_assignedTasks;
private AtomHashMap<string, HashMap<Ulid, Lst<ITaskEvent>>>
_requestedTasks;
private AtomHashMap<string, HashMap<Ulid, Lst<ITaskEvent>>>
_unassignedTasks;
private readonly ClientBootstrap _client;
private Task _mainTask;
public async ValueTask CreateTask(string taskType, string text,
string? assignedToUser, DateTimeOffset expectedBy)
{
await _client.SendMessage(new TaskRequest(UserId, assignedToUser,
Ulid.NewUlid(), taskType, text, DateTimeOffset.UtcNow,
expectedBy));
}
public async ValueTask AddTaskNote(TaskRequest origReq,
string notes)
{
await _client.SendMessage(new TaskUpdate(origReq.TaskId,
origReq.TaskType, UserId, notes));
}
public async ValueTask<bool> AddTaskCanceled(TaskRequest origReq, string notes)
{
if (origReq.RequestingUserId == UserId)
{
await _client.SendMessage(new TaskCanceled(origReq.TaskId,
origReq.TaskType, UserId, notes, DateTimeOffset.Now));
return true;
}
return false;
}
public async ValueTask<ReassignResult> ReassignTask(Ulid taskId,
string newUser, string notes)
{
bool foundTask = false;
foreach (var requestedTask in _assignedTasks)
{
var item =
requestedTask.Value.Find(taskId, a => a,
static () => Lst<ITaskEvent>.Empty);
if (item != default)
{
foundTask = true;
await _client.SendMessage(new TaskAssigned(taskId, newUser,
requestedTask.Key, UserId, notes, item,
DateTimeOffset.Now));
//_assignedTasks.AddOrUpdate(requestedTask.Key,
// e => e.Remove(taskId),
// () => HashMap<Ulid, Lst<ITaskEvent>>.Empty);
return new ReassignResult(taskId, true, "Reassigned");
}
}
foreach (var requestedTask in _unassignedTasks)
{
var item =
requestedTask.Value.Find(taskId, a => a,
static () => Lst<ITaskEvent>.Empty);
if (item != default)
{
foundTask = true;
await _client.SendMessage(new TaskAssigned(taskId, newUser,
requestedTask.Key, UserId, notes, item,
DateTimeOffset.Now));
//_assignedTasks.AddOrUpdate(requestedTask.Key,
// e => e.Remove(taskId),
// () => HashMap<Ulid, Lst<ITaskEvent>>.Empty);
return new ReassignResult(taskId, true, "Reassigned");
}
}
return new ReassignResult(taskId, false, "Could not find task!");
}
public async ValueTask Init(CancellationToken token)
{
var userEvents =
await _client.Sub<ITaskEvent>(
SubjectHelpers.TaskUserSub(UserId), token);
_mainTask = Task.Run(async () =>
{
try
{
await foreach (var entry in userEvents.Msgs.ReadAllAsync(
token))
{
try
{
RunEventHandlers(entry);
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
});
}
private void RunEventHandlers(NatsMsg<ITaskEvent> entry)
{
var msg = entry.Data;
switch (msg)
{
case TaskAssigned ta:
{
HandleTaskAssigned(ta);
break;
}
case TaskBegan tb:
{
HandleTaskBegan(tb);
break;
}
case TaskCompleted tc:
{
HandleTaskCompleted(tc);
break;
}
case TaskRequest tr:
{
HandleTaskRequest(tr);
break;
}
case TaskUpdate tu:
{
HandleTaskUpdate(tu);
break;
}
case TaskCanceled tcan:
{
HandleTaskCanceled(tcan);
break;
}
}
}
public async ValueTask<ClearResult> ClearItem(Ulid taskId)
{
bool? foundAndCleared = null;
_requestedTasks.Swap(thm =>
{
var innerThm = thm;
foreach (var valueTuple in thm)
{
valueTuple.Value.Find(taskId, s =>
{
var f = s.Head();
var c = s.Reverse().FirstOrDefault();
if (c is TaskCanceled or TaskCompleted ||
f is not TaskRequest)
{
innerThm = innerThm.AddOrUpdate(valueTuple.Key,
thm => thm.Remove(taskId),
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty);
}
return false;
}, () => (bool?)null);
}
return innerThm;
});
_requestedTasks.Swap(thm =>
{
var innerThm = thm;
foreach (var valueTuple in thm)
{
valueTuple.Value.Find(taskId, s =>
{
var f = s.Head();
var c = s.Reverse().FirstOrDefault();
if (c is TaskCanceled or TaskCompleted ||
f is not TaskRequest)
{
innerThm = innerThm.AddOrUpdate(valueTuple.Key,
thm => thm.Remove(taskId),
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty);
}
return false;
}, () => (bool?)null);
}
return innerThm;
});
_unassignedTasks.Swap(thm =>
{
var innerThm = thm;
foreach (var valueTuple in thm)
{
valueTuple.Value.Find(taskId, s =>
{
var f = s.Head();
var c = s.Reverse().FirstOrDefault();
if (c is TaskCanceled or TaskCompleted ||
f is not TaskRequest)
{
innerThm = innerThm.AddOrUpdate(valueTuple.Key,
thm => thm.Remove(taskId),
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty);
}
return false;
}, () => (bool?)null);
}
return innerThm;
});
if (foundAndCleared.HasValue == false)
{
return new ClearResult(true, "no instances found");
}
else
{
if (foundAndCleared.Value)
{
return new ClearResult(true, "cleared");
}
else
{
return new ClearResult(false, "Tasks not in closed state");
}
}
}
private void HandleTaskRequest(TaskRequest tr)
{
if (tr.RequestingUserId == UserId)
{
_requestedTasks.AddOrUpdate(tr.TaskType,
(hm) => hm.TryAdd(tr.TaskId, Lst<ITaskEvent>.Empty.Add(tr)),
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty.Add(tr.TaskId,
Lst<ITaskEvent>.Empty.Add(tr)));
}
if (tr.AssignedUserId == UserId)
{
_assignedTasks.AddOrUpdate(tr.TaskType,
(hm) => hm.TryAdd(tr.TaskId, Lst<ITaskEvent>.Empty.Add(tr)),
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty.Add(tr.TaskId,
Lst<ITaskEvent>.Empty.Add(tr)));
}
else if (tr.AssignedUserId == "unassigned")
{
_unassignedTasks.AddOrUpdate(tr.TaskType,
(hm) => hm.TryAdd(tr.TaskId, Lst<ITaskEvent>.Empty.Add(tr)),
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty.Add(tr.TaskId,
Lst<ITaskEvent>.Empty.Add(tr)));
}
}
private void HandleTaskUpdate(TaskUpdate tu)
{
_assignedTasks.Find(tu.TaskType, hm =>
{
_assignedTasks.AddOrUpdate(tu.TaskType,
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu),
() => Lst<ITaskEvent>.Empty.Add(tu)),
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty);
return Unit.Default;
}, () => Unit.Default);
_requestedTasks.Find(tu.TaskType, hm =>
{
_requestedTasks.AddOrUpdate(tu.TaskType,
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu),
() => Lst<ITaskEvent>.Empty.Add(tu)),
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty);
return Unit.Default;
}, () => Unit.Default);
_unassignedTasks.Find(tu.TaskType, hm =>
{
_unassignedTasks.AddOrUpdate(tu.TaskType,
(e) => e.AddOrUpdate(tu.TaskId, (s) => s.Add(tu),
() => Lst<ITaskEvent>.Empty.Add(tu)),
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty);
return Unit.Default;
}, () => Unit.Default);
}
private void HandleTaskCanceled(TaskCanceled taskCanceled)
{
_assignedTasks.Find(taskCanceled.TaskType, hm =>
{
_assignedTasks.AddOrUpdate(taskCanceled.TaskType,
(e) => e.AddOrUpdate(taskCanceled.TaskId,
(s) => s.Add(taskCanceled),
() => Lst<ITaskEvent>.Empty.Add(taskCanceled)),
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty);
return Unit.Default;
}, () => Unit.Default);
_requestedTasks.Find(taskCanceled.TaskType, hm =>
{
_requestedTasks.AddOrUpdate(taskCanceled.TaskType,
(e) => e.AddOrUpdate(taskCanceled.TaskId,
(s) => s.Add(taskCanceled),
() => Lst<ITaskEvent>.Empty.Add(taskCanceled)),
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty);
return Unit.Default;
}, () => Unit.Default);
_unassignedTasks.Find(taskCanceled.TaskType, hm =>
{
_unassignedTasks.AddOrUpdate(taskCanceled.TaskType,
(e) => e.AddOrUpdate(taskCanceled.TaskId,
(s) => s.Add(taskCanceled),
() => Lst<ITaskEvent>.Empty.Add(taskCanceled)),
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty);
return Unit.Default;
}, () => Unit.Default);
}
private void HandleTaskCompleted(TaskCompleted taskCompleted)
{
_assignedTasks.Find(taskCompleted.TaskType, hm =>
{
_assignedTasks.AddOrUpdate(taskCompleted.TaskType,
(e) => e.AddOrUpdate(taskCompleted.TaskId,
(s) => s.Add(taskCompleted),
() => Lst<ITaskEvent>.Empty.Add(taskCompleted)),
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty);
return Unit.Default;
}, () => Unit.Default);
_requestedTasks.Find(taskCompleted.TaskType, hm =>
{
_requestedTasks.AddOrUpdate(taskCompleted.TaskType,
(e) => e.AddOrUpdate(taskCompleted.TaskId,
(s) => s.Add(taskCompleted),
() => Lst<ITaskEvent>.Empty.Add(taskCompleted)),
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty);
return Unit.Default;
}, () => Unit.Default);
_unassignedTasks.Find(taskCompleted.TaskType, hm =>
{
_unassignedTasks.AddOrUpdate(taskCompleted.TaskType,
(e) => e.AddOrUpdate(taskCompleted.TaskId,
(s) => s.Add(taskCompleted),
() => Lst<ITaskEvent>.Empty.Add(taskCompleted)),
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty);
return Unit.Default;
}, () => Unit.Default);
}
private void HandleTaskAssigned(TaskAssigned ta)
{
if (ta.UserId == UserId)
{
_assignedTasks.AddOrUpdate(ta.TaskType, (c) =>
{
return c.AddOrUpdate(ta.TaskId,
(l) => l.AddRange(ta.History),
Lst<ITaskEvent>.Empty.Add(ta));
},
() => LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty
.Add(
ta.TaskId, ta.History));
}
else
{
//TODO: get rid of task types.
_assignedTasks.AddOrUpdate(ta.TaskType,
(c) => { return c.Remove(ta.TaskId); }, LanguageExt.HashMap<Ulid, Lst<ITaskEvent>>.Empty);
}
}
private void HandleTaskBegan(TaskBegan tb)
{
_requestedTasks.Find(tb.TaskType,
_ =>
{
_requestedTasks.AddOrUpdate(
tb.TaskType,
(t => t.AddOrUpdate(tb.TaskId,
(e) => e.Add(tb),
Lst<ITaskEvent>.Empty.Add(tb))),
() => { return default; });
return Unit.Default;
}, () => Unit.Default);
return;
}
}
public record ReassignResult(Ulid TaskId, bool Success, string Message);
public record ClearResult(bool Success, string? Reason);
public record DmConversationSet(HashMap<string, Lst<DirectMessage>> User)
{
public DmConversationSet AddMsg(string arg1FromUserId,
DirectMessage directMessage)
{
return this with
{
User = User.AddOrUpdate(arg1FromUserId,
(a) => a.Contains(directMessage) ? a : a.Add(directMessage),
() => new Lst<DirectMessage>().Add(directMessage))
};
}
public DmConversationSet AddOrMarkMessageReceived(string arg1ToUserId,
DirectMessage directMessage)
{
return AddMsg(arg1ToUserId, directMessage);
}
}
public class ChatTaskClientSerializer<T> : INatsSerializer<T>
{
public static ChatTaskClientSerializer<T> Default =
new ChatTaskClientSerializer<T>();
public void Serialize(IBufferWriter<byte> bufferWriter, T value)
{
MessagePackSerializer.Serialize(bufferWriter, value,
MessagePackSerializerOptions.Standard);
}
public T? Deserialize(in ReadOnlySequence<byte> buffer)
{
return MessagePackSerializer.Deserialize<T>(buffer,
MessagePackSerializerOptions.Standard);
}
}
public interface ISubjectFactory
{
string GetSubject();
}
public interface ITaskEvent : IChatTaskClient
{
Ulid TaskId { get; init; }
string TaskType { get; init; }
}
[Union(0, typeof(BroadcastMessage))]
[Union(1, typeof(TaskRequest))]
[Union(2, typeof(TaskCompleted))]
[Union(3, typeof(TaskBegan))]
[Union(4, typeof(TaskUpdate))]
[Union(5, typeof(TaskAssigned))]
public interface IChatTaskClient : ISubjectFactory
{
}
[MessagePackObject(true)]
public record BroadcastMessage(
string FromUserId,
string Text) : IChatTaskClient
{
public DateTimeOffset TimeSent { get; set; }
public string GetSubject()
{
return SubjectHelpers.BroadcastSubject;
}
}
public static class SubjectHelpers
{
public static string BroadcastSubject =>
$"sample.chat.message.broadcast";
public static string DirectMessage(string from, string to) =>
$"sample.chat.message.direct.{from}.{to}";
public static string
Tasks(string scope, string type, Ulid taskId, string requested,
string assigned) =>
$"sample.task.{scope}.{type},{taskId}.{requested}.{assigned}";
public static string Tasks(string scope, string type, Ulid taskId,
string doneByUser)
{
return $"sample.task.{scope}.{type},{taskId}.{doneByUser}";
}
public static string TaskUserSub(string userId)
{
return $"sample.task.>";
}
}
[MessagePackObject(true)]
public record DirectMessage(
string FromUserId,
string ToUserId,
string Text) : IChatTaskClient
{
public string GetSubject()
{
return SubjectHelpers.DirectMessage(FromUserId, ToUserId);
}
}
[MessagePackObject(true)]
public record TaskRequest(
string RequestingUserId,
string? AssignedUserId,
Ulid TaskId,
string TaskType,
string Text,
DateTimeOffset RequestedAt,
DateTimeOffset ExpectedBy) : IChatTaskClient, ITaskEvent
{
public string GetSubject()
{
return SubjectHelpers.Tasks("request", TaskType, TaskId,
RequestingUserId,
AssignedUserId != null ? AssignedUserId : "unassigned");
}
}
[MessagePackObject(true)]
public record TaskCanceled(
Ulid TaskId,
string TaskType,
string CanceledByUser,
string CancelReason,
DateTimeOffset CancelTime) : IChatTaskClient, ITaskEvent
{
public string GetSubject()
{
return SubjectHelpers.Tasks("canceled", TaskType, TaskId,
CanceledByUser);
}
}
[MessagePackObject(true)]
public record TaskBegan(
Ulid TaskId,
string TaskType,
string UserId,
string Text,
DateTimeOffset BeginTime) : IChatTaskClient, ITaskEvent
{
public string GetSubject()
{
return SubjectHelpers.Tasks("begun", TaskType, TaskId, UserId);
}
}
[MessagePackObject(true)]
public record TaskUpdate(
Ulid TaskId,
string TaskType,
string UserId,
string Notes) : IChatTaskClient, ITaskEvent
{
public string GetSubject()
{
return SubjectHelpers.Tasks("update", TaskType, TaskId, UserId);
}
}
[MessagePackObject(true)]
public record TaskCompleted(
Ulid TaskId,
string TaskType,
string UserId,
string Notes,
DateTimeOffset CompletedTime) : IChatTaskClient, ITaskEvent
{
public string GetSubject()
{
return SubjectHelpers.Tasks("completed", TaskType, TaskId, UserId);
}
}
[MessagePackObject(true)]
public record TaskAssigned(
Ulid TaskId,
string TaskType,
string UserId,
string AssignedByUserId,
string Notes,
Lst<ITaskEvent> History,
DateTimeOffset AssignedTime
) : IChatTaskClient, ITaskEvent
{
public string GetSubject()
{
return SubjectHelpers.Tasks("assigned", TaskType, TaskId, UserId,
AssignedByUserId);
}
}
public class ClientCoordinator
{
public string UserId { get; }
private readonly Atom<ChatHistory> _broadcastHistory;
private readonly Atom<DmConversationSet> _directMessageHistory;
private readonly ClientBootstrap _client;
public ClientCoordinator(string userId, ClientBootstrap bootstrap)
{
UserId = userId;
_client = bootstrap;
_broadcastHistory =
Atom(new ChatHistory(default));
_directMessageHistory =
Atom(new DmConversationSet(LanguageExt
.HashMap<string, Lst<DirectMessage>>.Empty));
}
public async ValueTask Begin(CancellationToken token)
{
var broadcastSub =
await _client.Sub<BroadcastMessage>(
"sample.chat.message.broadcast",
token);
var dmSub =
await _client.Sub<DirectMessage>(
$"sample.chat.message.direct.*.{UserId}",
token);
var outboundDmSub
= await _client.Sub<DirectMessage>(
$"sample.chat.message.direct.{UserId}.*", token);
Task.Run(async () =>
{
try
{
await foreach (var reader in broadcastSub.Msgs.ReadAllAsync(
token))
{
_broadcastHistory.Swap(reader.Data!,
(m, h) => h.AddMsg(m));
}
}
catch
{
}
});
Task.Run(async () =>
{
try
{
await foreach
(var reader in outboundDmSub.Msgs.ReadAllAsync(
token))
{
_directMessageHistory.Swap(reader.Data!,
(m, h) =>
{
return h.AddOrMarkMessageReceived(m.ToUserId,
m);
});
}
}
catch
{
}
});
Task.Run(async () =>
{
try
{
await foreach (var reader in dmSub.Msgs.ReadAllAsync(
token))
{
_directMessageHistory.Swap(reader.Data!, UserId,
(m, h, v) =>
{
if (m.FromUserId != h)
{
return v.AddMsg(m.FromUserId, m);
}
else
{
return v.AddMsg(m.ToUserId, m);
}
});
}
}
catch
{
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment