Skip to content

Instantly share code, notes, and snippets.

@ColinSullivan1
Created May 18, 2018 18:36
Show Gist options
  • Save ColinSullivan1/76a22429653bd7f58a5bbf86654219f9 to your computer and use it in GitHub Desktop.
Save ColinSullivan1/76a22429653bd7f58a5bbf86654219f9 to your computer and use it in GitHub Desktop.
C# Application, reconnect issue
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NATS.Client;
namespace ConsoleApp1
{
public class Program
{
private static IAsyncSubscription subscription;
private static IConnection subscriptionConnection;
private static IConnection publishConnection;
public static void Main(string[] args)
{
var cancellationTokenSource = new CancellationTokenSource();
var connectionFactory = new ConnectionFactory();
var options = ConnectionFactory.GetDefaultOptions();
options.Url = "nats://127.0.0.1:4222";
options.AllowReconnect = true;
options.DisconnectedEventHandler += (sender, eventArgs) =>
{
Console.Error.WriteLine("Server disconnected: " + eventArgs.Conn.LastError?.Message);
};
options.ReconnectedEventHandler += (sender, eventArgs) =>
{
Console.WriteLine("Server reconnected for connection: " + eventArgs.Conn.Opts.Name);
};
options.AsyncErrorEventHandler += (sender, eventArgs) =>
{
Console.Error.WriteLine("Async error: " + eventArgs.Error);
};
Task.Factory.StartNew(() =>
{
try
{
// Not threadsafe, but this is an example
options.Name = "subconn";
using (subscriptionConnection = connectionFactory.CreateConnection(options))
{
subscription = subscriptionConnection.SubscribeAsync("*.two", (sender, eventArgs) =>
{
var message = Encoding.UTF8.GetString(eventArgs.Message.Data);
Console.WriteLine($"Received Message: {message}, Subject: {eventArgs.Message.Subject}");
});
cancellationTokenSource.Token.WaitHandle.WaitOne();
subscription.Dispose();
}
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
}, cancellationTokenSource.Token);
Task.Factory.StartNew(() =>
{
// Not threadsafe, but this is an example
options.Name = "pubconn";
using (publishConnection = connectionFactory.CreateConnection(options))
{
var i = 0;
while (!cancellationTokenSource.IsCancellationRequested)
{
var message = $"Message {i}";
try
{
publishConnection.Publish($"{i}.two", Encoding.UTF8.GetBytes(message));
Console.WriteLine("Message published: " + message);
}
catch
{
Console.Error.WriteLine("Failed to publish message: " + message);
}
i++;
Thread.Sleep(500);
}
}
}, cancellationTokenSource.Token);
Task.Factory.StartNew(() =>
{
while (!cancellationTokenSource.IsCancellationRequested)
{
if (publishConnection != null && subscriptionConnection != null && subscription != null)
{
Console.WriteLine("Subscription Connection State: " + subscriptionConnection.State);
Console.WriteLine("Publish Connection State: " + subscriptionConnection.State);
Console.WriteLine("Subscription Is Valid: " + subscription.IsValid);
}
Thread.Sleep(1000);
}
}, cancellationTokenSource.Token);
Console.ReadKey();
cancellationTokenSource.Cancel();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment