Last active
December 13, 2015 17:19
-
-
Save jlew-cs/4946839 to your computer and use it in GitHub Desktop.
RedisMessageBus, copied from SignalR 1.0-rc2 and modified to attempt to reconnect when the connection is closed or an error occurs. Initial connection failures will retry until successful. Likewise, when a connection errors or closes unexpectedly after a successful initial connection, it will retry until successful. There is an exponentially-inc…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class RedisMessageBus : ScaleoutMessageBus | |
{ | |
private readonly string _server; | |
private readonly int _port; | |
private readonly string _password; | |
private readonly int _db; | |
private readonly string[] _keys; | |
private RedisConnection _connection; | |
private RedisSubscriberConnection _channel; | |
private Task _connectTask; | |
private readonly TaskQueue _publishQueue = new TaskQueue(); | |
publicRedisMessageBus(string server, int port, string password, int db, IEnumerable<string> keys, IDependencyResolver resolver) | |
: base(resolver) | |
{ | |
_server = server; | |
_port = port; | |
_password = password; | |
_db = db; | |
_keys = keys.ToArray(); | |
// Start the connection | |
_connectTask = NewConnection(); | |
} | |
protected override Task Send(Message[] messages) | |
{ | |
return EnsureConnection().Then(msgs => | |
{ | |
var taskCompletionSource = new TaskCompletionSource<object>(); | |
// Group messages by source (connection id) | |
var messagesBySource = msgs.GroupBy(m => m.Source); | |
SendImpl(messagesBySource.GetEnumerator(), taskCompletionSource); | |
return taskCompletionSource.Task; | |
}, | |
messages); | |
} | |
private void SendImpl(IEnumerator<IGrouping<string, Message>> enumerator, TaskCompletionSource<object> taskCompletionSource) | |
{ | |
if (!enumerator.MoveNext()) | |
{ | |
taskCompletionSource.TrySetResult(null); | |
} | |
else | |
{ | |
IGrouping<string, Message> group = enumerator.Current; | |
// Get the channel index we're going to use for this message | |
int index = Math.Abs(group.Key.GetHashCode()) % _keys.Length; | |
string key = _keys[index]; | |
// Increment the channel number | |
_connection.Strings.Increment(_db, key) | |
.Then((id, k) => | |
{ | |
var message = new RedisMessage(id, group.ToArray()); | |
return _connection.Publish(k, message.GetBytes()); | |
}, key) | |
.Then((enumer, tcs) => SendImpl(enumer, tcs), enumerator, taskCompletionSource) | |
.ContinueWithNotComplete(taskCompletionSource); | |
} | |
} | |
private Task EnsureConnection() | |
{ | |
return (_connectTask = _connectTask ?? NewConnection()); | |
} | |
private async Task NewConnection() | |
{ | |
DestroyConnection(); | |
_connection = new RedisConnection(host: _server, port: _port, password: _password); | |
_connection.Closed += OnConnectionClosed; | |
_connection.Error += OnConnectionError; | |
try | |
{ | |
await _connection.Open().Then(() => | |
{ | |
// Create a subscription channel in redis | |
_channel = _connection.GetOpenSubscriberChannel(); | |
// Subscribe to the registered connections | |
_channel.Subscribe(_keys, OnMessage); | |
// Dirty hack but it seems like subscribe returns before the actual | |
// subscription is properly setup in some cases | |
while (_channel.SubscriptionCount == 0) | |
{ | |
Thread.Sleep(500); | |
} | |
}); | |
} | |
catch (Exception ex) | |
{ | |
this.GetLogger().Error("Failed to open redis connection or subscribe to pubsub channel",ex); | |
DestroyConnection(); | |
throw; | |
} | |
} | |
private void DestroyConnection() | |
{ | |
if (_connection != null) | |
{ | |
_connection.Dispose(); | |
} | |
if (_channel != null) | |
{ | |
_channel.Dispose(); | |
} | |
_connectTask = null; | |
} | |
private void OnConnectionClosed(object sender, EventArgs e) | |
{ | |
this.GetLogger().Info("Redis connection closed"); | |
// Restart the connection on next request | |
_connectTask = null; | |
} | |
private void OnConnectionError(object sender, BookSleeve.ErrorEventArgs e) | |
{ | |
this.GetLogger().Error("Redis connection error", e.Exception); | |
// Restart the connection on next request | |
_connectTask = null; | |
} | |
private void OnMessage(string key, byte[] data) | |
{ | |
// The key is the stream id (channel) | |
var message = RedisMessage.Deserialize(data); | |
_publishQueue.Enqueue(() => OnReceived(key, (ulong)message.Id, message.Messages)); | |
} | |
protected override void Dispose(bool disposing) | |
{ | |
if (disposing) | |
{ | |
if (_channel != null) | |
{ | |
_channel.Unsubscribe(_keys); | |
_channel.Close(abort: true); | |
} | |
if (_connection != null) | |
{ | |
_connection.Close(abort: true); | |
} | |
} | |
base.Dispose(disposing); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment