Skip to content

Instantly share code, notes, and snippets.

@TheCloudlessSky
Created June 3, 2016 18:30
Show Gist options
  • Save TheCloudlessSky/7c60dd6657caea79dc5764ee8779df9e to your computer and use it in GitHub Desktop.
Save TheCloudlessSky/7c60dd6657caea79dc5764ee8779df9e to your computer and use it in GitHub Desktop.
using System;
using System.Linq;
using System.Collections.Generic;
using System.Globalization;
using System.Threading.Tasks;
using NHibernate.Cache;
using NHibernate.Util;
using System.Net.Sockets;
using StackExchange.Redis;
using System.Runtime.Caching;
namespace NHibernate.Caches.Redis
{
public class RedisCache : ICache
{
private const string cacheNamespacePrefix = "NHibernate-Cache:";
private static readonly IInternalLogger log = LoggerProvider.LoggerFor(typeof(RedisCache));
// The acquired locks do not need to be distributed into Redis because
// the same ISession will lock/unlock an object.
private readonly MemoryCache acquiredLocks = new MemoryCache("NHibernate.Caches.Redis.RedisCache");
private static readonly LuaScript getScript = LuaScript.Prepare(@"
if redis.call('sismember', @setOfActiveKeysKey, @key) == 1 then
local result = redis.call('get', @key)
if not result then
redis.call('srem', @setOfActiveKeysKey, @key)
end
return result
else
redis.call('del', @key)
return nil
end
");
private static readonly LuaScript slidingExpirationScript = LuaScript.Prepare(@"
local pttl = redis.call('pttl', @key)
if pttl <= tonumber(@slidingExpiration) then
redis.call('pexpire', @key, @expiration)
return true
else
return false
end
");
private static readonly LuaScript putScript = LuaScript.Prepare(@"
redis.call('sadd', @setOfActiveKeysKey, @key)
redis.call('set', @key, @value, 'PX', @expiration)
");
private static readonly LuaScript removeScript = LuaScript.Prepare(@"
redis.call('srem', @setOfActiveKeysKey, @key)
redis.call('del', @key)
");
private LuaScript unlockScript = LuaScript.Prepare(@"
if redis.call('get', @lockKey) == @lockValue then
return redis.call('del', @lockKey)
else
return 0
end
");
// Help with debugging scripts since exceptions are swallowed with FireAndForget.
//#if DEBUG
private const CommandFlags fireAndForgetFlags = CommandFlags.None;
//#else
// private const CommandFlags fireAndForgetFlags = CommandFlags.FireAndForget;
//#endif
private readonly bool useIndex;
private readonly ConnectionMultiplexer connectionMultiplexer;
private readonly RedisCacheProviderOptions options;
private readonly TimeSpan expiration;
private readonly TimeSpan slidingExpiration;
private readonly TimeSpan lockTimeout;
private readonly TimeSpan acquireLockTimeout;
public string RegionName { get; private set; }
internal RedisNamespace CacheNamespace { get; private set; }
public int Timeout { get { return Timestamper.OneMs * (int)lockTimeout.TotalMilliseconds; } }
private class LockData
{
public string Key { get; private set; }
public string LockKey { get; private set; }
public string LockValue { get; private set; }
public LockData(string key, string lockKey, string lockValue)
{
this.Key = key;
this.LockKey = lockKey;
this.LockValue = lockValue;
}
public override string ToString()
{
return "{ Key='" + Key + "', LockKey='" + LockKey + "', LockValue='" + LockValue + "' }";
}
}
public RedisCache(string regionName, ConnectionMultiplexer connectionMultiplexer, RedisCacheProviderOptions options, bool useIndex = true)
: this(new RedisCacheConfiguration(regionName), connectionMultiplexer, options, useIndex)
{
}
public RedisCache(RedisCacheConfiguration configuration, ConnectionMultiplexer connectionMultiplexer, RedisCacheProviderOptions options, bool useIndex = true)
{
configuration.ThrowIfNull("configuration")
.Validate();
RegionName = configuration.RegionName;
expiration = configuration.Expiration;
slidingExpiration = configuration.SlidingExpiration;
lockTimeout = configuration.LockTimeout;
acquireLockTimeout = configuration.AcquireLockTimeout;
this.connectionMultiplexer = connectionMultiplexer.ThrowIfNull("connectionMultiplexer");
this.options = options.ThrowIfNull("options")
.ShallowCloneAndValidate();
this.useIndex = useIndex;
log.DebugFormat("creating cache: regionName='{0}', expiration='{1}', lockTimeout='{2}', acquireLockTimeout='{3}'",
RegionName, expiration, lockTimeout, acquireLockTimeout
);
CacheNamespace = new RedisNamespace(cacheNamespacePrefix + RegionName);
}
public long NextTimestamp()
{
return Timestamper.Next();
}
public virtual void Put(object key, object value)
{
key.ThrowIfNull("key");
value.ThrowIfNull("value");
log.DebugFormat("put in cache: regionName='{0}', key='{1}'", RegionName, key);
try
{
var serializedValue = options.Serializer.Serialize(value);
var cacheKey = CacheNamespace.GetKey(key);
var setOfActiveKeysKey = CacheNamespace.GetSetOfActiveKeysKey();
var db = GetDatabase();
if (useIndex)
{
db.ScriptEvaluate(putScript, new
{
key = cacheKey,
setOfActiveKeysKey = setOfActiveKeysKey,
value = serializedValue,
expiration = expiration.TotalMilliseconds
}, fireAndForgetFlags);
}
else
{
db.StringSet(cacheKey, serializedValue, expiration, flags: fireAndForgetFlags);
}
}
catch (Exception e)
{
log.ErrorFormat("could not put in cache: regionName='{0}', key='{1}'", RegionName, key);
var evtArg = new ExceptionEventArgs(RegionName, RedisCacheMethod.Put, e);
options.OnException(this, evtArg);
if (evtArg.Throw)
{
throw new RedisCacheException(RegionName, "Failed to put item in cache. See inner exception.", e);
}
}
}
public virtual object Get(object key)
{
key.ThrowIfNull();
log.DebugFormat("get from cache: regionName='{0}', key='{1}'", RegionName, key);
try
{
var cacheKey = CacheNamespace.GetKey(key);
var setOfActiveKeysKey = CacheNamespace.GetSetOfActiveKeysKey();
var db = GetDatabase();
RedisValue[] resultValues;
if (useIndex)
{
resultValues = (RedisValue[])db.ScriptEvaluate(getScript, new
{
key = cacheKey,
setOfActiveKeysKey = setOfActiveKeysKey
});
}
else
{
resultValues = new[] { db.StringGet(cacheKey, fireAndForgetFlags) };
}
if (resultValues[0].IsNullOrEmpty)
{
log.DebugFormat("cache miss: regionName='{0}', key='{1}'", RegionName, key);
return null;
}
else
{
var serializedResult = resultValues[0];
var deserializedValue = options.Serializer.Deserialize(serializedResult);
if (deserializedValue != null && slidingExpiration != RedisCacheConfiguration.NoSlidingExpiration)
{
db.ScriptEvaluate(slidingExpirationScript, new
{
key = cacheKey,
expiration = expiration.TotalMilliseconds,
slidingExpiration = slidingExpiration.TotalMilliseconds
}, fireAndForgetFlags);
}
return deserializedValue;
}
}
catch (Exception e)
{
log.ErrorFormat("could not get from cache: regionName='{0}', key='{1}'", RegionName, key);
var evtArg = new ExceptionEventArgs(RegionName, RedisCacheMethod.Get, e);
options.OnException(this, evtArg);
if (evtArg.Throw)
{
throw new RedisCacheException(RegionName, "Failed to get item from cache. See inner exception.", e);
}
return null;
}
}
public virtual void Remove(object key)
{
key.ThrowIfNull();
log.DebugFormat("remove from cache: regionName='{0}', key='{1}'", RegionName, key);
try
{
var cacheKey = CacheNamespace.GetKey(key);
var setOfActiveKeysKey = CacheNamespace.GetSetOfActiveKeysKey();
var db = GetDatabase();
if (useIndex)
{
db.ScriptEvaluate(removeScript, new
{
key = cacheKey,
setOfActiveKeysKey = setOfActiveKeysKey
}, fireAndForgetFlags);
}
else
{
db.KeyDelete(cacheKey, fireAndForgetFlags);
}
}
catch (Exception e)
{
log.ErrorFormat("could not remove from cache: regionName='{0}', key='{1}'", RegionName, key);
var evtArg = new ExceptionEventArgs(RegionName, RedisCacheMethod.Remove, e);
options.OnException(this, evtArg);
if (evtArg.Throw)
{
throw new RedisCacheException(RegionName, "Failed to remove item from cache. See inner exception.", e);
}
}
}
public virtual void Clear()
{
log.DebugFormat("clear cache: regionName='{0}'", RegionName);
try
{
var db = GetDatabase();
if (useIndex)
{
var setOfActiveKeysKey = CacheNamespace.GetSetOfActiveKeysKey();
db.KeyDelete(setOfActiveKeysKey, fireAndForgetFlags);
}
else
{
var scanServer = connectionMultiplexer.GetEndPoints()
.Select(x => connectionMultiplexer.GetServer(x))
.Where(x => x.IsConnected)
.OrderByDescending(x => x.IsSlave)
.First();
var pattern = CacheNamespace.GetKey("") + "*";
var keysToDelete = scanServer.Keys(options.Database, pattern, pageSize: 1000).ToArray();
db.KeyDelete(keysToDelete);
}
}
catch (Exception e)
{
log.ErrorFormat("could not clear cache: regionName='{0}'", RegionName);
var evtArg = new ExceptionEventArgs(RegionName, RedisCacheMethod.Clear, e);
options.OnException(this, evtArg);
if (evtArg.Throw)
{
throw new RedisCacheException(RegionName, "Failed to clear cache. See inner exception.", e);
}
}
}
public virtual void Destroy()
{
// No-op since Redis is distributed.
log.DebugFormat("destroying cache: regionName='{0}'", RegionName);
}
public virtual void Lock(object key)
{
log.DebugFormat("acquiring cache lock: regionName='{0}', key='{1}'", RegionName, key);
try
{
var lockKey = CacheNamespace.GetLockKey(key);
var shouldRetry = options.AcquireLockRetryStrategy.GetShouldRetry();
var wasLockAcquired = false;
var shouldTryAcquireLock = true;
while (shouldTryAcquireLock)
{
var lockData = new LockData(
key: Convert.ToString(key),
lockKey: lockKey,
// Recalculated each attempt to ensure a unique value.
lockValue: options.LockValueFactory.GetLockValue()
);
if (TryAcquireLock(lockData))
{
wasLockAcquired = true;
shouldTryAcquireLock = false;
}
else
{
var shouldRetryArgs = new ShouldRetryAcquireLockArgs(
RegionName, lockData.Key, lockData.LockKey,
lockData.LockValue, lockTimeout, acquireLockTimeout
);
shouldTryAcquireLock = shouldRetry(shouldRetryArgs);
}
}
if (!wasLockAcquired)
{
var lockFailedArgs = new LockFailedEventArgs(
RegionName, key, lockKey,
lockTimeout, acquireLockTimeout
);
options.OnLockFailed(this, lockFailedArgs);
}
}
catch (Exception e)
{
log.ErrorFormat("could not acquire cache lock: regionName='{0}', key='{1}'", RegionName, key);
var evtArg = new ExceptionEventArgs(RegionName, RedisCacheMethod.Lock, e);
options.OnException(this, evtArg);
if (evtArg.Throw)
{
throw new RedisCacheException(RegionName, "Failed to lock item in cache. See inner exception.", e);
}
}
}
private bool TryAcquireLock(LockData lockData)
{
var db = GetDatabase();
// Don't use IDatabase.LockTake() because we don't use the matching
// LockRelease(). So, avoid any confusion. Besides, LockTake() just
// calls this anyways.
var wasLockAcquired = db.StringSet(lockData.LockKey, lockData.LockValue, lockTimeout, When.NotExists);
if (wasLockAcquired)
{
// It's ok to use Set() instead of Add() because the lock in
// Redis will cause other clients to wait.
acquiredLocks.Set(lockData.Key, lockData, absoluteExpiration: DateTime.UtcNow.Add(lockTimeout));
}
return wasLockAcquired;
}
public virtual void Unlock(object key)
{
// Use Remove() instead of Get() because we are releasing the lock
// anyways.
var lockData = acquiredLocks.Remove(Convert.ToString(key)) as LockData;
if (lockData == null)
{
log.WarnFormat("attempted to unlock '{0}' but a previous lock was not acquired or timed out", key);
var unlockFailedEventArgs = new UnlockFailedEventArgs(
RegionName, key, lockKey: null, lockValue: null
);
options.OnUnlockFailed(this, unlockFailedEventArgs);
return;
}
log.DebugFormat("releasing cache lock: regionName='{0}', key='{1}', lockKey='{2}', lockValue='{3}'",
RegionName, lockData.Key, lockData.LockKey, lockData.LockValue
);
try
{
var db = GetDatabase();
// Don't use IDatabase.LockRelease() because it uses watch/unwatch
// where we prefer an atomic operation (via a script).
var wasLockReleased = (bool)db.ScriptEvaluate(unlockScript, new
{
lockKey = lockData.LockKey,
lockValue = lockData.LockValue
});
if (!wasLockReleased)
{
log.WarnFormat("attempted to unlock '{0}' but it could not be released (it maybe timed out or was cleared in Redis)", lockData);
var unlockFailedEventArgs = new UnlockFailedEventArgs(
RegionName, key, lockData.LockKey, lockData.LockValue
);
options.OnUnlockFailed(this, unlockFailedEventArgs);
}
}
catch (Exception e)
{
log.ErrorFormat("could not release cache lock: regionName='{0}', key='{1}', lockKey='{2}', lockValue='{3}'",
RegionName, lockData.Key, lockData.LockKey, lockData.LockValue
);
var evtArg = new ExceptionEventArgs(RegionName, RedisCacheMethod.Unlock, e);
options.OnException(this, evtArg);
if (evtArg.Throw)
{
throw new RedisCacheException(RegionName, "Failed to unlock item in cache. See inner exception.", e);
}
}
}
private IDatabase GetDatabase()
{
return connectionMultiplexer.GetDatabase(options.Database);
}
}
}
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NHibernate.Cache;
using StackExchange.Redis;
using Xunit;
namespace NHibernate.Caches.Redis.Tests
{
public class RedisCacheLoadTests
{
private const int testDb = 15;
private const string testHost = "localhost";
private const int slowOperationThresholdMilliseconds = 10;
private const int numberOfKeys = 500000;
private readonly ConcurrentQueue<string> logQueue = new ConcurrentQueue<string>();
private static ConnectionMultiplexer CreateConnectionMultiplexer()
{
int testPort = 6379;
string connectionString = testHost + ":" + testPort + ",allowAdmin=true,abortConnect=false,syncTimeout=5000";
return ConnectionMultiplexer.Connect(connectionString);
}
private void Log(string message)
{
logQueue.Enqueue(message);
}
[Fact]
void has_index()
{
Run(useIndex: true);
}
[Fact]
void no_index()
{
Run(useIndex: false);
}
private void Run(bool useIndex)
{
var mux = CreateConnectionMultiplexer();
var masterServer = mux.GetServer(mux.GetEndPoints().First());
masterServer.FlushDatabase(testDb);
var cache1 = new RedisCache("region1", mux, new RedisCacheProviderOptions()
{
Database = testDb,
CacheConfigurations = new[] { new RedisCacheConfiguration("region1") { Expiration = TimeSpan.FromSeconds(1) } }
}, useIndex);
var cache2 = new RedisCache("region2", mux, new RedisCacheProviderOptions()
{
Database = testDb,
CacheConfigurations = new[] { new RedisCacheConfiguration("region2") { Expiration = TimeSpan.FromSeconds(60) } }
}, useIndex);
Setup(cache1, numberOfKeys);
var infoKeyspace = masterServer.Info("Keyspace");
Log($"Keyspace: {infoKeyspace.FirstOrDefault(x => x.Key == "Keyspace")?.FirstOrDefault(x => x.Key == "db" + testDb)}");
var taskReadWrite1_A = Task.Run(() => CacheReadWrite(cache1, duration: TimeSpan.FromSeconds(5)));
var taskReadWrite1_B = Task.Run(() => CacheReadWrite(cache1, duration: TimeSpan.FromSeconds(5)));
var taskReadWrite2_A = Task.Run(() => CacheReadWrite(cache2, duration: TimeSpan.FromSeconds(5)));
var taskReadWrite2_B = Task.Run(() => CacheReadWrite(cache2, duration: TimeSpan.FromSeconds(5)));
var taskClear = Task.Run(() => CacheClear(cache1, delay: TimeSpan.FromSeconds(1)));
Task.WaitAll(taskReadWrite1_A, taskReadWrite1_B, taskReadWrite2_A, taskReadWrite2_B, taskClear);
Console.WriteLine(String.Join("\n", logQueue));
}
private void Setup(ICache cache, int numberOfKeys)
{
var sw = Stopwatch.StartNew();
for (var i = 0; i < numberOfKeys; i++)
{
var key = Guid.NewGuid();
var value = Guid.NewGuid();
cache.Put(key, value);
}
sw.Stop();
Log($"Setup took {sw.Elapsed.TotalSeconds:F}s");
}
private void CacheReadWrite(ICache cache, TimeSpan duration)
{
var sw = Stopwatch.StartNew();
while (sw.Elapsed < duration)
{
var key = Guid.NewGuid();
var value = key;
var opTime = Stopwatch.StartNew();
cache.Get(key);
opTime.Stop();
if (opTime.ElapsedMilliseconds > slowOperationThresholdMilliseconds)
{
Log($"An ICache.Get for '{cache.RegionName}' was slow ({opTime.Elapsed.TotalMilliseconds:F}ms)");
}
opTime.Restart();
cache.Put(key, value);
opTime.Stop();
if (opTime.ElapsedMilliseconds > slowOperationThresholdMilliseconds)
{
Log($"An ICache.Put for '{cache.RegionName}' was slow ({opTime.Elapsed.TotalMilliseconds:F}ms)");
}
}
sw.Stop();
Log($"ReadWrite for {cache.RegionName} took {sw.Elapsed.TotalSeconds:F}s");
}
private void CacheClear(ICache cache, TimeSpan delay)
{
Thread.Sleep(delay);
Log("Starting clear");
var sw = Stopwatch.StartNew();
cache.Clear();
sw.Stop();
Log($"Clear took {sw.Elapsed.TotalMilliseconds:F}ms");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment