Skip to content

Instantly share code, notes, and snippets.

@pmunin
Last active January 22, 2016 01:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pmunin/2e48f13dc8f6673752a2 to your computer and use it in GitHub Desktop.
Save pmunin/2e48f13dc8f6673752a2 to your computer and use it in GitHub Desktop.
ReaderWriterLockByKey - allows to apply thread safety and read/write/upgrade locking by key to any dictionary-like object. Internally it uses ConcurrentDictionary and ReaderWriterLock (not ReaderWriterLockerSlim yet). Does not need disposing, unlike ReaderWriterLockerSlim.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Test
{
class Program
{
private static DateTime Start;
static void Main(string[] args)
{
var locker = new ReaderWriterLockByKey<string>();
Thread.Sleep(1001 - DateTime.Now.Millisecond);
Start = DateTime.Now;
Task.WaitAll(
Task.Run(() =>
{
WriteLine("T1 Waiting to lock for read");
using (locker.LockToRead("1"))
{
Thread.Sleep(50);
WriteLine("<T1read>");
Thread.Sleep(5000);
}
WriteLine("</T1read>");
}),
Task.Run(() =>
{
Thread.Sleep(1000);
WriteLine("T2 Waiting to lock for read");
using (locker.LockToRead("1"))
{
Thread.Sleep(50);
WriteLine("<T2read>");
Thread.Sleep(2500);
}
WriteLine("</T2read>");
}),
Task.Run(() =>
{
Thread.Sleep(1500);
WriteLine("T3 Waiting to lock for read");
using (locker.LockToRead("1"))
{
Thread.Sleep(50);
WriteLine("<T3read>");
Thread.Sleep(1000);
WriteLine("T3 Waiting to lock for write upgrade");
using (locker.UpgradeToWrite("1"))
{
Thread.Sleep(50);
WriteLine("<T3read.write>");
Thread.Sleep(1500);
}
WriteLine("</T3read.write>");
Thread.Sleep(1000);
}
WriteLine("</T3read>");
})
);
Console.WriteLine("Press any key");
Console.ReadKey();
/* Output:
Offset:00:00:00. Thread:6. T1 Waiting to lock for read
Offset:00:00:07. Thread:6. <T1read>
Offset:00:01:00. Thread:12. T2 Waiting to lock for read
Offset:00:01:05. Thread:12. <T2read>
Offset:00:01:50. Thread:14. T3 Waiting to lock for read
Offset:00:01:55. Thread:14. <T3read>
Offset:00:02:55. Thread:14. T3 Waiting to lock for write upgrade
Offset:00:03:55. Thread:12. </T2read>
Offset:00:05:07. Thread:6. </T1read>
Offset:00:05:12. Thread:14. <T3read.write>
Offset:00:06:62. Thread:14. </T3read.write>
Offset:00:07:62. Thread:14. </T3read>
Press any key
*/
}
static void WriteLine(string format, params object[] args)
{
var dtOffset = DateTime.Now - Start;
var str = string.Format(format, args);
Console.WriteLine(string.Format("Offset:{1}. Thread:{0}. " + str, Thread.CurrentThread.ManagedThreadId, dtOffset.ToString(@"mm\:ss\:ff")
//ToLongTimeString()
));
}
}
}
using System;
using System.Threading;
using System.Collections.Concurrent;
using System.Collections.Generic;
/// <summary>
/// Thread safe readWriteLocker by key
/// Latest version here: https://gist.github.com/pmunin/2e48f13dc8f6673752a2
/// ReaderWriterLockByKey - allows to apply thread safety and read/write/upgrade locking by key to any dictionary-like object.
/// Internally it uses ConcurrentDictionary and ReaderWriterLock (not ReaderWriterLockerSlim yet).
/// Does not need disposing, unlike ReaderWriterLockerSlim.
/// </summary>
/// <typeparam name="TKey"></typeparam>
public class ReaderWriterLockByKey<TKey>
{
internal class DisposableAction : IDisposable
{
public DisposableAction(Action onDispose)
{
this.onDispose = onDispose;
}
Action onDispose;
public void Dispose()
{
onDispose();
}
}
internal class SemaphoreDelegate
{
public SemaphoreDelegate(Action<SemaphoreDelegate> onZero)
{
this.onZero = onZero;
}
public SemaphoreDelegate(Action<SemaphoreDelegate> onLock = null, Action<SemaphoreDelegate> onRelease = null, Action<SemaphoreDelegate> onZero = null)
: this(onZero)
{
this.onLock = onLock;
this.onRelease = onRelease;
}
private Action<SemaphoreDelegate> onZero;
object locker = new object();
int counter = 0;
private Action<SemaphoreDelegate> onLock;
private Action<SemaphoreDelegate> onRelease;
public IDisposable Lock()
{
lock (locker)
{
counter++;
if (onLock != null)
onLock(this);
}
return new DisposableAction(Release);
}
public object State { get; set; }
public void Release()
{
lock (locker)
{
counter--;
if (onRelease != null)
onRelease(this);
if (counter == 0)
if (onZero != null)
onZero(this);
}
}
}
public ReaderWriterLockByKey(IEqualityComparer<TKey> keyComparer = null, int timeoutMs = 15000)
{
this.lockers =
keyComparer==null
?new ConcurrentDictionary<TKey, ReaderWriterLock>()
:new ConcurrentDictionary<TKey, ReaderWriterLock>(keyComparer);
this.counters =
keyComparer==null
? new ConcurrentDictionary<TKey, SemaphoreDelegate>()
: new ConcurrentDictionary<TKey, SemaphoreDelegate>(keyComparer);
this.timeout = timeoutMs;
}
readonly ConcurrentDictionary<TKey, ReaderWriterLock> lockers = null;
readonly ConcurrentDictionary<TKey, SemaphoreDelegate> counters = null;
SemaphoreDelegate GetCounter(TKey key)
{
var lockCounter = counters.GetOrAdd(key, k =>
{
var counter = new SemaphoreDelegate(
onLock: lockedSem =>
{
lockers.GetOrAdd(key, kk => new ReaderWriterLock());
},
onZero: zeroSem =>
{
var semLocker = zeroSem.State as ReaderWriterLock;
ReaderWriterLock l = null;
lockers.TryRemove(key, out l);
SemaphoreDelegate counterSelf;
counters.TryRemove(key, out counterSelf);
});
//counter.State = new ReaderWriterLock();
return counter;
});
return lockCounter;
}
ReaderWriterLock GetLocker(TKey key)
{
return lockers[key];
}
int timeout;
/// <summary>
/// Begin disposable scope of locking for reading. Must dispose to unlock.
/// </summary>
/// <param name="key"></param>
/// <param name="timeout"></param>
/// <returns></returns>
public IDisposable LockToRead(TKey key, int? timeout = null)
{
var counterLock = GetCounter(key).Lock();
var locker = GetLocker(key);
locker.AcquireReaderLock(timeout ?? this.timeout);
return new DisposableAction(() =>
{
locker.ReleaseReaderLock();
counterLock.Dispose();
});
}
/// <summary>
/// Begin disposable scope of upgrade locking from reading to writing. Must dispose to downgrade back to reading.
/// </summary>
/// <param name="key"></param>
/// <param name="timeout"></param>
/// <returns></returns>
public IDisposable UpgradeToWrite(TKey key, int? timeout = null)
{
var counterLock = GetCounter(key).Lock();
var locker = GetLocker(key);
var upgradeCookie = locker.UpgradeToWriterLock(timeout ?? this.timeout);
return new DisposableAction(() =>
{
locker.DowngradeFromWriterLock(ref upgradeCookie);
counterLock.Dispose();
});
}
/// <summary>
/// Begin disposable scope of locking for writing by key. Must dispose to unlock.
/// </summary>
/// <param name="key"></param>
/// <param name="timeout"></param>
/// <returns></returns>
public IDisposable LockToWrite(TKey key, int? timeout = null)
{
var counterLock = GetCounter(key).Lock();
var locker = GetLocker(key);
locker.AcquireWriterLock(timeout ?? this.timeout);
return new DisposableAction(() =>
{
locker.ReleaseWriterLock();
counterLock.Dispose();
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment