Skip to content

Instantly share code, notes, and snippets.

Last active March 5, 2024 16:06
Show Gist options
  • Save itn3000/822691f05b67a95ca7d08891e21d0cc7 to your computer and use it in GitHub Desktop.
Save itn3000/822691f05b67a95ca7d08891e21d0cc7 to your computer and use it in GitHub Desktop.
NATS .NET Client-v2 + R3
using NATS.Client.Core;
using R3;
using System.Diagnostics;
await using var con = new NatsConnection();
await using var subscription = await con.SubscribeCoreAsync<string>("hoge");
using var cts = new CancellationTokenSource();
using var _ = Observable.CreateFrom<NatsMsg<string>, INatsSub<string>>(subscription, Generator)
.Subscribe(msg =>
Console.WriteLine($"{msg.Subject}, {msg.Size}, {msg.Data}");
await Task.WhenAll(
Task.Run(() =>
Task.Run(async () =>
var sw = new Stopwatch();
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));
while (await timer.WaitForNextTickAsync(cts.Token).ConfigureAwait(false))
await con.PublishAsync<string>("hoge", $"piyo: {sw.Elapsed}").ConfigureAwait(false);
catch (OperationCanceledException) { }
static IAsyncEnumerable<NatsMsg<string>> Generator(CancellationToken ct, INatsSub<string> subscription)
return subscription.Msgs.ReadAllAsync(ct);
using System.Threading.Tasks;
using Stride.Engine;
using NATS.Client.Core;
using R3;
namespace natsr3test
public class NatsPublisher : AsyncScript
// Declared public member fields and properties will show in the game studio
public override async Task Execute()
await using var con = new NatsConnection();
using var subscription = Observable.EveryUpdate()
.SubscribeAwait(async (v, ct) =>
await con.PublishAsync<int>("hoge", (int)v.FrameCount, cancellationToken: ct);
// Do stuff every new frame
await Script.NextFrame();
using System.Threading.Tasks;
using Stride.Engine;
using R3;
using NATS.Client.Core;
namespace natsr3test
public class NatsSubscriber : AsyncScript
// Declared public member fields and properties will show in the game studio
public override async Task Execute()
await using var con = new NatsConnection();
var subscription = await con.SubscribeCoreAsync<int>("hoge");
using var _ = Observable.CreateFrom(subscription, (ct, state) => state.Msgs.ReadAllAsync(ct))
.Subscribe(msg =>
Log.Info($"{msg.Subject}, {msg.Size}, {msg.Data}");
// Do stuff every new frame
await Script.NextFrame();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment