Skip to content

Instantly share code, notes, and snippets.

@bboyle1234
Last active August 20, 2018 09:52
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bboyle1234/9ac5a469dbeb5ea38e7994837b008cda to your computer and use it in GitHub Desktop.
Save bboyle1234/9ac5a469dbeb5ea38e7994837b008cda to your computer and use it in GitHub Desktop.
using Apex.LoggingUtils;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
using Nito.AsyncEx;
using Orleans;
using Orleans.Runtime;
using Orleans.Streams;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static Apex.TaskUtilities.Tasks;
namespace Apex.Grains {
/// <summary>
/// Class recreates and connects a cluster client after a disconnection, raising events to inform consumers.
/// An example of this might be the interval while a cluster being restarted or upgraded.
/// </summary>
public abstract class ReconnectingClusterClient : IClusterClient {
public event Action<ReconnectingClusterClient> Connected;
public event Action<ReconnectingClusterClient> Disconnected;
public bool IsConnected { get; set; }
public Exception Error { get; private set; }
readonly ILogger Log;
readonly CancellationTokenSource Disposed = new CancellationTokenSource();
// This ManualResetEvent allows multiple consumers to await the same connection event without affecting each other.
// The trick is to make sure we manually reset it when disconnected.
readonly AsyncManualResetEvent ConnectedEvent = new AsyncManualResetEvent(false);
IClusterClient _client;
public ReconnectingClusterClient(ILoggerFactory loggerFactory) {
// Because we're an abstract class we don't use the generic method.
Log = loggerFactory.CreateLogger(GetType());
FireAndForget(Connect);
}
public async Task WaitForConnection(TimeSpan waitTime) {
using (var cts = new CancellationTokenSource(waitTime)) {
await WaitForConnection(cts.Token);
}
}
public async Task WaitForConnection(CancellationToken ct) {
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(ct, Disposed.Token)) {
await ConnectedEvent.WaitAsync(cts.Token);
}
}
async Task Connect() {
await CreateAndConnect();
if (Disposed.IsCancellationRequested) return; // We never managed to connect before we were disposed.
IsConnected = true;
ConnectedEvent.Set();
try {
Connected?.Invoke(this);
} catch (Exception x) {
Log.LogError(x, $"Exception thrown while executing the 'Connected' event handler.");
}
}
async Task CreateAndConnect() {
/// The orleans client has the ability to keep retrying connections before its first connection. (It won't retry after a disconnection)
/// In this implementation we don't use the built-in client's ability to retry connections during its first connection attempt, because
/// we found that it messed up stream subscriptions. Therefore we now recreate the client for each connection attempt.
int connectionFailCount = 0;
while (true) {
if (Disposed.IsCancellationRequested) return;
try {
var builder = new ClientBuilder();
Configure(builder);
builder.AddClusterConnectionLostHandler(OnDisconnected);
_client = builder.Build();
} catch (Exception x) {
Error = x;
Log.LogError(x, $"Exception thrown while configuring client.");
throw;
}
try {
/// This overload does not allow the client to reattempt connection if it fails the first time.
/// Tests showed that stream subscriptions didn't work so well when re-attempted connections were made,
/// so it was decided that on failed connection attempt, the client would be disposed, and a new one created.
await _client.Connect();
Log.LogInformation("Connected.");
return;
} catch (Exception x) {
Error = x;
_client.Dispose();
if (connectionFailCount++ < 5)
Log.LogWarning(x, $"Failed to connect.");
}
await Task.Delay(1000); // Attempt reconnection once per second.
}
}
protected abstract void Configure(ClientBuilder builder);
void OnDisconnected(object sender, EventArgs e) {
IsConnected = false;
ConnectedEvent.Reset();
_client?.Dispose();
if (Disposed.IsCancellationRequested) return;
try {
Disconnected?.Invoke(this);
} catch (Exception x) {
Log.LogError(x, $"Exception thrown while executing the 'Disconnected' event handler.");
}
FireAndForget(Connect);
}
public void Dispose() {
Disposed.Cancel();
_client?.Close(); // required for graceful (informing silo) disconnection
_client?.Dispose(); // does not inform silo
}
#region IClusterClient
// There is potential for null reference / object disposed exceptions here if these properties/methods are called while the _client is not properly connected.
// For code simplicity, and because the consumers "know" about it, I've decided not to add handling for that scenario.
public bool IsInitialized => _client.IsInitialized;
public IServiceProvider ServiceProvider => _client.ServiceProvider;
// uhhh --- yep, code smell, I know.
public Task Connect(Func<Exception, Task<bool>> retryFilter = null) => throw new NotImplementedException("This client starts connection attempts immediately. Try using 'WaitForConnection()' instead.");
public Task Close() => throw new NotImplementedException("Use Dispose() instead.");
public void Abort() => throw new NotImplementedException("Use Dispose() instead.");
public IStreamProvider GetStreamProvider(string name) => _client.GetStreamProvider(name);
public TGrainInterface GetGrain<TGrainInterface>(Guid primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidKey
=> _client.GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix);
public TGrainInterface GetGrain<TGrainInterface>(long primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerKey
=> _client.GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix);
public TGrainInterface GetGrain<TGrainInterface>(string primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithStringKey
=> _client.GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix);
public TGrainInterface GetGrain<TGrainInterface>(Guid primaryKey, string keyExtension, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidCompoundKey
=> _client.GetGrain<TGrainInterface>(primaryKey, keyExtension, grainClassNamePrefix);
public TGrainInterface GetGrain<TGrainInterface>(long primaryKey, string keyExtension, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerCompoundKey
=> _client.GetGrain<TGrainInterface>(primaryKey, keyExtension, grainClassNamePrefix);
public Task<TGrainObserverInterface> CreateObjectReference<TGrainObserverInterface>(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver
=> _client.CreateObjectReference<TGrainObserverInterface>(obj);
public Task DeleteObjectReference<TGrainObserverInterface>(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver
=> _client.DeleteObjectReference<TGrainObserverInterface>(obj);
public void BindGrainReference(IAddressable grain)
=> _client.BindGrainReference(grain);
#endregion
}
}
@martinothamar
Copy link

This is nice, thanks for sharing!

@bboyle1234
Copy link
Author

@martinothamar, I've updated it. In this new version, you inherit and override the "Configure()" method

@bboyle1234
Copy link
Author

@martinothamar, updated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment