Skip to content

Instantly share code, notes, and snippets.

@stevenxi
Created May 28, 2019 07:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stevenxi/34821c465eb09f72b5b1d3cc8023ffde to your computer and use it in GitHub Desktop.
Save stevenxi/34821c465eb09f72b5b1d3cc8023ffde to your computer and use it in GitHub Desktop.
Wrapper for auto-delete queue for EasyNetQ
public class SafeBusWrapper : IDisposable
{
private static readonly Logger Log = LogManager.GetCurrentClassLogger();
private readonly IAdvancedBus _advancedBus;
private readonly IBus _bus;
public SafeBusWrapper(string connecitonString, Action<IServiceRegister> registerServices)
{
_bus = RabbitHutch.CreateBus(connecitonString, registerServices);
_advancedBus = _bus.Advanced;
_advancedBus.Disconnected += OnDisconnectedPrivate;
_advancedBus.Connected += OnConnectPrivate;
}
public bool IsConnected => _bus.Advanced.IsConnected;
#region Auto Delete
private readonly ConcurrentDictionary<Guid, BindingWrapper> _liveAutoDeleteWrappers = new ConcurrentDictionary<Guid, BindingWrapper>();
private IQueue DeclareNamedAutoDeleteQueue(string queueName)
{
return string.IsNullOrEmpty(queueName)
? _advancedBus.QueueDeclare($"AD:{Guid.NewGuid()}", autoDelete: true, exclusive: true, durable: false)
: _advancedBus.QueueDeclare(queueName, passive: false, durable: false, exclusive: false, autoDelete: true);
}
public IDisposable ConsumeAutoDeleteQueue<T>(string queueName, TenaciousLazy<IExchange> exchange, Func<IMessage<T>, MessageReceivedInfo, Task> consumer, Action<IConsumerConfiguration> configure, string routeKey, params string[] otherRouteKeys) where T : class
{
Func<IDisposable> createBindConsume = () =>
{
var queue = DeclareNamedAutoDeleteQueue(queueName);
_advancedBus.Bind(exchange.Value, queue, routeKey);
if (otherRouteKeys != null && otherRouteKeys.Length > 0)
{
foreach (var k in otherRouteKeys)
{
_advancedBus.Bind(exchange.Value, queue, k);
}
}
return _advancedBus.Consume(queue, consumer, configure);
};
var w = new BindingWrapper(createBindConsume);
_liveAutoDeleteWrappers.TryAdd(w.Key, w);
w.Disposed += BindingWrapperDisposed;
return w;
}
public IDisposable ConsumeAutoDeleteQueue<T>(string queueName, TenaciousLazy<IExchange> exchange, Action<IMessage<T>, MessageReceivedInfo> consumer, string routeKey, params string[] otherRouteKeys) where T : class
{
Func<IDisposable> createBindConsume = () =>
{
var queue = DeclareNamedAutoDeleteQueue(queueName);
_advancedBus.Bind(exchange.Value, queue, routeKey);
if (otherRouteKeys != null && otherRouteKeys.Length > 0)
{
foreach (var k in otherRouteKeys)
{
_advancedBus.Bind(exchange.Value, queue, k);
}
}
return _advancedBus.Consume(queue, consumer);
};
var w = new BindingWrapper(createBindConsume);
_liveAutoDeleteWrappers.TryAdd(w.Key, w);
w.Disposed += BindingWrapperDisposed;
return w;
}
private void BindingWrapperDisposed(Guid key)
{
BindingWrapper w;
_liveAutoDeleteWrappers.TryRemove(key, out w);
}
#endregion
private void OnDisconnectedPrivate(object s, EventArgs e)
{
Log.Warn("* Bus disconnected, remove live auto-delete-queue consume");
Disconnected?.Invoke();
}
private void OnConnectPrivate(object s, EventArgs e)
{
Log.Info("* Bus re-connected, adding live auto-delete-queue consume");
var liveBindings = _liveAutoDeleteWrappers.ToArray();
foreach (var b in liveBindings)
{
b.Value.ReCreateBindConsume();
}
var pendingBindings = _pendingConsumer.ToArray();
foreach (var b in pendingBindings)
{
b.Value.CreateBindConsume();
}
OnConnect?.Invoke();
}
public event Action OnConnect;
public event Action Disconnected;
#region IDispose
public void Dispose()
{
Dispose(true);
}
~SafeBusWrapper()
{
Dispose(false);
}
private bool _disposed;
private void Dispose(bool disposing)
{
if (_disposed)
return;
_disposed = true;
_advancedBus.Disconnected -= OnDisconnectedPrivate;
_advancedBus.Disconnected -= OnDisconnectedPrivate;
if (disposing)
_bus.Dispose();
}
#endregion
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment