Skip to content

Instantly share code, notes, and snippets.

@darkl
Created November 6, 2023 15:03
Show Gist options
  • Save darkl/131596b6387e501cd182139de3d3251f to your computer and use it in GitHub Desktop.
Save darkl/131596b6387e501cd182139de3d3251f to your computer and use it in GitHub Desktop.
Reconnecting Pub/Sub
using System.Reactive.Linq;
using System.Reactive.Subjects;
using WampSharp.V2;
using WampSharp.V2.Client;
namespace Publisher
{
public class Program
{
public static async Task Main(string[] args)
{
DefaultWampChannelFactory factory =
new DefaultWampChannelFactory();
const string serverAddress = "ws://127.0.0.1:8080/ws";
IWampChannel channel =
factory.CreateJsonChannel(serverAddress, "realm1");
IDisposable disposable = null;
WampChannelReconnector reconnector =
new WampChannelReconnector(channel, async () =>
{
if (disposable != null)
{
disposable.Dispose();
}
await channel.Open().ConfigureAwait(false);
IWampRealmProxy realm = channel.RealmProxy;
ISubject<int> subject =
realm.Services.GetSubject<int>("com.myapp.topic1");
int counter = 0;
IObservable<long> timer =
Observable.Timer(TimeSpan.FromMilliseconds(0),
TimeSpan.FromMilliseconds(1000));
disposable =
timer.Subscribe(x =>
{
counter++;
Console.WriteLine("Publishing to topic 'com.myapp.topic1': " + counter);
try
{
subject.OnNext(counter);
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
});
});
reconnector.Start();
// This line is required in order to release the WebSocket thread, otherwise it will be blocked by the Console.ReadLine() line.
await Task.Yield();
Console.ReadLine();
}
}
}
using WampSharp.V2;
namespace Router
{
class Program
{
static void Main(string[] args)
{
Console.ForegroundColor = ConsoleColor.Green;
const string serverAddress = "http://127.0.0.1:8080/ws";
WampHost host = new DefaultWampHost(serverAddress);
Console.WriteLine("Router running on: " + serverAddress);
host.Open();
Console.ReadLine();
}
}
}
using WampSharp.V2;
using WampSharp.V2.Client;
namespace Subscriber
{
class Program
{
public static async Task Main(string[] args)
{
DefaultWampChannelFactory factory =
new DefaultWampChannelFactory();
const string serverAddress = "ws://127.0.0.1:8080/ws";
IWampChannel channel =
factory.CreateJsonChannel(serverAddress, "realm1");
WampChannelReconnector reconnector =
new WampChannelReconnector(channel,
async () =>
{
await channel.Open().ConfigureAwait(false);
IWampRealmProxy realmProxy = channel.RealmProxy;
int received = 0;
IDisposable subscription = null;
subscription =
realmProxy.Services.GetSubject<int>("com.myapp.topic1")
.Subscribe(x =>
{
Console.WriteLine($"Got Event: {x}");
received++;
if (received > 100)
{
Console.WriteLine("Closing ..");
subscription.Dispose();
}
},
ex =>
{
Console.WriteLine(ex);
});
});
reconnector.Start();
// This line is required in order to release the WebSocket thread, otherwise it will be blocked by the Console.ReadLine() line.
await Task.Yield();
Console.ReadLine();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment