Skip to content

Instantly share code, notes, and snippets.

@shawnweisfeld
Created July 1, 2020 18:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shawnweisfeld/2665ad17cae0b2c2482d84fbb286cca2 to your computer and use it in GitHub Desktop.
Save shawnweisfeld/2665ad17cae0b2c2482d84fbb286cca2 to your computer and use it in GitHub Desktop.
public class StorageQueueSender : ISendMessages
{
private readonly IQueueConfiguration _configuration;
private readonly Dictionary<string, QueueClient> _queueClients = new Dictionary<string, QueueClient>();
public StorageQueueSender(IQueueConfiguration configuration)
{
_configuration = configuration;
GetOrCreateQueueClientIfNotExists();
}
public async Task SendAsync(IEnumerable<Replicatable> replicatables, string queueName = null, CancellationToken cancellationToken = default)
{
await Task.WhenAll(replicatables.Select(r => SendAsync(r, queueName, cancellationToken)));
}
public async Task SendAsync(Replicatable replicatable, string queueName, CancellationToken cancellationToken = default)
{
var queueClient = GetOrCreateQueueClientIfNotExists(queueName, cancellationToken);
var serializedMessage = JsonConvert.SerializeObject(replicatable);
await queueClient.SendMessageAsync(serializedMessage, timeToLive: TimeSpan.FromSeconds(-1), cancellationToken: cancellationToken);
}
private QueueClient GetOrCreateQueueClientIfNotExists(string queueName = null, CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(queueName))
{
queueName = _configuration.QueueName ?? ReplicationConfigurationDefaults.ReplicationQueueName;
}
if (!_queueClients.ContainsKey(queueName))
{
_queueClients[queueName] = new QueueClient(_configuration.QueueConnectionString, queueName);
_queueClients[queueName].CreateIfNotExists(cancellationToken: cancellationToken);
}
return _queueClients[queueName];
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment