Skip to content

Instantly share code, notes, and snippets.

@anaisbetts
Created March 12, 2014 20:45
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anaisbetts/9515910 to your computer and use it in GitHub Desktop.
Save anaisbetts/9515910 to your computer and use it in GitHub Desktop.
Async reader/writer lock via abusing ConcurrentExclusiveSchedulerPair
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace GitHub.Helpers
{
public sealed class AsyncReaderWriterLock
{
bool isShutdown;
readonly TaskFactory readScheduler;
readonly TaskFactory writeScheduler;
public AsyncReaderWriterLock()
{
var pair = new ConcurrentExclusiveSchedulerPair();
readScheduler = new TaskFactory(
CancellationToken.None, TaskCreationOptions.LongRunning, TaskContinuationOptions.None, pair.ConcurrentScheduler);
writeScheduler = new TaskFactory(
CancellationToken.None, TaskCreationOptions.LongRunning, TaskContinuationOptions.None, pair.ExclusiveScheduler);
}
public IObservable<IDisposable> AcquireRead()
{
return AcquireOnScheduler(readScheduler);
}
public IObservable<IDisposable> AcquireWrite()
{
return AcquireOnScheduler(writeScheduler);
}
public IObservable<Unit> Shutdown()
{
// NB: Just grab the write lock to shut down
var writeFuture = AcquireWrite();
isShutdown = true;
return writeFuture.Select(x => { x.Dispose(); return Unit.Default; });
}
IObservable<IDisposable> AcquireOnScheduler(TaskFactory sched)
{
if (isShutdown) return Observable.Throw<IDisposable>(new ObjectDisposedException("AsyncReaderWriterLock"));
var ret = new AsyncSubject<IDisposable>();
var gate = new AsyncSubject<Unit>();
sched.StartNew(async () =>
{
// NB: At this point we know that we are currently executing on the
// scheduler (i.e. if this was the exclusive scheduler, we know that
// all the readers have been thrown out)
var disp = Disposable.Create(() => { gate.OnNext(Unit.Default); gate.OnCompleted(); });
ret.OnNext(disp); ret.OnCompleted();
// Trashing the returned Disposable will unlock this gate
await gate;
});
return ret;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment