Skip to content

Instantly share code, notes, and snippets.

@b-anand
Created July 16, 2018 03:29
Show Gist options
  • Save b-anand/7aa82d43a6231567bf90fc52ed6d02d8 to your computer and use it in GitHub Desktop.
Save b-anand/7aa82d43a6231567bf90fc52ed6d02d8 to your computer and use it in GitHub Desktop.
Async Reader Writer Spin Lock
using System;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// The async reader writer spin lock.
/// </summary>
public class AsyncReaderWriterSpinLock
{
/// <summary>
/// The max writer count.
/// </summary>
public const ushort MaxWriterCount = ushort.MaxValue;
/// <summary>
/// The max reader count.
/// </summary>
public const ulong MaxReaderCount = 0x0000_7fff_ffff_ffffUL;
/// <summary>
/// The write lock delay in milliseconds.
/// </summary>
private const int WriteLockDelayInMilliseconds = 100;
/// <summary>
/// The read lock delay in milliseconds.
/// </summary>
private const int ReadLockDelayInMilliseconds = 10;
/// <summary>
/// The lock state.
/// </summary>
private long lockStateLongValue;
/// <summary>
/// The acquire write lock async.
/// </summary>
/// <returns>
/// The <see cref="Task"/>.
/// </returns>
public async Task<IDisposable> AcquireWriteLockAsync()
{
// Impact to CPU consumption due to this yield is negligible but it is needed
// when there are too many writers and readers running in synchronous mode so that
// they yield the CPU for other tasks requesting read/write lock.
await Task.Yield();
var writeLockManager = new WriteLockManager(this);
await writeLockManager.AcquireLockAsync();
return writeLockManager;
}
/// <summary>
/// The acquire read lock async.
/// </summary>
/// <returns>
/// The <see cref="Task"/>.
/// </returns>
public async Task<IDisposable> AcquireReadLockAsync()
{
// Impact to CPU consumption due to this yield is negligible but it is needed
// when there are too many writers and readers running in synchronous mode so that
// they yield the CPU for other tasks requesting read/write lock.
await Task.Yield();
var readLockManager = new ReadLockManager(this);
await readLockManager.AcquireLockAsync();
return readLockManager;
}
/// <summary>
/// The update lock state action async.
/// </summary>
/// <param name="asyncAction">
/// The async action.
/// </param>
/// <returns>
/// The <see cref="Task"/>.
/// </returns>
private async Task UpdateLockStateActionAsync(Func<LockState, Task<(bool, LockState)>> asyncAction)
{
while (true)
{
var oldValue = Interlocked.Read(ref this.lockStateLongValue);
var (success, lockState) = await asyncAction(new LockState { LockStateValue = oldValue });
if (!success)
{
continue;
}
if (oldValue == Interlocked.CompareExchange(ref this.lockStateLongValue, lockState.LockStateValue, oldValue))
{
break;
}
}
}
/// <summary>
/// The lock state.
/// </summary>
private struct LockState
{
/// <summary>
/// The lock state value.
/// </summary>
private ulong lockStateValue;
/// <summary>
/// Gets or sets the lock state value.
/// </summary>
public long LockStateValue
{
get => (long)this.lockStateValue;
set => this.lockStateValue = (ulong)value;
}
/// <summary>
/// Gets or sets a value indicating whether write lock is acquired or requested.
/// </summary>
public bool WriteLock
{
get => (this.lockStateValue & 0x8000_0000_0000_0000UL) == 0x8000_0000_0000_0000UL;
set
{
if (value)
{
this.lockStateValue |= 0x8000_0000_0000_0000UL;
}
else
{
this.lockStateValue &= ~0x8000_0000_0000_0000UL;
}
}
}
/// <summary>
/// Gets or sets the count of writers.
/// </summary>
public ushort Writers
{
get => (ushort)((this.lockStateValue >> 47) & 0xffffUL);
set
{
this.lockStateValue &= ~(0xffffUL << 47);
this.lockStateValue |= (value & 0xffffUL) << 47;
}
}
/// <summary>
/// Gets or sets the count of readers.
/// </summary>
public ulong Readers
{
get => this.lockStateValue & 0x0000_7fff_ffff_ffffUL;
set
{
if (value > MaxReaderCount)
{
throw new ArgumentOutOfRangeException($"MaxReaderCount limit reached.");
}
this.lockStateValue &= ~0x0000_7fff_ffff_ffffUL;
this.lockStateValue |= value & 0x0000_7fff_ffff_ffffUL;
}
}
}
/// <summary>
/// The write lock manager.
/// </summary>
private class WriteLockManager : IDisposable
{
/// <summary>
/// The parent lock.
/// </summary>
private readonly AsyncReaderWriterSpinLock parentSpinLock;
/// <summary>
/// Initializes a new instance of the <see cref="WriteLockManager"/> class.
/// </summary>
/// <param name="parentSpinLock">
/// The parent lock.
/// </param>
public WriteLockManager(AsyncReaderWriterSpinLock parentSpinLock)
{
this.parentSpinLock = parentSpinLock;
}
/// <summary>
/// The acquire lock async.
/// </summary>
/// <returns>
/// The <see cref="Task"/>.
/// </returns>
public async Task AcquireLockAsync()
{
await this.IncrementWritersCountAsync();
await this.AcquireWriteLockAsync();
}
/// <inheritdoc />
public void Dispose()
{
this.ReleaseWriteLock();
}
/// <summary>
/// The increment writers count async.
/// </summary>
/// <returns>
/// The <see cref="Task"/>.
/// </returns>
private async Task IncrementWritersCountAsync()
{
await this.parentSpinLock.UpdateLockStateActionAsync(
async lockState =>
{
if (lockState.Writers == MaxWriterCount)
{
await Task.Delay(TimeSpan.FromMilliseconds(WriteLockDelayInMilliseconds));
return (false, lockState);
}
lockState.Writers = (ushort)(lockState.Writers + 1);
return (true, lockState);
});
}
/// <summary>
/// The acquire write lock async.
/// </summary>
/// <returns>
/// The <see cref="Task"/>.
/// </returns>
private async Task AcquireWriteLockAsync()
{
await this.parentSpinLock.UpdateLockStateActionAsync(
async lockState =>
{
if (lockState.WriteLock || lockState.Readers > 0)
{
var delayTimeInMilliseconds = lockState.WriteLock ? WriteLockDelayInMilliseconds : ReadLockDelayInMilliseconds;
await Task.Delay(TimeSpan.FromMilliseconds(delayTimeInMilliseconds));
return (false, lockState);
}
lockState.WriteLock = true;
return (true, lockState);
});
}
/// <summary>
/// The release write lock.
/// </summary>
private void ReleaseWriteLock()
{
this.parentSpinLock.UpdateLockStateActionAsync(
lockState =>
{
lockState.Writers = (ushort)(lockState.Writers - 1);
lockState.WriteLock = false;
return Task.FromResult((true, lockState));
}).GetAwaiter().GetResult();
}
}
/// <summary>
/// The read lock manager.
/// </summary>
private class ReadLockManager : IDisposable
{
/// <summary>
/// The parent lock.
/// </summary>
private readonly AsyncReaderWriterSpinLock parentSpinLock;
/// <summary>
/// Initializes a new instance of the <see cref="ReadLockManager"/> class.
/// </summary>
/// <param name="parentSpinLock">
/// The parent lock.
/// </param>
public ReadLockManager(AsyncReaderWriterSpinLock parentSpinLock)
{
this.parentSpinLock = parentSpinLock;
}
/// <summary>
/// The acquire lock async.
/// </summary>
/// <returns>
/// The <see cref="Task"/>.
/// </returns>
public async Task AcquireLockAsync()
{
await this.IncrementReadersCountAsync();
}
/// <inheritdoc />
public void Dispose()
{
this.DecrementReadersCount();
}
/// <summary>
/// The increment readers count async.
/// </summary>
/// <returns>
/// The <see cref="Task"/>.
/// </returns>
private async Task IncrementReadersCountAsync()
{
await this.parentSpinLock.UpdateLockStateActionAsync(
async lockState =>
{
if (lockState.Writers > 0 || lockState.Readers == MaxReaderCount)
{
var delayTimeInMilliseconds = lockState.Writers > 0 ? WriteLockDelayInMilliseconds : ReadLockDelayInMilliseconds;
await Task.Delay(TimeSpan.FromMilliseconds(delayTimeInMilliseconds));
return (false, lockState);
}
lockState.Readers = (ushort)(lockState.Readers + 1);
return (true, lockState);
});
}
/// <summary>
/// The release write lock.
/// </summary>
private void DecrementReadersCount()
{
this.parentSpinLock.UpdateLockStateActionAsync(
lockState =>
{
lockState.Readers = lockState.Readers - 1;
return Task.FromResult((true, lockState));
}).GetAwaiter().GetResult();
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.IntelligencePlatform.TableStore.Utilities.Task;
using Microsoft.VisualStudio.TestTools.UnitTesting;
/// <summary>
/// The async reader writer spin lock tests.
/// </summary>
[TestClass]
public class AsyncReaderWriterSpinLockTests
{
/// <summary>
/// The multiple readers tests async.
/// </summary>
/// <returns>
/// The <see cref="Task"/>.
/// </returns>
[TestMethod]
public async Task MultipleReadersTestsAsync()
{
long counter = 0;
var rwLock = new AsyncReaderWriterSpinLock();
var tasks = new List<Task>();
for (int i = 0; i < 5; i++)
{
tasks.Add(ReadAsync());
}
await Task.WhenAll(tasks);
counter.Should().Be(5);
async Task ReadAsync()
{
using (await rwLock.AcquireReadLockAsync())
{
Interlocked.Increment(ref counter);
while (true)
{
var localCounter = Interlocked.Read(ref counter);
if (localCounter == 5)
{
break;
}
await Task.Delay(1);
}
}
}
}
/// <summary>
/// The multiple writers tests async.
/// </summary>
/// <returns>
/// The <see cref="Task"/>.
/// </returns>
[TestMethod]
public async Task MultipleWritersTestsAsync()
{
long counter = 0;
var rwLock = new AsyncReaderWriterSpinLock();
var tasks = new List<Task>();
for (int i = 0; i < 5; i++)
{
tasks.Add(WriteAsync());
}
await Task.WhenAll(tasks);
async Task WriteAsync()
{
using (await rwLock.AcquireWriteLockAsync())
{
var localCounter = Interlocked.Increment(ref counter);
localCounter.Should().Be(1);
Interlocked.Decrement(ref counter);
}
}
}
/// <summary>
/// The reader writers tests async.
/// </summary>
/// <returns>
/// The <see cref="Task"/>.
/// </returns>
[TestMethod]
public async Task ReaderWriterTestsAsync()
{
var values = new[] { 0, 0, 0, 0, 0 };
var rwLock = new AsyncReaderWriterSpinLock();
var tasks = new List<Task>();
for (int i = 0; i < 5; i++)
{
tasks.Add(ReadAsync());
}
tasks.Add(WriteAsync());
await Task.WhenAll(tasks);
foreach (var value in values)
{
value.Should().Be(1);
}
async Task ReadAsync()
{
var continueReading = true;
while (continueReading)
{
using (await rwLock.AcquireReadLockAsync())
{
var localValues = values.ToList();
var anyZero = localValues.Any(v => v == 0);
var anyOne = localValues.Any(v => v == 1);
if (anyZero && anyOne)
{
throw new Exception($"Mix of 1 and 0 found.");
}
if (!anyZero && !anyOne)
{
throw new Exception("Not reachable.");
}
if (anyOne && !anyZero)
{
continueReading = false;
}
}
}
}
async Task WriteAsync()
{
using (await rwLock.AcquireWriteLockAsync())
{
for (var i = 0; i < values.Length; i++)
{
values[i] = 1;
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment