Skip to content

Instantly share code, notes, and snippets.

@bymyslf
Created April 9, 2021 13:09
Show Gist options
  • Save bymyslf/157e4bb3851c03d712a84f72da9ed42d to your computer and use it in GitHub Desktop.
Save bymyslf/157e4bb3851c03d712a84f72da9ed42d to your computer and use it in GitHub Desktop.
Redis Job Queue
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
public interface IJobQueue
{
Task Finish(string key, CancellationToken cancellationToken = default);
Task Requeue(string key, CancellationToken cancellationToken = default);
Task<IReadOnlyCollection<(string Key, string Value)>> PopAll(CancellationToken cancellationToken = default);
Task Enqueue(string value, CancellationToken cancellationToken = default);
Task<IDictionary<string, string>> GetAll(CancellationToken cancellationToken = default);
Task Clear(CancellationToken cancellationToken = default);
}
using StackExchange.Redis;
public class RedisConnection
{
private readonly Lazy<ConnectionMultiplexer> connection;
public RedisConnection(string connectionString)
{
if (string.IsNullOrEmpty(connectionString))
{
throw new ArgumentException(nameof(connectionString));
}
var options = ConfigurationOptions.Parse(connectionString);
this.connection = new Lazy<ConnectionMultiplexer>(() => ConnectionMultiplexer.Connect(options));
}
public IDatabase Database
=> this.connection.Value.GetDatabase();
}
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using StackExchange.Redis;
public class RedisJobQueue : IJobQueue
{
private readonly RedisConnection connection;
private readonly string failedQueue;
private readonly string processingQueue;
private IDatabase Database => connection.Database;
public RedisJobQueue(RedisConnection connection, string queueName)
{
this.connection = connection;
this.failedQueue = $"{queueName}:failedtasks";
this.processingQueue = $"{queueName}:processingtasks";
}
/// <summary>
/// When a job is finished, remove it from the processing queue and from the cache database.
/// </summary>
/// <param name="key"></param>
public async Task Finish(string key, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
await Database.ListRemoveAsync(processingQueue, key);
cancellationToken.ThrowIfCancellationRequested();
await Database.KeyDeleteAsync(key);
}
/// <summary>
/// When a job fails the execution, we want to add it to the failed queue and remove it from the processing queue.
/// </summary>
/// <param name="key"></param>
public async Task Requeue(string key, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
await Database.ListRemoveAsync(processingQueue, key);
cancellationToken.ThrowIfCancellationRequested();
Database.HashDelete(key, "active");
Database.HashIncrement(key, "failedcount");
Database.ListRightPush(failedQueue, key);
}
/// <summary>
/// Move key from FailedQueue to ProcessingQueue, get key value from cache.
///
/// Also set the active field. Indicates when job was retrieved so we can monitor its time.
/// </summary>
public async Task<IReadOnlyCollection<(string Key, string Value)>> PopAll(CancellationToken cancellationToken = default)
{
var result = new List<(string, string)>();
while (!cancellationToken.IsCancellationRequested)
{
string key = await Database.ListRightPopLeftPushAsync(failedQueue, processingQueue);
if (string.IsNullOrEmpty(key))
{
break;
}
cancellationToken.ThrowIfCancellationRequested();
var hashEntries = (await Database.HashGetAllAsync(key));
await Database.HashSetAsync(key, "active", (DateTime.UtcNow - new DateTime(1970, 1, 1)).TotalMilliseconds);
foreach (var entry in hashEntries)
{
cancellationToken.ThrowIfCancellationRequested();
if (entry.Name.IsNullOrEmpty || entry.Value.IsNullOrEmpty)
{
await Database.ListRemoveAsync(processingQueue, key);
continue;
}
result.Add((entry.Name, entry.Value));
}
}
return result;
}
/// <summary>
/// Add a failed job to the queue
/// </summary>
/// <param name="value">Value to be inserted</param>
public async Task Enqueue(string value, CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(value))
{
return;
}
cancellationToken.ThrowIfCancellationRequested();
var id = await Database.StringIncrementAsync($"{failedQueue}:jobid");
var key = $"{failedQueue}:{id}";
cancellationToken.ThrowIfCancellationRequested();
await Database.HashSetAsync(key, key, value);
cancellationToken.ThrowIfCancellationRequested();
await Database.ListLeftPushAsync(failedQueue, key);
}
/// <summary>
/// Get all values in the failed queue
/// </summary>
public async Task<IDictionary<string, string>> GetAll(CancellationToken cancellationToken = default)
{
var result = new Dictionary<string, string>();
while (!cancellationToken.IsCancellationRequested)
{
string key = await Database.ListRightPopLeftPushAsync(failedQueue, failedQueue);
if (string.IsNullOrEmpty(key) || result.ContainsKey(key))
{
break;
}
var entries = (await Database.HashGetAllAsync(key));
foreach (var entry in entries)
{
if (entry.Name.IsNullOrEmpty || entry.Value.IsNullOrEmpty)
{
continue;
}
result.Add(entry.Name, entry.Value);
}
}
return result;
}
/// <summary>
/// Delete all values from queue
/// </summary>
public async Task Clear(CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
await ClearQueue(processingQueue);
cancellationToken.ThrowIfCancellationRequested();
await DeleteKey($"{failedQueue}:jobid");
await ClearQueue(failedQueue);
async Task ClearQueue(string queueName)
{
string redisValue = await Database.ListLeftPopAsync(queueName);
while (!string.IsNullOrEmpty(redisValue))
{
await Database.KeyDeleteAsync(redisValue);
await DeleteKey(redisValue);
redisValue = await Database.ListLeftPopAsync(queueName);
}
await DeleteKey(queueName);
}
async Task DeleteKey(string key)
{
await Database.KeyDeleteAsync(key);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment