Skip to content

Instantly share code, notes, and snippets.

@micdenny
Last active February 14, 2017 13:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save micdenny/100f2945785e879919ff951baf1cf40d to your computer and use it in GitHub Desktop.
Save micdenny/100f2945785e879919ff951baf1cf40d to your computer and use it in GitHub Desktop.
ChannelPoolingClientCommandDispatcher
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using EasyNetQ;
using EasyNetQ.Producer;
using RabbitMQ.Client;
namespace EasyNetQSlowerThanRabbitMQ
{
public class ChannelPoolingClientCommandDispatcherFactory : IClientCommandDispatcherFactory
{
private readonly ConnectionConfiguration configuration;
private readonly IPersistentChannelFactory persistentChannelFactory;
public ChannelPoolingClientCommandDispatcherFactory(ConnectionConfiguration configuration, IPersistentChannelFactory persistentChannelFactory)
{
this.configuration = configuration;
this.persistentChannelFactory = persistentChannelFactory;
}
public IClientCommandDispatcher GetClientCommandDispatcher(IPersistentConnection connection)
{
return new ChannelPoolingClientCommandDispatcher(configuration, connection, persistentChannelFactory);
}
}
public class ChannelPoolingClientCommandDispatcher : IClientCommandDispatcher
{
private const int queueSize = 1;
private int dispatcherCount = Environment.ProcessorCount * 2;
private readonly IPersistentConnection connection;
private readonly CancellationTokenSource cancellation = new CancellationTokenSource();
private readonly BlockingCollection<Action> queue = new BlockingCollection<Action>(queueSize);
private readonly ObjectPool<IPersistentChannel> _persistentChannels;
public ChannelPoolingClientCommandDispatcher(
ConnectionConfiguration configuration,
IPersistentConnection connection,
IPersistentChannelFactory persistentChannelFactory)
{
this.connection = connection;
_persistentChannels = new ObjectPool<IPersistentChannel>(() => persistentChannelFactory.CreatePersistentChannel(connection));
for (int i = 0; i < dispatcherCount; i++)
{
StartDispatcherThread(configuration);
}
}
public T Invoke<T>(Func<IModel, T> channelAction)
{
try
{
return InvokeAsync(channelAction).Result;
}
catch (AggregateException e)
{
throw e.InnerException;
}
}
public void Invoke(Action<IModel> channelAction)
{
try
{
InvokeAsync(channelAction).Wait();
}
catch (AggregateException e)
{
throw e.InnerException;
}
}
public Task<T> InvokeAsync<T>(Func<IModel, T> channelAction)
{
var tcs = new TaskCompletionSource<T>();
try
{
queue.Add(() =>
{
if (cancellation.IsCancellationRequested)
{
tcs.TrySetCanceled();
return;
}
var persistentChannel = _persistentChannels.GetObject();
try
{
persistentChannel.InvokeChannelAction(channel => tcs.TrySetResult(channelAction(channel)));
}
catch (Exception e)
{
tcs.TrySetException(e);
}
finally
{
_persistentChannels.PutObject(persistentChannel);
}
}, cancellation.Token);
}
catch (OperationCanceledException)
{
tcs.TrySetCanceled();
}
return tcs.Task;
}
public Task InvokeAsync(Action<IModel> channelAction)
{
return InvokeAsync(x =>
{
channelAction(x);
return new NoContentStruct();
});
}
public void Dispose()
{
cancellation.Cancel();
IPersistentChannel persistentChannel;
while ((persistentChannel = _persistentChannels.GetObject()) != null)
{
persistentChannel.Dispose();
}
}
private void StartDispatcherThread(ConnectionConfiguration configuration)
{
var thread = new Thread(() =>
{
while (!cancellation.IsCancellationRequested)
{
try
{
var channelAction = queue.Take(cancellation.Token);
channelAction();
}
catch (OperationCanceledException)
{
break;
}
}
})
{ Name = "Client Command Dispatcher Thread", IsBackground = configuration.UseBackgroundThreads };
thread.Start();
}
private struct NoContentStruct
{
}
}
// TODO: we can replace this with a more robust implementation such:
// https://www.nuget.org/packages/CodeProject.ObjectPool/ (https://github.com/pomma89/ObjectPool)
// https://www.nuget.org/packages/Microsoft.Extensions.ObjectPool/ (https://github.com/aspnet/Common/tree/master/src/Microsoft.Extensions.ObjectPool)
public interface IObjectPool<T>
{
int Count { get; }
T GetObject();
void PutObject(T item);
}
public class ObjectPool<T> : IObjectPool<T>
{
private ConcurrentBag<T> _objects;
private Func<T> _objectGenerator;
public ObjectPool(Func<T> objectGenerator)
{
if (objectGenerator == null) throw new ArgumentNullException("objectGenerator");
_objects = new ConcurrentBag<T>();
_objectGenerator = objectGenerator;
}
public int Count => _objects.Count;
public T GetObject()
{
T item;
if (_objects.TryTake(out item)) return item;
return _objectGenerator();
}
public void PutObject(T item)
{
_objects.Add(item);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment