Skip to content

Instantly share code, notes, and snippets.

@yapaxi
Last active June 14, 2021 04:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yapaxi/82d01b5fca6aed61f3dc0d9527d0d612 to your computer and use it in GitHub Desktop.
Save yapaxi/82d01b5fca6aed61f3dc0d9527d0d612 to your computer and use it in GitHub Desktop.
One-time Redis Connection
using StackExchange.Redis;
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp41
{
class Program
{
static async Task Main(string[] args)
{
var con = new Connection(
getMultiplexerTimeout: TimeSpan.FromSeconds(5),
redisConnectionString: "CONNECTION STRING"
);
await Task.WhenAll(Enumerable.Range(0, 100).Select(q => con.GetConnectionMultiplexer()));
}
}
public class Connection
{
private class Stats
{
public volatile int ConnectionFailedTimes;
public volatile string ConnectingSinceUtc;
public volatile string ConnectedAtUtc;
public volatile int SecondsSinceConnecting;
public override string ToString()
{
return $"ConnectionFailedTimes='{ConnectionFailedTimes}', Elapsed='{TimeSpan.FromSeconds(SecondsSinceConnecting)}', ConnectingSinceUtc='{ConnectingSinceUtc}', ConnectedAtUtc='{ConnectedAtUtc}'";
}
}
private readonly TaskCompletionSource<ConnectionMultiplexer> _completion;
private readonly Task _connectTask;
private readonly TimeSpan _getMultiplexerTimeout;
private readonly CancellationTokenSource _connectCancellation;
private readonly Stats _stats;
public Connection(
TimeSpan getMultiplexerTimeout,
string redisConnectionString
)
{
if (string.IsNullOrWhiteSpace(redisConnectionString))
{
throw new ArgumentException($"'{nameof(redisConnectionString)}' cannot be null or whitespace.", nameof(redisConnectionString));
}
_connectCancellation = new CancellationTokenSource();
_completion = new TaskCompletionSource<ConnectionMultiplexer>();
_getMultiplexerTimeout = getMultiplexerTimeout;
_stats = new Stats();
_connectTask = Task.Run(() => StartInitialConnectionLoop(
redisConnectionString,
_connectCancellation,
_completion,
_stats
));
}
private static async Task StartInitialConnectionLoop(
string connectionString,
CancellationTokenSource cancellation,
TaskCompletionSource<ConnectionMultiplexer> completion,
Stats stats
)
{
var since = DateTime.UtcNow;
stats.ConnectingSinceUtc = since.ToString("o");
while (!cancellation.IsCancellationRequested)
{
try
{
var mx = await ConnectionMultiplexer.ConnectAsync(connectionString).ConfigureAwait(false);
var utc = DateTime.UtcNow;
completion.SetResult(mx);
stats.ConnectedAtUtc = utc.ToString("o");
stats.SecondsSinceConnecting = (int)(utc - since).TotalSeconds;
return;
}
catch (Exception e)
{
//
// LOGGER
//
stats.ConnectionFailedTimes++;
}
try
{
await Task.Delay(TimeSpan.FromSeconds(1), cancellation.Token);
}
catch (OperationCanceledException)
{
}
}
completion.SetCanceled();
}
public async Task<ConnectionMultiplexer> GetConnectionMultiplexer(CancellationToken cancellationToken = default)
{
if (_completion.Task.IsCompletedSuccessfully)
{
return _completion.Task.Result;
}
else
{
var timeoutTask = Task.Delay(_getMultiplexerTimeout, cancellationToken);
var winner = await Task.WhenAny(
_completion.Task,
timeoutTask
).ConfigureAwait(false);
if (winner == timeoutTask)
{
cancellationToken.ThrowIfCancellationRequested();
throw new TimeoutException($"Timeout on getting connection multiplexer: {_stats}");
}
return _completion.Task.Result;
}
}
public bool TryGetConnectionMultiplexer(out ConnectionMultiplexer multiplexer)
{
if (_completion.Task.IsCompletedSuccessfully)
{
multiplexer = _completion.Task.Result;
return true;
}
else
{
multiplexer = null;
return false;
}
}
public void Close()
{
// implementation depends on service type
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment