Skip to content

Instantly share code, notes, and snippets.

@itn3000
Last active March 5, 2024 16:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save itn3000/822691f05b67a95ca7d08891e21d0cc7 to your computer and use it in GitHub Desktop.
Save itn3000/822691f05b67a95ca7d08891e21d0cc7 to your computer and use it in GitHub Desktop.
NATS .NET Client-v2 + R3
using NATS.Client.Core;
using R3;
using System.Diagnostics;
await using var con = new NatsConnection();
await using var subscription = await con.SubscribeCoreAsync<string>("hoge");
using var cts = new CancellationTokenSource();
{
using var _ = Observable.CreateFrom<NatsMsg<string>, INatsSub<string>>(subscription, Generator)
.Subscribe(msg =>
{
Console.WriteLine($"{msg.Subject}, {msg.Size}, {msg.Data}");
});
await Task.WhenAll(
Task.Run(() =>
{
Console.ReadLine();
cts.Cancel();
}),
Task.Run(async () =>
{
var sw = new Stopwatch();
sw.Start();
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));
try
{
while (await timer.WaitForNextTickAsync(cts.Token).ConfigureAwait(false))
{
await con.PublishAsync<string>("hoge", $"piyo: {sw.Elapsed}").ConfigureAwait(false);
}
}
catch (OperationCanceledException) { }
})
);
}
static IAsyncEnumerable<NatsMsg<string>> Generator(CancellationToken ct, INatsSub<string> subscription)
{
return subscription.Msgs.ReadAllAsync(ct);
}
using System.Threading.Tasks;
using Stride.Engine;
using NATS.Client.Core;
using R3;
namespace natsr3test
{
public class NatsPublisher : AsyncScript
{
// Declared public member fields and properties will show in the game studio
public override async Task Execute()
{
await using var con = new NatsConnection();
using var subscription = Observable.EveryUpdate()
.ThrottleFirstFrame(24)
.FrameCount()
.SubscribeAwait(async (v, ct) =>
{
await con.PublishAsync<int>("hoge", (int)v.FrameCount, cancellationToken: ct);
});
while(Game.IsRunning)
{
// Do stuff every new frame
await Script.NextFrame();
}
}
}
}
using System.Threading.Tasks;
using Stride.Engine;
using R3;
using NATS.Client.Core;
namespace natsr3test
{
public class NatsSubscriber : AsyncScript
{
// Declared public member fields and properties will show in the game studio
public override async Task Execute()
{
await using var con = new NatsConnection();
var subscription = await con.SubscribeCoreAsync<int>("hoge");
using var _ = Observable.CreateFrom(subscription, (ct, state) => state.Msgs.ReadAllAsync(ct))
.Subscribe(msg =>
{
Log.Info($"{msg.Subject}, {msg.Size}, {msg.Data}");
});
while(Game.IsRunning)
{
// Do stuff every new frame
await Script.NextFrame();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment