Skip to content

Instantly share code, notes, and snippets.

@yapaxi
Last active January 12, 2023 19:08
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yapaxi/9526ca734c8cc30173276c616324d33b to your computer and use it in GitHub Desktop.
Save yapaxi/9526ca734c8cc30173276c616324d33b to your computer and use it in GitHub Desktop.
Possible fix for starvation in redis force reconnection code
using System;
using System.Threading;
using System.Threading.Tasks;
using StackExchange.Redis;
namespace ConsoleApp38
{
class Program
{
static async Task Main(string[] args)
{
// DEMO CODE
// NOT FOR PRODUCTION PURPOSES
var redis = new Redis("CONNECTION STRING");
var connection = await redis.GetConnection(CancellationToken.None);
var cnsl = new CancellationTokenSource();
Console.CancelKeyPress += (a, b) =>
{
cnsl.Cancel();
b.Cancel = true;
};
Console.WriteLine("Running heath-loop");
var heathLoop = Task.Run(async () =>
{
while (!cnsl.IsCancellationRequested)
{
try
{
if (await redis.ConsiderReconnect(cnsl.Token))
{
return;
}
}
catch (OperationCanceledException) when (cnsl.IsCancellationRequested)
{
break;
}
catch (Exception e)
{
Console.Write($"Failure in heath loop: {e}");
}
}
Console.WriteLine($"Heath loop aborted");
});
Console.WriteLine("Running connection-loop");
var connectionLoop = Task.Run(async () =>
{
while (!cnsl.IsCancellationRequested)
{
try
{
var newConnection = await redis.GetConnection(cnsl.Token);
if (connection != newConnection)
{
Console.WriteLine($"Reconnected");
return;
}
}
catch (OperationCanceledException) when (cnsl.IsCancellationRequested)
{
break;
}
catch (Exception e)
{
Console.Write($"Failure in connection loop: {e}");
}
}
Console.WriteLine($"Did not reconnect");
});
await heathLoop;
await connectionLoop;
}
}
public class Redis
{
// config
internal static readonly TimeSpan ReconnectMinFrequency = TimeSpan.FromSeconds(10);
internal static readonly TimeSpan ReconnectErrorThreshold = TimeSpan.FromSeconds(5);
internal static readonly TimeSpan ConnectionLockTimeout = TimeSpan.FromSeconds(10);
internal static readonly TimeSpan ConnectionTimeout = TimeSpan.FromSeconds(20);
private readonly string _connectionString;
// state
private volatile ConnectionMultiplexer? _multiplexer;
private readonly SemaphoreSlim _connectionLock;
private readonly SemaphoreSlim _reconnectNoWaitLock;
private long _lastReconnectTicks;
private DateTimeOffset _firstError;
private DateTimeOffset _previousError;
public Redis(string connectionString)
{
if (string.IsNullOrWhiteSpace(connectionString))
{
throw new ArgumentException($"'{nameof(connectionString)}' cannot be null or whitespace.", nameof(connectionString));
}
_connectionString = connectionString;
_connectionLock = new SemaphoreSlim(1,1);
_reconnectNoWaitLock = new SemaphoreSlim(1, 1);
_lastReconnectTicks = DateTimeOffset.MinValue.UtcTicks;
_firstError = DateTimeOffset.MinValue;
_previousError = DateTimeOffset.MinValue;
_multiplexer = null;
}
public async Task<ConnectionMultiplexer> GetConnection(CancellationToken token)
{
var mr = _multiplexer;
if (mr is not null)
{
return mr;
}
bool taken = false;
try
{
taken = await _connectionLock.WaitAsync(ConnectionLockTimeout, token).ConfigureAwait(false);
if (!taken)
{
throw new Exception("Failed to get connection-lock: lock timeout");
}
mr = _multiplexer;
if (mr is not null)
{
return mr;
}
var connectionTimeoutTask = Task.Delay(ConnectionTimeout, token);
var connectionTask = ConnectionMultiplexer.ConnectAsync(_connectionString);
var winner = await Task.WhenAny(
connectionTask,
connectionTimeoutTask
).ConfigureAwait(false);
if (winner == connectionTimeoutTask)
{
throw new Exception("Failed to get connection-lock: connection timeout");
}
mr = await connectionTask.ConfigureAwait(false);
_multiplexer = mr;
return mr;
}
finally
{
if (taken)
{
_connectionLock.Release();
}
}
}
public async Task<bool> ConsiderReconnect(CancellationToken token)
{
if (_multiplexer is null)
{
return false;
}
var utcNow = DateTimeOffset.UtcNow;
var previousTicks = Interlocked.Read(ref _lastReconnectTicks);
var previousReconnect = new DateTimeOffset(previousTicks, TimeSpan.Zero);
var elapsedSinceLastReconnect = utcNow - previousReconnect;
if (elapsedSinceLastReconnect < ReconnectMinFrequency)
{
return false;
}
bool taken = false;
try
{
// only tests if reconnect is running
// but does not wait for it
taken = await _reconnectNoWaitLock.WaitAsync(TimeSpan.Zero).ConfigureAwait(false);
if (!taken)
{
return false;
}
utcNow = DateTimeOffset.UtcNow;
elapsedSinceLastReconnect = utcNow - previousReconnect;
if (_firstError == DateTimeOffset.MinValue)
{
_firstError = utcNow;
_previousError = utcNow;
return false;
}
if (elapsedSinceLastReconnect < ReconnectMinFrequency)
{
return false;
}
var elapsedSinceFirstError = utcNow - _firstError;
var elapsedSinceMostRecentError = utcNow - _previousError;
var shouldReconnect =
elapsedSinceFirstError >= ReconnectErrorThreshold
&& elapsedSinceMostRecentError <= ReconnectErrorThreshold;
_previousError = utcNow;
if (!shouldReconnect)
{
return false;
}
_firstError = DateTimeOffset.MinValue;
_previousError = DateTimeOffset.MinValue;
var oldMultiplexer = _multiplexer;
if (oldMultiplexer is not null)
{
try
{
await Task.WhenAny(
oldMultiplexer.CloseAsync(),
Task.Delay(ConnectionTimeout, token)
).ConfigureAwait(false);
}
catch
{
}
}
_multiplexer = null;
Interlocked.Exchange(ref _lastReconnectTicks, utcNow.UtcTicks);
return true;
}
finally
{
if (taken)
{
_reconnectNoWaitLock.Release();
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment