Skip to content

Instantly share code, notes, and snippets.

@2garryn
Created July 30, 2020 17:36
Show Gist options
  • Save 2garryn/685801c3b3f4434bebe6c296d7ec2112 to your computer and use it in GitHub Desktop.
Save 2garryn/685801c3b3f4434bebe6c296d7ec2112 to your computer and use it in GitHub Desktop.
public class ChannelPool
{
private readonly IConnection _connection;
private readonly int _size;
private readonly ChannelReader<PublishMessage> _chan;
public ChannelPool(IConnection connection, int size, ChannelReader<PublishMessage> chan)
{
_size = size;
_connection = connection;
_chan = chan;
}
public async Task Run()
{
Task[] tasks = new Task[_size];
for (var i = 0; i < _size; i++)
{
tasks[i] = new Task(async () => await RunWorker(), TaskCreationOptions.LongRunning);
tasks[i].Start();
}
await Task.WhenAll(tasks);
}
public async Task RunWorker()
{
var exchanges = new HashSet<string>();
var model = _connection.CreateModel();
while (await _chan.WaitToReadAsync())
{
if (_chan.TryRead(out var tsk))
{
if (!exchanges.Contains(tsk.ExchangeName))
{
model.ExchangeDeclare(tsk.ExchangeName, tsk.ExchangeType, true, false);
exchanges.Add(tsk.ExchangeName);
}
var props = model.CreateBasicProperties();
tsk.Properties(props);
model.BasicPublish(tsk.ExchangeName, tsk.RoutingKey, true, props, tsk.Body);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment