Created
July 30, 2020 17:36
-
-
Save 2garryn/685801c3b3f4434bebe6c296d7ec2112 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class ChannelPool | |
{ | |
private readonly IConnection _connection; | |
private readonly int _size; | |
private readonly ChannelReader<PublishMessage> _chan; | |
public ChannelPool(IConnection connection, int size, ChannelReader<PublishMessage> chan) | |
{ | |
_size = size; | |
_connection = connection; | |
_chan = chan; | |
} | |
public async Task Run() | |
{ | |
Task[] tasks = new Task[_size]; | |
for (var i = 0; i < _size; i++) | |
{ | |
tasks[i] = new Task(async () => await RunWorker(), TaskCreationOptions.LongRunning); | |
tasks[i].Start(); | |
} | |
await Task.WhenAll(tasks); | |
} | |
public async Task RunWorker() | |
{ | |
var exchanges = new HashSet<string>(); | |
var model = _connection.CreateModel(); | |
while (await _chan.WaitToReadAsync()) | |
{ | |
if (_chan.TryRead(out var tsk)) | |
{ | |
if (!exchanges.Contains(tsk.ExchangeName)) | |
{ | |
model.ExchangeDeclare(tsk.ExchangeName, tsk.ExchangeType, true, false); | |
exchanges.Add(tsk.ExchangeName); | |
} | |
var props = model.CreateBasicProperties(); | |
tsk.Properties(props); | |
model.BasicPublish(tsk.ExchangeName, tsk.RoutingKey, true, props, tsk.Body); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment